# HG changeset patch # User cin # Date 2013-11-07 16:20:26 # Node ID 5a4b735ba66981d684eb477f0f5e697d2200bf82 # Parent 0f982f9b7d4d0bf83e1700783d6b0eb8fe01aec5 sync diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -242,6 +242,38 @@ namespace Implab.Test { } [TestMethod] + public void ChainedMapTest() { + + using (var pool = new WorkerPool(1,10)) { + int count = 10000; + + double[] args = new double[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = rand.NextDouble(); + + var t = Environment.TickCount; + var res = args + .ChainedMap( + x => pool.Invoke( + () => Math.Sin(x * x) + ), + 4 + ) + .Join(); + + Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); + + t = Environment.TickCount; + for (int i = 0; i < count; i++) + Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); + } + } + + [TestMethod] public void ParallelForEachTest() { int count = 100000; diff --git a/Implab.suo b/Implab.suo index 34b35ad721068c074ea216725753a1e954db220f..453df4ad2d602dd29c42a1de303bdb9f56e12f58 GIT binary patch literal 75776 zc%1D$4R{;Xk?SN*oRC0B1BBliYEnooMY1i)PDqKYpAc{o6FWZzhh#0UZ4p@#lAPEf zKq>SGD5WKZ0xcv2C|qgF(Q7GfY00HPq2&U!&9yYW{vAE|dUq6#pK@?CH#56WTCG>w zeJjaWW}m;YcBS1n^Zw?|o0&K7*=LUa!<`SF@JB(DJ}OKV{(g9-FkSWC6o49bGgrQ! z3h?*ChYz!_F9A4gr>aTs0!)K8I}xB3UP*a>hMz~ul}0DPQiPrzRW+9Cw`W*|R&!+bgxdJXGK4F=EdyK`jW z!9xdXkY_CS$=`JX#srjbC1CG|s}NDmgMUgMzcKsXT;;PHS}6^++W|G5hVrk2{}Rx| zzE2zE(;!T#!7;#tA9-k97Jn4lv6FD76&fU{iwOJ=0&IgG?xNnWpB4dr9Je1w_9jg4+DA(sJpIw5?AMhG^ zzy{^|W6I}+$|v#yN^-i7qzx35b zZ|rwORzI}p>A+|2?Re2l|52w`0yBAE$xk&(0hnMh8++$P>(fE0cX*&PC8ec!M$AML z@lG+aRUD8S>(@)EG~6$9yPWjbSRWXUWrkDIvbZ#yNr|z>`p)6rSTu5;G}@imD#e%e zx|;hv{l0#;yU*hio5dM53^S%_!>gi^R3e?|&p6jd)5Bt{D>K{|O*n~!0V$Qe=uRYJ z8=~>P#P+l^C}za&Xfo~WjK_ke_@J&SeNC#a=DSWRh zlSoNe&LeX2R0-8rFJS*&0&~m;;GiksT0PKjhk&odfQ$9g+=J;T<#{zurn@I#!FiqUZf@INLan8lAWPGbT5 zIR7B;O#gz#^%VJf znwb|&B+TdI0_%tJtUdt$Md*k1RH_*DsCsCz5#Y>TknDOunuX%rePCPE(_H8T=z*sK@E_mJz-O9j3j6uz#+?QnFEPN-PZTgp*!{$zNd9&r zP!uKkCPvRoS?**#O)2wtJgHRvjq=kfsEHWy8>~f<`t&eKg|cjnQp;3&dMT}SE`@gT zz@G~~-2km16}7^Orv+Ne0ssBLRa|h@2KR2b$6xXJFkA-!+TqzSJmG=WKwn7 zY61MX<~@M}KR(a+Kh8H9!W^fOlpBGp^(7c98ECfz(IRPP#3g_q<@4h?@aNZm0`eb* zI+t@+m$Z|{d$j0G=u+k%p7uwX`^b7@{A?{2B>o}l=H&$ZIG@e7SpF{R;ltT7%Jtk2 zcLOy4irHWEQ=^6ckNT0lGvwEIdgOhYk}ezjd1K+JK^m(aGH8} zvJ?876I#AKD*?B|lYN4k`<`8PdUE`Smlk;@2ml9Zgf@$)dE-oaMcd9 zc!ehVE&!kSmJcZO!+$SO<0V>*dO{AX7jT5(eay*47_hAE@Eu}krEeXOJ5u7K8VJG@ zKKQ;Fz)MeKEt$pYFFV$9^n-HJQ))kOL2lTlE^5zKqNNr7+)9rK!J7`E)d$xOcqR<_ zV=rz7eAom0@I-*KMNqSgZ@H*&sR_b$NI1V9^{(aY z$FfXT?mQW{jF8;CJ-g11=B=}%npHPa66AU(NqjX)^B>C;-$_4Al2 zACy{HPC((Tw_fdhBmC)S&%Js6CpUMTcSS9GYQgo_AJ`G-2|m=@(fHzR+umf~&u;wR z$fM%EbMD%H{lJf(5T6y4l4|pLYx;}RuN?f(z>Sw5ethZ!7eAu+TmzH}HPBmuc9BYz zrnQ9&{@k>Na1%~eE+@3e2vE-E%8k#=qTjKENkxl_t%R})>KWD>Xhzysg}YCrmR$|) z-VM3SYauCH%Z-o+)6OE!Wx_V7KctN5QbLf2dHTDlO*7OUe&L>Z@(SYc-|;_cMmvZn zWoXQyB+yL~agwM(3@9It!ms`C7b9s&%1TIVHILdAR^oNE+Da(e{e;pV`jmG9L|2-m zjVP76kff=fQ2&j3U6g68$X`s3KsgxeC_(afO5r~l1MLUlC!KIT2Y%EQEl~s@tR2KE z(pt1nBi9&fz)mZ1{c{e~dLQAALz!dnKTK`Zr}Vi>VKR14~1+(>Un4|y~aMX|@ zM>G@9L>bCW@=>#}IonId*3J?O_|XP+^|d_sab8#jzl*@5@@vFv0Y6G?=P1|3`XBN{ zw?SUWf3Z9`*O|qSC184t+?;_Hz7UOnoLjnRwvcEp#ME@^sk;x(y+>sizj}@P_b9d#FddsmQD5n%&>WZKV@@odnA-KoYrUR~;X#Ih-xPb}} ztxoW_AW7{$da{jHCr$7~6W#Hx2+E1I94%5#daSny%BX&t;i4ctO7K=cd3d1K1ZS_p zIWIN9Z@dyf+D5Xg5ww1rUq#68I*A*l$7zg0sm5W*J>CnQO)_>DZenE_RYI5mOdGZQ*_Q^)xNK^4z`GEPeKptuLs>r0jIH zoDV-v!$Tx0ND7wU{_U^Ze--c!dEZ!d^gD9~_b$@IqF(plswWL~gR7EmVGBQg?c&%^ z%NL2v6*#pp(NU&v))B5<-&%2W;{L}sUNFyhM`DUTeR^Y$CG2SZWTT1EJR2lFzK;A) zdP;4-LY)O^pV#)8GZwf8y-hpK9Cv+AoHtzcEdx&@AP~ z-$kX?e~`FAGCTR@6V-U)npeMbXxE`LPrUO!@gIKKa-4n-PSc>pt{<`{c;m$Sf2 zR1fFCFLCNQ*upGl)UdR3<*ZL3_K8leeAxPsX@zncq#Si<`@Dc7S6PRrZjLq7>M8nt zH2Xcv|42@rhIav~#|U*(`P-PP9#C6#7UjsKkvzEp_{EC$u0Vr{b%XP7B8ND-ovxczxkDeFYo`u1NZ6;c(!i->e9iN9zH;xQuzpjJot5Zz4bO< zM0-X%A+(t8!u@(yTk4Fi7`nNVA=XaK`{|izP(DoiDDvVRTFr@~4e&-6- z^v_26mnMU}8Z7ufpf5htB6TY|mT*@4n$vdX+a2;bFiYtFafHBE|L8AspK_hab{lt= zH=!@ZphN~(7(ciE7xxV@&T6dxEv^AxXo3GAUu8CJ^}AOKa@%}L`TUIXiK_DJ zl++QG1b@~}R^EAM}sGdc`_j3S0H+cUy z@cSVFN4YT`JK1SGQ+uGRQ>!=aInMn0r|@JOA+>-k-5rpgPVGFTxBjr2qcb}{esk$D znIGMCpa$(xC9K@z)KcKLKAIWX6_Wr_fKMu`YT5RYpHN&uKHjU?xpFfn z)-x(T66EXIqoGh+%Dz0B^l~k3TKY_>L@ET z71z^Yj?OaFU}2p(Vpvo%SJ2bhTIo~fxa4UT=`Fp_ucvw)rj<$I*@1rs;xmc`Lmev- z&z3A6v%A=E-tDK_WIOEZHQa{r;j#POdUUv0F%H+0dEtp(LyRK5subl#l`Q@G9(`!cs8{d2zY= z@ZnPAYS}8tN zFC8oE-1&I3+UBhPeZJPb;lkLK|9F1vJ`VGG^E#5&`^Z-$PO=2b``z%ipLWh-;?$#S zLa-FqqoXIBIX!x=21=9u`6P35PKKCoqvd?_=N`55Q8h}N-V*D9Pu^$ThId`mOV)XF zwi_H>7{BmRp$-1pX^%;ecERJ`p8&-Sz}+Wd8W$KBz)!p79TX!1pY+2Y?oMx~x7rkI zJ<@$0H?yD31uY>3*|M`guzJz^eKxNF2Tzjfx}RAY3KBrek18In$9X%hR0)=g(@ zKP)sqZmF8t#Fk{Lvs{HGnM(2&oITaw3!2)r`GY=xz*%Peuxe$9$uf4)GgSYH8cm6- zdsJ6eLq1zjYsi;!NY6Ujat`TP-l<`%Ew?4-_mAZ46m|$vqS&0zuuP}h)bq^48TSY9 z?+?;*6d$|MVtr*g!b5jVJ^0)&zxdrheD~*zuKxCm3-tWJPwc&X{#_jY??&q4dDDL9 z?VWwhzy9jr-GBa6NE)50gL4c&XfIuD`PLBwCmekCf#Z(-&bK}|Mf&xddVcqLG^qIj z=l)kd!`Pv%t7{J}dAN7o@iR{Qdh^NG>g}N^al~@$A26e={l9m&{N;@f$DWI>dh+7? zov~%LdTppim{vIcfimzMEBjFSSI)MCIhj_V#`_Ok1!j0T;r@dY1^LW|Jmyd&jSBMl zC73n>RWY(xU=%9Z_9w>{F%x}V-v5F2%K04lQKDh-hm6Z#dLe-fJovu?zUd<9fA?)1w@_YOn{c+Bq2yB+NDL*#l$5GNY+cbEk}y@M zyRlfMD4(HAhDAL3fn~78UC|oaM9nyptpTRt#Y7|%i914<2kD>xF;d_}j!--!rIM*= zTEe3^(#}9kOs9iVIywNwD#ytnv#}ejl&hi)C9lO&t+G-WXCN3FNhVU6U{s7H2HF!N zT~a0kErXKivB{LQEt(ik$3_E#Qe>;tw-V@0&t+eCBqDTV%1P{tl|$ly6i%du#0);U zGTtYR%uj5SQmJU46dH-9G3RhJCgJE`tkZwGl2QZ+fsxRs^pM=ISY>rJ*laVAxaky^ zhnln&?|HMMC}4eJMx1(*&YS*JN*u+rR1oV-v^`D3e}<1v-yFp%s@-IHO#AUW{m*$HKXmOavkOyzi6ef}*loI44|3Sgb>vW&zZdv> zNB!_NDSGtHwS#^4J3G7&sVO-Ked?vDs%_U+_b-`U9Lgu^^;FPrM~U-g)4|o5;Bhkir4dw*)@#(aw};soh+L7$!|5QRPVAaX12x5wwO&6 ziy8BO)5g;Lb))S6#^*WAECqUEo?6;k;%uY%SJmtHrncM__Y;syUz+8v#ZoI+-G5O% z7B@c64jwy}6<>d&fJ*$cYJN-Vna8vqA?d1Gy#r%a+8X&M{W-H1hL?Q<7?A&>&-y86?wD?;cu275L(Gm`L9j&cizr*Ei zX=-hDd0M=_mdn004y56aQi_wXuQ0;A&EdADfGgney4%B!mezpF;cxc&9IZjGFVGzD zdfh=6Bh1+?r87OM>!Kt&Gx)Ta9*sxvD$PONwUw*3RAMNamN;m+WgMDNM0I~AB}Tbw z>E`jE44aR$Z+&qb=o-pgGHh)BE*=5V%K1;*gk1bVrqE?)>;?q+Jk}w&W-A2){1pEX zIatX&N(!elN<4o{i@%HbcE%+3($8O(hIz(u<$7flej7+h>2y4i ziS|dO6bDr|R*tHSnBE#m#4{s0MD#hxvsQLj15axNZM6-)-))_`zA{NR(ex9 zBW8HI)b8;TSrz&QrC5@y#gNpr8(5@XlwF?9N}=l<7)PYz5ez< zmcZ~xPxh)fas4AbVmvxT?lCfPJe};e%2StDaXhqrgVEL$Zgz)STpmZL$?J2pxZ6Aq zzsnPJczi9cw&p;SyUEq2qAjm=m0Ke!a4+?sA5lyk?N6kJILQ6ldqwV$7>#q% zdRrxjF*I`~D&Z<2Ee-X?Mv*!W+G>OvW=M!GpTMj5ud*8SZ!?UAnbZ`Rq4E<8%4*&SjJ`70Tg{2s;RWaYHNWTE;;{+R& z&)~#X8R-|~ui5*B`%v?VM$_O#TyyD@uTE&Q#oYqI39D1oFN0V0%~C=31q_x&b!B}1 zjp6aPP%{Y5d0LZ{T@G%Q)~~18}8j{D%8)xa5Y53p4knQIAihqQNk#95e%a_*kf4^3$QwXz7yZ;|q5f|}P|9!I{@$W_ zS!w@!g8zi5g|%m;mKvP^cC%lIkvB`C*D8a>yuAXI7(I83b?Ll#Z6g-kqtN?d}!Z7Hn~kq ze**misMC?G029#6Z%loi0>K+gm(6uyt@kU^dPhEXlrG1?0hlqy&qP6gQr%{!QMOJ{YZ>Q z;#}o3IN_XhjA_RlFZ0)Kyuex;gUmVfvuKfH=*R8X3V`^;Wi+^K$z|>Dx-Z*8`zh8g z<;Q4(SFoGh(O}+ z|A{vEUnmyS#Pns=_A+@IJ34#+%A*)_6xY$gDsy|)FWJ=W{oH*Rs|j~A1;@ohRxc*7 z)mT`CrgA|*4N+Kkj66|O`93}#WxQv{uG!a}`RXV{k{A2#ev&jV*MGEifLBc8 zFaP}iJiyzg@f+*^f`ZIJ%;N6>cv-m~@A0E|^6JBH|KQT4Gv5;a`+MJe{FdM`F}2`< zH1-OG`2=R`r$ zhhp^iwVY8pjl-Axtv23XPI|zuvr(06nRZBN*>VxFc4F`F(=SNQ2mX~u zaeQN8f)jaHn0_(T2^rF<*}Ld+`mh`NebUg`WJe9OqXybh1IrUNaPr#8?dW?Gr$pAG z@7b~2?TCSu^QX6W3{CWM**k{bZ##zGtMe2lA9u5N3)#Dc?A=24ZXtWOknJu0{`MBP zt%bbSLU!Iw?F;V31An^7zh~2i<-7LJn>lTb4tH5c3cBkfcg>9vhG$8I z@f045R%|(H%=x1~pHo}4q4Dz?hOMncY*uV{;}Ov$rLXxZ9mTN_=9HrfiWH{+{2Ui!@~Rnt;twds}LDN}x%WcBb%J_|s#mdn9Fm>q1bxve#Sulc^MHMh0q)m1uMYi?`JZLN7}T65eR5ejs5+U@_|mDV`^ zq;dT3uz=^tnvXw;){rZeYh(Q7KY#iH!F>Gj<_es@yG4-qTa+vD;#Vg;J88XzHAa5q zI=xsw@>w@LdF#i~)QdVON3`OJx~JB+=VJJ>?{YMwywXv|XiD-)$6HFSh2n7w@fA}sR`cq&Of*33JyJ>G5Vc~-)sEcH*b5&UFNj{{|>!HE+`7v&<} zF_+JAa#9$}@i@*Ij`z3^0i&?Z3!KR@NYK0m`q zN{8-fe Wo_uO04<2jYWh|n%-Tsrk{r?{w1?ye_ diff --git a/Implab/Parallels/ArrayTraits.cs b/Implab/Parallels/ArrayTraits.cs --- a/Implab/Parallels/ArrayTraits.cs +++ b/Implab/Parallels/ArrayTraits.cs @@ -39,26 +39,14 @@ namespace Implab.Parallels { } protected override bool TryDequeue(out int unit) { - int index; - unit = -1; - do { - index = m_next; - if (index >= m_source.Length) - return false; - } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); - - unit = index; - return true; + unit = Interlocked.Increment(ref m_next) - 1; + return unit >= m_source.Length ? false : true; } protected override void InvokeUnit(int unit) { try { m_action(m_source[unit]); - int pending; - do { - pending = m_pending; - } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending)); - pending--; + var pending = Interlocked.Decrement(ref m_pending); if (pending == 0) m_promise.Resolve(m_source.Length); } catch (Exception e) { @@ -101,26 +89,14 @@ namespace Implab.Parallels { } protected override bool TryDequeue(out int unit) { - int index; - unit = -1; - do { - index = m_next; - if (index >= m_source.Length) - return false; - } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); - - unit = index; - return true; + unit = Interlocked.Increment(ref m_next) - 1; + return unit >= m_source.Length ? false : true; } protected override void InvokeUnit(int unit) { try { m_dest[unit] = m_transform(m_source[unit]); - int pending; - do { - pending = m_pending; - } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending)); - pending --; + var pending = Interlocked.Decrement(ref m_pending); if (pending == 0) m_promise.Resolve(m_dest); } catch (Exception e) { @@ -148,5 +124,48 @@ namespace Implab.Parallels { var iter = new ArrayIterator(source, action, threads); return iter.Promise; } + + public static Promise ChainedMap(this TSrc[] source, ChainedOperation transform, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (transform == null) + throw new ArgumentNullException("transform"); + if (threads <= 0) + throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); + + var promise = new Promise(); + var res = new TDst[source.Length]; + var pending = source.Length; + var semaphore = new Semaphore(threads, threads); + + AsyncPool.InvokeNewThread(() => { + for (int i = 0; i < source.Length; i++) { + if(promise.State != PromiseState.Unresolved) + break; // stop processing in case of error or cancellation + var idx = i; + semaphore.WaitOne(); + try { + var p1 = transform(source[i]); + p1.Anyway(() => semaphore.Release()); + p1.Cancelled(() => semaphore.Release()); + p1.Then( + x => { + res[idx] = x; + var left = Interlocked.Decrement(ref pending); + if (left == 0) + promise.Resolve(res); + }, + e => promise.Reject(e) + ); + + } catch (Exception e) { + promise.Reject(e); + } + } + return 0; + }); + + return promise.Anyway(() => semaphore.Dispose()); + } } } diff --git a/Implab/Parallels/AsyncPool.cs b/Implab/Parallels/AsyncPool.cs --- a/Implab/Parallels/AsyncPool.cs +++ b/Implab/Parallels/AsyncPool.cs @@ -36,7 +36,6 @@ namespace Implab.Parallels { } }); worker.IsBackground = true; - worker.Start(); return p; diff --git a/Implab/Parallels/DispatchPool.cs b/Implab/Parallels/DispatchPool.cs --- a/Implab/Parallels/DispatchPool.cs +++ b/Implab/Parallels/DispatchPool.cs @@ -63,9 +63,10 @@ namespace Implab.Parallels { } bool StartWorker() { - var current = m_runningThreads; + int current; // use spins to allocate slot for the new thread do { + current = m_runningThreads; if (current >= m_maxThreads || m_exitRequired != 0) // no more slots left or the pool has been disposed return false; @@ -84,24 +85,33 @@ namespace Implab.Parallels { protected abstract bool TryDequeue(out TUnit unit); - protected virtual void WakeNewWorker() { + protected virtual void WakeNewWorker(bool extend) { if (m_suspended > 0) m_hasTasks.Set(); else StartWorker(); } + /// + /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока + /// + protected void StartIfIdle() { + int threads; + do { + + } + } + + protected virtual void Suspend() { + m_hasTasks.WaitOne(); + } + bool FetchTask(out TUnit unit) { do { // exit if requested if (m_exitRequired != 0) { // release the thread slot - int running; - do { - running = m_runningThreads; - } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running)); - running--; - + var running = Interlocked.Decrement(ref m_runningThreads); if (running == 0) // it was the last worker m_hasTasks.Dispose(); else @@ -112,7 +122,7 @@ namespace Implab.Parallels { // fetch task if (TryDequeue(out unit)) { - WakeNewWorker(); + WakeNewWorker(true); return true; } @@ -122,19 +132,21 @@ namespace Implab.Parallels { do { runningThreads = m_runningThreads; if (runningThreads <= m_minThreads) { + // check wheather this is the last thread and we have tasks + exit = false; break; } } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); if (exit) { - Interlocked.Decrement(ref m_runningThreads); return false; } - // keep this thread and wait + // entering suspend state Interlocked.Increment(ref m_suspended); - m_hasTasks.WaitOne(); + // keep this thread and wait + Suspend(); Interlocked.Decrement(ref m_suspended); } while (true); } diff --git a/Implab/Parallels/WorkerPool.cs b/Implab/Parallels/WorkerPool.cs --- a/Implab/Parallels/WorkerPool.cs +++ b/Implab/Parallels/WorkerPool.cs @@ -10,20 +10,27 @@ namespace Implab.Parallels { MTQueue m_queue = new MTQueue(); int m_queueLength = 0; + readonly int m_threshold = 1; - public WorkerPool(int minThreads, int maxThreads) + public WorkerPool(int minThreads, int maxThreads, int threshold) : base(minThreads, maxThreads) { - InitPool(); + m_threshold = threshold; + InitPool(); + } + + public WorkerPool(int minThreads, int maxThreads) : + base(minThreads, maxThreads) { + InitPool(); } public WorkerPool(int threads) : base(threads) { - InitPool(); + InitPool(); } public WorkerPool() : base() { - InitPool(); + InitPool(); } public Promise Invoke(Func task) { @@ -47,11 +54,20 @@ namespace Implab.Parallels { protected void EnqueueTask(Action unit) { Debug.Assert(unit != null); - Interlocked.Increment(ref m_queueLength); + var len = Interlocked.Increment(ref m_queueLength); m_queue.Enqueue(unit); - // if there are sleeping threads in the pool wake one - // probably this will lead a dry run - WakeNewWorker(); + + if (ThreadCount == 0) + // force to start + WakeNewWorker(false); + } + + protected override void WakeNewWorker(bool extend) { + if (extend && m_queueLength <= m_threshold) + // in this case we are in active thread and it request for additional workers + // satisfy it only when queue is longer than threshold + return; + base.WakeNewWorker(extend); } protected override bool TryDequeue(out Action unit) { @@ -65,5 +81,10 @@ namespace Implab.Parallels { protected override void InvokeUnit(Action unit) { unit(); } + + protected override void Suspend() { + if (m_queueLength == 0) + base.Suspend(); + } } } diff --git a/Implab/Promise.cs b/Implab/Promise.cs --- a/Implab/Promise.cs +++ b/Implab/Promise.cs @@ -103,11 +103,16 @@ namespace Implab { /// /// Выполняет обещание, сообщая об ошибке /// + /// + /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков + /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные + /// будут проигнорированы. + /// /// Исключение возникшее при выполнении операции /// Данное обещание уже выполнено public void Reject(Exception error) { lock (m_lock) { - if (m_state == PromiseState.Cancelled) + if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected) return; if (m_state != PromiseState.Unresolved) throw new InvalidOperationException("The promise is already resolved");