@@ -774,6 +774,79 namespace Implab.Test { | |||||
774 | Assert.IsTrue(pSurvive.Join()); |
|
774 | Assert.IsTrue(pSurvive.Join()); | |
775 | } |
|
775 | } | |
776 | } |
|
776 | } | |
|
777 | ||||
|
778 | [TestMethod] | |||
|
779 | public void SharedLockTest() { | |||
|
780 | var l = new SharedLock(); | |||
|
781 | int shared = 0; | |||
|
782 | int exclusive = 0; | |||
|
783 | var s1 = new Signal(); | |||
|
784 | var log = new AsyncQueue<string>(); | |||
|
785 | ||||
|
786 | try { | |||
|
787 | AsyncPool.RunThread( | |||
|
788 | () => { | |||
|
789 | log.Enqueue("Reader #1 started"); | |||
|
790 | try { | |||
|
791 | l.LockShared(); | |||
|
792 | log.Enqueue("Reader #1 lock got"); | |||
|
793 | if (Interlocked.Increment(ref shared) == 2) | |||
|
794 | s1.Set(); | |||
|
795 | s1.Wait(); | |||
|
796 | log.Enqueue("Reader #1 finished"); | |||
|
797 | Interlocked.Decrement(ref shared); | |||
|
798 | } finally { | |||
|
799 | l.Release(); | |||
|
800 | log.Enqueue("Reader #1 lock released"); | |||
|
801 | } | |||
|
802 | }, | |||
|
803 | () => { | |||
|
804 | log.Enqueue("Reader #2 started"); | |||
|
805 | ||||
|
806 | try { | |||
|
807 | l.LockShared(); | |||
|
808 | log.Enqueue("Reader #2 lock got"); | |||
|
809 | ||||
|
810 | if (Interlocked.Increment(ref shared) == 2) | |||
|
811 | s1.Set(); | |||
|
812 | s1.Wait(); | |||
|
813 | log.Enqueue("Reader #2 upgrading to writer"); | |||
|
814 | Interlocked.Decrement(ref shared); | |||
|
815 | l.Upgrade(); | |||
|
816 | log.Enqueue("Reader #2 upgraded"); | |||
|
817 | ||||
|
818 | Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); | |||
|
819 | Assert.AreEqual(0, shared); | |||
|
820 | log.Enqueue("Reader #2 finished"); | |||
|
821 | Interlocked.Decrement(ref exclusive); | |||
|
822 | } finally { | |||
|
823 | l.Release(); | |||
|
824 | log.Enqueue("Reader #2 lock released"); | |||
|
825 | } | |||
|
826 | }, | |||
|
827 | () => { | |||
|
828 | log.Enqueue("Writer #1 started"); | |||
|
829 | try { | |||
|
830 | l.LockExclusive(); | |||
|
831 | log.Enqueue("Writer #1 got the lock"); | |||
|
832 | Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); | |||
|
833 | Interlocked.Decrement(ref exclusive); | |||
|
834 | log.Enqueue("Writer #1 is finished"); | |||
|
835 | } finally { | |||
|
836 | l.Release(); | |||
|
837 | log.Enqueue("Writer #1 lock released"); | |||
|
838 | } | |||
|
839 | } | |||
|
840 | ).Bundle().Join(1000); | |||
|
841 | log.Enqueue("Done"); | |||
|
842 | } catch(Exception error) { | |||
|
843 | log.Enqueue(error.Message); | |||
|
844 | throw; | |||
|
845 | } finally { | |||
|
846 | foreach (var m in log) | |||
|
847 | Console.WriteLine(m); | |||
|
848 | } | |||
|
849 | } | |||
777 | } |
|
850 | } | |
778 | } |
|
851 | } | |
779 |
|
852 |
@@ -8,23 +8,53 namespace Implab.Parallels { | |||||
8 | /// </summary> |
|
8 | /// </summary> | |
9 | public class SharedLock { |
|
9 | public class SharedLock { | |
10 | readonly object m_lock = new object(); |
|
10 | readonly object m_lock = new object(); | |
|
11 | // the count of locks currently acquired by clients | |||
11 | int m_locks; |
|
12 | int m_locks; | |
|
13 | // the count of pending requests for upgrade | |||
|
14 | int m_upgrades; | |||
12 | bool m_exclusive; |
|
15 | bool m_exclusive; | |
13 |
|
16 | |||
14 | public bool LockExclusive(int timeout) { |
|
17 | public bool LockExclusive(int timeout) { | |
15 | lock (m_lock) { |
|
18 | lock (m_lock) { | |
16 | if (m_locks > 0 && !Monitor.Wait(m_lock, timeout)) |
|
19 | var dt = timeout; | |
17 | return false; |
|
20 | if (m_locks > m_upgrades) { | |
|
21 | var t1 = Environment.TickCount; | |||
|
22 | do { | |||
|
23 | if (!Monitor.Wait(m_lock, timeout)) | |||
|
24 | return false; | |||
|
25 | ||||
|
26 | if (m_locks == m_upgrades) | |||
|
27 | break; | |||
|
28 | ||||
|
29 | if (timeout > 0) { | |||
|
30 | dt = timeout - Environment.TickCount + t1; | |||
|
31 | if (dt < 0) | |||
|
32 | return false; | |||
|
33 | } | |||
|
34 | } while(true); | |||
|
35 | } | |||
18 | m_exclusive = true; |
|
36 | m_exclusive = true; | |
19 |
m_locks |
|
37 | m_locks ++; | |
20 | return true; |
|
38 | return true; | |
21 | } |
|
39 | } | |
22 | } |
|
40 | } | |
23 |
|
41 | |||
24 | public void LockExclusive() { |
|
42 | public void LockExclusive() { | |
25 | LockExclusive(-1); |
|
43 | lock (m_lock) { | |
|
44 | ||||
|
45 | while (m_locks > m_upgrades) | |||
|
46 | Monitor.Wait(m_lock); | |||
|
47 | ||||
|
48 | m_exclusive = true; | |||
|
49 | m_locks ++; | |||
|
50 | } | |||
26 | } |
|
51 | } | |
27 |
|
52 | |||
|
53 | /// <summary> | |||
|
54 | /// Acquires a shared lock. | |||
|
55 | /// </summary> | |||
|
56 | /// <returns><c>true</c>, if the shared lock was acquired, <c>false</c> if the specified timeout was expired.</returns> | |||
|
57 | /// <param name="timeout">Timeout.</param> | |||
28 | public bool LockShared(int timeout) { |
|
58 | public bool LockShared(int timeout) { | |
29 | lock (m_lock) { |
|
59 | lock (m_lock) { | |
30 | if (!m_exclusive) { |
|
60 | if (!m_exclusive) { | |
@@ -32,45 +62,141 namespace Implab.Parallels { | |||||
32 | return true; |
|
62 | return true; | |
33 | } |
|
63 | } | |
34 |
|
64 | |||
35 |
if (m_locks == |
|
65 | if (m_locks == m_upgrades) { | |
36 | m_exclusive = false; |
|
66 | m_exclusive = false; | |
37 | m_locks = 1; |
|
67 | m_locks = 1; | |
38 | return true; |
|
68 | return true; | |
39 | } |
|
69 | } | |
40 |
|
70 | |||
41 | if (Monitor.Wait(m_lock, timeout)) { |
|
71 | var t1 = Environment.TickCount; | |
42 | Debug.Assert(m_locks == 0); |
|
72 | var dt = timeout; | |
43 |
|
|
73 | do { | |
|
74 | if (!Monitor.Wait(m_lock, dt)) | |||
|
75 | return false; | |||
|
76 | ||||
|
77 | if (m_locks == m_upgrades || !m_exclusive) | |||
|
78 | break; | |||
|
79 | ||||
|
80 | if (timeout >= 0) { | |||
|
81 | dt = timeout - Environment.TickCount + t1; | |||
|
82 | if (dt < 0) | |||
|
83 | return false; | |||
|
84 | } | |||
|
85 | } while(true); | |||
|
86 | ||||
|
87 | m_locks ++; | |||
|
88 | m_exclusive = false; | |||
|
89 | return true; | |||
|
90 | } | |||
|
91 | } | |||
|
92 | ||||
|
93 | /// <summary> | |||
|
94 | /// Acquires the shared lock. | |||
|
95 | /// </summary> | |||
|
96 | public void LockShared() { | |||
|
97 | lock (m_lock) { | |||
|
98 | if (!m_exclusive) { | |||
|
99 | m_locks++; | |||
|
100 | } else if (m_locks == m_upgrades) { | |||
44 | m_exclusive = false; |
|
101 | m_exclusive = false; | |
45 |
|
|
102 | m_locks++; | |
|
103 | } else { | |||
|
104 | while (m_exclusive && m_locks > m_upgrades) | |||
|
105 | Monitor.Wait(m_lock); | |||
|
106 | ||||
|
107 | m_locks++; | |||
|
108 | m_exclusive = false; | |||
46 | } |
|
109 | } | |
47 | return false; |
|
|||
48 | } |
|
110 | } | |
49 | } |
|
111 | } | |
50 |
|
112 | |||
51 | public void LockShared() { |
|
113 | /// <summary> | |
52 | LockShared(-1); |
|
114 | /// Upgrades the current lock to exclusive level. | |
53 | } |
|
115 | /// </summary> | |
54 |
|
116 | /// <remarks>If the current lock is exclusive already the method does nothing.</remarks> | ||
55 |
public void |
|
117 | public void Upgrade() { | |
56 | lock (m_lock) { |
|
118 | lock (m_lock) { | |
57 |
if (m_exclusive |
|
119 | if (!m_exclusive) { | |
58 | throw new InvalidOperationException(); |
|
120 | ||
59 |
m_locks |
|
121 | if (m_locks <= m_upgrades) | |
60 | if (m_locks == 0) |
|
122 | throw new InvalidOperationException(); | |
61 | Monitor.PulseAll(m_lock); |
|
123 | ||
|
124 | if (m_locks - m_upgrades == 1) { | |||
|
125 | m_exclusive = true; | |||
|
126 | } else { | |||
|
127 | m_upgrades++; | |||
|
128 | ||||
|
129 | while (m_locks > m_upgrades) | |||
|
130 | Monitor.Wait(m_lock); | |||
|
131 | ||||
|
132 | m_upgrades--; | |||
|
133 | m_exclusive = true; | |||
|
134 | } | |||
|
135 | } | |||
62 | } |
|
136 | } | |
63 | } |
|
137 | } | |
64 |
|
138 | |||
65 | public void ReleaseExclusive() { |
|
139 | /// <summary> | |
|
140 | /// Upgrades the current lock to exclusive level. | |||
|
141 | /// </summary> | |||
|
142 | /// <param name="timeout">Timeout.</param> | |||
|
143 | /// <returns><c>true</c> if the current lock was updated, <c>false</c> the specified timeout was expired.</returns> | |||
|
144 | /// <remarks>If the current lock is exclusive already the method does nothing.</remarks> | |||
|
145 | public bool Upgrade(int timeout) { | |||
66 | lock (m_lock) { |
|
146 | lock (m_lock) { | |
67 |
if ( |
|
147 | if (m_exclusive) | |
|
148 | return true; | |||
|
149 | if (m_locks <= m_upgrades) | |||
68 | throw new InvalidOperationException(); |
|
150 | throw new InvalidOperationException(); | |
69 | m_locks = 0; |
|
151 | ||
70 | Monitor.PulseAll(m_lock); |
|
152 | if (m_locks - m_upgrades == 1) { | |
|
153 | m_exclusive = true; | |||
|
154 | } else { | |||
|
155 | var t1 = Environment.TickCount; | |||
|
156 | var dt = timeout; | |||
|
157 | m_upgrades++; | |||
|
158 | do { | |||
|
159 | if (!Monitor.Wait(m_lock, dt)) { | |||
|
160 | m_upgrades--; | |||
|
161 | return false; | |||
|
162 | } | |||
|
163 | ||||
|
164 | // we may get there but the shared lock already aquired | |||
|
165 | if (m_locks == m_upgrades) | |||
|
166 | break; | |||
|
167 | ||||
|
168 | if (timeout >= 0) { | |||
|
169 | dt = timeout - Environment.TickCount + t1; | |||
|
170 | if (dt < 0) { | |||
|
171 | m_upgrades--; | |||
|
172 | return false; | |||
|
173 | } | |||
|
174 | } | |||
|
175 | } while(true); | |||
|
176 | m_upgrades--; | |||
|
177 | m_exclusive = true; | |||
|
178 | } | |||
|
179 | return true; | |||
71 | } |
|
180 | } | |
72 | } |
|
181 | } | |
73 |
|
182 | |||
|
183 | /// <summary> | |||
|
184 | /// Downgrades this lock to shared level. | |||
|
185 | /// </summary> | |||
|
186 | public void Downgrade() { | |||
|
187 | lock (m_lock) | |||
|
188 | m_exclusive = false; | |||
|
189 | } | |||
|
190 | ||||
|
191 | /// <summary> | |||
|
192 | /// Releases the current lock. | |||
|
193 | /// </summary> | |||
|
194 | public void Release() { | |||
|
195 | lock (m_lock) | |||
|
196 | // if no more running threads left | |||
|
197 | if (--m_locks == m_upgrades) | |||
|
198 | Monitor.PulseAll(m_lock); | |||
|
199 | } | |||
74 | } |
|
200 | } | |
75 | } |
|
201 | } | |
76 |
|
202 |
@@ -101,6 +101,11 namespace Implab { | |||||
101 | int errors = 0; |
|
101 | int errors = 0; | |
102 | var medium = new Promise(); |
|
102 | var medium = new Promise(); | |
103 |
|
103 | |||
|
104 | if (count == 0) { | |||
|
105 | medium.Resolve(); | |||
|
106 | return medium; | |||
|
107 | } | |||
|
108 | ||||
104 | medium.On(() => { |
|
109 | medium.On(() => { | |
105 | foreach(var p2 in that) |
|
110 | foreach(var p2 in that) | |
106 | p2.Cancel(); |
|
111 | p2.Cancel(); |
@@ -4,6 +4,7 using Implab.Parallels; | |||||
4 | using Implab; |
|
4 | using Implab; | |
5 | using System.Collections.Generic; |
|
5 | using System.Collections.Generic; | |
6 | using System.Collections.Concurrent; |
|
6 | using System.Collections.Concurrent; | |
|
7 | using System.Threading; | |||
7 |
|
8 | |||
8 | namespace MonoPlay { |
|
9 | namespace MonoPlay { | |
9 | class MainClass { |
|
10 | class MainClass { | |
@@ -11,24 +12,82 namespace MonoPlay { | |||||
11 | if (args == null) |
|
12 | if (args == null) | |
12 | throw new ArgumentNullException("args"); |
|
13 | throw new ArgumentNullException("args"); | |
13 |
|
14 | |||
14 | const int count = 10000000; |
|
|||
15 |
|
||||
16 | var t1 = Environment.TickCount; |
|
15 | var t1 = Environment.TickCount; | |
17 |
|
16 | |||
18 | for (int i = 0; i < count; i++) { |
|
17 | const int reads = 100000; | |
19 | var p = new Promise<int>(); |
|
18 | const int writes = 1000; | |
|
19 | const int readThreads = 8; | |||
|
20 | const int writeThreads = 0; | |||
|
21 | ||||
|
22 | var l = new SharedLock(); | |||
|
23 | var st = new HashSet<int>(); | |||
20 |
|
24 | |||
21 | p.On(x => {}).On(x => {}); |
|
25 | Action reader1 = () => { | |
|
26 | for (int i =0; i < reads; i++) { | |||
|
27 | try { | |||
|
28 | l.LockShared(); | |||
|
29 | st.Contains(i % 1000); | |||
|
30 | Thread.Sleep(0); | |||
|
31 | } finally { | |||
|
32 | l.Release(); | |||
|
33 | } | |||
|
34 | } | |||
|
35 | }; | |||
|
36 | ||||
|
37 | Action reader2 = () => { | |||
|
38 | for(var i = 0; i < reads; i++) | |||
|
39 | lock(st) { | |||
|
40 | st.Contains(i % 1000); | |||
|
41 | Thread.Sleep(0); | |||
|
42 | } | |||
|
43 | }; | |||
22 |
|
44 | |||
23 | p.Resolve(i); |
|
45 | Action writer1 = () => { | |
|
46 | var rnd = new Random(Environment.TickCount); | |||
|
47 | for (int i = 0; i < writes; i++) { | |||
|
48 | try { | |||
|
49 | l.LockExclusive(); | |||
|
50 | st.Add(rnd.Next(1000)); | |||
|
51 | //Thread.Sleep(1); | |||
|
52 | } finally { | |||
|
53 | l.Release(); | |||
|
54 | } | |||
|
55 | } | |||
|
56 | }; | |||
24 |
|
57 | |||
25 | } |
|
58 | Action writer2 = () => { | |
|
59 | var rnd = new Random(Environment.TickCount); | |||
|
60 | for (int i = 0; i < writes; i++) { | |||
|
61 | lock (st) { | |||
|
62 | st.Add(rnd.Next(1000)); | |||
|
63 | //Thread.Sleep(1); | |||
|
64 | } | |||
|
65 | } | |||
|
66 | }; | |||
|
67 | ||||
|
68 | ||||
26 |
|
69 | |||
27 |
|
70 | var readers = new IPromise[readThreads]; | ||
|
71 | for (int i = 0; i < readThreads; i++) | |||
|
72 | readers[i] = AsyncPool.RunThread(reader1); | |||
|
73 | ||||
|
74 | var writers = new IPromise[writeThreads]; | |||
|
75 | for (int i = 0; i < writeThreads; i++) | |||
|
76 | writers[i] = AsyncPool.RunThread(writer1); | |||
|
77 | ||||
|
78 | ||||
|
79 | new [] { | |||
|
80 | readers.Bundle().On(() => Console.WriteLine("readers complete in {0} ms", Environment.TickCount - t1)), | |||
|
81 | writers.Bundle().On(() => Console.WriteLine("writers complete in {0} ms", Environment.TickCount - t1)) | |||
|
82 | }.Bundle().Join(); | |||
|
83 | ||||
|
84 | ||||
28 |
|
85 | |||
29 | var t2 = Environment.TickCount; |
|
86 | var t2 = Environment.TickCount; | |
30 | Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) ); |
|
87 | Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) ); | |
31 |
|
88 | |||
32 | } |
|
89 | } | |
|
90 | ||||
|
91 | ||||
33 | } |
|
92 | } | |
34 | } |
|
93 | } |
General Comments 0
You need to be logged in to leave comments.
Login now