1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
|
module MessagePack
class Factory
# see ext for other methods
def register_type(type, klass, options = { packer: :to_msgpack_ext, unpacker: :from_msgpack_ext })
raise FrozenError, "can't modify frozen MessagePack::Factory" if frozen?
if options
options = options.dup
case packer = options[:packer]
when nil, Proc
# all good
when String, Symbol
options[:packer] = packer.to_sym.to_proc
when Method
options[:packer] = packer.to_proc
when packer.respond_to?(:call)
options[:packer] = packer.method(:call).to_proc
else
raise ::TypeError, "expected :packer argument to be a callable object, got: #{packer.inspect}"
end
case unpacker = options[:unpacker]
when nil, Proc
# all good
when String, Symbol
options[:unpacker] = klass.method(unpacker).to_proc
when Method
options[:unpacker] = unpacker.to_proc
when packer.respond_to?(:call)
options[:unpacker] = unpacker.method(:call).to_proc
else
raise ::TypeError, "expected :unpacker argument to be a callable object, got: #{unpacker.inspect}"
end
end
register_type_internal(type, klass, options)
end
# [ {type: id, class: Class(or nil), packer: arg, unpacker: arg}, ... ]
def registered_types(selector=:both)
packer, unpacker = registered_types_internal
# packer: Class -> [tid, proc, _flags]
# unpacker: tid -> [klass, proc, _flags]
list = []
case selector
when :both
packer.each_pair do |klass, ary|
type = ary[0]
packer_proc = ary[1]
unpacker_proc = nil
if unpacker.has_key?(type)
unpacker_proc = unpacker.delete(type)[1]
end
list << {type: type, class: klass, packer: packer_proc, unpacker: unpacker_proc}
end
# unpacker definition only
unpacker.each_pair do |type, ary|
list << {type: type, class: ary[0], packer: nil, unpacker: ary[1]}
end
when :packer
packer.each_pair do |klass, ary|
if ary[1]
list << {type: ary[0], class: klass, packer: ary[1]}
end
end
when :unpacker
unpacker.each_pair do |type, ary|
if ary[1]
list << {type: type, class: ary[0], unpacker: ary[1]}
end
end
else
raise ArgumentError, "invalid selector #{selector}"
end
list.sort{|a, b| a[:type] <=> b[:type] }
end
def type_registered?(klass_or_type, selector=:both)
case klass_or_type
when Class
klass = klass_or_type
registered_types(selector).any?{|entry| klass <= entry[:class] }
when Integer
type = klass_or_type
registered_types(selector).any?{|entry| type == entry[:type] }
else
raise ArgumentError, "class or type id"
end
end
def load(src, param = nil)
unpacker = nil
if src.is_a? String
unpacker = unpacker(param)
unpacker.feed(src)
else
unpacker = unpacker(src, param)
end
unpacker.full_unpack
end
alias :unpack :load
def dump(v, *rest)
packer = packer(*rest)
packer.write(v)
packer.full_pack
end
alias :pack :dump
def pool(size = 1, **options)
Pool.new(
frozen? ? self : dup.freeze,
size,
options.empty? ? nil : options,
)
end
class Pool
if RUBY_ENGINE == "ruby"
class MemberPool
def initialize(size, &block)
@size = size
@new_member = block
@members = []
end
def with
member = @members.pop || @new_member.call
begin
yield member
ensure
# If the pool is already full, we simply drop the extra member.
# This is because contrary to a connection pool, creating an extra instance
# is extremely unlikely to cause some kind of resource exhaustion.
#
# We could cycle the members (keep the newer one) but first It's more work and second
# the older member might have been created pre-fork, so it might be at least partially
# in shared memory.
if member && @members.size < @size
member.reset
@members << member
end
end
end
end
else
class MemberPool
def initialize(size, &block)
@size = size
@new_member = block
@members = []
@mutex = Mutex.new
end
def with
member = @mutex.synchronize { @members.pop } || @new_member.call
begin
yield member
ensure
member.reset
@mutex.synchronize do
if member && @members.size < @size
@members << member
end
end
end
end
end
end
def initialize(factory, size, options = nil)
options = nil if !options || options.empty?
@factory = factory
@packers = MemberPool.new(size) { factory.packer(options).freeze }
@unpackers = MemberPool.new(size) { factory.unpacker(options).freeze }
end
def load(data)
@unpackers.with do |unpacker|
unpacker.feed(data)
unpacker.full_unpack
end
end
def dump(object)
@packers.with do |packer|
packer.write(object)
packer.full_pack
end
end
def unpacker(&block)
@unpackers.with(&block)
end
def packer(&block)
@packers.with(&block)
end
end
end
end
|