| @@ -774,6 +774,79 namespace Implab.Test { | |||||
| 774 | Assert.IsTrue(pSurvive.Join()); |  | 774 | Assert.IsTrue(pSurvive.Join()); | |
| 775 | } |  | 775 | } | |
| 776 | } |  | 776 | } | |
|  | 777 | ||||
|  | 778 | [TestMethod] | |||
|  | 779 | public void SharedLockTest() { | |||
|  | 780 | var l = new SharedLock(); | |||
|  | 781 | int shared = 0; | |||
|  | 782 | int exclusive = 0; | |||
|  | 783 | var s1 = new Signal(); | |||
|  | 784 | var log = new AsyncQueue<string>(); | |||
|  | 785 | ||||
|  | 786 | try { | |||
|  | 787 | AsyncPool.RunThread( | |||
|  | 788 | () => { | |||
|  | 789 | log.Enqueue("Reader #1 started"); | |||
|  | 790 | try { | |||
|  | 791 | l.LockShared(); | |||
|  | 792 | log.Enqueue("Reader #1 lock got"); | |||
|  | 793 | if (Interlocked.Increment(ref shared) == 2) | |||
|  | 794 | s1.Set(); | |||
|  | 795 | s1.Wait(); | |||
|  | 796 | log.Enqueue("Reader #1 finished"); | |||
|  | 797 | Interlocked.Decrement(ref shared); | |||
|  | 798 | } finally { | |||
|  | 799 | l.Release(); | |||
|  | 800 | log.Enqueue("Reader #1 lock released"); | |||
|  | 801 | } | |||
|  | 802 | }, | |||
|  | 803 | () => { | |||
|  | 804 | log.Enqueue("Reader #2 started"); | |||
|  | 805 | ||||
|  | 806 | try { | |||
|  | 807 | l.LockShared(); | |||
|  | 808 | log.Enqueue("Reader #2 lock got"); | |||
|  | 809 | ||||
|  | 810 | if (Interlocked.Increment(ref shared) == 2) | |||
|  | 811 | s1.Set(); | |||
|  | 812 | s1.Wait(); | |||
|  | 813 | log.Enqueue("Reader #2 upgrading to writer"); | |||
|  | 814 | Interlocked.Decrement(ref shared); | |||
|  | 815 | l.Upgrade(); | |||
|  | 816 | log.Enqueue("Reader #2 upgraded"); | |||
|  | 817 | ||||
|  | 818 | Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); | |||
|  | 819 | Assert.AreEqual(0, shared); | |||
|  | 820 | log.Enqueue("Reader #2 finished"); | |||
|  | 821 | Interlocked.Decrement(ref exclusive); | |||
|  | 822 | } finally { | |||
|  | 823 | l.Release(); | |||
|  | 824 | log.Enqueue("Reader #2 lock released"); | |||
|  | 825 | } | |||
|  | 826 | }, | |||
|  | 827 | () => { | |||
|  | 828 | log.Enqueue("Writer #1 started"); | |||
|  | 829 | try { | |||
|  | 830 | l.LockExclusive(); | |||
|  | 831 | log.Enqueue("Writer #1 got the lock"); | |||
|  | 832 | Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); | |||
|  | 833 | Interlocked.Decrement(ref exclusive); | |||
|  | 834 | log.Enqueue("Writer #1 is finished"); | |||
|  | 835 | } finally { | |||
|  | 836 | l.Release(); | |||
|  | 837 | log.Enqueue("Writer #1 lock released"); | |||
|  | 838 | } | |||
|  | 839 | } | |||
|  | 840 | ).Bundle().Join(1000); | |||
|  | 841 | log.Enqueue("Done"); | |||
|  | 842 | } catch(Exception error) { | |||
|  | 843 | log.Enqueue(error.Message); | |||
|  | 844 | throw; | |||
|  | 845 | } finally { | |||
|  | 846 | foreach (var m in log) | |||
|  | 847 | Console.WriteLine(m); | |||
|  | 848 | } | |||
|  | 849 | } | |||
| 777 | } |  | 850 | } | |
| 778 | } |  | 851 | } | |
| 779 |  | 852 | |||
| @@ -8,23 +8,53 namespace Implab.Parallels { | |||||
| 8 | /// </summary> |  | 8 | /// </summary> | |
| 9 | public class SharedLock { |  | 9 | public class SharedLock { | |
| 10 | readonly object m_lock = new object(); |  | 10 | readonly object m_lock = new object(); | |
|  | 11 | // the count of locks currently acquired by clients | |||
| 11 | int m_locks; |  | 12 | int m_locks; | |
|  | 13 | // the count of pending requests for upgrade | |||
|  | 14 | int m_upgrades; | |||
| 12 | bool m_exclusive; |  | 15 | bool m_exclusive; | |
| 13 |  | 16 | |||
| 14 | public bool LockExclusive(int timeout) { |  | 17 | public bool LockExclusive(int timeout) { | |
| 15 | lock (m_lock) { |  | 18 | lock (m_lock) { | |
| 16 | if (m_locks > 0 && !Monitor.Wait(m_lock, timeout)) |  | 19 | var dt = timeout; | |
| 17 | return false; |  | 20 | if (m_locks > m_upgrades) { | |
|  | 21 | var t1 = Environment.TickCount; | |||
|  | 22 | do { | |||
|  | 23 | if (!Monitor.Wait(m_lock, timeout)) | |||
|  | 24 | return false; | |||
|  | 25 | ||||
|  | 26 | if (m_locks == m_upgrades) | |||
|  | 27 | break; | |||
|  | 28 | ||||
|  | 29 | if (timeout > 0) { | |||
|  | 30 | dt = timeout - Environment.TickCount + t1; | |||
|  | 31 | if (dt < 0) | |||
|  | 32 | return false; | |||
|  | 33 | } | |||
|  | 34 | } while(true); | |||
|  | 35 | } | |||
| 18 | m_exclusive = true; |  | 36 | m_exclusive = true; | |
| 19 | m_locks |  | 37 | m_locks ++; | |
| 20 | return true; |  | 38 | return true; | |
| 21 | } |  | 39 | } | |
| 22 | } |  | 40 | } | |
| 23 |  | 41 | |||
| 24 | public void LockExclusive() { |  | 42 | public void LockExclusive() { | |
| 25 | LockExclusive(-1); |  | 43 | lock (m_lock) { | |
|  | 44 | ||||
|  | 45 | while (m_locks > m_upgrades) | |||
|  | 46 | Monitor.Wait(m_lock); | |||
|  | 47 | ||||
|  | 48 | m_exclusive = true; | |||
|  | 49 | m_locks ++; | |||
|  | 50 | } | |||
| 26 | } |  | 51 | } | |
| 27 |  | 52 | |||
|  | 53 | /// <summary> | |||
|  | 54 | /// Acquires a shared lock. | |||
|  | 55 | /// </summary> | |||
|  | 56 | /// <returns><c>true</c>, if the shared lock was acquired, <c>false</c> if the specified timeout was expired.</returns> | |||
|  | 57 | /// <param name="timeout">Timeout.</param> | |||
| 28 | public bool LockShared(int timeout) { |  | 58 | public bool LockShared(int timeout) { | |
| 29 | lock (m_lock) { |  | 59 | lock (m_lock) { | |
| 30 | if (!m_exclusive) { |  | 60 | if (!m_exclusive) { | |
| @@ -32,45 +62,141 namespace Implab.Parallels { | |||||
| 32 | return true; |  | 62 | return true; | |
| 33 | } |  | 63 | } | |
| 34 |  | 64 | |||
| 35 | if (m_locks == |  | 65 | if (m_locks == m_upgrades) { | |
| 36 | m_exclusive = false; |  | 66 | m_exclusive = false; | |
| 37 | m_locks = 1; |  | 67 | m_locks = 1; | |
| 38 | return true; |  | 68 | return true; | |
| 39 | } |  | 69 | } | |
| 40 |  | 70 | |||
| 41 | if (Monitor.Wait(m_lock, timeout)) { |  | 71 | var t1 = Environment.TickCount; | |
| 42 | Debug.Assert(m_locks == 0); |  | 72 | var dt = timeout; | |
| 43 |  |  | 73 | do { | |
|  | 74 | if (!Monitor.Wait(m_lock, dt)) | |||
|  | 75 | return false; | |||
|  | 76 | ||||
|  | 77 | if (m_locks == m_upgrades || !m_exclusive) | |||
|  | 78 | break; | |||
|  | 79 | ||||
|  | 80 | if (timeout >= 0) { | |||
|  | 81 | dt = timeout - Environment.TickCount + t1; | |||
|  | 82 | if (dt < 0) | |||
|  | 83 | return false; | |||
|  | 84 | } | |||
|  | 85 | } while(true); | |||
|  | 86 | ||||
|  | 87 | m_locks ++; | |||
|  | 88 | m_exclusive = false; | |||
|  | 89 | return true; | |||
|  | 90 | } | |||
|  | 91 | } | |||
|  | 92 | ||||
|  | 93 | /// <summary> | |||
|  | 94 | /// Acquires the shared lock. | |||
|  | 95 | /// </summary> | |||
|  | 96 | public void LockShared() { | |||
|  | 97 | lock (m_lock) { | |||
|  | 98 | if (!m_exclusive) { | |||
|  | 99 | m_locks++; | |||
|  | 100 | } else if (m_locks == m_upgrades) { | |||
| 44 | m_exclusive = false; |  | 101 | m_exclusive = false; | |
| 45 |  |  | 102 | m_locks++; | |
|  | 103 | } else { | |||
|  | 104 | while (m_exclusive && m_locks > m_upgrades) | |||
|  | 105 | Monitor.Wait(m_lock); | |||
|  | 106 | ||||
|  | 107 | m_locks++; | |||
|  | 108 | m_exclusive = false; | |||
| 46 | } |  | 109 | } | |
| 47 | return false; |  | |||
| 48 | } |  | 110 | } | |
| 49 | } |  | 111 | } | |
| 50 |  | 112 | |||
| 51 | public void LockShared() { |  | 113 | /// <summary> | |
| 52 | LockShared(-1); |  | 114 | /// Upgrades the current lock to exclusive level. | |
| 53 | } |  | 115 | /// </summary> | |
| 54 |  | 116 | /// <remarks>If the current lock is exclusive already the method does nothing.</remarks> | ||
| 55 | public void |  | 117 | public void Upgrade() { | |
| 56 | lock (m_lock) { |  | 118 | lock (m_lock) { | |
| 57 | if (m_exclusive |  | 119 | if (!m_exclusive) { | |
| 58 | throw new InvalidOperationException(); |  | 120 | ||
| 59 | m_locks |  | 121 | if (m_locks <= m_upgrades) | |
| 60 | if (m_locks == 0) |  | 122 | throw new InvalidOperationException(); | |
| 61 | Monitor.PulseAll(m_lock); |  | 123 | ||
|  | 124 | if (m_locks - m_upgrades == 1) { | |||
|  | 125 | m_exclusive = true; | |||
|  | 126 | } else { | |||
|  | 127 | m_upgrades++; | |||
|  | 128 | ||||
|  | 129 | while (m_locks > m_upgrades) | |||
|  | 130 | Monitor.Wait(m_lock); | |||
|  | 131 | ||||
|  | 132 | m_upgrades--; | |||
|  | 133 | m_exclusive = true; | |||
|  | 134 | } | |||
|  | 135 | } | |||
| 62 | } |  | 136 | } | |
| 63 | } |  | 137 | } | |
| 64 |  | 138 | |||
| 65 | public void ReleaseExclusive() { |  | 139 | /// <summary> | |
|  | 140 | /// Upgrades the current lock to exclusive level. | |||
|  | 141 | /// </summary> | |||
|  | 142 | /// <param name="timeout">Timeout.</param> | |||
|  | 143 | /// <returns><c>true</c> if the current lock was updated, <c>false</c> the specified timeout was expired.</returns> | |||
|  | 144 | /// <remarks>If the current lock is exclusive already the method does nothing.</remarks> | |||
|  | 145 | public bool Upgrade(int timeout) { | |||
| 66 | lock (m_lock) { |  | 146 | lock (m_lock) { | |
| 67 | if ( |  | 147 | if (m_exclusive) | |
|  | 148 | return true; | |||
|  | 149 | if (m_locks <= m_upgrades) | |||
| 68 | throw new InvalidOperationException(); |  | 150 | throw new InvalidOperationException(); | |
| 69 | m_locks = 0; |  | 151 | ||
| 70 | Monitor.PulseAll(m_lock); |  | 152 | if (m_locks - m_upgrades == 1) { | |
|  | 153 | m_exclusive = true; | |||
|  | 154 | } else { | |||
|  | 155 | var t1 = Environment.TickCount; | |||
|  | 156 | var dt = timeout; | |||
|  | 157 | m_upgrades++; | |||
|  | 158 | do { | |||
|  | 159 | if (!Monitor.Wait(m_lock, dt)) { | |||
|  | 160 | m_upgrades--; | |||
|  | 161 | return false; | |||
|  | 162 | } | |||
|  | 163 | ||||
|  | 164 | // we may get there but the shared lock already aquired | |||
|  | 165 | if (m_locks == m_upgrades) | |||
|  | 166 | break; | |||
|  | 167 | ||||
|  | 168 | if (timeout >= 0) { | |||
|  | 169 | dt = timeout - Environment.TickCount + t1; | |||
|  | 170 | if (dt < 0) { | |||
|  | 171 | m_upgrades--; | |||
|  | 172 | return false; | |||
|  | 173 | } | |||
|  | 174 | } | |||
|  | 175 | } while(true); | |||
|  | 176 | m_upgrades--; | |||
|  | 177 | m_exclusive = true; | |||
|  | 178 | } | |||
|  | 179 | return true; | |||
| 71 | } |  | 180 | } | |
| 72 | } |  | 181 | } | |
| 73 |  | 182 | |||
|  | 183 | /// <summary> | |||
|  | 184 | /// Downgrades this lock to shared level. | |||
|  | 185 | /// </summary> | |||
|  | 186 | public void Downgrade() { | |||
|  | 187 | lock (m_lock) | |||
|  | 188 | m_exclusive = false; | |||
|  | 189 | } | |||
|  | 190 | ||||
|  | 191 | /// <summary> | |||
|  | 192 | /// Releases the current lock. | |||
|  | 193 | /// </summary> | |||
|  | 194 | public void Release() { | |||
|  | 195 | lock (m_lock) | |||
|  | 196 | // if no more running threads left | |||
|  | 197 | if (--m_locks == m_upgrades) | |||
|  | 198 | Monitor.PulseAll(m_lock); | |||
|  | 199 | } | |||
| 74 | } |  | 200 | } | |
| 75 | } |  | 201 | } | |
| 76 |  | 202 | |||
| @@ -101,6 +101,11 namespace Implab { | |||||
| 101 | int errors = 0; |  | 101 | int errors = 0; | |
| 102 | var medium = new Promise(); |  | 102 | var medium = new Promise(); | |
| 103 |  | 103 | |||
|  | 104 | if (count == 0) { | |||
|  | 105 | medium.Resolve(); | |||
|  | 106 | return medium; | |||
|  | 107 | } | |||
|  | 108 | ||||
| 104 | medium.On(() => { |  | 109 | medium.On(() => { | |
| 105 | foreach(var p2 in that) |  | 110 | foreach(var p2 in that) | |
| 106 | p2.Cancel(); |  | 111 | p2.Cancel(); | |
| @@ -4,6 +4,7 using Implab.Parallels; | |||||
| 4 | using Implab; |  | 4 | using Implab; | |
| 5 | using System.Collections.Generic; |  | 5 | using System.Collections.Generic; | |
| 6 | using System.Collections.Concurrent; |  | 6 | using System.Collections.Concurrent; | |
|  | 7 | using System.Threading; | |||
| 7 |  | 8 | |||
| 8 | namespace MonoPlay { |  | 9 | namespace MonoPlay { | |
| 9 | class MainClass { |  | 10 | class MainClass { | |
| @@ -11,24 +12,82 namespace MonoPlay { | |||||
| 11 | if (args == null) |  | 12 | if (args == null) | |
| 12 | throw new ArgumentNullException("args"); |  | 13 | throw new ArgumentNullException("args"); | |
| 13 |  | 14 | |||
| 14 | const int count = 10000000; |  | |||
| 15 |  | ||||
| 16 | var t1 = Environment.TickCount; |  | 15 | var t1 = Environment.TickCount; | |
| 17 |  | 16 | |||
| 18 | for (int i = 0; i < count; i++) { |  | 17 | const int reads = 100000; | |
| 19 | var p = new Promise<int>(); |  | 18 | const int writes = 1000; | |
|  | 19 | const int readThreads = 8; | |||
|  | 20 | const int writeThreads = 0; | |||
|  | 21 | ||||
|  | 22 | var l = new SharedLock(); | |||
|  | 23 | var st = new HashSet<int>(); | |||
| 20 |  | 24 | |||
| 21 | p.On(x => {}).On(x => {}); |  | 25 | Action reader1 = () => { | |
|  | 26 | for (int i =0; i < reads; i++) { | |||
|  | 27 | try { | |||
|  | 28 | l.LockShared(); | |||
|  | 29 | st.Contains(i % 1000); | |||
|  | 30 | Thread.Sleep(0); | |||
|  | 31 | } finally { | |||
|  | 32 | l.Release(); | |||
|  | 33 | } | |||
|  | 34 | } | |||
|  | 35 | }; | |||
|  | 36 | ||||
|  | 37 | Action reader2 = () => { | |||
|  | 38 | for(var i = 0; i < reads; i++) | |||
|  | 39 | lock(st) { | |||
|  | 40 | st.Contains(i % 1000); | |||
|  | 41 | Thread.Sleep(0); | |||
|  | 42 | } | |||
|  | 43 | }; | |||
| 22 |  | 44 | |||
| 23 | p.Resolve(i); |  | 45 | Action writer1 = () => { | |
|  | 46 | var rnd = new Random(Environment.TickCount); | |||
|  | 47 | for (int i = 0; i < writes; i++) { | |||
|  | 48 | try { | |||
|  | 49 | l.LockExclusive(); | |||
|  | 50 | st.Add(rnd.Next(1000)); | |||
|  | 51 | //Thread.Sleep(1); | |||
|  | 52 | } finally { | |||
|  | 53 | l.Release(); | |||
|  | 54 | } | |||
|  | 55 | } | |||
|  | 56 | }; | |||
| 24 |  | 57 | |||
| 25 | } |  | 58 | Action writer2 = () => { | |
|  | 59 | var rnd = new Random(Environment.TickCount); | |||
|  | 60 | for (int i = 0; i < writes; i++) { | |||
|  | 61 | lock (st) { | |||
|  | 62 | st.Add(rnd.Next(1000)); | |||
|  | 63 | //Thread.Sleep(1); | |||
|  | 64 | } | |||
|  | 65 | } | |||
|  | 66 | }; | |||
|  | 67 | ||||
|  | 68 | ||||
| 26 |  | 69 | |||
| 27 |  | 70 | var readers = new IPromise[readThreads]; | ||
|  | 71 | for (int i = 0; i < readThreads; i++) | |||
|  | 72 | readers[i] = AsyncPool.RunThread(reader1); | |||
|  | 73 | ||||
|  | 74 | var writers = new IPromise[writeThreads]; | |||
|  | 75 | for (int i = 0; i < writeThreads; i++) | |||
|  | 76 | writers[i] = AsyncPool.RunThread(writer1); | |||
|  | 77 | ||||
|  | 78 | ||||
|  | 79 | new [] { | |||
|  | 80 | readers.Bundle().On(() => Console.WriteLine("readers complete in {0} ms", Environment.TickCount - t1)), | |||
|  | 81 | writers.Bundle().On(() => Console.WriteLine("writers complete in {0} ms", Environment.TickCount - t1)) | |||
|  | 82 | }.Bundle().Join(); | |||
|  | 83 | ||||
|  | 84 | ||||
| 28 |  | 85 | |||
| 29 | var t2 = Environment.TickCount; |  | 86 | var t2 = Environment.TickCount; | |
| 30 | Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) ); |  | 87 | Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) ); | |
| 31 |  | 88 | |||
| 32 | } |  | 89 | } | |
|  | 90 | ||||
|  | 91 | ||||
| 33 | } |  | 92 | } | |
| 34 | } |  | 93 | } | |
        
        General Comments 0
    
    
  
  
                      You need to be logged in to leave comments.
                      Login now
                    
                