# HG changeset patch # User cin # Date 2013-11-06 23:41:32 # Node ID 0f982f9b7d4d0bf83e1700783d6b0eb8fe01aec5 # Parent e943453e5039617dcc330948d1aa9a17b3ce4c70 implemented parallel map and foreach for arrays rewritten WorkerPool with MTQueue for more efficiency diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -4,71 +4,64 @@ using System.Reflection; using System.Threading; using Implab.Parallels; -namespace Implab.Test -{ - [TestClass] - public class AsyncTests - { - [TestMethod] - public void ResolveTest () - { - int res = -1; - var p = new Promise (); - p.Then (x => res = x); - p.Resolve (100); +namespace Implab.Test { + [TestClass] + public class AsyncTests { + [TestMethod] + public void ResolveTest() { + int res = -1; + var p = new Promise(); + p.Then(x => res = x); + p.Resolve(100); - Assert.AreEqual (res, 100); - } + Assert.AreEqual(res, 100); + } [TestMethod] - public void RejectTest () - { - int res = -1; - Exception err = null; + public void RejectTest() { + int res = -1; + Exception err = null; - var p = new Promise (); - p.Then (x => res = x, e => err = e); - p.Reject (new ApplicationException ("error")); + var p = new Promise(); + p.Then(x => res = x, e => err = e); + p.Reject(new ApplicationException("error")); - Assert.AreEqual (res, -1); - Assert.AreEqual (err.Message, "error"); + Assert.AreEqual(res, -1); + Assert.AreEqual(err.Message, "error"); - } + } [TestMethod] - public void JoinSuccessTest () - { - var p = new Promise (); - p.Resolve (100); - Assert.AreEqual (p.Join (), 100); - } + public void JoinSuccessTest() { + var p = new Promise(); + p.Resolve(100); + Assert.AreEqual(p.Join(), 100); + } [TestMethod] - public void JoinFailTest () - { - var p = new Promise (); - p.Reject (new ApplicationException ("failed")); + public void JoinFailTest() { + var p = new Promise(); + p.Reject(new ApplicationException("failed")); - try { - p.Join (); - throw new ApplicationException ("WRONG!"); - } catch (TargetInvocationException err) { - Assert.AreEqual (err.InnerException.Message, "failed"); - } catch { - Assert.Fail ("Got wrong excaption"); - } - } + try { + p.Join(); + throw new ApplicationException("WRONG!"); + } catch (TargetInvocationException err) { + Assert.AreEqual(err.InnerException.Message, "failed"); + } catch { + Assert.Fail("Got wrong excaption"); + } + } [TestMethod] - public void MapTest () - { - var p = new Promise (); + public void MapTest() { + var p = new Promise(); - var p2 = p.Map (x => x.ToString ()); - p.Resolve (100); + var p2 = p.Map(x => x.ToString()); + p.Resolve(100); - Assert.AreEqual (p2.Join (), "100"); - } + Assert.AreEqual(p2.Join(), "100"); + } [TestMethod] public void FixErrorTest() { @@ -82,65 +75,90 @@ namespace Implab.Test } [TestMethod] - public void ChainTest () - { - var p1 = new Promise (); + public void ChainTest() { + var p1 = new Promise(); - var p3 = p1.Chain (x => { - var p2 = new Promise (); - p2.Resolve (x.ToString ()); - return p2; - }); + var p3 = p1.Chain(x => { + var p2 = new Promise(); + p2.Resolve(x.ToString()); + return p2; + }); - p1.Resolve (100); + p1.Resolve(100); - Assert.AreEqual (p3.Join (), "100"); - } + Assert.AreEqual(p3.Join(), "100"); + } [TestMethod] - public void PoolTest () - { - var pid = Thread.CurrentThread.ManagedThreadId; - var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId); + public void PoolTest() { + var pid = Thread.CurrentThread.ManagedThreadId; + var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); - Assert.AreNotEqual (pid, p.Join ()); - } + Assert.AreNotEqual(pid, p.Join()); + } [TestMethod] public void WorkerPoolSizeTest() { - var pool = new WorkerPool(5,10); + var pool = new WorkerPool(5, 10); Assert.AreEqual(5, pool.ThreadCount); - pool.Invoke(() => { Thread.Sleep(1000); return 10; }); - pool.Invoke(() => { Thread.Sleep(1000); return 10; }); - pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); Assert.AreEqual(5, pool.ThreadCount); for (int i = 0; i < 100; i++) - pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + Thread.Sleep(100); Assert.AreEqual(10, pool.ThreadCount); + + pool.Dispose(); } [TestMethod] public void WorkerPoolCorrectTest() { - var pool = new WorkerPool(5, 20); + var pool = new WorkerPool(); + + int iterations = 1000; + int pending = iterations; + var stop = new ManualResetEvent(false); var count = 0; - for (int i = 0; i < 1000; i++) + for (int i = 0; i < iterations; i++) { pool .Invoke(() => 1) - .Then(x => Interlocked.Add(ref count, x)); + .Then(x => Interlocked.Add(ref count, x)) + .Then(x => Math.Log10(x)) + .Anyway(() => { + Interlocked.Decrement(ref pending); + if (pending == 0) + stop.Set(); + }); + } + + stop.WaitOne(); - Assert.AreEqual(1000, count); + Assert.AreEqual(iterations, count); + Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads); + pool.Dispose(); + + } + + [TestMethod] + public void WorkerPoolDisposeTest() { + var pool = new WorkerPool(5, 20); + Assert.AreEqual(5, pool.ThreadCount); + pool.Dispose(); + Thread.Sleep(100); + Assert.AreEqual(0, pool.ThreadCount); + pool.Dispose(); } [TestMethod] public void MTQueueTest() { var queue = new MTQueue(); - var pool = new WorkerPool(5, 20); - int res; queue.Enqueue(10); @@ -169,33 +187,27 @@ namespace Implab.Test var wn = i; AsyncPool .InvokeNewThread(() => { - Console.WriteLine("Started writer: {0}", wn); for (int ii = 0; ii < itemsPerWriter; ii++) { queue.Enqueue(1); - Thread.Sleep(1); } - Console.WriteLine("Stopped writer: {0}", wn); return 1; }) - .Then(x => Interlocked.Decrement(ref writers) ); + .Anyway(() => Interlocked.Decrement(ref writers)); } - + for (int i = 0; i < 10; i++) { Interlocked.Increment(ref readers); var wn = i; AsyncPool .InvokeNewThread(() => { int t; - Console.WriteLine("Started reader: {0}", wn); do { while (queue.TryDequeue(out t)) Interlocked.Add(ref total, t); - Thread.Sleep(0); } while (writers > 0); - Console.WriteLine("Stopped reader: {0}", wn); return 1; }) - .Then(x => { + .Anyway(() => { Interlocked.Decrement(ref readers); if (readers == 0) stop.Set(); @@ -208,6 +220,55 @@ namespace Implab.Test } [TestMethod] + public void ParallelMapTest() { + + int count = 100000; + + double[] args = new double[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = rand.NextDouble(); + + var t = Environment.TickCount; + var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join(); + + Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); + + t = Environment.TickCount; + for (int i = 0; i < count; i++) + Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + } + + [TestMethod] + public void ParallelForEachTest() { + + int count = 100000; + + int[] args = new int[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = (int)(rand.NextDouble() * 100); + + int result = 0; + + var t = Environment.TickCount; + args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join(); + + Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result); + + int result2 = 0; + + t = Environment.TickCount; + for (int i = 0; i < count; i++) + result2 += args[i]; + Assert.AreEqual(result2, result); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + } + + [TestMethod] public void ComplexCase1Test() { var flags = new bool[3]; @@ -219,7 +280,7 @@ namespace Implab.Test .Chain(x => PromiseHelper .Sleep(200, "Hi, " + x) - .Map( y => y ) + .Map(y => y) .Cancelled(() => flags[1] = true) ) .Cancelled(() => flags[2] = true); @@ -228,13 +289,13 @@ namespace Implab.Test try { Assert.AreEqual(p.Join(), "Hi, Alan"); Assert.Fail("Shouldn't get here"); - } catch(OperationCanceledException) { + } catch (OperationCanceledException) { } Assert.IsFalse(flags[0]); Assert.IsTrue(flags[1]); Assert.IsTrue(flags[2]); } - } + } } diff --git a/Implab.v11.suo b/Implab.v11.suo index 89d4ac29ba1a27fd1eba1015282feda7105746af..d05880eb554a9716b818e81f9e8e949a3388219a GIT binary patch literal 106496 zc%1Eh3wRvWb?{xk@I$V#!9W}*D2baAA}uRRT1m#HUhPV@f@~|6EGQASN4q0=?bYtG zJL?CrLqbYOLJ6&%5JCtT2qi59LYneOptw9zNC<)aKW!-qf8b9`zO-KoZ9_>YR?j(i zu6A~Ic4zMHO4^m?>bsiRd7b+>_q^`8=RWu2K0c0@Jdl46^0=F7CT(X4_|sTD>tb;@rr04fDilp;u=x}H{cf9QKRI|_ zpy#D$J%5Df9sF7!KuoZ`tOuZvz*!eNC<%W~4!${g=bss&FQoydur%ApdRY{X8ruQ? zRVb6Vp0@ec8aBss7R%zi6Zo$}UN`|i%979u2s5phL4~#m!~YQE{j7=k*oO;#>s)|; zE*$?rFM|{KCk@YShx`uwUZB7KQT=#Qug^05c}#t9K2QIR@BBW?9qVVAelLRGh!a23 zufNN5e+#pw6-)H?afZQDpFU_PTOFM5fP6ON^bxO}X)60Y@cUzshag`Hc_-vukat5? zAP++hLykaJA&)>Fg}euH6!Kok`yj_4k3o(@PC!mVz6^2-GUBNQIRkk=)M#z5()AAm0f2tB`Lpyd(0haP2n8w?qCqojw zu?_Er{7u6*zNMdkTR;8{9RC{f{g5Ak{2=6qAb%I~_aHwE`4PkWC-n2j^y3q7d{RIE zJN@{ye*B?+{E>eAv3`6Wjz59?fArrk=*NG6<3B=v5%NDle#un#{|CQ+4*3j5#Oo{D0=se{tSP{T~BJ3PA0LqXvIbI3KmGJEnSo zk8M9s0zT5L#rpY2p#^ve(r}j1TSI~R?}PuNP#!O|g92qz*oE-RjGZk2eIsE&3$Qchm&!q+YH}ZDGkp+hPCvCHm*xO{Zwa-*4r^67c_uu?Z>C9)7DGeChT_{pWyw?vDSOAOF(ujpM%`+C8lI!hWc4gmjewEa2$+rxh^Lp{_veAIKUatOcp=+Dw0nf6D0LG-U3{3iG-mOfYl ze8lM^j^W$WKZJ`@E8}ZK!f0IrJlQ|_R zoX!EvpU>p@Z!!4&MDsL<&`YDgM)V${kcXw2cMLb9_7B6A12NyXha>2Ac$ku^keGxU=J~-=vCQFj}WZ2>t zCv?c{(Cm_Co5|!8Ap;nie8O;TWU%gqdtN=~LpPq%{VH3gjQZa*kWYjBEaasn zv_~~2v;DiD?a?Vvm^gz2`?oax!J#kg-`y+#e8vY{rVD6Z6Ke#H?}KAEoFVTE!S7uY zQewVzw5|mBmq7lfgEH_3;P)1|uK+R0v;RuN$Fx~L+VkJy3FQy8L8AT9DSWJfkpE`@ z|7WOwO(XyJ)H44slYfT<|88yVZV9$@wsrejj~wn9OU9JpA%5_N)5%m~@8R&ep#zE3 zzM&|;sUNn=@NdyrK~e^BvNCar^Y^_lh!4VlG37Eu--*!l&JK7|TyN6lfQObna-$ku ze*OGCk-NOT{`>0;-TVURP`{jO==MSahY>t}G^tZEFX_1|8ug8`NAlQpW8Np8zjXOg z^;fTa^=iawYuN+&56%yMM5g^X866{1ukwLjiTOXo7?Ir21|uam*QemIUyz*~CWc?&LluecUr>B zg_hCJhk$D*;V;Cb=*J)&k*FeT+fUQ%8WYA-(H`^g4K6vw0rHxKBhm|ucd%AHq?)$!--TZI^0k{sHW2J*oGFb5C0nKgPE9? zNvRv+YDP;bv4;A-%y29kzEC|hkl3fjHw?G6DXrmFe^WgA8N%);R zfsZzifdcS5iT)@sB9Z_%s?$JYNpe$53H8T0cY*)d#~MM76LZ4$kC9A}ONk0m<#oFL zC46g~z(`5n%{c(aL*%9^u8A6) z^B;3x-n(Yr<8O~V?|<^8&);BK1H@C**b;K`yHP=ln6z0A{VMgEM}FzKw(I21?Hhj+ z?CG+EZY;D!{fEd#kY|A~Y4Go~y?yUBd+IZfzIk}#yg8rw&jLN@Wm{o<53qsq<}bFA z^L7-ps;Hq1x6gmirq5nCcg1Pr%l@Kg>#<{&Wk?gJ#nWpnZd)bv|3Y6b;s5ogkpB!v z9At!H^#3yZG70~$uQBe^|xvyUD5cS|^h2{Yh(4)5BLVx zTpQsRUiHD>F7hMWMXitZNf5$g+s=UbAI~(;|J1bq#~|6=`8Yyt{~*l)q{tU6?7#~R z?N7KiL7~^(a1x`lC_Hd?nyNt|1Amu!k>Xl45?yUcLXt8zGB%0`F=Wr#x-_C3bbP(uHAL;iw}0NVij_5#+UP-^buL9*-=Pe6Y?=#wPY)H#8V zTCp&h*zt+uL7d;iK>270fAo`|*eKS!ktq3LkwQNA+_v|GS7_huNMyPY3H%Un2iIy#Rb` z{#!Vq{}rx|$9XcR@X>bFPyjx*Im*3GDG;_lu7eh9f1SWb`4K%I?N4W$oy|@k>{RD8X64z_sd}Tdkq}}gloPQu4{e7wvk7s~*jq@59-vIj< z`wV?uP}f7B8yp$f?>Iu64fVho8=R9u?uoh~TCUDF+{g6-xL#``L-RYX!C^#WLWXBH zF+NAz1NkG6FNE9+8P}{MHQvVfdU#y-j_cep7S@3N{-A!m7>*e00{u-ZvHp8Z|887A z;`;A{`Z><0X@>G2(9coNzFa>)tRHa=xLE%^9s1u-nXCU*i~ctQ{C}kR503Y@UH>0S zom;vGkT#3?sH~sk0o}IQV8X)<6ZTweLd;$84JUA|(#1dKo}k$kkZBmxgJiXAA}HJg zCK1<{8fE8Wn{YaIP2LJh>-Te=fQKfqyq+NqlyxE=GL1%~0bWcVa6T&YZOaK-;5Y-o zWzcCcGcjet`RR74a$8T{b_VKc9KC8&cnsJ8r3s>Rk(G)LLuuns+IlESAFZ6^3FVX#vaH8QE(rnlcEbHl z!088|Wl)o%*TzVU2g?^oHYz*?k58anW;aQM&uyRR<8o$zN}(QJssW#c z9)|B7g5MgAs3^6bMzzY8^xYFR;&sr7U!`??HhxFU8}N5SUA;Z{{_csIW~TwMtn~() zTRq5}2WfuhL+~8V-i#0|5s|Cj1WS})3rR~UvX>>`9UKGsZBeY9h9=r&2(G7TjwYq! zguoWW&{5)6!_)&F14evUYOD@HMro%Qu3E!Hx%}fGL3F@q>j14Y1SJlERNev5yWv01 zV|CLAYlLxsvX5D7l`sSCxn^Oj33B1Iq z6#9+^?UY^H;U!56uL^3F-^xzfaA%F;N%5K?joK0n_D;oUHeSbM%D9GE5y* zD=A>6&`gxyte@~$)VlU5fbqJyhDZr=bAJ$J^tTqgw%}8H{xx{?6XQ?Ly?5`!H=e;| z^mC;sh0tECmPHz6^Y2D}ePHIu=4gksm(6)AlqZ*$Dx=nxB#w6808uPsY$hum7@uBh z`E>UF3RX;05nmEg{BVtqem>9XM8b*HXbp3{HN0fUN|7$(q}ID@rq-osXQMS!J8P`% zrM|t))kLefuVggQJvCEDD+xPDt5lI6Cmq$cR(n+NdM9-pso6T>vj_CKHzzgC)ND;f zZ;CVSUn4vK;}epZGmcsNdQi1nLS`P7#90BmXRUPZTBV@nSsG>&rFm~&%gB6XCgF^k z>*nK(cw+Whr)Hj{vCN`(jn7%Nz|7UqSLU@w;diWBW@cDtmgKbpzi(CJnWovFTA=RP zC!JcX=l`PVywO*`6gYnAMH`PjwqlY0Om4AmVe%I1!oDTBcXO@SFCKYy23fJubJnR9 zyHb88XA?FkIs_GU>m>)=}3r^DJl6&!rAsT};SRcTxb=z2j`^V~$O*4nkMyViBry6#%nomtmw zhsqVV*C3U%e!{6W{jE3pe$##Rb04|>^s9FDUU}DlFrvb?>?C|JK6@s?$X^MZUGbS4`4Zn* zN#A!C9=Kdj^;HGR7b2N2$wUO1{8HA=T)+}$_DZ+!ADjV5(jLYN;e+ve=;dx7zFPF8 z({!-5%Q4g44>FK1yOaPn}huM1^Cqid6#6Hry2L<7{-) z8NQp}diWiyZGLh`0WWcc2N>W5Tt=oVoGeXLJ1bTiEo*f4`YcZ~S-M zjcq#PzoT!{OpO1ID~oU(7~{WVo$TVj<24cg9ar?={w&7$@8Z2`ivN!Mvz++vc>E+| z2B|mRrEXV0@VqzVU-@J4UpPOUxjkn(D z^Wg9>PtJ?);iQhuvvCCYCv|8%fo zsTBVAIOBVuW3uOeO?L2!{vi!;3jh0z$KZDYzs=@b7=Iq=*9HmrPcfPO&z1E@On3KC?wBLW;UjG@_d8$+VyO(A4KRfuvsZVzQ4Y4=06ZrQr&VQZ2Z?+$Q()91I z7;_|BJ593ijp<7L{ZBK#FVIzxKf}0w`dP@IgZz2OUpB#Ul5p?$uP`oeoA03xa2)cj zkZ)srPtdPJz60`~L%x&oJxcC2y^GuWqU?e=^ex8w^4pBJDe8594f%e^4?unp@Pa39%1KBo(*Un6Xo`U0g2Z1GZJGjM&k)YuH; zo;qE6v}w?)^5}-cV9PG zs+&%!&W)yD9n{^GN?ob6w&Sa~Sn5i|u0&id5^ z%Hrk#bz{)FJG@n5hqv1G=D-!HPWMoA?ZvLWczW%{ zGnWgww$|#eUHP_F*LSHpbYj<5Tm?4JLN{*E#|>t`^MDmzU8%%s`{7h+5pj6F0ye!gylG(N-eXss$|*Dr|4Xjt!;5HM=y-OP9`GYb;6U8oYw!tikGLb4b84B923xf)(&66 z+(c!;J*V5?TX@z&noT3T?T5d$bPb~_25HCY4x&izJdrz3koK__;c4o*RlRKYL8rQCj9$h&pqI^9X_aW;ddhOS-5BDW&2MAoFMvmV|Wo?6V zoIS){QxIR90UkukIMmX5q&oJ;cD*2Gmls5>T7+a~)|;g`s93@6jE@>U z)U3H_N}^K&{r=o!xS0x7^t+cXrF! z>{jhix#E@vq;l3z*v)3t?y4u;bFa{-M#Z(uslx)xRt}*ShjG9rathL6Jq?S^m0>Ko27|NwUxzrY=v1KGo zz(rqpygp3tHrQ8gt|s#~Uty6X+LM~x+=3V6+=Hk)K0-Y}WOfVwamk80|E zHLfYCQ5il6^2&r4Q*>i>^X#Q8RCNTyq~x))m}idpny&v$tUYs5%81>k9mV;R^bY;ePg^D&$i37_^9SPsLExN+b4lco*Y4Wlkar zmBWgsMtfv~TV+8KJ=>QD}mw!8m!TzoxN34h2_>W?&-Xqp>MQE z==uIhHu;_gK7Ez~_k|v!cX75O4aYDX8T-N%m%&{v;I0-Zf3?8Om=B#9o_nsRwVbDQ z6D!A0*I5+Fsq)d?E!~4+MyZ+BU7#OaK99t{y?`wRxi_C0&u{Yjyx4@SwE44(<(oby zgyeRa_p=+Sa>8c4&FZ&qIew(`x|bjO(h2_+Fa8(%4vPDED>5fip@5gq+)($sJHGps z+nW1-7rdp<|N1FbU)d*Lf8zqkVvOLvn|*Jc_sjv`17H7M_oAoHxP6`#{FpR1HP1hO zq|E+f*$|+wFM4s!udn>ZJDvAkx#huQZI3RrZueY(TTAKxp*#o2CcO;*cIOMjr@y(h zK5_fQ&#!%C=Tgg7GbXvwQfh0r(A>A|*enei`WN7uy5i6H6XaZlWXLfv+r(s|7Nb5-3NK;d! z)u%Kob#s}!YHSMRQ<+s`QyVk4@@uZ$)f(<9F?W^N>{}%^ozYpRLW5syDhI!7RT`H9 zy6e^4^=j^VHFv$5yI##*ujZ~-o38b0BKqE2Z}k1fxVq)%3!Pi%3(HW(=z2R}de5@% zm-3_Q$*-0(M^`Lkg`6oBUm=I%%YI+5@@B-*VBrOBNgK9gW5R3d&i#&Rb+`gEx&`_4-GzPV&u;TM|cM+obUFv{84 zY*m5sg-GU0GO+?Jzm&D(3$TQlcQMP|;S6W{=CPmj!&%k5UFhMB19(LCZk11?zHFFV6CR4MwC$6cf5hbjq zh1EH}p$X@G;g3FmvtlLMud`ojs1IghS|+7#h^rYbrNkQQ`!d6^X!t_)&_H6J8sA`L zzh+5WP({lx4|P7)biF;)U?Q%i60w+?@@~(>`&CVg#z)g3MN^RDty$lb?vBP({dyp* zMfa;0?Ax`gQwi^b3*AvQ7J2`AcoRPv7=apeW}>l(`H{2N4=AZoRqKl>+DIZbX1?FB zC6U&)t6?>+#SZOAs}TfBrT8!uQ$4tCq*qC6T`_eGp4d?M;q_ywOfWGvroOrkH zn%2yqko_(2cmnoK({)f&e9JOiuWXZ@K6luj%|W3e0VzXpD?jQ3KR3n4`>*O#qi;@ zxYw`7)Uc-Jq1u$9(gF`*0JZ8OEa`ImhgP_yin-#akCIIvu(9+6>?&mul-sU zvwd0Vy2e!g!jNu6NGIDt z`gX2k7>|-MC}}Zqrd9LxAv-18NY0YLS0i>y!6;-k;I@w+R6rkJOTMq|Y>lO*WsN}x zZziu-jCZq!S}{Z=Q%Ls<=k;wq26>C)y`GGPq#!6FW>;ux}wb!b- zA5musSZJ;{TEVuKJ>s#2lM~-uElHP#Y%Cg#_Bp$xEQ(Rg{gyA=TrXAPT+0k!F(yNC z@=I9A&6Tf<+EdoIov$!qw9Da7D?l&ASrgj@IKxMB6v~A?9AM~zjeD00=#AvZhjqRj zo=Os)IeCXuoF+%liyAH~37f3=rj;`}UkEIR|HZ9=jJ;2iV_J{WVpJ8^A%#k!e`bnu zdPyU}+y>SyyF!|l+FKTR-~t(LD@q)s-$?nhV->9v+a=r43B7-pWX+vX7Sw+)F7c~{ z`%8R%rQ|rX#@fP?&DoMX+v%bbZeto|5na)#t z3G+0HjE7@G(Ch@`^JG!_+BW#s2 zPpi~oj3}M;yq-v-kAGiz&OBB6igxn0-~-Pszh@OY{`9i($Mdu&(nc3E^Ls)?#p%qm zuD$lz%Y#Fq`-giQUb}Jsn@07uuHm=gN0k$sZ$EG?!T0EBV*Va}=dA^=E%?-)e+?e} z#Q2kQ@7?=wv1iR2u6gufY5s=QyXHDuO%RA=e&1XlyY>V0TkPRI=Rq>Jx}W+tGr)hb~Ue9@-&#S_x`ix?w{)JpEtYr&vWCa z*LM7LA>Ni6|9iH_Uw7j_)OP%ba^vUE_V^E#jsGtn|DkyGMfJARk!wlm9Gh6@MEw6M z4tG0=QS-8K7E>wZkaQH=s)|Ecg<)o^DZFI$#$K$}IHJ`ZbhbLfLRW3HTMeMTlL5^bq5sY`>_!aCqvQw6P!=FGdM2wFSM*mO)0v}|5Siue#TD5Z?(oRaN~El z@jE2u?=2TGUNU)0eji%)$c<`r`StVnMDFtT`tPq}Szmw`ppL(soBdsI9gY#aF=m=q zmk3OYa+1fc8}mN#{H4o}s=s>Wt5;Ka3rqh4;YlFje_^>K>?zMoS;c$bZ|B#){gca% z-gNhatFAle*f)($hv%cl-xG_bD;__lsNV&)l1f}{81yIfhjrt}Ohfz_3!YhOGRBW7 KDNS?ynEwTg8UGpp diff --git a/Implab/Implab.csproj b/Implab/Implab.csproj --- a/Implab/Implab.csproj +++ b/Implab/Implab.csproj @@ -38,6 +38,8 @@ + + diff --git a/Implab/Parallels/ArrayTraits.cs b/Implab/Parallels/ArrayTraits.cs new file mode 100644 --- /dev/null +++ b/Implab/Parallels/ArrayTraits.cs @@ -0,0 +1,152 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab.Parallels { + public static class ArrayTraits { + class ArrayIterator : DispatchPool { + readonly Action m_action; + readonly TSrc[] m_source; + readonly Promise m_promise = new Promise(); + + int m_pending; + int m_next; + + public ArrayIterator(TSrc[] source, Action action, int threads) + : base(threads) { + + Debug.Assert(source != null); + Debug.Assert(action != null); + + m_next = 0; + m_source = source; + m_pending = source.Length; + m_action = action; + + m_promise.Anyway(() => Dispose()); + m_promise.Cancelled(() => Dispose()); + + InitPool(); + } + + public Promise Promise { + get { + return m_promise; + } + } + + protected override bool TryDequeue(out int unit) { + int index; + unit = -1; + do { + index = m_next; + if (index >= m_source.Length) + return false; + } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); + + unit = index; + return true; + } + + protected override void InvokeUnit(int unit) { + try { + m_action(m_source[unit]); + int pending; + do { + pending = m_pending; + } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending)); + pending--; + if (pending == 0) + m_promise.Resolve(m_source.Length); + } catch (Exception e) { + m_promise.Reject(e); + } + } + } + + class ArrayMapper: DispatchPool { + readonly Func m_transform; + readonly TSrc[] m_source; + readonly TDst[] m_dest; + readonly Promise m_promise = new Promise(); + + int m_pending; + int m_next; + + public ArrayMapper(TSrc[] source, Func transform, int threads) + : base(threads) { + + Debug.Assert (source != null); + Debug.Assert( transform != null); + + m_next = 0; + m_source = source; + m_dest = new TDst[source.Length]; + m_pending = source.Length; + m_transform = transform; + + m_promise.Anyway(() => Dispose()); + m_promise.Cancelled(() => Dispose()); + + InitPool(); + } + + public Promise Promise { + get { + return m_promise; + } + } + + protected override bool TryDequeue(out int unit) { + int index; + unit = -1; + do { + index = m_next; + if (index >= m_source.Length) + return false; + } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); + + unit = index; + return true; + } + + protected override void InvokeUnit(int unit) { + try { + m_dest[unit] = m_transform(m_source[unit]); + int pending; + do { + pending = m_pending; + } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending)); + pending --; + if (pending == 0) + m_promise.Resolve(m_dest); + } catch (Exception e) { + m_promise.Reject(e); + } + } + } + + public static Promise ParallelMap (this TSrc[] source, Func transform, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (transform == null) + throw new ArgumentNullException("transform"); + + var mapper = new ArrayMapper(source, transform, threads); + return mapper.Promise; + } + + public static Promise ParallelForEach(this TSrc[] source, Action action, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (action == null) + throw new ArgumentNullException("action"); + + var iter = new ArrayIterator(source, action, threads); + return iter.Promise; + } + } +} diff --git a/Implab/Parallels/DispatchPool.cs b/Implab/Parallels/DispatchPool.cs new file mode 100644 --- /dev/null +++ b/Implab/Parallels/DispatchPool.cs @@ -0,0 +1,171 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Diagnostics; + +namespace Implab.Parallels { + public abstract class DispatchPool : IDisposable { + readonly int m_minThreads; + readonly int m_maxThreads; + int m_runningThreads = 0; + int m_maxRunningThreads = 0; + int m_suspended = 0; + int m_exitRequired = 0; + AutoResetEvent m_hasTasks = new AutoResetEvent(false); + + protected DispatchPool(int min, int max) { + if (min < 0) + throw new ArgumentOutOfRangeException("min"); + if (max <= 0) + throw new ArgumentOutOfRangeException("max"); + + if (min > max) + min = max; + m_minThreads = min; + m_maxThreads = max; + } + + protected DispatchPool(int threads) + : this(threads, threads) { + } + + protected DispatchPool() { + int maxThreads, maxCP; + ThreadPool.GetMaxThreads(out maxThreads, out maxCP); + + m_minThreads = 0; + m_maxThreads = maxThreads; + } + + protected void InitPool() { + for (int i = 0; i < m_minThreads; i++) + StartWorker(); + } + + public int ThreadCount { + get { + return m_runningThreads; + } + } + + public int MaxRunningThreads { + get { + return m_maxRunningThreads; + } + } + + protected bool IsDisposed { + get { + return m_exitRequired != 0; + } + } + + bool StartWorker() { + var current = m_runningThreads; + // use spins to allocate slot for the new thread + do { + if (current >= m_maxThreads || m_exitRequired != 0) + // no more slots left or the pool has been disposed + return false; + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); + + m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1); + + // slot successfully allocated + + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + + return true; + } + + protected abstract bool TryDequeue(out TUnit unit); + + protected virtual void WakeNewWorker() { + if (m_suspended > 0) + m_hasTasks.Set(); + else + StartWorker(); + } + + bool FetchTask(out TUnit unit) { + do { + // exit if requested + if (m_exitRequired != 0) { + // release the thread slot + int running; + do { + running = m_runningThreads; + } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running)); + running--; + + if (running == 0) // it was the last worker + m_hasTasks.Dispose(); + else + m_hasTasks.Set(); // release next worker + unit = default(TUnit); + return false; + } + + // fetch task + if (TryDequeue(out unit)) { + WakeNewWorker(); + return true; + } + + //no tasks left, exit if the thread is no longer needed + int runningThreads; + bool exit = true; + do { + runningThreads = m_runningThreads; + if (runningThreads <= m_minThreads) { + exit = false; + break; + } + } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); + + if (exit) { + Interlocked.Decrement(ref m_runningThreads); + return false; + } + + // keep this thread and wait + Interlocked.Increment(ref m_suspended); + m_hasTasks.WaitOne(); + Interlocked.Decrement(ref m_suspended); + } while (true); + } + + protected abstract void InvokeUnit(TUnit unit); + + void Worker() { + TUnit unit; + while (FetchTask(out unit)) + InvokeUnit(unit); + } + + protected virtual void Dispose(bool disposing) { + if (disposing) { + if (m_exitRequired == 0) { + if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) + return; + + // wake sleeping threads + m_hasTasks.Set(); + GC.SuppressFinalize(this); + } + } + } + + public void Dispose() { + Dispose(true); + } + + ~DispatchPool() { + Dispose(false); + } + } +} diff --git a/Implab/Parallels/WorkerPool.cs b/Implab/Parallels/WorkerPool.cs --- a/Implab/Parallels/WorkerPool.cs +++ b/Implab/Parallels/WorkerPool.cs @@ -6,66 +6,35 @@ using System.Threading; using System.Diagnostics; namespace Implab.Parallels { - public class WorkerPool : IDisposable { - readonly int m_minThreads; - readonly int m_maxThreads; - int m_runningThreads; - object m_lock = new object(); - - bool m_disposed = false; - - // this event will signal that workers can try to fetch a task from queue or the pool has been disposed - ManualResetEvent m_hasTasks = new ManualResetEvent(false); - Queue m_queue = new Queue(); + public class WorkerPool : DispatchPool { - public WorkerPool(int min, int max) { - if (min < 0) - throw new ArgumentOutOfRangeException("min"); - if (max <= 0) - throw new ArgumentOutOfRangeException("max"); + MTQueue m_queue = new MTQueue(); + int m_queueLength = 0; - if (min > max) - min = max; - m_minThreads = min; - m_maxThreads = max; - - InitPool(); + public WorkerPool(int minThreads, int maxThreads) + : base(minThreads, maxThreads) { + InitPool(); } - public WorkerPool(int max) - : this(0, max) { + public WorkerPool(int threads) + : base(threads) { + InitPool(); } - public WorkerPool() { - int maxThreads, maxCP; - ThreadPool.GetMaxThreads(out maxThreads, out maxCP); - - m_minThreads = 0; - m_maxThreads = maxThreads; - - InitPool(); - } - - void InitPool() { - for (int i = 0; i < m_minThreads; i++) - StartWorker(); - } - - public int ThreadCount { - get { - return m_runningThreads; - } + public WorkerPool() + : base() { + InitPool(); } public Promise Invoke(Func task) { - if (m_disposed) - throw new ObjectDisposedException(ToString()); if (task == null) throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); var promise = new Promise(); - var queueLen = EnqueueTask(delegate() { + EnqueueTask(delegate() { try { promise.Resolve(task()); } catch (Exception e) { @@ -73,99 +42,28 @@ namespace Implab.Parallels { } }); - if (queueLen > 1) - StartWorker(); - return promise; } - bool StartWorker() { - var current = m_runningThreads; - // use spins to allocate slot for the new thread - do { - if (current >= m_maxThreads) - // no more slots left - return false; - } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); - - // slot successfully allocated - - var worker = new Thread(this.Worker); - worker.IsBackground = true; - worker.Start(); - - return true; - } - - int EnqueueTask(Action task) { - Debug.Assert(task != null); - lock (m_queue) { - m_queue.Enqueue(task); - m_hasTasks.Set(); - return m_queue.Count; - } + protected void EnqueueTask(Action unit) { + Debug.Assert(unit != null); + Interlocked.Increment(ref m_queueLength); + m_queue.Enqueue(unit); + // if there are sleeping threads in the pool wake one + // probably this will lead a dry run + WakeNewWorker(); } - bool FetchTask(out Action task) { - task = null; - - while (true) { - - m_hasTasks.WaitOne(); - - if (m_disposed) - return false; - - lock (m_queue) { - if (m_queue.Count > 0) { - task = m_queue.Dequeue(); - return true; - } - - // no tasks left - // signal that no more tasks left, current lock ensures that this event won't suppress newly added task - m_hasTasks.Reset(); - } - - bool exit = true; - - var current = m_runningThreads; - do { - if (current <= m_minThreads) { - exit = false; // this thread should return and wait for the new events - break; - } - } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); - - if (exit) - return false; + protected override bool TryDequeue(out Action unit) { + if (m_queue.TryDequeue(out unit)) { + Interlocked.Decrement(ref m_queueLength); + return true; } + return false; } - void Worker() { - Action task; - while (FetchTask(out task)) - task(); - } - - protected virtual void Dispose(bool disposing) { - if (disposing) { - lock (m_lock) { - if (m_disposed) - return; - m_disposed = true; - } - m_hasTasks.Set(); - GC.SuppressFinalize(this); - } - } - - public void Dispose() { - Dispose(true); - } - - ~WorkerPool() { - Dispose(false); + protected override void InvokeUnit(Action unit) { + unit(); } } } diff --git a/Implab/Promise.cs b/Implab/Promise.cs --- a/Implab/Promise.cs +++ b/Implab/Promise.cs @@ -539,5 +539,6 @@ namespace Implab { return result; } + } }