# HG changeset patch # User cin # Date 2013-11-13 10:03:00 # Node ID 5a35900264f5d7a3e7a5adf452791da58790804d # Parent 6a56df4ec59e8a1fa1f783feb68fe3527d129070 implemented nonblocking wake singnals processing diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -103,15 +103,15 @@ namespace Implab.Test { Assert.AreEqual(5, pool.PoolSize); - pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); - pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); - pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); Assert.AreEqual(5, pool.PoolSize); for (int i = 0; i < 100; i++) - pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); - Thread.Sleep(100); + pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); + Thread.Sleep(200); Assert.AreEqual(10, pool.PoolSize); pool.Dispose(); @@ -244,7 +244,7 @@ namespace Implab.Test { [TestMethod] public void ChainedMapTest() { - using (var pool = new WorkerPool(0,100,1)) { + using (var pool = new WorkerPool(4,4,0)) { int count = 10000; double[] args = new double[count]; diff --git a/Implab.suo b/Implab.suo index aed3f9f75a5a4aef0daf4fc2d1945603eacc7f9a..2e658bab65aab728cc1f2d27570c91f2140bb05f GIT binary patch literal 75264 zc%1D$4R}=5l{Y{jAPQ1M#J`AQ1S1pR$6QQx3W~Nc3ISJ*{!YHTDM*Ox?PukRzBBOv*+CR?#!E)H}l?| zpD+XS@}0b&dGDTc?>+b2bI(2Z+&5l7^_}niXx1Ny>i7Z~OFkQ(NX9FkjRB}&H&dni zu>hY94-d1)R{{)|kCHh)1sDf)b|ye2zytuS^N9de04D=X0yqg^vQ+Kl6o4rJGXYKo zm0Q%mDKM>Hw?&Hh{AM+yHg}2Y?fx0bo7Ae1HW2mjEoJyiLd=fW-hS0h$0j z04o5z06u^v080Uy0WJkt2G9<$9AFJV3qUKtDuC4hZ2)TlIsh&MSO*{gd=X#+KqtUP zfXe~=0Gj~53eW{0%H?vSeC~mB0H7Bj2yg|!W`GbtA3#4q7$5==1;9Ft!8r~v05Ax! z1z;<{5WqHos{pnGTn%syz_kEhqO^zMFAjC#g>>VP9)4l`m;qeFc&SF`-FF_|vb^J+ zLlr197CWV9t$;BMIcx^(J#givRP*2;lOk`7em7M-z?XH2F+gtG9%e-FSQaJYqXzxFW~@MF9Eyi9`;{3y5C zi^y*XSSKh4DizNdDBC~E1J?er3|aff_ogep#rBW#fyn@rFWCOEonrexml$jR3*
~T*8Y|G&}#oKBDH^%9Vi1>`^R>|+CP4a?Y~8l-b(p;m3+qb-!5OH z3|K2)e^EYPCZACjpbXe3-=mD^lCQf-vi;*(+5YjqPd;NG8r$eovW z#5`ixK%g}$#>D=(5D$j?TLu4SAt2UPcZks#+%L3R%=E9e+C3184@AX<{o+78DuimQ zTL-#ALH{yws6D({>|fYrsqeA(ID4$tZo5UO7ba9N%y^6&SRV98!?AEr+}sh24G5vO z_&|3sY^Edx#AxE8Jsb|L4fc13x5UgIAuhBBBQbMpu)kmE3W@Z_yEQKM$AVqKP%u7Z zZbeFC@Wd}(+(IwJD1O%#4@X7J=SfL@Du?2$Cdj|@p^sS$d=Uky)d_st2XZ9@Qml*i zJ$Ou!-&f*98Pf&a6M#RIL2KcQI7pvfxYLt;gyQCxzo=hKPlLY+Km-~#1NC9xe(aMn z(QzTjKc*uX#g95pZ5sU8|Df!R0Nx-_(63Vpa{&JZa+*f*SCORrSL655Ja>aW5r(Hx zN{b$T#9PIIA6x8-G1B!oBOe$zFr6@L@I0Rfuv_HPqNvkjxIy>^ec4 z3PKL?-55X*)k8XHFE{{ShTyIZo(({GU|m$xzR(QN32(XKKYkmBbBvZ0^7+?Cod#?# zA;8c>DPWYa`{JQUdRhz=1*v`$qVJ0d>6HC3CeP1!j$Gw8>QBp|Btlf)U?~cerw6E3 zDCx$iwTz{2FQ8+ctDv6j@MnRu6`%pMq6Qf8*r2vd@ZSYe#R6B2aBqcs{1xB#!L=Kp z3EuU=8+Oog(#xMAsIM--AIeDooGj$O(q5p(za0Me(Ozr_`XY&zFpOr>N#R<+kK;Ym zOO4{kv1)r7{M!AGl9JTJ2J#ojyt6p)d>fEPP>*Hr*F%Oj=f@WG~ zz<&xSer5jKD2ERu@+g&a58MT4|0^VZ(LQRk;D5A_%#;~mC2-#r!1XAD)pguKM=+U> z*^qzaXB>kW#gFn$p|vyy5*OzI2|FQA&EwrrBP6W^_(LDE#io2|faW~lk5ga+IMZ*~)<^j(y9dXn;omlmlfaDxPCfK(8y zlolJQ2MjKHWdkg3xM~7g9HfpubHf?Gasq`e`0oH}9F!J4PDo;P01h8~k11Iw2Fz;{ zJVOi(^r;C_M@pQu1U&GD6Q0)tIOyA0N=C7|@{Tnhe2`Cia^(XHq=t2Bq4n87X=#8z ztIQEz_|imab;7j?-tj^D$i?-54>`aEZ@4*Y1TDMxm6h@sri5+44tTwgBOm2CqzQS` zt!U9mjR&s1WK}iVUGvGul1`TIG?}#YQ@wdhVw@dJ8D|Goqi&?c!}U#s%GC(%_ps+d zO@L|0O`ItSwEqfGJ`X9?+V^%19%aub*Z!aXMPdKay<2Vz{OlFs4I<}MozBPO-y45j?|-{*zh?O5vHPxg zUhBOY$Q4?kHv{bg%~gz!EiCY7rDF&y#mVwzh8o!ll(W8a-RqO+@0i1gY(>RdLR|&z z3>`X_k*4Lo_A_bCu7G-Pht#F9keDdtT1bOgXA$Q@G6>~|lrdY17t%0Je><(yIIRyC zd0?6}f*Ae`|D$EJh03HjZF8szv{Q{ZLa9Lvs2>i&uRZV=qS}&}(2&?@9<3`Z$7|?l zD=b_0!!kc~%ip*uT`{U{1Zl3*X`0#x+P~4Ri#km<`WMq9P!Gm33RC?%D$Ad^j`f4$ zr#9hg4*X~o8$7s}V_`L8z6F~YR(@zatq z(Ehpc<6JNs|N5vFU;&BhmGy{vDl<`svQqu1o@`8T$;i@~p9OxLd#byZ2S4@;%i(tc zWK?R2II_Tx8rxF&Iurk)Otk8xh4L5Y)UdBJiXU^p>=?;D184ZWwEbh>(nfm=k@kgn z)SY+U?n4zyYgd!I&3#JZF5CXA(rUJt;h@cMA zs|UTI>FyORBLzAHyi2bqz>Mwqj!8Gf_!4lM~&o@o?L6h zMXB*p3|@K63N9(-6yLY(uY*@Q^=YeT-LK)RF_4q+OzW77@v-{jn zy>aSev&f#;rVRfqoxD?jryjqk@o3SbJe2RTXUS2P8R{%bW!(_fX9Q|N#piC|4MEn} zyP-e9-ekQ|LDxm6Yeo*~*q5j-wo+vxBoBZrOU4au`{3=pYkv9XM;|`;Y5)cNu<9U- z`S4>m+(%UfQO2_K-5Wzs^zR+qckVY9+_YfIvsNuE%5e{ldSXyEI4Wr;oA~jo2gfct zd=cq%1$He=b(H9vbdqJ~<4aBrKl1Xr_0yd9hR0~rr`4+2fdk?mI!6>sa1dx*5>ihntj+bXCWk%EMK|H7&p_J7ptSae6|=%u%Q z7?_}iTSE70g+u_GYrrH2s{ofhpzCYc)`?GyNzT<*N#yz)4 znQfH9T`BiW9{(c8{4;GNMLz&}v81WZU1MMzg5@Xah)(#=UxM>__@@odJqamy^1OND zmpwgs)tyHUe{kTPeGh5!i9Bw8a6#|;PamS5QmF_%Joq(aeXbn9(f=^huZxNb*(}7- z{?w+|bQ0i)yZ2n(wBvUN?%40R_N~8>Cnjl?YvMq&)KU%e^TE3&fAYYyci&gv_KACU ztK*1fAGDc2|4J#gk_yWx{e39$T*CjOdo3T&JGjO2)cw!-CcZjjuO@edNC&*yE>|Xc zdu#)BuG zbkeGk{qK14kG}ZKiqtOKSbPcVYtq`8I`6>j^3fEZ9Q6NCD)vV3qrc3<^0iU|KU%qa zc5_=**IPR_?0xo=BY(*xftmchG{^Q!GbN!vGV!gN53Yas)21I?zx?T)&KIkQX1hy8 zxFWm$e~qkeuqRfThQB;?xNG{+ldHpfpMSINh09OYY&FUwlSKpthl}!ckTz5|2=4%q zd?nCdDZM>mflN+BZXFNJa0PDoy_Wnwlw<$b2KRos6%Nw52>cQ)F9EU){_Cv*J?ZM;`vi^*UcRh~K7t9*T%eBJ@)2LQeaa67 z0_+CZ1Moe7`vCR=+%LcTpnQHgm(K^%q^Ugq z8QwVv@GpvYapL<;fM4o-{~P%Iw*YS&z*9cT$7mi_I$L3d9!I)j`VOkA5qXqL`!$uL zGIAl2 z#@yNXo0vyX=K>sYI;Yk}`w=lQ{6I=gS(VF3^-ODcgXz!7Q~JpiFV3MRnORzkT93D- z8Po3~yel7&dwXbFY*l8psumdMj9eqpsZqO`YQ8g3)1B1+URn#<2T#_)|4^cxDA#cD z*K;QTY8q1Bn%r%z>iLmLla1Ry($|eK{WaSZ&50KG*n-pIKizw%g3U*#*Q(i($(}FG zgEMKUmW#u~Bjlp$$Rrmf8zz&BNjlZ}%uO23<1FK3OArlNVi5;_Ju*L&)KP4R)?l_0 zi*G`LU%ud-x^&QHhjOcjwtFS*)~w2%zyGo1p^P#c?MthdJ1?tpPo2cXbS~sf2j_3m z@);mWzrm5I@^YNfDK7s0SDo|ZI5U~zIh-k`8lFy1Te9HADge4Y{!0&Asa)N@gEv z!cDoXkJhh~<`u{FhctQ_1uu zWlg3u^OatcDWq({-c$LUs7kXeAGGBI_A;Y~RjorDFKrjs1!})hp~`V-w`!x>e0CP{ z$wX-(U%;U~Ygo%Uv}Y-%j=r_LtT~r8=k%I$xraGXHBes5JJW0Q=H&`GUOHZS-s4|u zE);i_HRiI$Jeut`*Yzv>>pwEjJ^yOf(4)Iv5Ei(4_Wk_jx9{nZBG5?(mbTId9aHTn zjB54dy<-o*`Kxa|_0ChjoO|OF@6FWm13$Ctn(2Ev{NME^V5g1youg~=Y5(@`hwuN> zSG?lTSPh&bH0-9_Cjvh{DKP8s8~aY5@sr2T9V7nwsFvS-3JppPn{)jupR37lH?OGt z?fj>^)|@e6_M`P@-=ejKCPy8ZWBq`U>Cb_&Z#3q;^w%F)NTvhVADl^~ zJsVPlLxuxK;$it(@@&L@0;5n#-Zaum8Y9uyrS%u+4>6qsKiUXb{2~4PXI_DV`NzF0 zY#jI>0k}!N?f`tPfN3@GK`X7DW$=FqJZYoQ2Hdie?o6Y7%*`VH5bj~a@;_q#g>iWC z*NV2GSX}HgyThT7=*R6BV&)~deL>J~_5|qzAv(0F?y}3|7u({|V1J;tx-aGrM?=A` z2aYOB88dNM5V!d(PZy~?9Hehc%K~AU29qx+=Q88MD z*xG{IL^76C-JYpXtZL)%q8Q0U;!e_((%SofY)!Kx zhqpg2MkCQ+OvJ4?VrF+ph{Zf&EEs@ng~=arbN<=JG$?DlxKM#9m! zCn$u%fu``*HZdNDnn6u;MkFc@2Ezlf(2%=V^luisn}OcgRQ9+f?5A6%oW&kA_Xz>f z7moG`aeTA6zgyfoJv=Bzqrq;`yEPcYlzqXFh^>E~2LH50L_ZJ$EumZHkmRpeWK|Vd zZ4;5W@icxYtApICV-Sm|a$^jT zR&5-VY8FVXs+u@KAseT%o0E)cmAvDhOrCA`d!7+)UT!zP{Ls|xz3-gVn`s5EWS!^I zU-jJ!BRhBWnO}Ny!>PB6fBwUFZZc~|oRJP`HsM(N&vV328o7QR%RvhJxrVx^r>+Hl zX37h@#NerSR`zy3Vs3FfsSGoD7V< z$B?P}l9*}ihA5qvIB{lI0wt3p?Ui&bv<|O4;ry3+OD11h)_gaOPmD4%a@^G%v+||O zS5L|23*X%$TUXNMZlGjy$7+pTVf>_cVqYh;U@=S9i8DKTEPV=(KM5!85tsR0wSB%+ zrC|0xd?%6wIcG$v@T79Q@QS*}9LX!}jT2|~>iE+XrfAfv@;!C#kKa+{X(LJ^{Wt1T zNf-HK5~->t}R!DR8;Tqc{(?JzYoI9w)+!&cW&Z?W4PPTSSrWu!UkeT{W)i`(R|Hu+4p z2DioJs&_g~4IYQnUGH`{tR4#^&D<`=;+=}?j6|B___h!m>i6SSjDxyc3RiH^a9=Pc za?tYaQK&@xQH*j{;a#IZn1JT~cvJ{-Ro|VXK^gKDXNi4l6zJ-UUpb&Z|D8DkVmA9f z4U%O1L1xfpd+fT2v>$69Tr-bu9sCu`;Ea}0qxhBaC)N0u5NRK*ND3wCj|+3$e@l(O zjY{yiLF~Ejzbp>$2$;fgWe|P~M8#OFKO7JC1jQ%^Rks(8s<;r_><{lj1!Zu~3M*i*DL1xYe=sf%iv4jR z8sOLTEtDhNPW^7froFe|NF)x6DnkxDe-IsokTcOUv~2ilGQsi6xYMi^J=* zIH1HF4GmV4t=`*YYOHg5O+KI7ZmDK-Ez$HQd1 z3Qt{1!ST@c14dh&uiomlS?nfnox^FeSsU#pm&NWe*_}2^W4*i1T4!lg(3VoW@~s|~ zQXR(xxEDLok0>Mz^@O8+9OQogSdrT&1p7H@-BrkD4AmSDia1J$iG5w6A*7CjwhB_g z-2Z}$i+B`&I`I_KP+6@L&!olJmN1*l+kZs`+C>QQ6z||wI}H{s=(OApSai`+EiY*0XJQjo`n5 zNPCdsQS7s#sXHIw+5)ZL)w7+)n|JEeH2F@xnQjGZsl|N4eZ=%*K9zTkDO{YU@~$x_ z#;!5Pub)hL^+oaID7)1c<=9=LRVM{-T~T+ok_jUbaDbe@pP4fXXwS>VpFleU%HtH4 zfg+sd*QdTp2G5QPkKXvhsZLIH)MPRK#FjE3U+eoHWgY?I_y>0}K`WI}{Eu_suQw3? z1o1B7z`u?p{h##5f4cc6Um(($+9>|#NOJsNT<#ZYN=Nc8jxJ^Zc#3kqU!Q)oK;!tS zfRcmDeqLTNSgJjDepU?XbMNOBPjJ_2x^sEa`;q9k#HsSxQ*=%``m|#mpuNUZRQ_uE zbkAgEUc9bt`kBBk2;-s@P=sgg1*cCvdIO-4@JLQArM`b9SNt3OEgOH{eEXOU|9RUZ z5BKekPH4UF4$T;xN=s>bA+{5Jde2M4mrOZ(+da?zeeA5K#n&~dkjA=P@kcc_Th!M- zGmijJ|3!xYFEOrvzCcLA>Z^~xE>lto{oBHU|3$(x|D1USXigUMPyYn4#UTEA{rq!} zzc8BMwd^Lj+2*rvOuFwIFI7G`@W;Qc@gBJI=bBunK33;{-~#g$wT~O^bu+trRe49qH{g4ZRg}$FZ%S=u@B4|zUIkwdoo4tQ|b@7h7UiG zWBKX#Kl=U$y#3EF0RARl>*Lq=|5{9W)Oi2@I=~Ub_;ve#?*8{JfKLqL*SCLq;JlO= z&wnSt2lDl3w;wH8@bo9oUA181$K-#WdFJK2Jg0>+#r#+2rM!Q6Zu^%Pr(LTYF>16T zM&-JHeLnUtuv0I`h9cN~WcA5!Hoa+ge(nAEnhVxeZhkVeB*GCmZ-8Aa)!A9L*W7jf zksDa#X@!(pEN4(_i+hEic1Dsu@JC)mk2%bYrB4=442}}E1$*bGrz!i($_^cYq^w|Q)@r9rDnT^j-S^Uw#FQ>Wn(uOjfnQi zRVhAVhd&FPtpE*l1&R~6+d)^QxabNLT&d!qaXIl_}QxB;5rT_3z*SFB1oK6>)f z_1I0-xte4?NSCkj?(V52Szf!isbq3$bnRm4v|YZPmFlzTos9Mq%JuHa-ogUqT+Lvz8k1O7L`lTu&9c%tu}l#6}pI{5Z_ zBE`gEpge9NQoIoCnJ{k9Z2(^f*g>Q?J{VxGn~ZeOEFYzD9DDnu)@;zR+&^lx5L z8|h|wN-ZYb*~EBExbiGYc^0KSi&Ct!D91++Q@R$-@@zzTHX@hVh-_rN5ntWWvTtuH znVK>>n^O2liTR(CE**KvHv?3v`6|zzlw$6RZ!N`W^bTdKdD&`ytmS^$YF@UQmsak| zR`aseylgek&1#NoBfRdm*21NHEJ{%?{*2AQ{%1?zd?_&=e+Jjv9F?#2@f(Z(#||U- zQ8W42Fn-7&|6 zwg*ohqGU+18alEVm+0>gBZYx&%`pN;9NKY|?zYBK#SpEp=3(~7NL(lFd|_{JC=V1A zPh=Nq56zaG`}0V;K3eD1SG-N4>?G zNUJh0?Wi5vJZeV*jb@ojEw079JZB78qxcN%Ab3XWN3wn|0C>qzI$AOCq~3KkSBWO) zWRvwYGLFxLjB>9w%XX9+;Yzz5;dUkL{rHpm@xOfp z_mMTO|Du1bBRBCV+~g5PFv{~;dyxQhpiKh6^? zig6U$;NK{@cCDu$UQBBye>)RsW2qs0eced^8F15`_a^to8?)9^m-E7;&?QR&J0=#dj0Wfa{60DNdAcJIhL?i#7#oZB7}M@= 1) { + if (signals > 1 && hasLock) + m_hasTasks.Set(); + return true; + } + + if (timeout != -1) + timeout = Math.Max(0, timeout - (Environment.TickCount - start)); + + // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие + // и уйдет на пустой цикл, после чего заблокируется + + hasLock = true; + } while (m_hasTasks.WaitOne(timeout)); + + return false; + } + bool Sleep(int timeout) { Interlocked.Increment(ref m_sleepingThreads); - if (m_hasTasks.WaitOne(timeout)) { - // this is autoreset event, only one thread can run this block simultaneously - var sleeping = Interlocked.Decrement(ref m_sleepingThreads); - if (Interlocked.Decrement(ref m_wakeEvents) > 0) - m_hasTasks.Set(); // wake next worker - + if (FetchSignalOrWait(timeout)) { + Interlocked.Decrement(ref m_sleepingThreads); return true; } else { Interlocked.Decrement(ref m_sleepingThreads); @@ -106,6 +134,8 @@ namespace Implab.Parallels { if (m_exitRequired != 0) return; if (m_sleepingThreads > m_wakeEvents) { + //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); + // all sleeping threads may gone SignalThread(); // wake a sleeping thread; @@ -130,8 +160,6 @@ namespace Implab.Parallels { bool last; bool requestExit; - - // if threads have a timeout before releasing if (m_releaseTimeout > 0) requestExit = !Sleep(m_releaseTimeout); @@ -242,8 +270,8 @@ namespace Implab.Parallels { void Worker() { TUnit unit; + //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); Interlocked.Increment(ref m_activeThreads); - Sleep(0); // remove wake request if the new thread is started do { // exit if requested if (m_exitRequired != 0) { @@ -269,10 +297,10 @@ namespace Implab.Parallels { // keep this thread and wait if (!Suspend()) break; - + //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); Interlocked.Increment(ref m_activeThreads); } while (true); - + //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); } protected virtual void Dispose(bool disposing) {