File: sharded_single.rb

package info (click to toggle)
ruby-sequel 5.63.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 10,408 kB
  • sloc: ruby: 113,747; makefile: 3
file content (113 lines) | stat: -rw-r--r-- 3,619 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# frozen-string-literal: true

# A ShardedSingleConnectionPool is a single threaded connection pool that
# works with multiple shards/servers.
class Sequel::ShardedSingleConnectionPool < Sequel::ConnectionPool
  # The single threaded pool takes the following options:
  #
  # :servers :: A hash of servers to use.  Keys should be symbols.  If not
  #             present, will use a single :default server.
  # :servers_hash :: The base hash to use for the servers.  By default,
  #                  Sequel uses Hash.new(:default).  You can use a hash with a default proc
  #                  that raises an error if you want to catch all cases where a nonexistent
  #                  server is used.
  def initialize(db, opts=OPTS)
    super
    @conns = {}
    @servers = opts.fetch(:servers_hash, Hash.new(:default))
    add_servers([:default])
    add_servers(opts[:servers].keys) if opts[:servers]
  end
  
  # Adds new servers to the connection pool. Primarily used in conjunction with primary/replica
  # or sharded configurations. Allows for dynamic expansion of the potential replicas/shards
  # at runtime. +servers+ argument should be an array of symbols. 
  def add_servers(servers)
    servers.each{|s| @servers[s] = s}
  end

  # Yield all of the currently established connections
  def all_connections
    @conns.values.each{|c| yield c}
  end
  
  # The connection for the given server.
  def conn(server=:default)
    @conns[@servers[server]]
  end
  
  # Disconnects from the database. Once a connection is requested using
  # #hold, the connection is reestablished. Options:
  # :server :: Should be a symbol specifing the server to disconnect from,
  #            or an array of symbols to specify multiple servers.
  def disconnect(opts=OPTS)
    (opts[:server] ? Array(opts[:server]) : servers).each do |s|
      raise Sequel::Error, "invalid server: #{s}" unless @servers.has_key?(s)
      disconnect_server(s)
    end
  end

  def freeze
    @servers.freeze
    super
  end
  
  # Yields the connection to the supplied block for the given server.
  # This method simulates the ConnectionPool#hold API.
  def hold(server=:default)
    server = pick_server(server)
    yield(@conns[server] ||= make_new(server))
  rescue Sequel::DatabaseDisconnectError, *@error_classes => e
    disconnect_server(server) if disconnect_error?(e)
    raise
  end
  
  # The ShardedSingleConnectionPool always has a maximum size of 1.
  def max_size
    1
  end
  
  # Remove servers from the connection pool. Similar to disconnecting from all given servers,
  # except that after it is used, future requests for the server will use the
  # :default server instead.
  def remove_servers(servers)
    raise(Sequel::Error, "cannot remove default server") if servers.include?(:default)
    servers.each do |server|
      disconnect_server(server)
      @servers.delete(server)
    end
  end
  
  # Return an array of symbols for servers in the connection pool.
  def servers
    @servers.keys
  end
  
  # The number of different shards/servers this pool is connected to.
  def size
    @conns.length
  end
  
  def pool_type
    :sharded_single
  end
  
  private
  
  # Disconnect from the given server, if connected.
  def disconnect_server(server)
    if conn = @conns.delete(server)
      disconnect_connection(conn)
    end
  end

  # If the server given is in the hash, return it, otherwise, return the default server.
  def pick_server(server)
    @servers[server]
  end
  
  # Make sure there is a valid connection for each server.
  def preconnect(concurrent = nil)
    servers.each{|s| hold(s){}}
  end
end