##// END OF EJS Templates
shared locks + tests
cin -
r136:e9e7940c7d98 v2
parent child
Show More
@@ -774,6 +774,79 namespace Implab.Test {
774 774 Assert.IsTrue(pSurvive.Join());
775 775 }
776 776 }
777
778 [TestMethod]
779 public void SharedLockTest() {
780 var l = new SharedLock();
781 int shared = 0;
782 int exclusive = 0;
783 var s1 = new Signal();
784 var log = new AsyncQueue<string>();
785
786 try {
787 AsyncPool.RunThread(
788 () => {
789 log.Enqueue("Reader #1 started");
790 try {
791 l.LockShared();
792 log.Enqueue("Reader #1 lock got");
793 if (Interlocked.Increment(ref shared) == 2)
794 s1.Set();
795 s1.Wait();
796 log.Enqueue("Reader #1 finished");
797 Interlocked.Decrement(ref shared);
798 } finally {
799 l.Release();
800 log.Enqueue("Reader #1 lock released");
801 }
802 },
803 () => {
804 log.Enqueue("Reader #2 started");
805
806 try {
807 l.LockShared();
808 log.Enqueue("Reader #2 lock got");
809
810 if (Interlocked.Increment(ref shared) == 2)
811 s1.Set();
812 s1.Wait();
813 log.Enqueue("Reader #2 upgrading to writer");
814 Interlocked.Decrement(ref shared);
815 l.Upgrade();
816 log.Enqueue("Reader #2 upgraded");
817
818 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
819 Assert.AreEqual(0, shared);
820 log.Enqueue("Reader #2 finished");
821 Interlocked.Decrement(ref exclusive);
822 } finally {
823 l.Release();
824 log.Enqueue("Reader #2 lock released");
825 }
826 },
827 () => {
828 log.Enqueue("Writer #1 started");
829 try {
830 l.LockExclusive();
831 log.Enqueue("Writer #1 got the lock");
832 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
833 Interlocked.Decrement(ref exclusive);
834 log.Enqueue("Writer #1 is finished");
835 } finally {
836 l.Release();
837 log.Enqueue("Writer #1 lock released");
838 }
839 }
840 ).Bundle().Join(1000);
841 log.Enqueue("Done");
842 } catch(Exception error) {
843 log.Enqueue(error.Message);
844 throw;
845 } finally {
846 foreach (var m in log)
847 Console.WriteLine(m);
848 }
849 }
777 850 }
778 851 }
779 852
@@ -8,23 +8,53 namespace Implab.Parallels {
8 8 /// </summary>
9 9 public class SharedLock {
10 10 readonly object m_lock = new object();
11 // the count of locks currently acquired by clients
11 12 int m_locks;
13 // the count of pending requests for upgrade
14 int m_upgrades;
12 15 bool m_exclusive;
13 16
14 17 public bool LockExclusive(int timeout) {
15 18 lock (m_lock) {
16 if (m_locks > 0 && !Monitor.Wait(m_lock, timeout))
17 return false;
19 var dt = timeout;
20 if (m_locks > m_upgrades) {
21 var t1 = Environment.TickCount;
22 do {
23 if (!Monitor.Wait(m_lock, timeout))
24 return false;
25
26 if (m_locks == m_upgrades)
27 break;
28
29 if (timeout > 0) {
30 dt = timeout - Environment.TickCount + t1;
31 if (dt < 0)
32 return false;
33 }
34 } while(true);
35 }
18 36 m_exclusive = true;
19 m_locks = 1;
37 m_locks ++;
20 38 return true;
21 39 }
22 40 }
23 41
24 42 public void LockExclusive() {
25 LockExclusive(-1);
43 lock (m_lock) {
44
45 while (m_locks > m_upgrades)
46 Monitor.Wait(m_lock);
47
48 m_exclusive = true;
49 m_locks ++;
50 }
26 51 }
27 52
53 /// <summary>
54 /// Acquires a shared lock.
55 /// </summary>
56 /// <returns><c>true</c>, if the shared lock was acquired, <c>false</c> if the specified timeout was expired.</returns>
57 /// <param name="timeout">Timeout.</param>
28 58 public bool LockShared(int timeout) {
29 59 lock (m_lock) {
30 60 if (!m_exclusive) {
@@ -32,45 +62,141 namespace Implab.Parallels {
32 62 return true;
33 63 }
34 64
35 if (m_locks == 0) {
65 if (m_locks == m_upgrades) {
36 66 m_exclusive = false;
37 67 m_locks = 1;
38 68 return true;
39 69 }
40
41 if (Monitor.Wait(m_lock, timeout)) {
42 Debug.Assert(m_locks == 0);
43 m_locks = 1;
70
71 var t1 = Environment.TickCount;
72 var dt = timeout;
73 do {
74 if (!Monitor.Wait(m_lock, dt))
75 return false;
76
77 if (m_locks == m_upgrades || !m_exclusive)
78 break;
79
80 if (timeout >= 0) {
81 dt = timeout - Environment.TickCount + t1;
82 if (dt < 0)
83 return false;
84 }
85 } while(true);
86
87 m_locks ++;
88 m_exclusive = false;
89 return true;
90 }
91 }
92
93 /// <summary>
94 /// Acquires the shared lock.
95 /// </summary>
96 public void LockShared() {
97 lock (m_lock) {
98 if (!m_exclusive) {
99 m_locks++;
100 } else if (m_locks == m_upgrades) {
44 101 m_exclusive = false;
45 return true;
102 m_locks++;
103 } else {
104 while (m_exclusive && m_locks > m_upgrades)
105 Monitor.Wait(m_lock);
106
107 m_locks++;
108 m_exclusive = false;
46 109 }
47 return false;
48 110 }
49 111 }
50 112
51 public void LockShared() {
52 LockShared(-1);
53 }
54
55 public void ReleaseShared() {
113 /// <summary>
114 /// Upgrades the current lock to exclusive level.
115 /// </summary>
116 /// <remarks>If the current lock is exclusive already the method does nothing.</remarks>
117 public void Upgrade() {
56 118 lock (m_lock) {
57 if (m_exclusive || m_locks <= 0)
58 throw new InvalidOperationException();
59 m_locks--;
60 if (m_locks == 0)
61 Monitor.PulseAll(m_lock);
119 if (!m_exclusive) {
120
121 if (m_locks <= m_upgrades)
122 throw new InvalidOperationException();
123
124 if (m_locks - m_upgrades == 1) {
125 m_exclusive = true;
126 } else {
127 m_upgrades++;
128
129 while (m_locks > m_upgrades)
130 Monitor.Wait(m_lock);
131
132 m_upgrades--;
133 m_exclusive = true;
134 }
135 }
62 136 }
63 137 }
64 138
65 public void ReleaseExclusive() {
139 /// <summary>
140 /// Upgrades the current lock to exclusive level.
141 /// </summary>
142 /// <param name="timeout">Timeout.</param>
143 /// <returns><c>true</c> if the current lock was updated, <c>false</c> the specified timeout was expired.</returns>
144 /// <remarks>If the current lock is exclusive already the method does nothing.</remarks>
145 public bool Upgrade(int timeout) {
66 146 lock (m_lock) {
67 if (!m_exclusive && m_locks != 1)
147 if (m_exclusive)
148 return true;
149 if (m_locks <= m_upgrades)
68 150 throw new InvalidOperationException();
69 m_locks = 0;
70 Monitor.PulseAll(m_lock);
151
152 if (m_locks - m_upgrades == 1) {
153 m_exclusive = true;
154 } else {
155 var t1 = Environment.TickCount;
156 var dt = timeout;
157 m_upgrades++;
158 do {
159 if (!Monitor.Wait(m_lock, dt)) {
160 m_upgrades--;
161 return false;
162 }
163
164 // we may get there but the shared lock already aquired
165 if (m_locks == m_upgrades)
166 break;
167
168 if (timeout >= 0) {
169 dt = timeout - Environment.TickCount + t1;
170 if (dt < 0) {
171 m_upgrades--;
172 return false;
173 }
174 }
175 } while(true);
176 m_upgrades--;
177 m_exclusive = true;
178 }
179 return true;
71 180 }
72 181 }
73 182
183 /// <summary>
184 /// Downgrades this lock to shared level.
185 /// </summary>
186 public void Downgrade() {
187 lock (m_lock)
188 m_exclusive = false;
189 }
190
191 /// <summary>
192 /// Releases the current lock.
193 /// </summary>
194 public void Release() {
195 lock (m_lock)
196 // if no more running threads left
197 if (--m_locks == m_upgrades)
198 Monitor.PulseAll(m_lock);
199 }
74 200 }
75 201 }
76 202
@@ -101,6 +101,11 namespace Implab {
101 101 int errors = 0;
102 102 var medium = new Promise();
103 103
104 if (count == 0) {
105 medium.Resolve();
106 return medium;
107 }
108
104 109 medium.On(() => {
105 110 foreach(var p2 in that)
106 111 p2.Cancel();
@@ -4,6 +4,7 using Implab.Parallels;
4 4 using Implab;
5 5 using System.Collections.Generic;
6 6 using System.Collections.Concurrent;
7 using System.Threading;
7 8
8 9 namespace MonoPlay {
9 10 class MainClass {
@@ -11,24 +12,82 namespace MonoPlay {
11 12 if (args == null)
12 13 throw new ArgumentNullException("args");
13 14
14 const int count = 10000000;
15
16 15 var t1 = Environment.TickCount;
17 16
18 for (int i = 0; i < count; i++) {
19 var p = new Promise<int>();
17 const int reads = 100000;
18 const int writes = 1000;
19 const int readThreads = 8;
20 const int writeThreads = 0;
21
22 var l = new SharedLock();
23 var st = new HashSet<int>();
20 24
21 p.On(x => {}).On(x => {});
25 Action reader1 = () => {
26 for (int i =0; i < reads; i++) {
27 try {
28 l.LockShared();
29 st.Contains(i % 1000);
30 Thread.Sleep(0);
31 } finally {
32 l.Release();
33 }
34 }
35 };
36
37 Action reader2 = () => {
38 for(var i = 0; i < reads; i++)
39 lock(st) {
40 st.Contains(i % 1000);
41 Thread.Sleep(0);
42 }
43 };
22 44
23 p.Resolve(i);
45 Action writer1 = () => {
46 var rnd = new Random(Environment.TickCount);
47 for (int i = 0; i < writes; i++) {
48 try {
49 l.LockExclusive();
50 st.Add(rnd.Next(1000));
51 //Thread.Sleep(1);
52 } finally {
53 l.Release();
54 }
55 }
56 };
24 57
25 }
58 Action writer2 = () => {
59 var rnd = new Random(Environment.TickCount);
60 for (int i = 0; i < writes; i++) {
61 lock (st) {
62 st.Add(rnd.Next(1000));
63 //Thread.Sleep(1);
64 }
65 }
66 };
67
68
26 69
27
70 var readers = new IPromise[readThreads];
71 for (int i = 0; i < readThreads; i++)
72 readers[i] = AsyncPool.RunThread(reader1);
73
74 var writers = new IPromise[writeThreads];
75 for (int i = 0; i < writeThreads; i++)
76 writers[i] = AsyncPool.RunThread(writer1);
77
78
79 new [] {
80 readers.Bundle().On(() => Console.WriteLine("readers complete in {0} ms", Environment.TickCount - t1)),
81 writers.Bundle().On(() => Console.WriteLine("writers complete in {0} ms", Environment.TickCount - t1))
82 }.Bundle().Join();
83
84
28 85
29 86 var t2 = Environment.TickCount;
30 87 Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
31 88
32 89 }
90
91
33 92 }
34 93 }
General Comments 0
You need to be logged in to leave comments. Login now