# HG changeset patch # User cin # Date 2013-11-12 15:52:10 # Node ID 6a56df4ec59e8a1fa1f783feb68fe3527d129070 # Parent 1c3b3d518480cc457fb114ffac1ff53565d383a4 DispatchPool works again, but performance is poor in some cases diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -151,7 +151,7 @@ namespace Implab.Test { var pool = new WorkerPool(5, 20); Assert.AreEqual(5, pool.PoolSize); pool.Dispose(); - Thread.Sleep(200); + Thread.Sleep(500); Assert.AreEqual(0, pool.PoolSize); pool.Dispose(); } @@ -244,7 +244,7 @@ namespace Implab.Test { [TestMethod] public void ChainedMapTest() { - using (var pool = new WorkerPool(0,100,0)) { + using (var pool = new WorkerPool(0,100,1)) { int count = 10000; double[] args = new double[count]; diff --git a/Implab.suo b/Implab.suo index 453df4ad2d602dd29c42a1de303bdb9f56e12f58..aed3f9f75a5a4aef0daf4fc2d1945603eacc7f9a GIT binary patch literal 74752 zc%1D$33MFAl_lG1p*$ zcg-Qs*i(96eN1)zUsb<;{rXk)>$O*B{^8p{ocBkj31492*=Hk@*+kX5aR4>S!wl(p zJiuooBO}V|O94izp<=?P0283jjt8g(m;``zJ{h16U>d*_fTIAWO4ZJe2AB>o7hop9 zEP&4e%mz3HU=F~s0LKB$1F!)!1Dqi9N)ud92B-%(1>jVG(*PO(P6s#x;7owC0L})O z5Ab<_MgR+d6<`6ti2!y02S6);7r+VN0&oMg0IUaC2(Sp?9Dv2d+l-wHumoTwKpTJ$ z-~s?YKmcGVz%qd40OtXm56}g$0$?>jJ3t4(g#fDnIsw)KtO2+PU>$$}@I`al2rvY& z6<`~{Fu-Mz~MIHe*G{X@MF6@Q>MWTew5ov zMC3OFtP_+2wW@atsM|lv1Eu|A87l1` zYvubd%GZnJE6M_t0UPCKlo35_bos^7QD_ZYUz_d8VY zlyK*F=YP1*61?Dv`LB3yez5%=GyJ0t2fdSdKgkR=at0K|Vk&ay#cL8iv1c&U5f>BU zKvGDCqXQj6aElNU8|&AI@dP|Cw%e@q*I4fzj3fu+;^F~uFc}vjjrARaJ&|zmd~vud zx-f+}PB!t9x z>ZU6ijjRn1^hUQPtUe(rbcJIHYe#rsKr6)DBIfg`v_4fp@zpcr--Xb}tOW@g2j1-lzU>FO5&2iEEPLwe{ z&})R?4`t9=_#z3?XEQt*$v#H7`Q1dM-(WOD9`0X zk@U74C<>E)6QTHGN;>5q#ufP;PbgG=qyDr4N+Lq?21`*Oo*pEvP|}T2YZ*_m7tvVf zGN>mf{Mq1Y2WSDUs0Btm4yY{){P%!VvB6y{Jlo+Jf5rF#xc36ILEHeuaKdOHyZjl3 z`sxAvk(~6Ok%#cxhkFOq19!e}O&6mA6kINn3O)GU4+t9E65k4b^bev!v|A& zl*+jeoCNORPeO%iuv+&UV!6;n1XB2Fo0idMD?D2`AIN8Z{2Uk`w! zsfU;yz;9Nl`L>h>+y*gwnOgduTXcHT@`slesVDG)1Zja(5bQ*YgEa#N58XKcix=+N zfEE{PqIX`n;#Y2<&;$QnK#hxNG2(c1ky=Mhyp zsI@350fkdO{%gmFf$Lv8?~d7D-q?Qrm9cx{jxsy#VUH z3sRTHLSm|v8zBwFI*T|Lvmq!yq)f4;_#qAR^mkF6CaFF=?15R*2x8>W9PdYajeYNLvz98j>=aN9&4`u zm~x#>)6@@W|3NNT2Uy2@qdN7tzl=SboEPs*))(^r@HsN{>{Aep$C@Vl%I*3)E zv1qSGuQ9fOom$}dXBm`wFX0}5JV)Svfa<7M=DAYi$DSn!ToR&u50dWPE|*6xy;}+S zjQ_CLaL5Da%<}Gnr$OR=Wj-iQ?JBzt4&x7+m&iB&D@pg4+7UuX#d$t>6E{e!A&>lD z53p>ebgz|uX8B$TC**-2=U>l~@3r$^ouFexsqYC=$(U&W-1u=WSQ-BYNDHumMD@#h zL^H`u)S>L8A2qX$DJ~gZIt%l_k5abjN*?^!FRXyy1&~phCF05hKWc2t8Ijpy~P+26jKW?bVSeu=`{mpKRn}T(*k!*H2y$Z>_CN+ zMkn~2kF<6-#cZX~NfX3qq9?u)K{2rwqehBJPoXv9A!_`D!7q=Q+{71(tdYix_#2K< zP>1x;$g-8J4HCCBXVi%CNC0WumKu%V?6>(*g!HYIq)~!qT9i~;xKbJYJ(Iu0aYF6e z_64E0Zujiouy)DLeY0pn8PzlG_(40q_+#?3`@K)THuI5rZ1>C4M}C$~-f6$nj$gER z^ypCsbv)%=dX!~_I*XI68zy~5APXwK_X2MSvc}#E{R#Fa>&*(f9-6KhJ)~2-|kuc#-G3c&?_&6P{5C90$aj|AG_gxQWZoQ%g(p1jXXZEXXw$B zzqaW5Mbn?Q>tRukdvMf~fU?0+Nf+D1k6$}D_R#P}r0ELmS`^h$qHoGkww;eHof&=j z=j+zba^Dvnr%#_=t5yOIi2G=cCPMwJkL37jT7S}AX#Hhl7NmVv>-#sn+narg3MjV=p>jk53HRQ`b=U0uaI@>&+an*Hchmc!NqV>?bT8FN1SoS2nB<^*kC}_X zpE3^Wrjb#Q2G*Lg_kU=iWXe6{DWiPOjZrMz|P6N9D((+N!3Dl&5n(SJ2 zvTM!J++R$*JZ{a4=id8r>#q0zsej^!6Ld1IgmnA6xX|+Vku-><;tvXA?nUMiUA=GW1Gl z!H?EIKJKI12R>ZC{&RX1rv>4vLd&m{rkbS5KZ$~&m$&Z^JpI^X-#ai8*_7OMk6uNl z5Eg@f-DItH9Af_c?>W}~Ki$3Svqyh)`{@r)c;;MnwowZAWIQYKIG09>mO&5M6~e?- zUid1+q`9hTOq%;uw2=(`0OZBewoY$@iE)U2&#}m$Supknlh#u%5%P&VzG2p=*AxCqK?Bu>De|BxDCCzuEBK)en8z_QR`JJhjvPyt1TG zQ@oi7FUYU|UoEQ}${V{(!+$+^pl9|+)9Rypo_)RPrx#DtZ8hqUTVQ4{QERG~eq_-x z$>;X$uTku2*{uvp$Tc33?0AbHLvFHTDt8hekBp!gipknbjO@KZ66g44h~^@YCbYbS z*!l3^Xg7$$|8rSs+F!FVezH39@IUU?`Lap;4rBZ^sMK1S`TXO)Aw{wp+kbOgfX~Q7 z{-9h{=Csw{UdyDqxlX=*MZVq$*PG<~TUh2Eg0I8#Z2&s~z5#GMz#Ra00^9{~w;cbQ zaQzm*y#V(C>;l*g@NIzm0rmiVN0atLn&nQ3xfFntBYCY7Ch^gTRQfe_#E`^TY3e_JgU4PPsTNV0vk;9{RJa+3yv0M_aJ0;O@H)w*^ z;Kt>iTbRLvK5mu~ztNqZN@p+Ovf#RlT&s4cAHrs9oSLHV^IXve1r3AFWaJ z&|a!G`l?m7EhF7GaZAbkJq>t?%lfH)-IP}x*CXaZS-J*I%Uf#(PBlg=Y2pHXoh0o< z(k78dw3AAs`7m4mxT#`#ld2|Dnfc1D$&^yIVDG7Z&orf3l@I#z0ehLT!>ZLG4wtrz z>#Ou*)M#>C*{#~FHlLq`d@560$QN;F&pOs}4((a;sbXv`uWHU!%{jZ~T+UmXP}eNy{#U-GM!(r|LG5oAKGn1OxJjpezxl))_4d#dI0oX_KVWA1`~N=O^yd%T zExRwi=A}y>wni4$>eZoU2P0>p6nFiVne7>R+ne~;dKhj0;Mb}$S90n#X6=fBo(k=? z*1`WsR_(eF3$F4016vkZEd|_va6FUFY{(D}Il?fL&M(0rHI=lJ(~S3Z zY5xVThndZRANwZ7|B!M1bMHXG{Nvdb4i5YeGsy=OhBbh%12C-ue(0dy`F!}l6y9_a zbVA*&q?2j%!~8t_58)X$O8!Tkzc2xB{#w&HoJflOR&O*C5rcUALc+Qfk1q%Zt-dh5 z5aPp|nl8FXj@X%uhX+E9_5F!pG#&}}q>pKEm_DV!?G~Itr_0{j>=11&ts5FI(xGCR zkl38W^A?JWYXkPSljL;jywU!c5EtWhh^;fcU1Z}~-EFxFMZWqk9Tf2B2PJ_`_L7#+ z2CBx%idA(2-jpMmNZe7n@=pK!k8N2_yXs9i^ty4@Up=MAMofC_TL*eLPA~NjV zEC#oTy~}~##0=$ido)N#rktR>Sl%y$#6UFOFC;PM@_}A)+wAC&7>|d0MgO*N0#goz zBOoG854s*2(*M=nM2aQVv*I=V6{y~;wF;+P^!{eeCACKi)=!#kQBzBp!20C z9v6o3EEU8$8Amk}@SkDSi5tUMM70|efV6JokW{lkYF*vrNh;YmLwPvLoK|_L*RONw zug1$uV>@r{x4!WDhMBjCfBxe)uea)YoRNYwn{=qx&vW=s8ohrW%RzGcxtdhu%)P)r znf}vz#PG~JYB%>jY;AWvp>{KQ0C?)%@v3d7`R-pi(%1iU_-`?{jpS4J-yXebbN%4+ zAMIE&Vcd1^>+axABP?`YOY*g_G<0GA{#|o-ITtuDJa*`Nx}!e*+0{e6f8kKB4vVnL z9F1k3OJXJ84RM;6I&x;$LKTxE{gHG#)#0T_-1@c66_YQ$*&HwVC&vCPcG%UN(wAn- zS6{{E3*&B;tt;7bH(0T`Q)*4QL;p$1)VWRrDGeig?@-xuY2~)2^zf5#(iw5N<7($; zD^&{SKO=o4NsyC9l?qQPhYPQ0d(6?iqQp3IRxccWn!=Kk;b0a zee)yr3eQ>HY%76y9Dg}-HPN+g9O4@>PDD_vTqZ?hPQag|s~*|)j; zZkr2A%+=Cjw>X;pZI;$1x8D*7c%8OpPr&7A^QH(4Z0k62?3tPZd^wc5r+Gs@qP|+zjvs}?H9rWoV4Ck%4Q7JoD7RNN=S(PJ&|Fgj)S%u zR--Kc3oj|-QT(yYS58A^txi0bit&b&*<``q`)Z^UT72a@c-7CuS5nW7d(k~RE{*n; zF=y!KLLG496E1u5jn$}_Z&ak80rjzg4f54j-nK0LjQl0_eCA#}10D?mzH%IM88Z?t z6~*;dzOuEH>F1#(^?cgi^A%nyis{Pu{2Rml58U<#JbOd2>KNm98sR7VkDHmd|KQ$> zkZJtJ=WpP7MvFP{-vDrxY5a!!Z@A0%2-^* z8l{>LY2~L>FZ!V|kMmdVL6GxrT>q||Y2{P@53+Rqx5+DiMOqD-nPa29R~Yd8tz((A zqt`6{g#c6Jd!zif8l;ua`CDjF-DnbjORj`u{1`_$R@Myu7(m!0{w8DldkyXdR*u~@ zgZ~;Pok517+-F5IcRu2tb-mxUvz>?A>eLxo@|}*E&4WRu`TW9j#Ed-*R8Ng5ou4kg zwR_cFxa{ZE9fOtHa~EgFpfUHJsd|H_R!>9jE&FvO#w~G%eD#&xA{}Ge zu@2B)<14H6YQ}WWRm!}4m$n&a0=ppei&8=np7$y^W9o4=01648w5X-h&#&b2e`9=P z=P#DuHg3be-}3N7{k!6mI_|$+*9WKCQrcCD<7A)O{ldsO(@)%f@6&%9Kkq5=Rb48i zu`ZYYQJup!jrGsmBLLKY(POKhnb$vm%2>+kYmC1sS5hhc+s1+adB!vUoO=f7j6CL_ z{vKefN&L;m`R5*gp*O)Rm520Ut53f+<^Hd|Q2X7%w?A0z-*?9^bh%C&Ru%u?)V?)S z4?ObLS!drfspa06ouPlz^`@hQ>iM;WonKp?Ygx5iCUs*oQ_lyVN1vlOj`mg9vRD0_ zjZZz#*n1rO$6nVrCM#qQ_4&mw))YP~hHHdc;eUg!&9^}{6Nc8DX0j z0l`r=vUmE|wypb!OShrj4b`0yaNA;}MAM$tGE!EGy0?W_1 z|1n;Fz}x@)65s>*-Wb2}`mZI#Pv-moR{;*1#&6jFbN9b*0(@c`zp?$(2iIlHeEz!u z-jnagy8Y;l1)loknadVU{+RvcCqMc5UA|)?YQ+O-?9n`DhU)p{h1i|S)2>zZ7&TTN zqY9muex(Tgx06;mwv37V$?f@c)wI{!UU#~0emA-L^tH8Hp2#hUd}I%;Y)f*Hc1UT3 zat^h&ym$ENXC%D_{^0YtVh%IoE9YHd`q@wiT;g|1>M6X0F4f{JYH^k?WvNgu9s>{6 zd3uN6>}J(lVAWe-)mvcITVT~&VAWe-vAqSBd#O}+3l+9oNICDc_HFxu&|9~Ac5hg_ zWaqwFlP4^Y&O6O#&y4!Om0Z7Oo{v)Z1;?13fvUfgNm}TU?@p$X5!?~0>}1OA*IHW+ zh3#uvTwon_692`B-fp&%?4)suJ)Dvl)=@i#D{-e2--$y;CAEUAyaTE-Qt&J+F`n#o zkS*baX-pOn~q2BMF?k#LkK7P_zU2<=MvuG{Q#<90( zrv3XCn%Q^5J?>!)0EJGPL-WAB3;wskn@Zoqc%+}5RET}*D){zlCi%qqOtZ)m9m(Cu zB!3~;Gojz0TL8Waa4VDi_-MbWhROrnfD1- zokgk6qEu&5%5@gy@aSPG*P>aSji}B>6fzr;kE}PdR(GuI+nXw;rp(T!l-^Tf@#mx~ zM_%&H099(fsMcA9_8<)XOxHbR9dyf z--UW|>SCKo>I&pCHj`T1;od#IR%+}~kg?A(^aaVo zGuqiKy>_dlK9{rAs#DjFk<+}16X5I1p#J9PDep4tLtoxan^+i|7_r~}Q&!4VY1Xm9S{`Jhf|MBJ$oWHw?NqZ{H;CGlje~1S^daYCZk7sYU zTJFTvQe6eMO+7!ujGe&vWH~kDrNMh{o5jNjdEva8v(aepJB)` z;`WdphJL=3d|7C&3QKc4#EKEUX1mmJoduJy}9H?eUao})qYg9A64x~l~wjt`%(GWj|wmEZ0+bIPA$l)vV7#W chu|z&Psw)-Nay?U;K^@gyW-@!QtSW!0S}i8ZU6uP diff --git a/Implab/Parallels/DispatchPool.cs b/Implab/Parallels/DispatchPool.cs --- a/Implab/Parallels/DispatchPool.cs +++ b/Implab/Parallels/DispatchPool.cs @@ -9,12 +9,15 @@ namespace Implab.Parallels { public abstract class DispatchPool : IDisposable { readonly int m_minThreads; readonly int m_maxThreads; - int m_createdThreads = 0; - int m_activeThreads = 0; - int m_sleepingThreads = 0; - int m_maxRunningThreads = 0; - int m_exitRequired = 0; - int m_releaseTimeout = 100; // timeout while the working thread will wait for the new tasks before exit + + int m_createdThreads = 0; // the current size of the pool + int m_activeThreads = 0; // the count of threads which are active + int m_sleepingThreads = 0; // the count of currently inactive threads + int m_maxRunningThreads = 0; // the meximum reached size of the pool + int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released + int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit + int m_wakeEvents = 0; // the count of wake events + AutoResetEvent m_hasTasks = new AutoResetEvent(false); protected DispatchPool(int min, int max) { @@ -72,69 +75,89 @@ namespace Implab.Parallels { protected abstract bool TryDequeue(out TUnit unit); - protected virtual bool ExtendPool() { - if (m_sleepingThreads == 0) - // no sleeping workers are available - // try create one - return StartWorker(); - else { - // we can get here a race condition when several threads asks to extend pool - // and some sleaping threads are exited due timeout but they are still counted as sleeping - // in that case all of this threads could exit except one - WakePool(); + #region thread execution traits + int SignalThread() { + var signals = Interlocked.Increment(ref m_wakeEvents); + if(signals == 1) + m_hasTasks.Set(); + return signals; + } + + bool Sleep(int timeout) { + Interlocked.Increment(ref m_sleepingThreads); + if (m_hasTasks.WaitOne(timeout)) { + // this is autoreset event, only one thread can run this block simultaneously + var sleeping = Interlocked.Decrement(ref m_sleepingThreads); + if (Interlocked.Decrement(ref m_wakeEvents) > 0) + m_hasTasks.Set(); // wake next worker + return true; + } else { + Interlocked.Decrement(ref m_sleepingThreads); + return false; } - } + #endregion /// /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока /// - protected void WakePool() { - m_hasTasks.Set(); // wake sleeping thread; + protected void GrowPool() { + if (m_exitRequired != 0) + return; + if (m_sleepingThreads > m_wakeEvents) { + // all sleeping threads may gone + SignalThread(); // wake a sleeping thread; - if (AllocateThreadSlot(1)) { - // if there were no threads in the pool - var worker = new Thread(this.Worker); - worker.IsBackground = true; - worker.Start(); + // we can't check whether signal has been processed + // anyway it may take some time for the thread to start + // we will ensure that at least one thread is running + + if (AllocateThreadSlot(1)) { + // if there were no threads in the pool + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + } + } else { + // if there is no sleeping threads in the pool + StartWorker(); } } - bool Sleep(int timeout) { - Interlocked.Increment(ref m_sleepingThreads); - var result = m_hasTasks.WaitOne(timeout); - Interlocked.Decrement(ref m_sleepingThreads); - return result; - } - - protected virtual bool Suspend() { + private bool Suspend() { //no tasks left, exit if the thread is no longer needed bool last; bool requestExit; + + + // if threads have a timeout before releasing if (m_releaseTimeout > 0) requestExit = !Sleep(m_releaseTimeout); else requestExit = true; - + if (!requestExit) + return true; + + // release unsused thread if (requestExit && ReleaseThreadSlot(out last)) { // in case at the moment the last thread was being released // a new task was added to the queue, we need to try // to revoke the thread to avoid the situation when the task is left unprocessed - if (last && m_hasTasks.WaitOne(0)) { + if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false if (AllocateThreadSlot(1)) return true; // spin again... else - // we failed to reallocate the first slot for this thread - // therefore we need to release the event - m_hasTasks.Set(); + SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it + } return false; } + // wait till infinity Sleep(-1); return true; @@ -215,7 +238,12 @@ namespace Implab.Parallels { } } - bool FetchTask(out TUnit unit) { + protected abstract void InvokeUnit(TUnit unit); + + void Worker() { + TUnit unit; + Interlocked.Increment(ref m_activeThreads); + Sleep(0); // remove wake request if the new thread is started do { // exit if requested if (m_exitRequired != 0) { @@ -224,15 +252,15 @@ namespace Implab.Parallels { if (ReleaseThreadSlotAnyway()) // it was the last worker m_hasTasks.Dispose(); else - m_hasTasks.Set(); // wake next worker + SignalThread(); // wake next worker unit = default(TUnit); - return false; + break; } // fetch task if (TryDequeue(out unit)) { - ExtendPool(); - return true; + InvokeUnit(unit); + continue; } Interlocked.Decrement(ref m_activeThreads); @@ -240,19 +268,11 @@ namespace Implab.Parallels { // entering suspend state // keep this thread and wait if (!Suspend()) - return false; + break; Interlocked.Increment(ref m_activeThreads); } while (true); - } - - protected abstract void InvokeUnit(TUnit unit); - - void Worker() { - TUnit unit; - Interlocked.Increment(ref m_activeThreads); - while (FetchTask(out unit)) - InvokeUnit(unit); + } protected virtual void Dispose(bool disposing) { @@ -262,7 +282,10 @@ namespace Implab.Parallels { return; // wake sleeping threads - m_hasTasks.Set(); + if (m_createdThreads > 0) + SignalThread(); + else + m_hasTasks.Dispose(); GC.SuppressFinalize(this); } } diff --git a/Implab/Parallels/WorkerPool.cs b/Implab/Parallels/WorkerPool.cs --- a/Implab/Parallels/WorkerPool.cs +++ b/Implab/Parallels/WorkerPool.cs @@ -57,15 +57,8 @@ namespace Implab.Parallels { var len = Interlocked.Increment(ref m_queueLength); m_queue.Enqueue(unit); - ExtendPool(); - } - - protected override bool ExtendPool() { - if (m_queueLength <= m_threshold*ActiveThreads) - // in this case we are in active thread and it request for additional workers - // satisfy it only when queue is longer than threshold - return false; - return base.ExtendPool(); + if (len > m_threshold*ActiveThreads) + GrowPool(); } protected override bool TryDequeue(out Action unit) { @@ -80,11 +73,5 @@ namespace Implab.Parallels { unit(); } - protected override bool Suspend() { - if (m_queueLength == 0) - return base.Suspend(); - else - return true; // spin again without locks... - } } }