File: synchronize.rb

package info (click to toggle)
ruby-moneta 1.6.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,776 kB
  • sloc: ruby: 13,201; sh: 178; makefile: 7
file content (120 lines) | stat: -rw-r--r-- 3,025 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
module Moneta
  # Base class for {Mutex} and {Semaphore}
  # @abstract
  class SynchronizePrimitive
    # Synchronize block
    #
    # @api public
    # @yieldparam Synchronized block
    # @return [Object] result of block
    def synchronize
      enter
      yield
    ensure
      leave
    end

    # Try to enter critical section (nonblocking)
    #
    # @return [Boolean] true if the lock was acquired
    def try_enter
      raise 'Already locked' if @locked
      enter_primitive ? @locked = true : false
    end
    alias try_lock try_enter

    # Enter critical section (blocking)
    #
    # @param [Number] timeout Maximum time to wait
    # @param [Number] wait Sleep time between tries to acquire lock
    # @return [Boolean] true if the lock was aquired
    def enter(timeout = nil, wait = 0.01)
      time_at_timeout = Time.now + timeout if timeout
      while !timeout || Time.now < time_at_timeout
        return true if try_enter
        sleep(wait)
      end
      false
    end
    alias lock enter

    # Leave critical section
    def leave
      raise 'Not locked' unless @locked
      leave_primitive
      @locked = false
      nil
    end
    alias unlock leave

    # Is the lock acquired?
    def locked?
      @locked
    end
  end

  # Distributed/shared store-wide mutex
  #
  # @example Use `Moneta::Mutex`
  #     mutex = Moneta::Mutex.new(store, 'mutex')
  #     mutex.synchronize do
  #       # Synchronized access
  #       store['counter'] += 1
  #     end
  #
  # @api public
  class Mutex < SynchronizePrimitive
    # @param [Moneta store] store The store we want to lock
    # @param [Object] lock Key of the lock entry
    def initialize(store, lock)
      raise 'Store must support feature :create' unless store.supports?(:create)
      @store, @lock = store, lock
    end

    protected

    def enter_primitive
      @store.create(@lock, '', expires: false)
    end

    def leave_primitive
      @store.delete(@lock)
    end
  end

  # Distributed/shared store-wide semaphore
  #
  # @example Use `Moneta::Semaphore`
  #     semaphore = Moneta::Semaphore.new(store, 'semaphore', 2)
  #     semaphore.synchronize do
  #       # Synchronized access
  #       # ...
  #     end
  #
  # @api public
  class Semaphore < SynchronizePrimitive
    # @param [Moneta store] store The store we want to lock
    # @param [Object] counter Key of the counter entry
    # @param [Integer] max Maximum number of threads which are allowed to enter the critical section
    def initialize(store, counter, max = 1)
      raise 'Store must support feature :increment' unless store.supports?(:increment)
      @store, @counter, @max = store, counter, max
      @store.increment(@counter, 0, expires: false) # Ensure that counter exists
    end

    protected

    def enter_primitive
      if @store.increment(@counter, 1) <= @max
        true
      else
        @store.decrement(@counter)
        false
      end
    end

    def leave_primitive
      @store.decrement(@counter)
    end
  end
end