diff --git a/.hgignore b/.hgignore --- a/.hgignore +++ b/.hgignore @@ -10,3 +10,4 @@ Implab.Fx/obj/ Implab.Fx/bin/ Implab.Fx.Test/bin/ Implab.Fx.Test/obj/ +_ReSharper.Implab/ diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -1,101 +1,333 @@ using System; using Microsoft.VisualStudio.TestTools.UnitTesting; -using Implab; using System.Reflection; using System.Threading; +using Implab.Parallels; -namespace Implab.Tests -{ - [TestClass] - public class AsyncTests - { - [TestMethod] - public void ResolveTest () - { - int res = -1; - var p = new Promise (); - p.Then (x => res = x); - p.Resolve (100); +namespace Implab.Test { + [TestClass] + public class AsyncTests { + [TestMethod] + public void ResolveTest() { + int res = -1; + var p = new Promise(); + p.Then(x => res = x); + p.Resolve(100); + + Assert.AreEqual(res, 100); + } + + [TestMethod] + public void RejectTest() { + int res = -1; + Exception err = null; + + var p = new Promise(); + p.Then(x => res = x, e => err = e); + p.Reject(new ApplicationException("error")); + + Assert.AreEqual(res, -1); + Assert.AreEqual(err.Message, "error"); + + } + + [TestMethod] + public void JoinSuccessTest() { + var p = new Promise(); + p.Resolve(100); + Assert.AreEqual(p.Join(), 100); + } - Assert.AreEqual (res, 100); - } + [TestMethod] + public void JoinFailTest() { + var p = new Promise(); + p.Reject(new ApplicationException("failed")); + + try { + p.Join(); + throw new ApplicationException("WRONG!"); + } catch (TargetInvocationException err) { + Assert.AreEqual(err.InnerException.Message, "failed"); + } catch { + Assert.Fail("Got wrong excaption"); + } + } + + [TestMethod] + public void MapTest() { + var p = new Promise(); + + var p2 = p.Map(x => x.ToString()); + p.Resolve(100); + + Assert.AreEqual(p2.Join(), "100"); + } + + [TestMethod] + public void FixErrorTest() { + var p = new Promise(); + + var p2 = p.Error(e => 101); + + p.Reject(new Exception()); + + Assert.AreEqual(p2.Join(), 101); + } [TestMethod] - public void RejectTest () - { - int res = -1; - Exception err = null; + public void ChainTest() { + var p1 = new Promise(); + + var p3 = p1.Chain(x => { + var p2 = new Promise(); + p2.Resolve(x.ToString()); + return p2; + }); + + p1.Resolve(100); - var p = new Promise (); - p.Then (x => res = x, e => err = e); - p.Reject (new ApplicationException ("error")); + Assert.AreEqual(p3.Join(), "100"); + } - Assert.AreEqual (res, -1); - Assert.AreEqual (err.Message, "error"); + [TestMethod] + public void PoolTest() { + var pid = Thread.CurrentThread.ManagedThreadId; + var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); - } + Assert.AreNotEqual(pid, p.Join()); + } [TestMethod] - public void JoinSuccessTest () - { - var p = new Promise (); - p.Resolve (100); - Assert.AreEqual (p.Join (), 100); - } + public void WorkerPoolSizeTest() { + var pool = new WorkerPool(5, 10, 0); + + Assert.AreEqual(5, pool.ThreadCount); + + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + + Assert.AreEqual(5, pool.ThreadCount); + + for (int i = 0; i < 100; i++) + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + Thread.Sleep(100); + Assert.AreEqual(10, pool.ThreadCount); + + pool.Dispose(); + } + + [TestMethod] + public void WorkerPoolCorrectTest() { + var pool = new WorkerPool(0,1000,100); + + int iterations = 1000; + int pending = iterations; + var stop = new ManualResetEvent(false); + + var count = 0; + for (int i = 0; i < iterations; i++) { + pool + .Invoke(() => 1) + .Then(x => Interlocked.Add(ref count, x)) + .Then(x => Math.Log10(x)) + .Anyway(() => { + Interlocked.Decrement(ref pending); + if (pending == 0) + stop.Set(); + }); + } + + stop.WaitOne(); + + Assert.AreEqual(iterations, count); + Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads); + pool.Dispose(); + + } + + [TestMethod] + public void WorkerPoolDisposeTest() { + var pool = new WorkerPool(5, 20); + Assert.AreEqual(5, pool.ThreadCount); + pool.Dispose(); + Thread.Sleep(100); + Assert.AreEqual(0, pool.ThreadCount); + pool.Dispose(); + } [TestMethod] - public void JoinFailTest () - { - var p = new Promise (); - p.Reject (new ApplicationException ("failed")); + public void MTQueueTest() { + var queue = new MTQueue(); + int res; + + queue.Enqueue(10); + Assert.IsTrue(queue.TryDequeue(out res)); + Assert.AreEqual(10, res); + Assert.IsFalse(queue.TryDequeue(out res)); + + for (int i = 0; i < 1000; i++) + queue.Enqueue(i); + + for (int i = 0; i < 1000; i++) { + queue.TryDequeue(out res); + Assert.AreEqual(i, res); + } + + int writers = 0; + int readers = 0; + var stop = new ManualResetEvent(false); + int total = 0; + + int itemsPerWriter = 1000; + int writersCount = 3; - try { - p.Join (); - throw new ApplicationException ("WRONG!"); - } catch (TargetInvocationException err) { - Assert.AreEqual (err.InnerException.Message, "failed"); - } catch { - Assert.Fail ("Got wrong excaption"); - } - } + for (int i = 0; i < writersCount; i++) { + Interlocked.Increment(ref writers); + var wn = i; + AsyncPool + .InvokeNewThread(() => { + for (int ii = 0; ii < itemsPerWriter; ii++) { + queue.Enqueue(1); + } + return 1; + }) + .Anyway(() => Interlocked.Decrement(ref writers)); + } + + for (int i = 0; i < 10; i++) { + Interlocked.Increment(ref readers); + var wn = i; + AsyncPool + .InvokeNewThread(() => { + int t; + do { + while (queue.TryDequeue(out t)) + Interlocked.Add(ref total, t); + } while (writers > 0); + return 1; + }) + .Anyway(() => { + Interlocked.Decrement(ref readers); + if (readers == 0) + stop.Set(); + }); + } + + stop.WaitOne(); + + Assert.AreEqual(itemsPerWriter * writersCount, total); + } [TestMethod] - public void MapTest () - { - var p = new Promise (); + public void ParallelMapTest() { + + int count = 100000; + + double[] args = new double[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = rand.NextDouble(); - var p2 = p.Map (x => x.ToString ()); - p.Resolve (100); + var t = Environment.TickCount; + var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join(); + + Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); - Assert.AreEqual (p2.Join (), "100"); - } + t = Environment.TickCount; + for (int i = 0; i < count; i++) + Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + } [TestMethod] - public void ChainTest () - { - var p1 = new Promise (); + public void ChainedMapTest() { + + using (var pool = new WorkerPool(8,100,0)) { + int count = 10000; + + double[] args = new double[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = rand.NextDouble(); - var p3 = p1.Chain (x => { - var p2 = new Promise (); - p2.Resolve (x.ToString ()); - return p2; - }); + var t = Environment.TickCount; + var res = args + .ChainedMap( + x => pool.Invoke( + () => Math.Sin(x * x) + ), + 4 + ) + .Join(); - p1.Resolve (100); + Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); - Assert.AreEqual (p3.Join (), "100"); - } + t = Environment.TickCount; + for (int i = 0; i < count; i++) + Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); + } + } [TestMethod] - public void PoolTest () - { - var pid = Thread.CurrentThread.ManagedThreadId; - var p = AsyncPool.Invoke (() => { - return Thread.CurrentThread.ManagedThreadId; - }); + public void ParallelForEachTest() { + + int count = 100000; + + int[] args = new int[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = (int)(rand.NextDouble() * 100); + + int result = 0; + + var t = Environment.TickCount; + args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join(); + + Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result); + + int result2 = 0; + + t = Environment.TickCount; + for (int i = 0; i < count; i++) + result2 += args[i]; + Assert.AreEqual(result2, result); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + } - Assert.AreNotEqual (pid, p.Join ()); - } - } + [TestMethod] + public void ComplexCase1Test() { + var flags = new bool[3]; + + // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) + + var p = PromiseHelper + .Sleep(200, "Alan") + .Cancelled(() => flags[0] = true) + .Chain(x => + PromiseHelper + .Sleep(200, "Hi, " + x) + .Map(y => y) + .Cancelled(() => flags[1] = true) + ) + .Cancelled(() => flags[2] = true); + Thread.Sleep(300); + p.Cancel(); + try { + Assert.AreEqual(p.Join(), "Hi, Alan"); + Assert.Fail("Shouldn't get here"); + } catch (OperationCanceledException) { + } + + Assert.IsFalse(flags[0]); + Assert.IsTrue(flags[1]); + Assert.IsTrue(flags[2]); + } + } } diff --git a/Implab.Test/Implab.Test.csproj b/Implab.Test/Implab.Test.csproj --- a/Implab.Test/Implab.Test.csproj +++ b/Implab.Test/Implab.Test.csproj @@ -46,6 +46,7 @@ + diff --git a/Implab.Test/PromiseHelper.cs b/Implab.Test/PromiseHelper.cs new file mode 100644 --- /dev/null +++ b/Implab.Test/PromiseHelper.cs @@ -0,0 +1,17 @@ +using Implab.Parallels; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab.Test { + class PromiseHelper { + public static Promise Sleep(int timeout, T retVal) { + return AsyncPool.Invoke(() => { + Thread.Sleep(timeout); + return retVal; + }); + } + } +} diff --git a/Implab.suo b/Implab.suo index 61301b443c112c10f3b3ee2fe11cecb25781ac9c..453df4ad2d602dd29c42a1de303bdb9f56e12f58 GIT binary patch literal 75776 zc%1D$4R{;Xk?SN*oRC0B1BBliYEnooMY1i)PDqKYpAc{o6FWZzhh#0UZ4p@#lAPEf zKq>SGD5WKZ0xcv2C|qgF(Q7GfY00HPq2&U!&9yYW{vAE|dUq6#pK@?CH#56WTCG>w zeJjaWW}m;YcBS1n^Zw?|o0&K7*=LUa!<`SF@JB(DJ}OKV{(g9-FkSWC6o49bGgrQ! z3h?*ChYz!_F9A4gr>aTs0!)K8I}xB3UP*a>hMz~ul}0DPQiPrzRW+9Cw`W*|R&!+bgxdJXGK4F=EdyK`jW z!9xdXkY_CS$=`JX#srjbC1CG|s}NDmgMUgMzcKsXT;;PHS}6^++W|G5hVrk2{}Rx| zzE2zE(;!T#!7;#tA9-k97Jn4lv6FD76&fU{iwOJ=0&IgG?xNnWpB4dr9Je1w_9jg4+DA(sJpIw5?AMhG^ zzy{^|W6I}+$|v#yN^-i7qzx35b zZ|rwORzI}p>A+|2?Re2l|52w`0yBAE$xk&(0hnMh8++$P>(fE0cX*&PC8ec!M$AML z@lG+aRUD8S>(@)EG~6$9yPWjbSRWXUWrkDIvbZ#yNr|z>`p)6rSTu5;G}@imD#e%e zx|;hv{l0#;yU*hio5dM53^S%_!>gi^R3e?|&p6jd)5Bt{D>K{|O*n~!0V$Qe=uRYJ z8=~>P#P+l^C}za&Xfo~WjK_ke_@J&SeNC#a=DSWRh zlSoNe&LeX2R0-8rFJS*&0&~m;;GiksT0PKjhk&odfQ$9g+=J;T<#{zurn@I#!FiqUZf@INLan8lAWPGbT5 zIR7B;O#gz#^%VJf znwb|&B+TdI0_%tJtUdt$Md*k1RH_*DsCsCz5#Y>TknDOunuX%rePCPE(_H8T=z*sK@E_mJz-O9j3j6uz#+?QnFEPN-PZTgp*!{$zNd9&r zP!uKkCPvRoS?**#O)2wtJgHRvjq=kfsEHWy8>~f<`t&eKg|cjnQp;3&dMT}SE`@gT zz@G~~-2km16}7^Orv+Ne0ssBLRa|h@2KR2b$6xXJFkA-!+TqzSJmG=WKwn7 zY61MX<~@M}KR(a+Kh8H9!W^fOlpBGp^(7c98ECfz(IRPP#3g_q<@4h?@aNZm0`eb* zI+t@+m$Z|{d$j0G=u+k%p7uwX`^b7@{A?{2B>o}l=H&$ZIG@e7SpF{R;ltT7%Jtk2 zcLOy4irHWEQ=^6ckNT0lGvwEIdgOhYk}ezjd1K+JK^m(aGH8} zvJ?876I#AKD*?B|lYN4k`<`8PdUE`Smlk;@2ml9Zgf@$)dE-oaMcd9 zc!ehVE&!kSmJcZO!+$SO<0V>*dO{AX7jT5(eay*47_hAE@Eu}krEeXOJ5u7K8VJG@ zKKQ;Fz)MeKEt$pYFFV$9^n-HJQ))kOL2lTlE^5zKqNNr7+)9rK!J7`E)d$xOcqR<_ zV=rz7eAom0@I-*KMNqSgZ@H*&sR_b$NI1V9^{(aY z$FfXT?mQW{jF8;CJ-g11=B=}%npHPa66AU(NqjX)^B>C;-$_4Al2 zACy{HPC((Tw_fdhBmC)S&%Js6CpUMTcSS9GYQgo_AJ`G-2|m=@(fHzR+umf~&u;wR z$fM%EbMD%H{lJf(5T6y4l4|pLYx;}RuN?f(z>Sw5ethZ!7eAu+TmzH}HPBmuc9BYz zrnQ9&{@k>Na1%~eE+@3e2vE-E%8k#=qTjKENkxl_t%R})>KWD>Xhzysg}YCrmR$|) z-VM3SYauCH%Z-o+)6OE!Wx_V7KctN5QbLf2dHTDlO*7OUe&L>Z@(SYc-|;_cMmvZn zWoXQyB+yL~agwM(3@9It!ms`C7b9s&%1TIVHILdAR^oNE+Da(e{e;pV`jmG9L|2-m zjVP76kff=fQ2&j3U6g68$X`s3KsgxeC_(afO5r~l1MLUlC!KIT2Y%EQEl~s@tR2KE z(pt1nBi9&fz)mZ1{c{e~dLQAALz!dnKTK`Zr}Vi>VKR14~1+(>Un4|y~aMX|@ zM>G@9L>bCW@=>#}IonId*3J?O_|XP+^|d_sab8#jzl*@5@@vFv0Y6G?=P1|3`XBN{ zw?SUWf3Z9`*O|qSC184t+?;_Hz7UOnoLjnRwvcEp#ME@^sk;x(y+>sizj}@P_b9d#FddsmQD5n%&>WZKV@@odnA-KoYrUR~;X#Ih-xPb}} ztxoW_AW7{$da{jHCr$7~6W#Hx2+E1I94%5#daSny%BX&t;i4ctO7K=cd3d1K1ZS_p zIWIN9Z@dyf+D5Xg5ww1rUq#68I*A*l$7zg0sm5W*J>CnQO)_>DZenE_RYI5mOdGZQ*_Q^)xNK^4z`GEPeKptuLs>r0jIH zoDV-v!$Tx0ND7wU{_U^Ze--c!dEZ!d^gD9~_b$@IqF(plswWL~gR7EmVGBQg?c&%^ z%NL2v6*#pp(NU&v))B5<-&%2W;{L}sUNFyhM`DUTeR^Y$CG2SZWTT1EJR2lFzK;A) zdP;4-LY)O^pV#)8GZwf8y-hpK9Cv+AoHtzcEdx&@AP~ z-$kX?e~`FAGCTR@6V-U)npeMbXxE`LPrUO!@gIKKa-4n-PSc>pt{<`{c;m$Sf2 zR1fFCFLCNQ*upGl)UdR3<*ZL3_K8leeAxPsX@zncq#Si<`@Dc7S6PRrZjLq7>M8nt zH2Xcv|42@rhIav~#|U*(`P-PP9#C6#7UjsKkvzEp_{EC$u0Vr{b%XP7B8ND-ovxczxkDeFYo`u1NZ6;c(!i->e9iN9zH;xQuzpjJot5Zz4bO< zM0-X%A+(t8!u@(yTk4Fi7`nNVA=XaK`{|izP(DoiDDvVRTFr@~4e&-6- z^v_26mnMU}8Z7ufpf5htB6TY|mT*@4n$vdX+a2;bFiYtFafHBE|L8AspK_hab{lt= zH=!@ZphN~(7(ciE7xxV@&T6dxEv^AxXo3GAUu8CJ^}AOKa@%}L`TUIXiK_DJ zl++QG1b@~}R^EAM}sGdc`_j3S0H+cUy z@cSVFN4YT`JK1SGQ+uGRQ>!=aInMn0r|@JOA+>-k-5rpgPVGFTxBjr2qcb}{esk$D znIGMCpa$(xC9K@z)KcKLKAIWX6_Wr_fKMu`YT5RYpHN&uKHjU?xpFfn z)-x(T66EXIqoGh+%Dz0B^l~k3TKY_>L@ET z71z^Yj?OaFU}2p(Vpvo%SJ2bhTIo~fxa4UT=`Fp_ucvw)rj<$I*@1rs;xmc`Lmev- z&z3A6v%A=E-tDK_WIOEZHQa{r;j#POdUUv0F%H+0dEtp(LyRK5subl#l`Q@G9(`!cs8{d2zY= z@ZnPAYS}8tN zFC8oE-1&I3+UBhPeZJPb;lkLK|9F1vJ`VGG^E#5&`^Z-$PO=2b``z%ipLWh-;?$#S zLa-FqqoXIBIX!x=21=9u`6P35PKKCoqvd?_=N`55Q8h}N-V*D9Pu^$ThId`mOV)XF zwi_H>7{BmRp$-1pX^%;ecERJ`p8&-Sz}+Wd8W$KBz)!p79TX!1pY+2Y?oMx~x7rkI zJ<@$0H?yD31uY>3*|M`guzJz^eKxNF2Tzjfx}RAY3KBrek18In$9X%hR0)=g(@ zKP)sqZmF8t#Fk{Lvs{HGnM(2&oITaw3!2)r`GY=xz*%Peuxe$9$uf4)GgSYH8cm6- zdsJ6eLq1zjYsi;!NY6Ujat`TP-l<`%Ew?4-_mAZ46m|$vqS&0zuuP}h)bq^48TSY9 z?+?;*6d$|MVtr*g!b5jVJ^0)&zxdrheD~*zuKxCm3-tWJPwc&X{#_jY??&q4dDDL9 z?VWwhzy9jr-GBa6NE)50gL4c&XfIuD`PLBwCmekCf#Z(-&bK}|Mf&xddVcqLG^qIj z=l)kd!`Pv%t7{J}dAN7o@iR{Qdh^NG>g}N^al~@$A26e={l9m&{N;@f$DWI>dh+7? zov~%LdTppim{vIcfimzMEBjFSSI)MCIhj_V#`_Ok1!j0T;r@dY1^LW|Jmyd&jSBMl zC73n>RWY(xU=%9Z_9w>{F%x}V-v5F2%K04lQKDh-hm6Z#dLe-fJovu?zUd<9fA?)1w@_YOn{c+Bq2yB+NDL*#l$5GNY+cbEk}y@M zyRlfMD4(HAhDAL3fn~78UC|oaM9nyptpTRt#Y7|%i914<2kD>xF;d_}j!--!rIM*= zTEe3^(#}9kOs9iVIywNwD#ytnv#}ejl&hi)C9lO&t+G-WXCN3FNhVU6U{s7H2HF!N zT~a0kErXKivB{LQEt(ik$3_E#Qe>;tw-V@0&t+eCBqDTV%1P{tl|$ly6i%du#0);U zGTtYR%uj5SQmJU46dH-9G3RhJCgJE`tkZwGl2QZ+fsxRs^pM=ISY>rJ*laVAxaky^ zhnln&?|HMMC}4eJMx1(*&YS*JN*u+rR1oV-v^`D3e}<1v-yFp%s@-IHO#AUW{m*$HKXmOavkOyzi6ef}*loI44|3Sgb>vW&zZdv> zNB!_NDSGtHwS#^4J3G7&sVO-Ked?vDs%_U+_b-`U9Lgu^^;FPrM~U-g)4|o5;Bhkir4dw*)@#(aw};soh+L7$!|5QRPVAaX12x5wwO&6 ziy8BO)5g;Lb))S6#^*WAECqUEo?6;k;%uY%SJmtHrncM__Y;syUz+8v#ZoI+-G5O% z7B@c64jwy}6<>d&fJ*$cYJN-Vna8vqA?d1Gy#r%a+8X&M{W-H1hL?Q<7?A&>&-y86?wD?;cu275L(Gm`L9j&cizr*Ei zX=-hDd0M=_mdn004y56aQi_wXuQ0;A&EdADfGgney4%B!mezpF;cxc&9IZjGFVGzD zdfh=6Bh1+?r87OM>!Kt&Gx)Ta9*sxvD$PONwUw*3RAMNamN;m+WgMDNM0I~AB}Tbw z>E`jE44aR$Z+&qb=o-pgGHh)BE*=5V%K1;*gk1bVrqE?)>;?q+Jk}w&W-A2){1pEX zIatX&N(!elN<4o{i@%HbcE%+3($8O(hIz(u<$7flej7+h>2y4i ziS|dO6bDr|R*tHSnBE#m#4{s0MD#hxvsQLj15axNZM6-)-))_`zA{NR(ex9 zBW8HI)b8;TSrz&QrC5@y#gNpr8(5@XlwF?9N}=l<7)PYz5ez< zmcZ~xPxh)fas4AbVmvxT?lCfPJe};e%2StDaXhqrgVEL$Zgz)STpmZL$?J2pxZ6Aq zzsnPJczi9cw&p;SyUEq2qAjm=m0Ke!a4+?sA5lyk?N6kJILQ6ldqwV$7>#q% zdRrxjF*I`~D&Z<2Ee-X?Mv*!W+G>OvW=M!GpTMj5ud*8SZ!?UAnbZ`Rq4E<8%4*&SjJ`70Tg{2s;RWaYHNWTE;;{+R& z&)~#X8R-|~ui5*B`%v?VM$_O#TyyD@uTE&Q#oYqI39D1oFN0V0%~C=31q_x&b!B}1 zjp6aPP%{Y5d0LZ{T@G%Q)~~18}8j{D%8)xa5Y53p4knQIAihqQNk#95e%a_*kf4^3$QwXz7yZ;|q5f|}P|9!I{@$W_ zS!w@!g8zi5g|%m;mKvP^cC%lIkvB`C*D8a>yuAXI7(I83b?Ll#Z6g-kqtN?d}!Z7Hn~kq ze**misMC?G029#6Z%loi0>K+gm(6uyt@kU^dPhEXlrG1?0hlqy&qP6gQr%{!QMOJ{YZ>Q z;#}o3IN_XhjA_RlFZ0)Kyuex;gUmVfvuKfH=*R8X3V`^;Wi+^K$z|>Dx-Z*8`zh8g z<;Q4(SFoGh(O}+ z|A{vEUnmyS#Pns=_A+@IJ34#+%A*)_6xY$gDsy|)FWJ=W{oH*Rs|j~A1;@ohRxc*7 z)mT`CrgA|*4N+Kkj66|O`93}#WxQv{uG!a}`RXV{k{A2#ev&jV*MGEifLBc8 zFaP}iJiyzg@f+*^f`ZIJ%;N6>cv-m~@A0E|^6JBH|KQT4Gv5;a`+MJe{FdM`F}2`< zH1-OG`2=R`r$ zhhp^iwVY8pjl-Axtv23XPI|zuvr(06nRZBN*>VxFc4F`F(=SNQ2mX~u zaeQN8f)jaHn0_(T2^rF<*}Ld+`mh`NebUg`WJe9OqXybh1IrUNaPr#8?dW?Gr$pAG z@7b~2?TCSu^QX6W3{CWM**k{bZ##zGtMe2lA9u5N3)#Dc?A=24ZXtWOknJu0{`MBP zt%bbSLU!Iw?F;V31An^7zh~2i<-7LJn>lTb4tH5c3cBkfcg>9vhG$8I z@f045R%|(H%=x1~pHo}4q4Dz?hOMncY*uV{;}Ov$rLXxZ9mTN_=9HrfiWH{+{2Ui!@~Rnt;twds}LDN}x%WcBb%J_|s#mdn9Fm>q1bxve#Sulc^MHMh0q)m1uMYi?`JZLN7}T65eR5ejs5+U@_|mDV`^ zq;dT3uz=^tnvXw;){rZeYh(Q7KY#iH!F>Gj<_es@yG4-qTa+vD;#Vg;J88XzHAa5q zI=xsw@>w@LdF#i~)QdVON3`OJx~JB+=VJJ>?{YMwywXv|XiD-)$6HFSh2n7w@fA}sR`cq&Of*33JyJ>G5Vc~-)sEcH*b5&UFNj{{|>!HE+`7v&<} zF_+JAa#9$}@i@*Ij`z3^0i&?Z3!KR@NYK0m`q zN{8-fe Wo_uO04<2jYWh|n%-Tsrk{r?{w1?ye_ diff --git a/Implab.v11.suo b/Implab.v11.suo new file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0324d71b1911ff2672c0c8ae7c7d19bceabeae2e GIT binary patch literal 121344 zc%1D$4R{;Hl_T<>ID`bkPs)#hlm@UB$v?84ABZelNx(^rD`5q#tT_&C4_4`ma&NEH5LQcc=P1g>#1tZxc=!_j0)s=hx~AnsL%FBuwn`FY1D z@V-TP)(IG+WHVU}*!$tGoous+|1byQK1OYKX1@6&q0N_uZW1I3vW0XI5w0>>5B~+A zi9JsgNNXYBKbEB1ycztj158ih7fB3C0dXc&8U!ed5d3cg*h<{QMb6Bb))~+%we@KR z|3OW>lL0=0-&55*%vY}mbF>%j^HbEg3)L$thvVUT62OT7*e3pqU_B9X9Kala$p9w+ ztXJb>-7O#rf2mUG!Eh_wpHA~gXbIE+)B-pG76Z5d+yHd|^#Bb3jQ}2iCV*xDY-URU zmI5pTSWb|)5pp)b3IH!aD}WE64WJ#s53mwo6~JnMa{$f-=m1!&(!7q=T{f@lfoF38 z&Ih;v;37?&Ubx?8@Xc5T?9Er`a9*wXjrZ56ZQ|2#y;i+P_^f*WdAME=@Xr7{0KNcl z1Hg>{Uj*2x`Q|3LzZu}m0Ji{q1>jcA``h9E4uCrWb_3i6@HK!v0AC0A2Ee}n+^v4| zEx7&%zuRl|-KUc3WsMiz?({| zX+_{io?Gis?pgf{K!psY5eFKu3|M=JlR-*_2=@_#v|LdBC{N5J=Er|+{?`FMnMja+ zgu(y`&^*hu{6)Z@KP)UF|0pxdhb#B3wEu0Pxs1a#jon;0z6ks{&Ms2#wciHR-#Gqa z8KF$@PXz0jkS>53L@7HQqvvVj&W8J=0FG7PYXkCx!_~OQk?gUr)8H9~={cU;Ao21o*I1sDRjOoQ`sxZe)&N%h&4aJ|aLWz56)1TOzgZ22z$oF}A? z!drpcgsCLb1^)+N&Lb3*22uXO`PN~i^y4qeg7qfw<2B@IDA*Zf1#iiolOyvU)9C z|M>&9Kh&k5zQrtl%)yX)ugISgU<(5F09<7%Z6hUx>K;Qi;6Fyqk6HXEv&>ZQ&x5j6 zC`iCvQk@McbCMv?AP{G29%96g8r1f`*aH3o7+1YNpXN0T6bwNPV;Lw|-VDNhR{c9LFH9yhgZ~wx=(#X-(-h?lIuNr2*oJAlz_+GA&=hYw_^-jv=m0ryGuz85%k4F1?S&`U>#5Zp;{k0aM0yzM1*z=b`e8R}v+eS%|G zAB5E3>cvO;vxU*9V4ohM}ZVSpmWaDQpTb=Mq5=Kt2 zcL6~XLb4D(Ys-cWt-;V1F*4v61u0y!CYq4Dgpd%CrQ!7nA?z0=p%mrc06ZG&pqTRK z^S_Bb|3lpme)lQ3GX2kI0ImZ-{m-? z5Y+!5FUR(TJoO=BPXB{?qQ?RD0{jTzIe;etnEvN!0M!3H1AzLUp8{z0KhFcaQABxI z4Sxn4M~*R`LhY~4#Q1{};u2!M|6zG2)%(Ku3(FrEquBy}tUt!r3gh<_#IG$+Gx+zB zwESC`|J4=3Ph}+ZlQ4__UrAd2DBbugeX!&Dvwwfa?!z}fQhnq*2hcE*G5+>zey7L3 z1dgO_Q4OI*Ygag-1vz{QEys<>8kg#nzgp%{%R+NXk zqfsg0>=tA2FDb@4A?LvXp{BY=h$qBoWVzezbm3o3wJ#~j$+)mQA|&N_P^zi!O!i4) z=v)EP*dj!h_tA!?RN%bH2RwaM^MCd0{a^p%XW9kYL-gz-OIBH;h%c;JWHurFuK-;8 zQ}|{tUOe^OhpW!J`K|qbed5*o?>59&NHSV$1%D51%t_j{wo)^@oK^ml@#bwuU^pQQ zgCP1zD9k~jCY&pUh!7V;&Nh)g2*!su)?IXw8ZjWpq4CvJ4<Unzp3Bt4mY}j^}#8a`{j8=V&kUg z3?!r2L9u^g3MVhaxB)?yp->X|M&CnO|9i_fqu8!TLzk8MU#$wdsO(Z^0IR6VFH9L+ zTy2;AJyrYfdwtRAw@z!m{n^HWR}9-F3kP30^`80nUAyjq=eNx&s{bG2FcJ0tsQcpj zeW zS3CdL<6lFy8d2(ps4bIRZLvm`znPDb8T>f6?aqO}l@1L-XnirjD^d|5kwHmjXec88 zIKO865Bs=GF-NFyg5@vKHj!x=@lpGStO_mZ(paMApZQjq!H;wOThx0dYg~-t@ibMA zm=5A~EEpH!(^1+8h?fau5}Cd;pHQS;+b>llRF=R-_n1mZ7t>;#!4{@W#v415iNxg) z4ZKjz19WPIvd2F@+W41uJ8E}d@~LMw-{X|fTir+|PU`_Lv?BjxPw09(&42GqF)I|m z6LjhrN4s2dH`N{_BNG7kc5AVNzQ+4w3_qMN?eAs9H zFGmeJ$y(_9-K2YT^6Ogaye-14O4QJ>XUYjHKYPQZc}I@S`PS;SyLK7Ukf5Blu>4Wj z{(;FgSl~bCpQ!ggVLaAH3p5FoMN&54Lilk#N38|?+VMw^f2C>%k7ZDjJ1heHQ;1^I zEd~C7Dt(*7KKk)Dk3#3JM;u4G-`aQj&bM#5yH;pL4PSOEYAvSyX7<_l-ucs|&%Y77 z|CK$zx%#ACSNxA5o?P;OuQmRc*?U&gMlWl$0h!$Rp$mWZr8|Dm{n4j?e%6-v7oS#R zNZIJ*|DNLbZ)Q2JqTD>D;Q;gEKki$3$KgwUwDzO;-xv69H>8~6r*gMa?I`VSqjP{b zwFQMtcp;nu`JY2FHGPG`?=<<(lycf=21TfX2&JNM!1)K(%MoO3>`(c%uf^Ga&=;FY zaNShe90a+KdEcbHU3yZhLZT{XOb!|{`UhO!_cwL^vmSCOss2uBp@Ox?!ZYAgM#+Cy zQJZU+mUaLriO`lHsdm`0$KOUJ$t0cGMs)K~91aqcK`JfKh#FTB?H~Q3+vgR=&osaJ z{Lq|$A2opZ0+fEw24tnAYvE}l zaCI!|$<5;b z62K30=s&SOQ0|T89re-y`ycLk!q)eh!H@HCR{y#CUzCnNg=BCx|7PnY&EQ9$p0y_V zpW6ig!F8giTELHE2bG42g#b(e*U=H%IGh3aaes{27Vx9LD$BoiO&ZS0 zP?v%AIZcg&vK3q3P^IEQT|3VCaL$E(w^doV%Gl-s_AOi|87&Cp+A+Z zp>0ULM?KqR>iu^0ife$``d{wwj@f!*P|Z|JAQy#W5e{e$+&L z%OpM)&|eLC<_i|T8|#36eA@NDxK0_@|Kj=p#veBk(~Ec zo#6Uk+?(W2B-8KnFL3{+L7A2!aQ)xJ*8jDy@pt3c%Pl0qKxfK-MGmr( zwxNL)B!+H~642=|nKF4O@=!++bbDz#)vk$TDPlczSxOPxUaK$A`TpCT@A|LXch1d6 zUAv*<+$*QN3WDFt)_@P?F>B@>ZR$u>Ozqv5ynMnO|I5!i9GaR3%s!MP(=(v$Gi*{%=>**eCO3z;kt)z*z6?Rt#cXhNCDLS-~5tL|Ss@Wt) zWu?r3(k}9WK3ap?eXy`sp$J)kKBg;aFkD zG>vhOp*XTzY1*KlX#~V?=F{}enVvEJ$5Q!FdX!9KFf5AQxeqW5r!2hbZx`gCuZ^0z zVkzxJHkDne);gNCRK@T?XI8$vRBsxmb)TeTiMGCGrOGGM5Q^_`E>+mgfz^sxeuMFk z$ikW)>C&;V&1Sl3vu{^Z|3_tKGD&UmQbw#}b5NAq#Ef4ZW!Som9CKOum}}`(nsklJ z*~lF{yLTOzv&Y2gvthbwJxF^g`>w6MV>N!eG1Vs1`NLLKt;Xx%c5pJ&QK!5e*lxbCHFSqOP-7v(MjiQuKToQc=bx-}$8?Wv5>z6=l+%yDUa0Bca5Y z+xZY<%q9A9F55k(yyS&q@! za^6y`wG^}wjUQH{k*req7S6MbW9a3u%IJkwB+cSy(z4l)#F4zgUd+^{Ivo!!uKwKTDbQ?u3F%(0j}7?YM~sx z@D5i7`{60Z^uk{w#B8NswWzCFDv7&usgz5lqnAp@vNvm=W6+Mtyp?i^m`lWVl8Bk^ z_5G=DOuc&3o4%c&9C>Qe{hJ?su13+lo?(eoFspd>vrL`q_rrf3suZ28Rx;18b5+V= zbdvdKY`Ntli=z!P+q1&RD+FWDHjJHVTg*LI(c1|7%C!G0p4(<*oLWnLIz9U5Z{6c7 z&r4-mOPOnEoYc=OPG@9hYHnlM`qxV%+@tVQqq~Z5vTlTLuT|LV=WOC z(uq_ar$qZep}0n~?{ej$qtiJ|ByMwJR>@dNrIOl=e&o$=Gc3l$(^FNRv@yq^cD3=0 zN}zIQx%0D1BlkB(B~uw%nps(;pu9KAaiz_UkI0etbj-^2=D?L1g}phrzL@Ka?VvBN z+);?@TJ4{^(d$~d?~*;{#9UWg?lsDF#avfB(RIa#(s!=%xFpvTb3HNF6LUQ=*AsI+ zG1n6(`2PQ7V(g6XlFXHXb;$cCRc4%SI{A z|6pe?n&m%FkdmJNF)IF7g7ZJ<`6E{L-?r2FAC+?co0aoV`T1{J{0f)k=fCjtU-3Zl`jBhbMT5){rf9^u~Y5PYmZ5IF2MA?Jg4F0A9X`%iB&o5bS0sk`qNt5{NOv+zY z%KxV%z5l!&XXKS+&(72ywyOMrdqGW1FR1cqx5azb)D!F4EwkgO)!Doz5Zenee45d;viFbS=f73P`EORv zzvbt@@$=u<`ETR1%UQXOd))tU;`-ek>NVQqBX{ugzxerI{QNHr{QNI|{ue+0i=Y3+ z&;R1utaKhHhOArux#Z`6@$6n@%)omZ5We=p?c ze{rdfOLb!{)$u)a?7`i+RLZ5&^4`OQw^A+;iI@_k+TzOH;< zSM%q8ak~Y#TNEcllxwc}D5xC;`M$pV{6B6l;`Smt*^4Sy3bmiDCA$anSy1J+zwB8M zx3!ht*2aBT?J=L>w!(6+QEn?7Yg-}Lg5^KmrhIx%{+(uV7HS;B-qss#E9U2aayv1% z6LUK;*N}5NG1riAJF(p)q4}7%XqlNX+f^%B?v+h<QNwO&f+Bm?V;9Aj5d3& zE+KewuisMIxz6-etlFKH(kgr=t+LC&PfK9AuA|($nJhHaj4jkytLrJ>b^||ONXxB_ z<~K+w3uU#K>~`tQb2hDV22N`cEk<81lP#Ba;S+8N@Cx+98(mv7I%SuTQvueFy2oRS ziKWs+>;HwL>fiaS|Ae52ik0)!dkdUrOz3)mWtLLRJU5w;871lu4Im3(pE%V0;o}y$ zTXr4lrHM3m#`)f+QKfC(^MlY;K))+N&9FN+*#M=bl$i*0VQG2^?%;uIiWv4TZ#4j-Q2F$gwCO?*;_Jg zV*y%XGD%xYPA1kYz{XameD({J58H~enCUHgxc|3xONhPAxx-w%mJr3XkOXZNZ zFy*d2mil~)b!W~thLc_^w$ohYoy!{l=d4yN$!V$+nl>$?cWgUU}%( ze{uhBGydP)|9@iFKllIV{{P(npZouF|Nn{U|Ih6|-2TJuKivMq?LQOK{$s`Kk&sC= z*S?*f9C>Qe{hJ?!@`4=BB>Hk=Q*=Bk;@BNKp1;i3+xFeQj+!@a-TIz(`d(G@LFn<| z-c@^scF^*7s70nv57T*n>Kjw9-t=Y)cfPp#gEt&%C(@^3^sTnd&i9+vrYMToYHv=&CpXbYLYVg|fp zo{Cv4Zk*BP@Ii`83ltfgoN zrKD%H?Vxl%?N(}}lvqnqBc-Hgv^7!(>7g#H9e!hPuZ6ohTK`B3>d5dJ6YsQQTh%n> zoHm?;7Gm#Snsytc)JS=d3(~EpecVlTb}bfb)Ew33u$H1m3(+&h?Q}WO5s4I6eiw{V zMb%Z@v^C$f?SVUYU(tHQfhTU->$&o!cgRCCjYi~D$fY8C?v|>*f8+aK`C5J8@4h=b zJ#S4jN@a`n{F_yYKX`}h{l(7>xxV-HANXfHbM&6cM))O5?a8P|o>K}HqeAgpGhSNw z*K5A{QR{=(ta*4>)8o^P%RLj}+R`dp{`>CzeMh}FyE?k((HH9;yJWUuscC~1j^w4i zshyT^JLQS3^uC^IdEKhq;-&Ar>IA||N46F!?I_%`-lD9~2w(eYiYS$|K)N^@x>G#4 zneyajTJv7Df4ixq)I>SAn^NF4jTO6#mrA7~ZJs_Tn*miqolWwsBVJ0*f*3PtI#_M> zbS%fJk?C)xik7KUfC!D9zxrAAcG0UcOG+5%#}r!?i(O0GygOAdSQAY^C5}=X=#vc) zqY>!YO3z>mHOCbc>=bJ!_w*SjK0Q+Z=66T<~|f zr{3RE=X3dLJ?>V2Z9}uqRqL&9YN~B+^ECPDeIAdy&2`20)q^o9*yrpP5^}HR-Wf{7 z;?d39LrZ#xqVX-gB7LP^oHE7^CWa#+yh}jj?b|Q+dVEa{4c_KjS9^oEw!!c7)HXMJ zytOV*LtS&dtFgh;)NsXD7-?U5~M_LSWLu%a%fX58kH=N_T_OP4POdzi-g@W4ul2c@!+r=4~mv* z^ZGHPOoVrXVnVP$%ooRun89EqI3R?rll1wqt2xWGjA1K^M+f3UA|XaZS=cH>A`m2o)Op_G3VV;*vq_tc2X4JAbS!azh;&=}!QO||n>PCnrN)`~ znC3|@%Y;4ol5qwmX`Y~0CHfw<*;djr&clhCCqAP19< zLhs97`Yoz+6L1YtLFWtOP6KLvCBV>6DPWYa-}a$Md8((#sIyt2me+rI*Z-lQNTg;r ziN+UF)01?F2uYX*Nj24-$v#O8ohuA?N4E%(<$bRD{>J{Mez!Z^=nB>ct0txQ;v8X9 z;wo&>!WuCYk0zr1va?4_B!f~wPKL!O<)7z^k#KaVrn*OnC&VazR!s`oDHRW}IVA0& zrVs2n($iHiyYNs%#gpIKA^5U^fc50^As%GY38+A9`hFeO&c7VzC!d4se`~ zlK1=0JNxNrznptxfb3nj=diojb1knh8yvd(OnxN^M!DW1Mh5(%Acap^)|HH4oqPLb zA>Nu4rSMr>Hf(4OhEh>zqG;7Cvi(GsyU_v;e>K&YclcVdeh6-2a#R|8oCd?*GgEf4ToJ_y6Vozuf;Jnr{a`PiR*Ak--2a#R|60$(Cp1r%5!GGO-W}FBBX4TD#C!11Nx1(n_y6Vozc!h` z6sDW||8oD|vGxDup*#ck|KbSvO3L1i4cR%Kg#!p#FW$nrK4q5<)PA zl7`nOgfJqdRGbN93ftE8cLWo1yCe)kjODo_FB^;}ebGUf8-%-sZE}a0kh7WsG|Z_E zhqrLiv~QDzNJ8urB~c!RUqUD;L#41hV6{$H8tRmPK+_t1idbdSwg#o7uwet4uGNjq zz{;45|M2PTNwd_fOm$ESXse!qujb-E6yIjgoYf68>V&gfj3u0hvi)OAv|ztCJQn4` zu{$#8kvWguDu{D$p0X)?kF&$`-71py1=tUz`R7UL-#G}*L?5EoV#V*(sh%}KGB>D? z?Yd#m`QsNap1V`{%WGe~4izoN{s+vGZo&V;@CXLp|9SuC{eR;2|7RSPa$eViA55wK z@E4cvyzSnH7u>LD*SEDrhxY^8-{}M0S}aLI&>@E<;2p?QutS{*Y|tx77&*b-1q4Y5 z$wC&ZGh}6@D1UpL4<&uDtL0!1LQ?6;=O-IQ%~u*9_DE diff --git a/Implab/AsyncPool.cs b/Implab/AsyncPool.cs deleted file mode 100644 --- a/Implab/AsyncPool.cs +++ /dev/null @@ -1,28 +0,0 @@ -using System; -using System.Threading; - -namespace Implab { - /// - /// Класс для распаралеливания задач. - /// - /// - /// Используя данный класс и лямда выражения можно распараллелить - /// вычисления, для этого используется концепция обещаний. - /// - public static class AsyncPool { - - public static Promise Invoke(Func func) { - var p = new Promise(); - - ThreadPool.QueueUserWorkItem(param => { - try { - p.Resolve(func()); - } catch(Exception e) { - p.Reject(e); - } - }); - - return p; - } - } -} diff --git a/Implab/ICancellable.cs b/Implab/ICancellable.cs new file mode 100644 --- /dev/null +++ b/Implab/ICancellable.cs @@ -0,0 +1,10 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab { + public interface ICancellable { + bool Cancel(); + } +} diff --git a/Implab/IProgressHandler.cs b/Implab/IProgressHandler.cs new file mode 100644 --- /dev/null +++ b/Implab/IProgressHandler.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab { + public interface IProgressHandler { + string Message { + get; + set; + } + float CurrentProgress { + get; + set; + } + void InitProgress(float current, float max, string message); + } +} diff --git a/Implab/IProgressNotifier.cs b/Implab/IProgressNotifier.cs new file mode 100644 --- /dev/null +++ b/Implab/IProgressNotifier.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab +{ + public interface IProgressNotifier + { + event EventHandler> MessageUpdated; + event EventHandler> ProgressUpdated; + event EventHandler ProgressInit; + } +} diff --git a/Implab/IPromise.cs b/Implab/IPromise.cs new file mode 100644 --- /dev/null +++ b/Implab/IPromise.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab +{ + public interface IPromise: ICancellable + { + /// + /// Check whereather the promise has no more than one dependent promise. + /// + bool IsExclusive + { + get; + } + + /// + /// The current state of the promise. + /// + PromiseState State + { + get; + } + + /// + /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the + /// handler will be invoked immediatelly. + /// + /// The handler + void HandleCancelled(Action handler); + } +} diff --git a/Implab/ITaskController.cs b/Implab/ITaskController.cs new file mode 100644 --- /dev/null +++ b/Implab/ITaskController.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab { + public interface ITaskController: IProgressHandler { + bool Cancelled { + get; + } + } +} diff --git a/Implab/Implab.csproj b/Implab/Implab.csproj --- a/Implab/Implab.csproj +++ b/Implab/Implab.csproj @@ -32,13 +32,25 @@ + + + + + + + + + + + + + - + + - - - + \ No newline at end of file diff --git a/Implab/ManagedPromise.cs b/Implab/ManagedPromise.cs new file mode 100644 --- /dev/null +++ b/Implab/ManagedPromise.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab { + + /*public class ManagedPromise: Promise, ITaskController, IProgressNotifier { + + }*/ +} diff --git a/Implab/Parallels/ArrayTraits.cs b/Implab/Parallels/ArrayTraits.cs new file mode 100644 --- /dev/null +++ b/Implab/Parallels/ArrayTraits.cs @@ -0,0 +1,171 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab.Parallels { + public static class ArrayTraits { + class ArrayIterator : DispatchPool { + readonly Action m_action; + readonly TSrc[] m_source; + readonly Promise m_promise = new Promise(); + + int m_pending; + int m_next; + + public ArrayIterator(TSrc[] source, Action action, int threads) + : base(threads) { + + Debug.Assert(source != null); + Debug.Assert(action != null); + + m_next = 0; + m_source = source; + m_pending = source.Length; + m_action = action; + + m_promise.Anyway(() => Dispose()); + m_promise.Cancelled(() => Dispose()); + + InitPool(); + } + + public Promise Promise { + get { + return m_promise; + } + } + + protected override bool TryDequeue(out int unit) { + unit = Interlocked.Increment(ref m_next) - 1; + return unit >= m_source.Length ? false : true; + } + + protected override void InvokeUnit(int unit) { + try { + m_action(m_source[unit]); + var pending = Interlocked.Decrement(ref m_pending); + if (pending == 0) + m_promise.Resolve(m_source.Length); + } catch (Exception e) { + m_promise.Reject(e); + } + } + } + + class ArrayMapper: DispatchPool { + readonly Func m_transform; + readonly TSrc[] m_source; + readonly TDst[] m_dest; + readonly Promise m_promise = new Promise(); + + int m_pending; + int m_next; + + public ArrayMapper(TSrc[] source, Func transform, int threads) + : base(threads) { + + Debug.Assert (source != null); + Debug.Assert( transform != null); + + m_next = 0; + m_source = source; + m_dest = new TDst[source.Length]; + m_pending = source.Length; + m_transform = transform; + + m_promise.Anyway(() => Dispose()); + m_promise.Cancelled(() => Dispose()); + + InitPool(); + } + + public Promise Promise { + get { + return m_promise; + } + } + + protected override bool TryDequeue(out int unit) { + unit = Interlocked.Increment(ref m_next) - 1; + return unit >= m_source.Length ? false : true; + } + + protected override void InvokeUnit(int unit) { + try { + m_dest[unit] = m_transform(m_source[unit]); + var pending = Interlocked.Decrement(ref m_pending); + if (pending == 0) + m_promise.Resolve(m_dest); + } catch (Exception e) { + m_promise.Reject(e); + } + } + } + + public static Promise ParallelMap (this TSrc[] source, Func transform, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (transform == null) + throw new ArgumentNullException("transform"); + + var mapper = new ArrayMapper(source, transform, threads); + return mapper.Promise; + } + + public static Promise ParallelForEach(this TSrc[] source, Action action, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (action == null) + throw new ArgumentNullException("action"); + + var iter = new ArrayIterator(source, action, threads); + return iter.Promise; + } + + public static Promise ChainedMap(this TSrc[] source, ChainedOperation transform, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (transform == null) + throw new ArgumentNullException("transform"); + if (threads <= 0) + throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); + + var promise = new Promise(); + var res = new TDst[source.Length]; + var pending = source.Length; + var semaphore = new Semaphore(threads, threads); + + AsyncPool.InvokeNewThread(() => { + for (int i = 0; i < source.Length; i++) { + if(promise.State != PromiseState.Unresolved) + break; // stop processing in case of error or cancellation + var idx = i; + semaphore.WaitOne(); + try { + var p1 = transform(source[i]); + p1.Anyway(() => semaphore.Release()); + p1.Cancelled(() => semaphore.Release()); + p1.Then( + x => { + res[idx] = x; + var left = Interlocked.Decrement(ref pending); + if (left == 0) + promise.Resolve(res); + }, + e => promise.Reject(e) + ); + + } catch (Exception e) { + promise.Reject(e); + } + } + return 0; + }); + + return promise.Anyway(() => semaphore.Dispose()); + } + } +} diff --git a/Implab/Parallels/AsyncPool.cs b/Implab/Parallels/AsyncPool.cs new file mode 100644 --- /dev/null +++ b/Implab/Parallels/AsyncPool.cs @@ -0,0 +1,44 @@ +using System; +using System.Threading; + +namespace Implab.Parallels { + /// + /// Класс для распаралеливания задач. + /// + /// + /// Используя данный класс и лямда выражения можно распараллелить + /// вычисления, для этого используется концепция обещаний. + /// + public static class AsyncPool { + + public static Promise Invoke(Func func) { + var p = new Promise(); + + ThreadPool.QueueUserWorkItem(param => { + try { + p.Resolve(func()); + } catch(Exception e) { + p.Reject(e); + } + }); + + return p; + } + + public static Promise InvokeNewThread(Func func) { + var p = new Promise(); + + var worker = new Thread(() => { + try { + p.Resolve(func()); + } catch (Exception e) { + p.Reject(e); + } + }); + worker.IsBackground = true; + worker.Start(); + + return p; + } + } +} diff --git a/Implab/Parallels/DispatchPool.cs b/Implab/Parallels/DispatchPool.cs new file mode 100644 --- /dev/null +++ b/Implab/Parallels/DispatchPool.cs @@ -0,0 +1,238 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Diagnostics; + +namespace Implab.Parallels { + public abstract class DispatchPool : IDisposable { + readonly int m_minThreads; + readonly int m_maxThreads; + int m_runningThreads = 0; + int m_maxRunningThreads = 0; + int m_suspended = 0; + int m_exitRequired = 0; + AutoResetEvent m_hasTasks = new AutoResetEvent(false); + + protected DispatchPool(int min, int max) { + if (min < 0) + throw new ArgumentOutOfRangeException("min"); + if (max <= 0) + throw new ArgumentOutOfRangeException("max"); + + if (min > max) + min = max; + m_minThreads = min; + m_maxThreads = max; + } + + protected DispatchPool(int threads) + : this(threads, threads) { + } + + protected DispatchPool() { + int maxThreads, maxCP; + ThreadPool.GetMaxThreads(out maxThreads, out maxCP); + + m_minThreads = 0; + m_maxThreads = maxThreads; + } + + protected void InitPool() { + for (int i = 0; i < m_minThreads; i++) + StartWorker(); + } + + public int ThreadCount { + get { + return m_runningThreads; + } + } + + public int MaxRunningThreads { + get { + return m_maxRunningThreads; + } + } + + protected bool IsDisposed { + get { + return m_exitRequired != 0; + } + } + + protected abstract bool TryDequeue(out TUnit unit); + + protected virtual bool ExtendPool() { + if (m_suspended > 0) { + m_hasTasks.Set(); + return true; + } else + return StartWorker(); + } + + /// + /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока + /// + protected void WakePool() { + m_hasTasks.Set(); // wake sleeping thread; + + if (AllocateThreadSlot(1)) { + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + } + } + + protected virtual void Suspend() { + m_hasTasks.WaitOne(); + } + + #region thread slots traits + + bool AllocateThreadSlot() { + int current; + // use spins to allocate slot for the new thread + do { + current = m_runningThreads; + if (current >= m_maxThreads || m_exitRequired != 0) + // no more slots left or the pool has been disposed + return false; + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); + + UpdateMaxThreads(current + 1); + + return true; + } + + bool AllocateThreadSlot(int desired) { + if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1)) + return false; + + UpdateMaxThreads(desired); + + return true; + } + + bool ReleaseThreadSlot(out bool last) { + last = false; + int current; + // use spins to release slot for the new thread + do { + current = m_runningThreads; + if (current <= m_minThreads && m_exitRequired == 0) + // the thread is reserved + return false; + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); + + last = (current == 1); + + return true; + } + + /// + /// releases thread slot unconditionally, used during cleanup + /// + /// true - no more threads left + bool ReleaseThreadSlotAnyway() { + var left = Interlocked.Decrement(ref m_runningThreads); + return left == 0; + } + + void UpdateMaxThreads(int count) { + int max; + do { + max = m_maxRunningThreads; + if (max >= count) + break; + } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); + } + + #endregion + + bool StartWorker() { + if (AllocateThreadSlot()) { + // slot successfully allocated + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + + return true; + } else { + return false; + } + } + + bool FetchTask(out TUnit unit) { + do { + // exit if requested + if (m_exitRequired != 0) { + // release the thread slot + if (ReleaseThreadSlotAnyway()) // it was the last worker + m_hasTasks.Dispose(); + else + m_hasTasks.Set(); // wake next worker + unit = default(TUnit); + return false; + } + + // fetch task + if (TryDequeue(out unit)) { + ExtendPool(); + return true; + } + + //no tasks left, exit if the thread is no longer needed + bool last; + if (ReleaseThreadSlot(out last)) { + if (last && m_hasTasks.WaitOne(0)) { + if (AllocateThreadSlot(1)) + continue; // spin again... + else + // we failed to reallocate slot for this thread + // therefore we need to release the event + m_hasTasks.Set(); + } + + return false; + } + + // entering suspend state + Interlocked.Increment(ref m_suspended); + // keep this thread and wait + Suspend(); + Interlocked.Decrement(ref m_suspended); + } while (true); + } + + protected abstract void InvokeUnit(TUnit unit); + + void Worker() { + TUnit unit; + while (FetchTask(out unit)) + InvokeUnit(unit); + } + + protected virtual void Dispose(bool disposing) { + if (disposing) { + if (m_exitRequired == 0) { + if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) + return; + + // wake sleeping threads + m_hasTasks.Set(); + GC.SuppressFinalize(this); + } + } + } + + public void Dispose() { + Dispose(true); + } + + ~DispatchPool() { + Dispose(false); + } + } +} diff --git a/Implab/Parallels/MTQueue.cs b/Implab/Parallels/MTQueue.cs new file mode 100644 --- /dev/null +++ b/Implab/Parallels/MTQueue.cs @@ -0,0 +1,74 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab.Parallels { + public class MTQueue { + class Node { + public Node(T value) { + this.value = value; + } + public readonly T value; + public Node next; + } + + Node m_first; + Node m_last; + + public void Enqueue(T value) { + var last = m_last; + var next = new Node(value); + + while (last != Interlocked.CompareExchange(ref m_last, next, last)) + last = m_last; + + if (last != null) + last.next = next; + else + m_first = next; + } + + public bool TryDequeue(out T value) { + Node first; + Node next = null; + value = default(T); + + do { + first = m_first; + if (first == null) + return false; + next = first.next; + if (next == null) { + // this is the last element, + // then try to update tail + if (first != Interlocked.CompareExchange(ref m_last, null, first)) { + // this is inconsistent situation which means that the queue is empty + if (m_last == null) + return false; + // tail has been changed, that means that we need to restart + continue; + } + + // tail succesfully updated and first.next will never be changed + // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null + // but the writer may update the m_first since the m_last is null + + // so we need to fix inconsistency by setting m_first to null, but if it already has been + // updated by a writer then we should just give up + Interlocked.CompareExchange(ref m_first, null, first); + break; + + } else { + if (first == Interlocked.CompareExchange(ref m_first, next, first)) + // head succesfully updated + break; + } + } while (true); + + value = first.value; + return true; + } + } +} diff --git a/Implab/Parallels/WorkerPool.cs b/Implab/Parallels/WorkerPool.cs new file mode 100644 --- /dev/null +++ b/Implab/Parallels/WorkerPool.cs @@ -0,0 +1,89 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Diagnostics; + +namespace Implab.Parallels { + public class WorkerPool : DispatchPool { + + MTQueue m_queue = new MTQueue(); + int m_queueLength = 0; + readonly int m_threshold = 1; + + public WorkerPool(int minThreads, int maxThreads, int threshold) + : base(minThreads, maxThreads) { + m_threshold = threshold; + InitPool(); + } + + public WorkerPool(int minThreads, int maxThreads) : + base(minThreads, maxThreads) { + InitPool(); + } + + public WorkerPool(int threads) + : base(threads) { + InitPool(); + } + + public WorkerPool() + : base() { + InitPool(); + } + + public Promise Invoke(Func task) { + if (task == null) + throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); + + var promise = new Promise(); + + EnqueueTask(delegate() { + try { + promise.Resolve(task()); + } catch (Exception e) { + promise.Reject(e); + } + }); + + return promise; + } + + protected void EnqueueTask(Action unit) { + Debug.Assert(unit != null); + var len = Interlocked.Increment(ref m_queueLength); + m_queue.Enqueue(unit); + + if(!ExtendPool()) + WakePool(); + } + + protected override bool ExtendPool() { + if (m_queueLength <= m_threshold*ThreadCount) + // in this case we are in active thread and it request for additional workers + // satisfy it only when queue is longer than threshold + return false; + return base.ExtendPool(); + } + + protected override bool TryDequeue(out Action unit) { + if (m_queue.TryDequeue(out unit)) { + Interlocked.Decrement(ref m_queueLength); + return true; + } + return false; + } + + protected override void InvokeUnit(Action unit) { + unit(); + } + + protected override void Suspend() { + if (m_queueLength == 0) + base.Suspend(); + } + } +} diff --git a/Implab/ProgressInitEventArgs.cs b/Implab/ProgressInitEventArgs.cs new file mode 100644 --- /dev/null +++ b/Implab/ProgressInitEventArgs.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab +{ + [Serializable] + public class ProgressInitEventArgs: EventArgs + { + public float MaxProgress + { + get; + private set; + } + + public float CurrentProgress + { + get; + private set; + } + + public string Message + { + get; + private set; + } + + public ProgressInitEventArgs(float current, float max, string message) + { + this.MaxProgress = max; + this.CurrentProgress = current; + this.Message = message; + } + } +} diff --git a/Implab/Promise.cs b/Implab/Promise.cs --- a/Implab/Promise.cs +++ b/Implab/Promise.cs @@ -1,18 +1,16 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Reflection; -using System.Text; using System.Diagnostics; using System.Threading; namespace Implab { public delegate void ErrorHandler(Exception e); - - public delegate void ResultHandler(T result); - public delegate TNew ResultMapper(TSrc result); - public delegate Promise ChainedOperation(TSrc result); + public delegate T ErrorHandler(Exception e); + public delegate void ResultHandler(T result); + public delegate TNew ResultMapper(TSrc result); + public delegate Promise ChainedOperation(TSrc result); /// /// Класс для асинхронного получения результатов. Так называемое "обещание". @@ -48,23 +46,23 @@ namespace Implab { /// только инициатор обещания иначе могут возникнуть противоречия. /// /// - public class Promise { + public class Promise : IPromise { struct ResultHandlerInfo { public ResultHandler resultHandler; public ErrorHandler errorHandler; } - enum State { - Unresolved, - Resolving, - Resolved, - Cancelled - } + readonly IPromise m_parent; + + LinkedList m_resultHandlers = new LinkedList(); + LinkedList m_cancelHandlers = new LinkedList(); - LinkedList m_handlersChain = new LinkedList(); - State m_state; - bool m_cancellable; + readonly object m_lock = new Object(); + readonly bool m_cancellable; + int m_childrenCount = 0; + + PromiseState m_state; T m_result; Exception m_error; @@ -72,13 +70,17 @@ namespace Implab { m_cancellable = true; } - /// - /// Событие, возникающее при отмене асинхронной операции. - /// - /// - /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить. - /// - public event EventHandler Cancelled; + public Promise(IPromise parent, bool cancellable) { + m_cancellable = cancellable; + m_parent = parent; + if (parent != null) + parent.HandleCancelled(InternalCancel); + } + + void InternalCancel() { + // don't try to cancel parent :) + Cancel(false); + } /// /// Выполняет обещание, сообщая об успешном выполнении. @@ -86,38 +88,39 @@ namespace Implab { /// Результат выполнения. /// Данное обещание уже выполнено public void Resolve(T result) { - lock (this) { - if (m_state == State.Cancelled) + lock (m_lock) { + if (m_state == PromiseState.Cancelled) return; - if (m_state != State.Unresolved) + if (m_state != PromiseState.Unresolved) throw new InvalidOperationException("The promise is already resolved"); m_result = result; - m_state = State.Resolving; + m_state = PromiseState.Resolved; } - ResultHandlerInfo handler; - while (FetchNextHandler(out handler)) - InvokeHandler(handler); + OnStateChanged(); } /// /// Выполняет обещание, сообщая об ошибке /// + /// + /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков + /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные + /// будут проигнорированы. + /// /// Исключение возникшее при выполнении операции /// Данное обещание уже выполнено public void Reject(Exception error) { - lock (this) { - if (m_state == State.Cancelled) + lock (m_lock) { + if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected) return; - if (m_state != State.Unresolved) + if (m_state != PromiseState.Unresolved) throw new InvalidOperationException("The promise is already resolved"); m_error = error; - m_state = State.Resolving; + m_state = PromiseState.Rejected; } - ResultHandlerInfo handler; - while (FetchNextHandler(out handler)) - InvokeHandler(handler); + OnStateChanged(); } /// @@ -125,47 +128,31 @@ namespace Implab { /// /// true Операция была отменена, обработчики не будут вызваны.false отмена не возможна, поскольку обещание уже выполнено и обработчики отработали. public bool Cancel() { - lock (this) { - if (m_state == State.Unresolved && m_cancellable) { - m_state = State.Cancelled; - EventHandler temp = Cancelled; - - if (temp != null) - temp(this, new EventArgs()); - - return true; - } else - return false; - } + return Cancel(true); } /// - /// Добавляет обработчики событий выполнения обещания. + /// Adds new handlers to this promise. /// - /// Обработчик успешного выполнения обещания. - /// Данному обработчику будет передан результат выполнения операции. - /// Обработчик ошибки. Данный обработчик получит - /// исключение возникшее при выполнении операции. - /// Само обещание + /// The handler of the successfully completed operation. + /// This handler will recieve an operation result as a parameter. + /// Handles an exception that may occur during the operation. + /// The new promise chained to this one. public Promise Then(ResultHandler success, ErrorHandler error) { if (success == null && error == null) return this; - var medium = new Promise(); + var medium = new Promise(this, true); var handlerInfo = new ResultHandlerInfo(); if (success != null) handlerInfo.resultHandler = x => { - try { - success(x); - medium.Resolve(x); - } catch (Exception e) { - medium.Reject(e); - } + success(x); + medium.Resolve(x); }; else - handlerInfo.resultHandler = x => medium.Resolve(x); + handlerInfo.resultHandler = medium.Resolve; if (error != null) handlerInfo.errorHandler = x => { @@ -175,21 +162,106 @@ namespace Implab { medium.Reject(x); }; else - handlerInfo.errorHandler = x => medium.Reject(x); + handlerInfo.errorHandler = medium.Reject; + + AddHandler(handlerInfo); + + return medium; + } + + /// + /// Adds new handlers to this promise. + /// + /// The handler of the successfully completed operation. + /// This handler will recieve an operation result as a parameter. + /// Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation. + /// The new promise chained to this one. + public Promise Then(ResultHandler success, ErrorHandler error) { + if (success == null && error == null) + return this; + + var medium = new Promise(this, true); + + var handlerInfo = new ResultHandlerInfo(); + + if (success != null) + handlerInfo.resultHandler = x => { + success(x); + medium.Resolve(x); + }; + else + handlerInfo.resultHandler = medium.Resolve; + + if (error != null) + handlerInfo.errorHandler = x => { + try { + medium.Resolve(error(x)); + } catch { } + medium.Reject(x); + }; + else + handlerInfo.errorHandler = medium.Reject; AddHandler(handlerInfo); return medium; } + public Promise Then(ResultHandler success) { - return Then(success, null); + if (success == null) + return this; + + var medium = new Promise(this, true); + + var handlerInfo = new ResultHandlerInfo(); + + if (success != null) + handlerInfo.resultHandler = x => { + success(x); + medium.Resolve(x); + }; + else + handlerInfo.resultHandler = medium.Resolve; + + handlerInfo.errorHandler = medium.Reject; + + AddHandler(handlerInfo); + + return medium; } public Promise Error(ErrorHandler error) { return Then(null, error); } + /// + /// Handles error and allows to keep the promise. + /// + /// + /// If the specified handler throws an exception, this exception will be used to reject the promise. + /// + /// The error handler which returns the result of the promise. + /// New promise. + public Promise Error(ErrorHandler handler) { + if (handler == null) + return this; + + var medium = new Promise(this, true); + + AddHandler(new ResultHandlerInfo { + errorHandler = e => { + try { + medium.Resolve(handler(e)); + } catch (Exception e2) { + medium.Reject(e2); + } + } + }); + + return medium; + } + public Promise Anyway(Action handler) { if (handler == null) return this; @@ -198,6 +270,7 @@ namespace Implab { AddHandler(new ResultHandlerInfo { resultHandler = x => { + // to avoid handler being called multiple times we handle exception by ourselfs try { handler(); medium.Resolve(x); @@ -229,20 +302,15 @@ namespace Implab { throw new ArgumentNullException("mapper"); // создаем прицепленное обещание - Promise chained = new Promise(); + var chained = new Promise(); AddHandler(new ResultHandlerInfo() { - resultHandler = delegate(T result) { - try { - // если преобразование выдаст исключение, то сработает reject сцепленного deferred - chained.Resolve(mapper(result)); - } catch (Exception e) { - chained.Reject(e); - } - }, + resultHandler = result => chained.Resolve(mapper(result)), errorHandler = delegate(Exception e) { if (error != null) - error(e); + try { + error(e); + } catch { } // в случае ошибки нужно передать исключение дальше по цепочке chained.Reject(e); } @@ -271,19 +339,21 @@ namespace Implab { // создать посредника, к которому будут подвызяваться следующие обработчики. // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы // передать через него результаты работы. - Promise medium = new Promise(); + var medium = new Promise(this, true); - AddHandler(new ResultHandlerInfo() { + AddHandler(new ResultHandlerInfo { resultHandler = delegate(T result) { - try { - chained(result).Then( - x => medium.Resolve(x), - e => medium.Reject(e) - ); - } catch (Exception e) { - // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке - medium.Reject(e); - } + if (medium.State == PromiseState.Cancelled) + return; + + var promise = chained(result); + + // notify chained operation that it's not needed + medium.Cancelled(() => promise.Cancel()); + promise.Then( + x => medium.Resolve(x), + e => medium.Reject(e) + ); }, errorHandler = delegate(Exception e) { if (error != null) @@ -300,6 +370,22 @@ namespace Implab { return Chain(chained, null); } + public Promise Cancelled(Action handler) { + if (handler == null) + return this; + lock (m_lock) { + if (m_state == PromiseState.Unresolved) + m_cancelHandlers.AddLast(handler); + else if (m_state == PromiseState.Cancelled) + handler(); + } + return this; + } + + public void HandleCancelled(Action handler) { + Cancelled(handler); + } + /// /// Дожидается отложенного обещания и в случае успеха, возвращает /// его, результат, в противном случае бросает исключение. @@ -322,52 +408,37 @@ namespace Implab { /// Время ожидания /// Результат выполнения обещания public T Join(int timeout) { - ManualResetEvent evt = new ManualResetEvent(false); + var evt = new ManualResetEvent(false); Anyway(() => evt.Set()); + Cancelled(() => evt.Set()); if (!evt.WaitOne(timeout, true)) throw new TimeoutException(); - if (m_error != null) - throw new TargetInvocationException(m_error); - else - return m_result; + switch (State) { + case PromiseState.Resolved: + return m_result; + case PromiseState.Cancelled: + throw new OperationCanceledException(); + case PromiseState.Rejected: + throw new TargetInvocationException(m_error); + default: + throw new ApplicationException(String.Format("Invalid promise state {0}", State)); + } } public T Join() { return Join(Timeout.Infinite); } - /// - /// Данный метод последовательно извлекает обработчики обещания и когда - /// их больше не осталось - ставит состояние "разрешено". - /// - /// Информация об обработчике - /// Признак того, что еще остались обработчики в очереди - bool FetchNextHandler(out ResultHandlerInfo handler) { - handler = default(ResultHandlerInfo); - - lock (this) { - Debug.Assert(m_state == State.Resolving); - - if (m_handlersChain.Count > 0) { - handler = m_handlersChain.First.Value; - m_handlersChain.RemoveFirst(); - return true; - } else { - m_state = State.Resolved; - return false; - } - } - } - void AddHandler(ResultHandlerInfo handler) { bool invokeRequired = false; - lock (this) { - if (m_state != State.Resolved) - m_handlersChain.AddLast(handler); - else + lock (m_lock) { + m_childrenCount++; + if (m_state == PromiseState.Unresolved) { + m_resultHandlers.AddLast(handler); + } else invokeRequired = true; } @@ -377,21 +448,102 @@ namespace Implab { } void InvokeHandler(ResultHandlerInfo handler) { - if (m_error == null) { - try { - if (handler.resultHandler != null) - handler.resultHandler(m_result); - } catch { } - } - - if (m_error != null) { - try { - if (handler.errorHandler != null) - handler.errorHandler(m_error); - } catch { } + switch (m_state) { + case PromiseState.Resolved: + try { + if (handler.resultHandler != null) + handler.resultHandler(m_result); + } catch (Exception e) { + try { + if (handler.errorHandler != null) + handler.errorHandler(e); + } catch { } + } + break; + case PromiseState.Rejected: + try { + if (handler.errorHandler != null) + handler.errorHandler(m_error); + } catch { } + break; + default: + // do nothing + return; } } + protected virtual void OnStateChanged() { + switch (m_state) { + case PromiseState.Resolved: + foreach (var resultHandlerInfo in m_resultHandlers) + try { + if (resultHandlerInfo.resultHandler != null) + resultHandlerInfo.resultHandler(m_result); + } catch (Exception e) { + try { + if (resultHandlerInfo.errorHandler != null) + resultHandlerInfo.errorHandler(e); + } catch { } + } + break; + case PromiseState.Cancelled: + foreach (var cancelHandler in m_cancelHandlers) + cancelHandler(); + break; + case PromiseState.Rejected: + foreach (var resultHandlerInfo in m_resultHandlers) + try { + if (resultHandlerInfo.errorHandler != null) + resultHandlerInfo.errorHandler(m_error); + } catch { } + break; + default: + throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state)); + } + + m_resultHandlers = null; + m_cancelHandlers = null; + } + + + + public bool IsExclusive { + get { + lock (m_lock) { + return m_childrenCount <= 1; + } + } + } + + public PromiseState State { + get { + lock (m_lock) { + return m_state; + } + } + } + + protected bool Cancel(bool dependencies) { + bool result; + + lock (m_lock) { + if (m_state == PromiseState.Unresolved) { + m_state = PromiseState.Cancelled; + result = true; + } else { + result = false; + } + } + + if (result) + OnStateChanged(); + + if (dependencies && m_parent != null && m_parent.IsExclusive) { + m_parent.Cancel(); + } + + return result; + } } } diff --git a/Implab/PromiseState.cs b/Implab/PromiseState.cs new file mode 100644 --- /dev/null +++ b/Implab/PromiseState.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab +{ + public enum PromiseState + { + Unresolved, + Resolved, + Cancelled, + Rejected + } +} diff --git a/Implab/TaskController.cs b/Implab/TaskController.cs new file mode 100644 --- /dev/null +++ b/Implab/TaskController.cs @@ -0,0 +1,132 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab +{ + /// + /// This class allows to interact with asyncronuos task. + /// + /// + /// Members of this object are thread safe. + /// + class TaskController: IProgressNotifier, ITaskController, ICancellable + { + readonly object m_lock; + string m_message; + + float m_current; + float m_max; + + bool m_cancelled; + + public event EventHandler> MessageUpdated; + public event EventHandler> ProgressUpdated; + public event EventHandler ProgressInit; + + public TaskController() + { + m_lock = new Object(); + } + + public string Message + { + get + { + lock (m_lock) + return m_message; + } + set + { + lock (m_lock) + { + m_message = value; + OnMessageUpdated(); + } + } + } + + public float CurrentProgress + { + get + { + lock (m_lock) + return m_current; + } + set + { + lock (m_lock) + { + var prev = m_current; + m_current = value; + if (m_current >= m_max) + m_current = m_max; + if (m_current != prev) + OnProgressUpdated(); + } + } + } + + public void InitProgress(float current, float max, string message) + { + if (max < 0) + throw new ArgumentOutOfRangeException("max"); + if (current < 0 || current > max) + throw new ArgumentOutOfRangeException("current"); + + lock(m_lock) { + m_current = current; + m_max = max; + m_message = message; + OnProgressInit(); + } + } + + public bool Cancelled { + get { + lock (m_lock) + return m_cancelled; + } + } + + public bool Cancel() { + lock (m_lock) { + if (!m_cancelled) { + m_cancelled = true; + return true; + } else { + return false; + } + } + } + + protected virtual void OnMessageUpdated() + { + var temp = MessageUpdated; + if (temp != null) + { + temp(this, new ValueEventArgs(m_message)); + } + } + + protected virtual void OnProgressUpdated() + { + var temp = ProgressUpdated; + if (temp != null) + { + temp(this,new ValueEventArgs(m_current)); + } + } + + protected virtual void OnProgressInit() + { + var temp = ProgressInit; + if (temp != null) + { + temp(this, new ProgressInitEventArgs(m_current,m_max, m_message)); + } + } + } +} diff --git a/Implab/ValueEventArgs.cs b/Implab/ValueEventArgs.cs new file mode 100644 --- /dev/null +++ b/Implab/ValueEventArgs.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab +{ + [Serializable] + public class ValueEventArgs: EventArgs + { + public ValueEventArgs(T value) + { + this.Value = value; + } + public T Value + { + get; + private set; + } + } +}