diff --git a/Implab.Test/AsyncTests.cs b/Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs +++ b/Implab.Test/AsyncTests.cs @@ -774,6 +774,79 @@ namespace Implab.Test { Assert.IsTrue(pSurvive.Join()); } } + + [TestMethod] + public void SharedLockTest() { + var l = new SharedLock(); + int shared = 0; + int exclusive = 0; + var s1 = new Signal(); + var log = new AsyncQueue(); + + try { + AsyncPool.RunThread( + () => { + log.Enqueue("Reader #1 started"); + try { + l.LockShared(); + log.Enqueue("Reader #1 lock got"); + if (Interlocked.Increment(ref shared) == 2) + s1.Set(); + s1.Wait(); + log.Enqueue("Reader #1 finished"); + Interlocked.Decrement(ref shared); + } finally { + l.Release(); + log.Enqueue("Reader #1 lock released"); + } + }, + () => { + log.Enqueue("Reader #2 started"); + + try { + l.LockShared(); + log.Enqueue("Reader #2 lock got"); + + if (Interlocked.Increment(ref shared) == 2) + s1.Set(); + s1.Wait(); + log.Enqueue("Reader #2 upgrading to writer"); + Interlocked.Decrement(ref shared); + l.Upgrade(); + log.Enqueue("Reader #2 upgraded"); + + Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); + Assert.AreEqual(0, shared); + log.Enqueue("Reader #2 finished"); + Interlocked.Decrement(ref exclusive); + } finally { + l.Release(); + log.Enqueue("Reader #2 lock released"); + } + }, + () => { + log.Enqueue("Writer #1 started"); + try { + l.LockExclusive(); + log.Enqueue("Writer #1 got the lock"); + Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); + Interlocked.Decrement(ref exclusive); + log.Enqueue("Writer #1 is finished"); + } finally { + l.Release(); + log.Enqueue("Writer #1 lock released"); + } + } + ).Bundle().Join(1000); + log.Enqueue("Done"); + } catch(Exception error) { + log.Enqueue(error.Message); + throw; + } finally { + foreach (var m in log) + Console.WriteLine(m); + } + } } } diff --git a/Implab/Parallels/SharedLock.cs b/Implab/Parallels/SharedLock.cs --- a/Implab/Parallels/SharedLock.cs +++ b/Implab/Parallels/SharedLock.cs @@ -8,23 +8,53 @@ namespace Implab.Parallels { /// public class SharedLock { readonly object m_lock = new object(); + // the count of locks currently acquired by clients int m_locks; + // the count of pending requests for upgrade + int m_upgrades; bool m_exclusive; public bool LockExclusive(int timeout) { lock (m_lock) { - if (m_locks > 0 && !Monitor.Wait(m_lock, timeout)) - return false; + var dt = timeout; + if (m_locks > m_upgrades) { + var t1 = Environment.TickCount; + do { + if (!Monitor.Wait(m_lock, timeout)) + return false; + + if (m_locks == m_upgrades) + break; + + if (timeout > 0) { + dt = timeout - Environment.TickCount + t1; + if (dt < 0) + return false; + } + } while(true); + } m_exclusive = true; - m_locks = 1; + m_locks ++; return true; } } public void LockExclusive() { - LockExclusive(-1); + lock (m_lock) { + + while (m_locks > m_upgrades) + Monitor.Wait(m_lock); + + m_exclusive = true; + m_locks ++; + } } + /// + /// Acquires a shared lock. + /// + /// true, if the shared lock was acquired, false if the specified timeout was expired. + /// Timeout. public bool LockShared(int timeout) { lock (m_lock) { if (!m_exclusive) { @@ -32,45 +62,141 @@ namespace Implab.Parallels { return true; } - if (m_locks == 0) { + if (m_locks == m_upgrades) { m_exclusive = false; m_locks = 1; return true; } - - if (Monitor.Wait(m_lock, timeout)) { - Debug.Assert(m_locks == 0); - m_locks = 1; + + var t1 = Environment.TickCount; + var dt = timeout; + do { + if (!Monitor.Wait(m_lock, dt)) + return false; + + if (m_locks == m_upgrades || !m_exclusive) + break; + + if (timeout >= 0) { + dt = timeout - Environment.TickCount + t1; + if (dt < 0) + return false; + } + } while(true); + + m_locks ++; + m_exclusive = false; + return true; + } + } + + /// + /// Acquires the shared lock. + /// + public void LockShared() { + lock (m_lock) { + if (!m_exclusive) { + m_locks++; + } else if (m_locks == m_upgrades) { m_exclusive = false; - return true; + m_locks++; + } else { + while (m_exclusive && m_locks > m_upgrades) + Monitor.Wait(m_lock); + + m_locks++; + m_exclusive = false; } - return false; } } - public void LockShared() { - LockShared(-1); - } - - public void ReleaseShared() { + /// + /// Upgrades the current lock to exclusive level. + /// + /// If the current lock is exclusive already the method does nothing. + public void Upgrade() { lock (m_lock) { - if (m_exclusive || m_locks <= 0) - throw new InvalidOperationException(); - m_locks--; - if (m_locks == 0) - Monitor.PulseAll(m_lock); + if (!m_exclusive) { + + if (m_locks <= m_upgrades) + throw new InvalidOperationException(); + + if (m_locks - m_upgrades == 1) { + m_exclusive = true; + } else { + m_upgrades++; + + while (m_locks > m_upgrades) + Monitor.Wait(m_lock); + + m_upgrades--; + m_exclusive = true; + } + } } } - public void ReleaseExclusive() { + /// + /// Upgrades the current lock to exclusive level. + /// + /// Timeout. + /// true if the current lock was updated, false the specified timeout was expired. + /// If the current lock is exclusive already the method does nothing. + public bool Upgrade(int timeout) { lock (m_lock) { - if (!m_exclusive && m_locks != 1) + if (m_exclusive) + return true; + if (m_locks <= m_upgrades) throw new InvalidOperationException(); - m_locks = 0; - Monitor.PulseAll(m_lock); + + if (m_locks - m_upgrades == 1) { + m_exclusive = true; + } else { + var t1 = Environment.TickCount; + var dt = timeout; + m_upgrades++; + do { + if (!Monitor.Wait(m_lock, dt)) { + m_upgrades--; + return false; + } + + // we may get there but the shared lock already aquired + if (m_locks == m_upgrades) + break; + + if (timeout >= 0) { + dt = timeout - Environment.TickCount + t1; + if (dt < 0) { + m_upgrades--; + return false; + } + } + } while(true); + m_upgrades--; + m_exclusive = true; + } + return true; } } + /// + /// Downgrades this lock to shared level. + /// + public void Downgrade() { + lock (m_lock) + m_exclusive = false; + } + + /// + /// Releases the current lock. + /// + public void Release() { + lock (m_lock) + // if no more running threads left + if (--m_locks == m_upgrades) + Monitor.PulseAll(m_lock); + } } } diff --git a/Implab/PromiseExtensions.cs b/Implab/PromiseExtensions.cs --- a/Implab/PromiseExtensions.cs +++ b/Implab/PromiseExtensions.cs @@ -101,6 +101,11 @@ namespace Implab { int errors = 0; var medium = new Promise(); + if (count == 0) { + medium.Resolve(); + return medium; + } + medium.On(() => { foreach(var p2 in that) p2.Cancel(); diff --git a/MonoPlay/Program.cs b/MonoPlay/Program.cs --- a/MonoPlay/Program.cs +++ b/MonoPlay/Program.cs @@ -4,6 +4,7 @@ using Implab.Parallels; using Implab; using System.Collections.Generic; using System.Collections.Concurrent; +using System.Threading; namespace MonoPlay { class MainClass { @@ -11,24 +12,82 @@ namespace MonoPlay { if (args == null) throw new ArgumentNullException("args"); - const int count = 10000000; - var t1 = Environment.TickCount; - for (int i = 0; i < count; i++) { - var p = new Promise(); + const int reads = 100000; + const int writes = 1000; + const int readThreads = 8; + const int writeThreads = 0; + + var l = new SharedLock(); + var st = new HashSet(); - p.On(x => {}).On(x => {}); + Action reader1 = () => { + for (int i =0; i < reads; i++) { + try { + l.LockShared(); + st.Contains(i % 1000); + Thread.Sleep(0); + } finally { + l.Release(); + } + } + }; + + Action reader2 = () => { + for(var i = 0; i < reads; i++) + lock(st) { + st.Contains(i % 1000); + Thread.Sleep(0); + } + }; - p.Resolve(i); + Action writer1 = () => { + var rnd = new Random(Environment.TickCount); + for (int i = 0; i < writes; i++) { + try { + l.LockExclusive(); + st.Add(rnd.Next(1000)); + //Thread.Sleep(1); + } finally { + l.Release(); + } + } + }; - } + Action writer2 = () => { + var rnd = new Random(Environment.TickCount); + for (int i = 0; i < writes; i++) { + lock (st) { + st.Add(rnd.Next(1000)); + //Thread.Sleep(1); + } + } + }; + + - + var readers = new IPromise[readThreads]; + for (int i = 0; i < readThreads; i++) + readers[i] = AsyncPool.RunThread(reader1); + + var writers = new IPromise[writeThreads]; + for (int i = 0; i < writeThreads; i++) + writers[i] = AsyncPool.RunThread(writer1); + + + new [] { + readers.Bundle().On(() => Console.WriteLine("readers complete in {0} ms", Environment.TickCount - t1)), + writers.Bundle().On(() => Console.WriteLine("writers complete in {0} ms", Environment.TickCount - t1)) + }.Bundle().Join(); + + var t2 = Environment.TickCount; Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) ); } + + } }