1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
|
require "thread"
module Stud
# Public: A thread-safe, generic resource pool.
#
# This class is agnostic as to the resources in the pool. You can put
# database connections, sockets, threads, etc. It's up to you!
#
# Examples:
#
# pool = Pool.new
# pool.add(Sequel.connect("postgres://pg-readonly-1/prod"))
# pool.add(Sequel.connect("postgres://pg-readonly-2/prod"))
# pool.add(Sequel.connect("postgres://pg-readonly-3/prod"))
#
# pool.fetch # => Returns one of the Sequel::Database values from the pool
class Pool
class Error < StandardError; end
# An error indicating a given resource is busy.
class ResourceBusy < Error; end
# An error indicating a given resource is not found.
class NotFound < Error; end
# You performed an invalid action.
class InvalidAction < Error; end
# Default all methods to private. See the bottom of the class definition
# for public method declarations.
private
# Public: initialize a new pool.
#
# max_size - if specified, limits the number of resources allowed in the pool.
def initialize(max_size=nil)
# Available resources
@available = Hash.new
# Busy resources
@busy = Hash.new
# The pool lock
@lock = Mutex.new
# Locks for blocking {#fetch} calls if the pool is full.
@full_lock = Mutex.new
@full_cv = ConditionVariable.new
# Maximum size of this pool.
@max_size = max_size
end # def initialize
# Private: Is this pool size-limited?
#
# Returns true if this pool was created with a max_size. False, otherwise.
def sized?
return !@max_size.nil?
end # def sized?
# Private: Is this pool full?
#
# Returns true if the pool is sized and the count of resources is at maximum.
def full?
return sized? && (count == @max_size)
end # def full?
# Public: the count of resources in the pool
#
# Returns the count of resources in the pool.
def count
return (@busy.size + @available.size)
end # def count
# Public: Add a new resource to this pool.
#
# The resource, once added, is assumed to be available for use.
# That means once you add it, you must not use it unless you receive it from
# {Pool#fetch}
#
# resource - the object resource to add to the pool.
#
# Returns nothing
def add(resource)
@lock.synchronize do
@available[resource.object_id] = resource
end
return nil
end # def add
# Public: Fetch an available resource.
#
# If no resource is available, and the pool is not full, the
# default_value_block will be called and the return value of it used as the
# resource.
#
# If no resource is availabe, and the pool is full, this call will block
# until a resource is available.
#
# Returns a resource ready to be used.
def fetch(&default_value_block)
@lock.synchronize do
object_id, resource = @available.shift
if !resource.nil?
@busy[resource.object_id] = resource
return resource
end
end
@full_lock.synchronize do
if full?
# This should really use a logger.
puts "=> Pool is full and nothing available. Waiting for a release..."
@full_cv.wait(@full_lock)
return fetch(&default_value_block)
end
end
# TODO(sissel): If no block is given, we should block until a resource is
# available.
# If we get here, no resource is available and the pool is not full.
resource = default_value_block.call
# Only add the resource if the default_value_block returned one.
if !resource.nil?
add(resource)
return fetch
end
end # def fetch
# Public: Remove a resource from the pool.
#
# This is useful if the resource is no longer useful. For example, if it is
# a database connection and that connection has failed.
#
# This resource *MUST* be available and not busy.
#
# Raises Pool::NotFound if no such resource is found.
# Raises Pool::ResourceBusy if the resource is found but in use.
def remove(resource)
# Find the object by object_id
#p [:internal, :busy => @busy, :available => @available]
@lock.synchronize do
if available?(resource)
raise InvalidAction, "This resource must be busy for you to remove " \
"it (ie; it must be fetched from the pool)"
end
@busy.delete(resource.object_id)
end
end # def remove
# Private: Verify this resource is in the pool.
#
# You *MUST* call this method only when you are holding @lock.
#
# Returns :available if it is available, :busy if busy, false if not in the pool.
def include?(resource)
if @available.include?(resource.object_id)
return :available
elsif @busy.include?(resource.object_id)
return :busy
else
return false
end
end # def include?
# Private: Is this resource available?
# You *MUST* call this method only when you are holding @lock.
#
# Returns true if this resource is available in the pool.
# Raises NotFound if the resource given is not in the pool at all.
def available?(resource)
case include?(resource)
when :available; return true
when :busy; return false
else; raise NotFound, "No resource, #{resource.inspect}, found in pool"
end
end # def avilable?
# Private: Is this resource busy?
#
# You *MUST* call this method only when you are holding @lock.
#
# Returns true if this resource is busy.
# Raises NotFound if the resource given is not in the pool at all.
def busy?(resource)
return !available?(resource)
end # def busy?
# Public: Release this resource back to the pool.
#
# After you finish using a resource you received with {#fetch}, you must
# release it back to the pool using this method.
#
# Alternately, you can {#remove} it if you want to remove it from the pool
# instead of releasing it.
def release(resource)
@lock.synchronize do
if !include?(resource)
raise NotFound, "No resource, #{resource.inspect}, found in pool"
end
# Release is a no-op if this resource is already available.
#return if available?(resource)
@busy.delete(resource.object_id)
@available[resource.object_id] = resource
# Notify any threads waiting on a resource from the pool.
@full_lock.synchronize { @full_cv.signal }
end
end # def release
public(:add, :remove, :fetch, :release, :sized?, :count, :initialize)
end # class Pool
end # module Stud
|