diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -137,6 +137,77 @@ namespace Implab.Test } [TestMethod] + public void MTQueueTest() { + var queue = new MTQueue(); + var pool = new WorkerPool(5, 20); + + int res; + + queue.Enqueue(10); + Assert.IsTrue(queue.TryDequeue(out res)); + Assert.AreEqual(10, res); + Assert.IsFalse(queue.TryDequeue(out res)); + + for (int i = 0; i < 1000; i++) + queue.Enqueue(i); + + for (int i = 0; i < 1000; i++) { + queue.TryDequeue(out res); + Assert.AreEqual(i, res); + } + + int writers = 0; + int readers = 0; + var stop = new ManualResetEvent(false); + int total = 0; + + int itemsPerWriter = 1000; + int writersCount = 3; + + for (int i = 0; i < writersCount; i++) { + Interlocked.Increment(ref writers); + var wn = i; + AsyncPool + .InvokeNewThread(() => { + Console.WriteLine("Started writer: {0}", wn); + for (int ii = 0; ii < itemsPerWriter; ii++) { + queue.Enqueue(1); + Thread.Sleep(1); + } + Console.WriteLine("Stopped writer: {0}", wn); + return 1; + }) + .Then(x => Interlocked.Decrement(ref writers) ); + } + + for (int i = 0; i < 10; i++) { + Interlocked.Increment(ref readers); + var wn = i; + AsyncPool + .InvokeNewThread(() => { + int t; + Console.WriteLine("Started reader: {0}", wn); + do { + while (queue.TryDequeue(out t)) + Interlocked.Add(ref total, t); + Thread.Sleep(0); + } while (writers > 0); + Console.WriteLine("Stopped reader: {0}", wn); + return 1; + }) + .Then(x => { + Interlocked.Decrement(ref readers); + if (readers == 0) + stop.Set(); + }); + } + + stop.WaitOne(); + + Assert.AreEqual(itemsPerWriter * writersCount, total); + } + + [TestMethod] public void ComplexCase1Test() { var flags = new bool[3]; diff --git a/Implab.suo b/Implab.suo index 39be6a30b2af76bbf8a5b2d784ab6eedfd7ed4db..34b35ad721068c074ea216725753a1e954db220f GIT binary patch literal 50688 zc%1D$3wT^*m4BK((l*!@3sfE&*odWOrZbsmQmUP2TS-f3($a;L_Ri#Ha@)zwFpo5$ z1$=^nEJy_rmDVDU6%=t53Zk$T5LdQ^bM&J9FpGow<*h zDM{&{@8mw_zyCS^|DXT-=bZob;tNZD{`rT``~_2m_pny>=c)N@uHs#b1QZrCHVZDT z0DqpEnv!2noeVQ7ybdrM>g_CmIRNtju+HZLv;mv~umE5oK)YD&>@5I`0G0tP0XP-l ztpG~_P6Id{;0%Db0h|fo0B`}EE%D0R;rb4M4uEq3&I4Ep&Da4EnRfXe~40&oC-39t=d2w*$F z6;c`OkghVkQDGBH!(SfiBEnK|&9X^mCY%mjBd67=6L;|oAN|&=KR(ieGUNRo@m)V) zOtYO(&M7tmcM&#W6923+nPc?3#nN>UYAMHdL468v%|qSh7!Neb?{hlobh243*apnt zM;Y2y#4oT6lmg<+Ni^_K7IF9=1{enpcM$h$hZTSy+wHp~8jRpaxh+SO`48&^<-i

NlW)% zm#zi4<`sB%OZQj9^(yIpuXMczuGa#*AK+8>uK2@YTYTdqE1nO1{Jy?dwD_e~I2M|3 z_LK5}jR6@~RxH|)JFnQ13-hr;qCd;$_*95kEPY-o>X@#~iK^&xmit=^XYrI2SjsMaGO_c431M z&!%(fk-UA2kSlP>fqY?DNZW~o1fMP545rh`ONG>MdUwtq=JMR2kjdHmg;a`*C3$*? zOyv1gPKXIfAwOyFM@n<>CeFXRk8b!Zem9U$XL-!$!je8!NAYzqV7=lbS_;hLk8 zLO%bi<4yy%mn2{qAqr$lXcuG+J%N2jo4qX=bAN?Z-=ZW_h1NZF&u16WHuH!A_ z!KfY1gZv{uqYY*hKk7@e+_OVj?^em3i%I*DrOwpUD$*RaWs}IA7`Kh=wnoaPRm3Tc zOwrCZ0mVsb;mBKi;p-GgnhuEB5Bz3_nhzE=;2^{tW=iS%-iFgtl0Rl?5qp9VNDx1y zg5V@t++Yj)AT7Le1C|in1%VbX>!x=hxZ+nnpfCXcy+DnZXwl<@5>_wZh{E@nl7lc{ zUW4!sG5G1N4N^x+d{hEqh~b0xE&wmZ#!@nhHPCRZjo^bu(o-)VI3P8wQwPcnv5F=!=M$og1Upa}#FePjQ9>5!c97T!ekS65K zkfKE+HDS1quuD47pVCM^7Im^lrzuOzIO)y1i*~k9W@ig3s~af^oBAe0ay3K!9`-z_ z3CJ39&}2#+^K@a*aNQWZ~IkJr$@aH5O$Vp{ZqJNO;G*4*<*wz;o(C?+UG6rLoMQ}rO z@GRoYjlj2qP&%S5#}~Pz3#gI3%UEWs**MfcQid2pL|2Zq4S~L@rfF&i^nasY7j+sd z`j@OnpdO56lqUTOC<;WHtR_QhR9i~14@S)3N4dHIe&;|&m6wRu0)8BuUMJnx;y;v$PMx$+{-P<3eVtMKm;>35 zfz2=ieMn_Li@i`r_CWP^pL_29BQ454DHs|Ey@lO#s`zf|&-fol_WFpU@}x7OR2w9Z zPKLM&c^v&4lkn>Z{E7MIi;_n6M&sy!++%MhZ=tfcPm-ioe}f~G5t3h+_5j7^sV~GZ zq|mUHImIwcY03dP6X}Xc^x}AUH81(vVB@szMpz_`v;yk2NV*|C7hsOSGg`Ygxa%hS z2WfEv6&|vV@V78o4L*w5L)K9@#OS6ca~4S>u{NSc8cC1!8bKX1Ks~Gg+8+;JjnD`) zdTp>Hs-E&HvTS?E+aYp`tE@)!EfJ(`qG%1_>>J}&jQFjcq+yOmdgN66QYvU?%=wjb zW#X5&1rBVxbj`lQr_z8ls&{Jpqne!~2enn__oOz~Y90$mJ;c2)6 zyB1k>6boR%LdU+Zu3eIT;F+zLo$9+QO_N_#p$hNI0U2e4#%Pk%&xT2kZl*aRYs}L> zMyi%DyFl@xB4T2r~C#k6shYqYMk2S_8%P?@UBBC3)bI5z!H5;l1(4h3#Oy2o~ufj?Oj zA0p2|ocw1)MZLdt=OjxGzwC$nO;8O7$Zt0a-^kc)Gfp^kJ$&0! z-fLg}J^R`Mt#T~}6wg}j=jZQe|KmN6fBK+n;E$pG{oZ5qv{Kn+KL18V*~51^UR(M6 zZpWiv_(pX8bEn^}$sI|MR5|!itW2EkF#$}5NsiNuAAK(r`X3iDbN;srS;?X<_y2_{ z3pnScqHk0M(1zLqS5a=(4KCLjl# z|FMvnD}P)oaldpg*#EfiXl&`Lr*x$6e&VI>Z(ebVW~)(#+8S1gm*f&XgPF6&w8y06FU5YY zT!ZakXrUOzFaTf0C9211ECSO+&qsn?1poC`gUI~9j5VkIxeVY(ucO}teq68f5rg>M z2JoA0|7cmtl2zaTTiXMCz6Ja!SLHG74*-0SiE~0f%*496PP%?ny50cS8>Rc3r0dOa zy$9eE0Jj3{1NbDsZ2-3eddAY8x1N^2ax0{0ICd=21lr1y_V*GJ)c2;doj$E9~qNY`&k*QccGVd?s` zbbS`C-vM|I;JW}v0KNzCJiy;eaee?-9Qb|-;QP|MAHwyhbbm6O3^O61rL_tG#|%VW zXKD&n)r{1Sk(XUHNm^@!gh}6utG?1StFfBTs-()+GxE~69%Zw6^}~u25wUG5Vynj% zwlYt{UO`;|aO7z$Ek^wZU)0%e_vfkQ_dgT{gI77diKX!;}2F-v==q+ zFRwZ+|E;@^w21T3YRzSlL#^?A(dQ{!L~f~<)ZBytv0rAMJq8n2LaP|S}OMw*$zF|Qi+t$PQY}Qn@mT4Murt$FuFFxyoIbc zYJe%e-DHa8L&a8Tsy!0}Q?xgr1gno^Achrnw^yxsDOPyOnEX1=BE9wV{1z%#<80}fs{QNPZPP_ZG@Gx=+{nB)vvY*HpYpwOH3a6c1Op+|tJU*~AC&ZJ zb8=HTY_0viTr=*xJh|iFUe;QxVOBS8XG-0872w+hUxk>sbjGBfY-$VvXEy5V$))_) zr!kw;W{u6e#&o+Zr(4~#JrngnZ=K~}PDhqy&5fpIjrCoQLR4F0R8Bt154Ib`T##f` z+_2QLm*G@mw3DvM(bsvJ@FnU?Sk~$ZEtU28=JBnc^gW-{VVbAIG>f~i$5g&&s**e@ z&$Q(k_AbW{t6FzBQQ9r)dfG8sR5_m6E!wOOuU_+a;B--kH|LpJvGOoH+T9rf9y-?zwf>!qC(Vv$i?pJ4SMVz*WYfEzstVix@ zJ^IoQKl$j-AN~G{4}a~IWm@yV&x&2M^llUL-}T(Qr_TPVH`adIkN)B47yk9S2tV1X zfwRKS4%Y8(d~IRk%%d+pbjIml|LQwt@&EX$*1Y>N8svZxhnhLm^pjm1=lrDiP;B$t z=AHMT>zo_3*3i^x37c3yV8qG#f3NTO%^&(~2d?R09@5OTs zu z5zrr(mhQ#*WH>T}yHJT^cH-V1Ms{Bp*I%3~wH%}P(NiezKct`k+AB~n|G1x!+XVgx z0Irwrw*bC=z_bZ?sGl^ni{SrScr!pS04?}pI&nfftgx{E5S~*b=fC3oh1q!XyOx2; zT%I4ZhtkO;AIIYta`v@&e1Q3JPg_<%_`}~iIDkpM8Qh7d` z$qG3hkK)MLLrE@|3-dW40oiKA$sp~>2J6M5Xh6>AV6HY;$&5V|j!b0I*?d^wlIcV+ zJu$%N^H4LWiJqRx^5a6fkV{U6M)~+Iet135n_Dcu?n}q%$dt3?7wgBk1RqUj$GAMk zT%Q`|Czhti`D|7g<|7kA4pWW_NgiAON)7%Q$nbF>1X{wd#37|$vB=t5u-fJ$adT;3 z9jekf_-r;gEV9|dT%Kz^TjR?>Hp@-oSt^KiKF;2njsJ9`&fP9x5zX121Eg)+$Hkfj zQrp_*&r`_8#qz^KV_Kz)_Rhk6oH}W4P?*MpI8w3W6$VJ<3t$erem1TlwGo;xpIVtd{*<+S9ePa|kxd`qv!~vQZ}PsyAH0t$3o>)23|~6Z~Ur8sY1{KRtBgXh-46 zSNE=&J?pyv)?C3|LRgROgl%8kYhi8TlCdNEm+kkg^jvbr_!l*;K5ai;eX>J)Z(`I( zjqW5iGcBiTwxS&4&EvB^@m2{e(x_EsJaz6*+*NR=Rg^^aZ`7r7Vyt*vSWyDv@9Ny1 zXshWKSIKuNrrx-fdQZ$J&Jf3~tgM-?J_D`%*K!S~?LF?}JeJk?d1$zApV77Kjhc5+>=65!3AAOnr z9n-X?7O)4-!F8#hZ3*`};BSxD747K`IYKtCGZ?kG{UL`f;PUxw{;<~;`&E+OjalFexjMQkhIPl#9sgT%LaJz6HVjEF)qa=_+j(SqeWQFbW2(f8Dz(EK@j1LuVqU-BX>+?GL0eC^FJg;ELmr1K5cLLv zp(26NiJ{_MZQ@2IhPae4Ml(G`;>Am z(t2kzn=w>#Uf|J`%JE~ddcB75hBHPhQ5J%a`91B1QJ9;7ff%nfrsPmj$r z9D{0ibdNw$@-DoKIel?|+!J$OEynj7jvt<$wxwE^Tt;=)u6WZaUBlD62DLMCu=s3$ z>TmHsX5_dVWtPcT2Amd$Go!=P*6`NO!*azNEPgysDLh>rK-yWDt9XOcqHrL;iS4-_ zuZ%t)y+yi)r=cCFPkoyN;b~`|>C=v5*LdDpxQQMr{qt{h*FTu{Kj7ILvR6kRzef*0 z`G4?CWuy3U?M1>ce*N<|a9ne>3H%>p;<+2f@awL>Fi<4gs{f!g4vBMmQ>q;6 zt-Zp4=WpSeM)EXUBlxdm;%T=S^zz@Mla`h9chIA{-5`FyLHzQuvPSS{00e{hyY>C= zEjZW7X#)R;n0N*mhUvV^P84L2hfbb$N~5rTIvv|fqp$(mQ3pq1y);UJzhZpcYbOK{ zgt$>USNp`@WoM@5>&?CY`OKb`KArXeIUch^7N}?te^pa*a^}*>U7DuD?g%qA$0yhL z9N#rQ4RW~*73sAy!xR%8-HUHZv4P4 z^BtdFpt+~UE_3_e=pPLAo%OfQ+V(FuJ#hcn{_MQ|gSTn!!KqxEJlKrm#10*Jdg{VO z=Unx<$N$uN<{|zCO)5mMm8tzlHI8)B*FS4p0H}ZBiF3Woxc>W1#)|8#_3?M>;zzs= z6ZoHGX2zduj{sR2f5VAIyA9%Z8O;A0W<3Aj2Dny!C>`nb_=^h;-tzREdkeq%{pQHw z+rOj9b?UIG@yBzGY-vCGt`+s(U=1x?N+HhD@yv*!~tV~_)5iRZegsR15HB!~=)~5H&Wi<Teonn$)CP;NUWmUHP@sXtWP22ZpAV!9=JxX$v^?|&M<{_pPqelOkY{u>?Y!GFBlkJgTLhyM7DJ*(!w#{ToM$DX+(d|Fbecp!~2 zR^7i@@BS{LlTp8&Rc>Q%ZXXh*0jUgc^!eLw{GKL8_xPGQBbv4MI>BCDJ4UTNy&}LT JX4|%r@c*sQPf-8> diff --git a/Implab/Implab.csproj b/Implab/Implab.csproj --- a/Implab/Implab.csproj +++ b/Implab/Implab.csproj @@ -38,6 +38,7 @@ + diff --git a/Implab/Parallels/AsyncPool.cs b/Implab/Parallels/AsyncPool.cs --- a/Implab/Parallels/AsyncPool.cs +++ b/Implab/Parallels/AsyncPool.cs @@ -16,13 +16,30 @@ namespace Implab.Parallels { ThreadPool.QueueUserWorkItem(param => { try { - p.Resolve(func()); + p.Resolve(func()); } catch(Exception e) { p.Reject(e); } }); return p; - } + } + + public static Promise InvokeNewThread(Func func) { + var p = new Promise(); + + var worker = new Thread(() => { + try { + p.Resolve(func()); + } catch (Exception e) { + p.Reject(e); + } + }); + worker.IsBackground = true; + + worker.Start(); + + return p; + } } } diff --git a/Implab/Parallels/MTQueue.cs b/Implab/Parallels/MTQueue.cs new file mode 100644 --- /dev/null +++ b/Implab/Parallels/MTQueue.cs @@ -0,0 +1,74 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab.Parallels { + public class MTQueue { + class Node { + public Node(T value) { + this.value = value; + } + public readonly T value; + public Node next; + } + + Node m_first; + Node m_last; + + public void Enqueue(T value) { + var last = m_last; + var next = new Node(value); + + while (last != Interlocked.CompareExchange(ref m_last, next, last)) + last = m_last; + + if (last != null) + last.next = next; + else + m_first = next; + } + + public bool TryDequeue(out T value) { + Node first; + Node next = null; + value = default(T); + + do { + first = m_first; + if (first == null) + return false; + next = first.next; + if (next == null) { + // this is the last element, + // then try to update tail + if (first != Interlocked.CompareExchange(ref m_last, null, first)) { + // this is inconsistent situation which means that the queue is empty + if (m_last == null) + return false; + // tail has been changed, that means that we need to restart + continue; + } + + // tail succesfully updated and first.next will never be changed + // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null + // but the writer may update the m_first since the m_last is null + + // so we need to fix inconsistency by setting m_first to null, but if it already has been + // updated by a writer then we should just give up + Interlocked.CompareExchange(ref m_first, null, first); + break; + + } else { + if (first == Interlocked.CompareExchange(ref m_first, next, first)) + // head succesfully updated + break; + } + } while (true); + + value = first.value; + return true; + } + } +} diff --git a/Implab/Promise.cs b/Implab/Promise.cs --- a/Implab/Promise.cs +++ b/Implab/Promise.cs @@ -88,7 +88,7 @@ namespace Implab { /// Результат выполнения. /// Данное обещание уже выполнено public void Resolve(T result) { - lock (this) { + lock (m_lock) { if (m_state == PromiseState.Cancelled) return; if (m_state != PromiseState.Unresolved) @@ -106,7 +106,7 @@ namespace Implab { /// Исключение возникшее при выполнении операции /// Данное обещание уже выполнено public void Reject(Exception error) { - lock (this) { + lock (m_lock) { if (m_state == PromiseState.Cancelled) return; if (m_state != PromiseState.Unresolved)