# HG changeset patch # User cin # Date 2013-11-07 21:25:42 # Node ID 7cd4a843b4e4084855ab73d2a036bffaa0705f4f # Parent 5a4b735ba66981d684eb477f0f5e697d2200bf82 Improved worker pool diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -99,7 +99,7 @@ namespace Implab.Test { [TestMethod] public void WorkerPoolSizeTest() { - var pool = new WorkerPool(5, 10); + var pool = new WorkerPool(5, 10, 0); Assert.AreEqual(5, pool.ThreadCount); @@ -119,7 +119,7 @@ namespace Implab.Test { [TestMethod] public void WorkerPoolCorrectTest() { - var pool = new WorkerPool(); + var pool = new WorkerPool(0,1000,100); int iterations = 1000; int pending = iterations; @@ -244,7 +244,7 @@ namespace Implab.Test { [TestMethod] public void ChainedMapTest() { - using (var pool = new WorkerPool(1,10)) { + using (var pool = new WorkerPool(8,100,0)) { int count = 10000; double[] args = new double[count]; diff --git a/Implab.v11.suo b/Implab.v11.suo index d05880eb554a9716b818e81f9e8e949a3388219a..0324d71b1911ff2672c0c8ae7c7d19bceabeae2e GIT binary patch literal 121344 zc%1D$4R{;Hl_T<>ID`bkPs)#hlm@UB$v?84ABZelNx(^rD`5q#tT_&C4_4`ma&NEH5LQcc=P1g>#1tZxc=!_j0)s=hx~AnsL%FBuwn`FY1D z@V-TP)(IG+WHVU}*!$tGoous+|1byQK1OYKX1@6&q0N_uZW1I3vW0XI5w0>>5B~+A zi9JsgNNXYBKbEB1ycztj158ih7fB3C0dXc&8U!ed5d3cg*h<{QMb6Bb))~+%we@KR z|3OW>lL0=0-&55*%vY}mbF>%j^HbEg3)L$thvVUT62OT7*e3pqU_B9X9Kala$p9w+ ztXJb>-7O#rf2mUG!Eh_wpHA~gXbIE+)B-pG76Z5d+yHd|^#Bb3jQ}2iCV*xDY-URU zmI5pTSWb|)5pp)b3IH!aD}WE64WJ#s53mwo6~JnMa{$f-=m1!&(!7q=T{f@lfoF38 z&Ih;v;37?&Ubx?8@Xc5T?9Er`a9*wXjrZ56ZQ|2#y;i+P_^f*WdAME=@Xr7{0KNcl z1Hg>{Uj*2x`Q|3LzZu}m0Ji{q1>jcA``h9E4uCrWb_3i6@HK!v0AC0A2Ee}n+^v4| zEx7&%zuRl|-KUc3WsMiz?({| zX+_{io?Gis?pgf{K!psY5eFKu3|M=JlR-*_2=@_#v|LdBC{N5J=Er|+{?`FMnMja+ zgu(y`&^*hu{6)Z@KP)UF|0pxdhb#B3wEu0Pxs1a#jon;0z6ks{&Ms2#wciHR-#Gqa z8KF$@PXz0jkS>53L@7HQqvvVj&W8J=0FG7PYXkCx!_~OQk?gUr)8H9~={cU;Ao21o*I1sDRjOoQ`sxZe)&N%h&4aJ|aLWz56)1TOzgZ22z$oF}A? z!drpcgsCLb1^)+N&Lb3*22uXO`PN~i^y4qeg7qfw<2B@IDA*Zf1#iiolOyvU)9C z|M>&9Kh&k5zQrtl%)yX)ugISgU<(5F09<7%Z6hUx>K;Qi;6Fyqk6HXEv&>ZQ&x5j6 zC`iCvQk@McbCMv?AP{G29%96g8r1f`*aH3o7+1YNpXN0T6bwNPV;Lw|-VDNhR{c9LFH9yhgZ~wx=(#X-(-h?lIuNr2*oJAlz_+GA&=hYw_^-jv=m0ryGuz85%k4F1?S&`U>#5Zp;{k0aM0yzM1*z=b`e8R}v+eS%|G zAB5E3>cvO;vxU*9V4ohM}ZVSpmWaDQpTb=Mq5=Kt2 zcL6~XLb4D(Ys-cWt-;V1F*4v61u0y!CYq4Dgpd%CrQ!7nA?z0=p%mrc06ZG&pqTRK z^S_Bb|3lpme)lQ3GX2kI0ImZ-{m-? z5Y+!5FUR(TJoO=BPXB{?qQ?RD0{jTzIe;etnEvN!0M!3H1AzLUp8{z0KhFcaQABxI z4Sxn4M~*R`LhY~4#Q1{};u2!M|6zG2)%(Ku3(FrEquBy}tUt!r3gh<_#IG$+Gx+zB zwESC`|J4=3Ph}+ZlQ4__UrAd2DBbugeX!&Dvwwfa?!z}fQhnq*2hcE*G5+>zey7L3 z1dgO_Q4OI*Ygag-1vz{QEys<>8kg#nzgp%{%R+NXk zqfsg0>=tA2FDb@4A?LvXp{BY=h$qBoWVzezbm3o3wJ#~j$+)mQA|&N_P^zi!O!i4) z=v)EP*dj!h_tA!?RN%bH2RwaM^MCd0{a^p%XW9kYL-gz-OIBH;h%c;JWHurFuK-;8 zQ}|{tUOe^OhpW!J`K|qbed5*o?>59&NHSV$1%D51%t_j{wo)^@oK^ml@#bwuU^pQQ zgCP1zD9k~jCY&pUh!7V;&Nh)g2*!su)?IXw8ZjWpq4CvJ4<Unzp3Bt4mY}j^}#8a`{j8=V&kUg z3?!r2L9u^g3MVhaxB)?yp->X|M&CnO|9i_fqu8!TLzk8MU#$wdsO(Z^0IR6VFH9L+ zTy2;AJyrYfdwtRAw@z!m{n^HWR}9-F3kP30^`80nUAyjq=eNx&s{bG2FcJ0tsQcpj zeW zS3CdL<6lFy8d2(ps4bIRZLvm`znPDb8T>f6?aqO}l@1L-XnirjD^d|5kwHmjXec88 zIKO865Bs=GF-NFyg5@vKHj!x=@lpGStO_mZ(paMApZQjq!H;wOThx0dYg~-t@ibMA zm=5A~EEpH!(^1+8h?fau5}Cd;pHQS;+b>llRF=R-_n1mZ7t>;#!4{@W#v415iNxg) z4ZKjz19WPIvd2F@+W41uJ8E}d@~LMw-{X|fTir+|PU`_Lv?BjxPw09(&42GqF)I|m z6LjhrN4s2dH`N{_BNG7kc5AVNzQ+4w3_qMN?eAs9H zFGmeJ$y(_9-K2YT^6Ogaye-14O4QJ>XUYjHKYPQZc}I@S`PS;SyLK7Ukf5Blu>4Wj z{(;FgSl~bCpQ!ggVLaAH3p5FoMN&54Lilk#N38|?+VMw^f2C>%k7ZDjJ1heHQ;1^I zEd~C7Dt(*7KKk)Dk3#3JM;u4G-`aQj&bM#5yH;pL4PSOEYAvSyX7<_l-ucs|&%Y77 z|CK$zx%#ACSNxA5o?P;OuQmRc*?U&gMlWl$0h!$Rp$mWZr8|Dm{n4j?e%6-v7oS#R zNZIJ*|DNLbZ)Q2JqTD>D;Q;gEKki$3$KgwUwDzO;-xv69H>8~6r*gMa?I`VSqjP{b zwFQMtcp;nu`JY2FHGPG`?=<<(lycf=21TfX2&JNM!1)K(%MoO3>`(c%uf^Ga&=;FY zaNShe90a+KdEcbHU3yZhLZT{XOb!|{`UhO!_cwL^vmSCOss2uBp@Ox?!ZYAgM#+Cy zQJZU+mUaLriO`lHsdm`0$KOUJ$t0cGMs)K~91aqcK`JfKh#FTB?H~Q3+vgR=&osaJ z{Lq|$A2opZ0+fEw24tnAYvE}l zaCI!|$<5;b z62K30=s&SOQ0|T89re-y`ycLk!q)eh!H@HCR{y#CUzCnNg=BCx|7PnY&EQ9$p0y_V zpW6ig!F8giTELHE2bG42g#b(e*U=H%IGh3aaes{27Vx9LD$BoiO&ZS0 zP?v%AIZcg&vK3q3P^IEQT|3VCaL$E(w^doV%Gl-s_AOi|87&Cp+A+Z zp>0ULM?KqR>iu^0ife$``d{wwj@f!*P|Z|JAQy#W5e{e$+&L z%OpM)&|eLC<_i|T8|#36eA@NDxK0_@|Kj=p#veBk(~Ec zo#6Uk+?(W2B-8KnFL3{+L7A2!aQ)xJ*8jDy@pt3c%Pl0qKxfK-MGmr( zwxNL)B!+H~642=|nKF4O@=!++bbDz#)vk$TDPlczSxOPxUaK$A`TpCT@A|LXch1d6 zUAv*<+$*QN3WDFt)_@P?F>B@>ZR$u>Ozqv5ynMnO|I5!i9GaR3%s!MP(=(v$Gi*{%=>**eCO3z;kt)z*z6?Rt#cXhNCDLS-~5tL|Ss@Wt) zWu?r3(k}9WK3ap?eXy`sp$J)kKBg;aFkD zG>vhOp*XTzY1*KlX#~V?=F{}enVvEJ$5Q!FdX!9KFf5AQxeqW5r!2hbZx`gCuZ^0z zVkzxJHkDne);gNCRK@T?XI8$vRBsxmb)TeTiMGCGrOGGM5Q^_`E>+mgfz^sxeuMFk z$ikW)>C&;V&1Sl3vu{^Z|3_tKGD&UmQbw#}b5NAq#Ef4ZW!Som9CKOum}}`(nsklJ z*~lF{yLTOzv&Y2gvthbwJxF^g`>w6MV>N!eG1Vs1`NLLKt;Xx%c5pJ&QK!5e*lxbCHFSqOP-7v(MjiQuKToQc=bx-}$8?Wv5>z6=l+%yDUa0Bca5Y z+xZY<%q9A9F55k(yyS&q@! za^6y`wG^}wjUQH{k*req7S6MbW9a3u%IJkwB+cSy(z4l)#F4zgUd+^{Ivo!!uKwKTDbQ?u3F%(0j}7?YM~sx z@D5i7`{60Z^uk{w#B8NswWzCFDv7&usgz5lqnAp@vNvm=W6+Mtyp?i^m`lWVl8Bk^ z_5G=DOuc&3o4%c&9C>Qe{hJ?su13+lo?(eoFspd>vrL`q_rrf3suZ28Rx;18b5+V= zbdvdKY`Ntli=z!P+q1&RD+FWDHjJHVTg*LI(c1|7%C!G0p4(<*oLWnLIz9U5Z{6c7 z&r4-mOPOnEoYc=OPG@9hYHnlM`qxV%+@tVQqq~Z5vTlTLuT|LV=WOC z(uq_ar$qZep}0n~?{ej$qtiJ|ByMwJR>@dNrIOl=e&o$=Gc3l$(^FNRv@yq^cD3=0 zN}zIQx%0D1BlkB(B~uw%nps(;pu9KAaiz_UkI0etbj-^2=D?L1g}phrzL@Ka?VvBN z+);?@TJ4{^(d$~d?~*;{#9UWg?lsDF#avfB(RIa#(s!=%xFpvTb3HNF6LUQ=*AsI+ zG1n6(`2PQ7V(g6XlFXHXb;$cCRc4%SI{A z|6pe?n&m%FkdmJNF)IF7g7ZJ<`6E{L-?r2FAC+?co0aoV`T1{J{0f)k=fCjtU-3Zl`jBhbMT5){rf9^u~Y5PYmZ5IF2MA?Jg4F0A9X`%iB&o5bS0sk`qNt5{NOv+zY z%KxV%z5l!&XXKS+&(72ywyOMrdqGW1FR1cqx5azb)D!F4EwkgO)!Doz5Zenee45d;viFbS=f73P`EORv zzvbt@@$=u<`ETR1%UQXOd))tU;`-ek>NVQqBX{ugzxerI{QNHr{QNI|{ue+0i=Y3+ z&;R1utaKhHhOArux#Z`6@$6n@%)omZ5We=p?c ze{rdfOLb!{)$u)a?7`i+RLZ5&^4`OQw^A+;iI@_k+TzOH;< zSM%q8ak~Y#TNEcllxwc}D5xC;`M$pV{6B6l;`Smt*^4Sy3bmiDCA$anSy1J+zwB8M zx3!ht*2aBT?J=L>w!(6+QEn?7Yg-}Lg5^KmrhIx%{+(uV7HS;B-qss#E9U2aayv1% z6LUK;*N}5NG1riAJF(p)q4}7%XqlNX+f^%B?v+h<QNwO&f+Bm?V;9Aj5d3& zE+KewuisMIxz6-etlFKH(kgr=t+LC&PfK9AuA|($nJhHaj4jkytLrJ>b^||ONXxB_ z<~K+w3uU#K>~`tQb2hDV22N`cEk<81lP#Ba;S+8N@Cx+98(mv7I%SuTQvueFy2oRS ziKWs+>;HwL>fiaS|Ae52ik0)!dkdUrOz3)mWtLLRJU5w;871lu4Im3(pE%V0;o}y$ zTXr4lrHM3m#`)f+QKfC(^MlY;K))+N&9FN+*#M=bl$i*0VQG2^?%;uIiWv4TZ#4j-Q2F$gwCO?*;_Jg zV*y%XGD%xYPA1kYz{XameD({J58H~enCUHgxc|3xONhPAxx-w%mJr3XkOXZNZ zFy*d2mil~)b!W~thLc_^w$ohYoy!{l=d4yN$!V$+nl>$?cWgUU}%( ze{uhBGydP)|9@iFKllIV{{P(npZouF|Nn{U|Ih6|-2TJuKivMq?LQOK{$s`Kk&sC= z*S?*f9C>Qe{hJ?!@`4=BB>Hk=Q*=Bk;@BNKp1;i3+xFeQj+!@a-TIz(`d(G@LFn<| z-c@^scF^*7s70nv57T*n>Kjw9-t=Y)cfPp#gEt&%C(@^3^sTnd&i9+vrYMToYHv=&CpXbYLYVg|fp zo{Cv4Zk*BP@Ii`83ltfgoN zrKD%H?Vxl%?N(}}lvqnqBc-Hgv^7!(>7g#H9e!hPuZ6ohTK`B3>d5dJ6YsQQTh%n> zoHm?;7Gm#Snsytc)JS=d3(~EpecVlTb}bfb)Ew33u$H1m3(+&h?Q}WO5s4I6eiw{V zMb%Z@v^C$f?SVUYU(tHQfhTU->$&o!cgRCCjYi~D$fY8C?v|>*f8+aK`C5J8@4h=b zJ#S4jN@a`n{F_yYKX`}h{l(7>xxV-HANXfHbM&6cM))O5?a8P|o>K}HqeAgpGhSNw z*K5A{QR{=(ta*4>)8o^P%RLj}+R`dp{`>CzeMh}FyE?k((HH9;yJWUuscC~1j^w4i zshyT^JLQS3^uC^IdEKhq;-&Ar>IA||N46F!?I_%`-lD9~2w(eYiYS$|K)N^@x>G#4 zneyajTJv7Df4ixq)I>SAn^NF4jTO6#mrA7~ZJs_Tn*miqolWwsBVJ0*f*3PtI#_M> zbS%fJk?C)xik7KUfC!D9zxrAAcG0UcOG+5%#}r!?i(O0GygOAdSQAY^C5}=X=#vc) zqY>!YO3z>mHOCbc>=bJ!_w*SjK0Q+Z=66T<~|f zr{3RE=X3dLJ?>V2Z9}uqRqL&9YN~B+^ECPDeIAdy&2`20)q^o9*yrpP5^}HR-Wf{7 z;?d39LrZ#xqVX-gB7LP^oHE7^CWa#+yh}jj?b|Q+dVEa{4c_KjS9^oEw!!c7)HXMJ zytOV*LtS&dtFgh;)NsXD7-?U5~M_LSWLu%a%fX58kH=N_T_OP4POdzi-g@W4ul2c@!+r=4~mv* z^ZGHPOoVrXVnVP$%ooRun89EqI3R?rll1wqt2xWGjA1K^M+f3UA|XaZS=cH>A`m2o)Op_G3VV;*vq_tc2X4JAbS!azh;&=}!QO||n>PCnrN)`~ znC3|@%Y;4ol5qwmX`Y~0CHfw<*;djr&clhCCqAP19< zLhs97`Yoz+6L1YtLFWtOP6KLvCBV>6DPWYa-}a$Md8((#sIyt2me+rI*Z-lQNTg;r ziN+UF)01?F2uYX*Nj24-$v#O8ohuA?N4E%(<$bRD{>J{Mez!Z^=nB>ct0txQ;v8X9 z;wo&>!WuCYk0zr1va?4_B!f~wPKL!O<)7z^k#KaVrn*OnC&VazR!s`oDHRW}IVA0& zrVs2n($iHiyYNs%#gpIKA^5U^fc50^As%GY38+A9`hFeO&c7VzC!d4se`~ zlK1=0JNxNrznptxfb3nj=diojb1knh8yvd(OnxN^M!DW1Mh5(%Acap^)|HH4oqPLb zA>Nu4rSMr>Hf(4OhEh>zqG;7Cvi(GsyU_v;e>K&YclcVdeh6-2a#R|8oCd?*GgEf4ToJ_y6Vozuf;Jnr{a`PiR*Ak--2a#R|60$(Cp1r%5!GGO-W}FBBX4TD#C!11Nx1(n_y6Vozc!h` z6sDW||8oD|vGxDup*#ck|KbSvO3L1i4cR%Kg#!p#FW$nrK4q5<)PA zl7`nOgfJqdRGbN93ftE8cLWo1yCe)kjODo_FB^;}ebGUf8-%-sZE}a0kh7WsG|Z_E zhqrLiv~QDzNJ8urB~c!RUqUD;L#41hV6{$H8tRmPK+_t1idbdSwg#o7uwet4uGNjq zz{;45|M2PTNwd_fOm$ESXse!qujb-E6yIjgoYf68>V&gfj3u0hvi)OAv|ztCJQn4` zu{$#8kvWguDu{D$p0X)?kF&$`-71py1=tUz`R7UL-#G}*L?5EoV#V*(sh%}KGB>D? z?Yd#m`QsNap1V`{%WGe~4izoN{s+vGZo&V;@CXLp|9SuC{eR;2|7RSPa$eViA55wK z@E4cvyzSnH7u>LD*SEDrhxY^8-{}M0S}aLI&>@E<;2p?QutS{*Y|tx77&*b-1q4Y5 z$wC&ZGh}6@D1UpL4<&uDtL0!1LQ?6;=O-IQ%~u*9_DE diff --git a/Implab/Parallels/DispatchPool.cs b/Implab/Parallels/DispatchPool.cs --- a/Implab/Parallels/DispatchPool.cs +++ b/Implab/Parallels/DispatchPool.cs @@ -62,7 +62,36 @@ namespace Implab.Parallels { } } - bool StartWorker() { + protected abstract bool TryDequeue(out TUnit unit); + + protected virtual bool ExtendPool() { + if (m_suspended > 0) { + m_hasTasks.Set(); + return true; + } else + return StartWorker(); + } + + /// + /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока + /// + protected void WakePool() { + m_hasTasks.Set(); // wake sleeping thread; + + if (AllocateThreadSlot(1)) { + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + } + } + + protected virtual void Suspend() { + m_hasTasks.WaitOne(); + } + + #region thread slots traits + + bool AllocateThreadSlot() { int current; // use spins to allocate slot for the new thread do { @@ -72,38 +101,67 @@ namespace Implab.Parallels { return false; } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); - m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1); + UpdateMaxThreads(current + 1); + + return true; + } - // slot successfully allocated + bool AllocateThreadSlot(int desired) { + if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1)) + return false; + + UpdateMaxThreads(desired); - var worker = new Thread(this.Worker); - worker.IsBackground = true; - worker.Start(); + return true; + } + + bool ReleaseThreadSlot(out bool last) { + last = false; + int current; + // use spins to release slot for the new thread + do { + current = m_runningThreads; + if (current <= m_minThreads && m_exitRequired == 0) + // the thread is reserved + return false; + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); + + last = (current == 1); return true; } - protected abstract bool TryDequeue(out TUnit unit); - - protected virtual void WakeNewWorker(bool extend) { - if (m_suspended > 0) - m_hasTasks.Set(); - else - StartWorker(); + /// + /// releases thread slot unconditionally, used during cleanup + /// + /// true - no more threads left + bool ReleaseThreadSlotAnyway() { + var left = Interlocked.Decrement(ref m_runningThreads); + return left == 0; } - /// - /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока - /// - protected void StartIfIdle() { - int threads; + void UpdateMaxThreads(int count) { + int max; do { - - } + max = m_maxRunningThreads; + if (max >= count) + break; + } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); } - protected virtual void Suspend() { - m_hasTasks.WaitOne(); + #endregion + + bool StartWorker() { + if (AllocateThreadSlot()) { + // slot successfully allocated + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + + return true; + } else { + return false; + } } bool FetchTask(out TUnit unit) { @@ -111,35 +169,32 @@ namespace Implab.Parallels { // exit if requested if (m_exitRequired != 0) { // release the thread slot - var running = Interlocked.Decrement(ref m_runningThreads); - if (running == 0) // it was the last worker + if (ReleaseThreadSlotAnyway()) // it was the last worker m_hasTasks.Dispose(); else - m_hasTasks.Set(); // release next worker + m_hasTasks.Set(); // wake next worker unit = default(TUnit); return false; } // fetch task if (TryDequeue(out unit)) { - WakeNewWorker(true); + ExtendPool(); return true; } //no tasks left, exit if the thread is no longer needed - int runningThreads; - bool exit = true; - do { - runningThreads = m_runningThreads; - if (runningThreads <= m_minThreads) { - // check wheather this is the last thread and we have tasks + bool last; + if (ReleaseThreadSlot(out last)) { + if (last && m_hasTasks.WaitOne(0)) { + if (AllocateThreadSlot(1)) + continue; // spin again... + else + // we failed to reallocate slot for this thread + // therefore we need to release the event + m_hasTasks.Set(); + } - exit = false; - break; - } - } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); - - if (exit) { return false; } diff --git a/Implab/Parallels/WorkerPool.cs b/Implab/Parallels/WorkerPool.cs --- a/Implab/Parallels/WorkerPool.cs +++ b/Implab/Parallels/WorkerPool.cs @@ -57,17 +57,16 @@ namespace Implab.Parallels { var len = Interlocked.Increment(ref m_queueLength); m_queue.Enqueue(unit); - if (ThreadCount == 0) - // force to start - WakeNewWorker(false); + if(!ExtendPool()) + WakePool(); } - protected override void WakeNewWorker(bool extend) { - if (extend && m_queueLength <= m_threshold) + protected override bool ExtendPool() { + if (m_queueLength <= m_threshold*ThreadCount) // in this case we are in active thread and it request for additional workers // satisfy it only when queue is longer than threshold - return; - base.WakeNewWorker(extend); + return false; + return base.ExtendPool(); } protected override bool TryDequeue(out Action unit) {