File: mongo_config.rb

package info (click to toggle)
ruby-mongo 1.10.0-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 1,188 kB
  • ctags: 1,483
  • sloc: ruby: 16,188; makefile: 5
file content (608 lines) | stat: -rwxr-xr-x 18,344 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
#!/usr/bin/env ruby

# Copyright (C) 2009-2013 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'socket'
require 'fileutils'
require 'mongo'
#require 'sfl'

$debug_level = 2
STDOUT.sync = true

def debug(level, arg)
  if level <= $debug_level
    file_line = caller[0][/(.*:\d+):/, 1]
    calling_method = caller[0][/`([^']*)'/, 1]
    puts "#{file_line}:#{calling_method}:#{arg.class == String ? arg : arg.inspect}"
  end
end

#
# Design Notes
#     Configuration and Cluster Management are modularized with the concept that the Cluster Manager
#     can be supplied with any configuration to run.
#     A configuration can be edited, modified, copied into a test file, and supplied to a cluster manager
#     as a parameter.
#
module Mongo
  class Config
    DEFAULT_BASE_OPTS = { :host => 'localhost', :dbpath => 'data', :logpath => 'data/log' }
    DEFAULT_REPLICA_SET = DEFAULT_BASE_OPTS.merge( :replicas => 3, :arbiters => 0 )
    DEFAULT_SHARDED_SIMPLE = DEFAULT_BASE_OPTS.merge( :shards => 2, :configs => 1, :routers => 2 )
    DEFAULT_SHARDED_REPLICA = DEFAULT_SHARDED_SIMPLE.merge( :replicas => 3, :arbiters => 0)

    IGNORE_KEYS = [:host, :command, :_id]
    SHARDING_OPT_KEYS = [:shards, :configs, :routers]
    REPLICA_OPT_KEYS = [:replicas, :arbiters]
    MONGODS_OPT_KEYS = [:mongods]
    CLUSTER_OPT_KEYS = SHARDING_OPT_KEYS + REPLICA_OPT_KEYS + MONGODS_OPT_KEYS

    FLAGS = [:noprealloc, :smallfiles, :logappend, :configsvr, :shardsvr, :quiet, :fastsync, :auth, :ipv6]

    DEFAULT_VERIFIES = 60
    BASE_PORT = 3000
    @@port = BASE_PORT

    def self.configdb(config)
      config[:configs].collect{|c|"#{c[:host]}:#{c[:port]}"}.join(' ')
    end

    def self.cluster(opts = DEFAULT_SHARDED_SIMPLE)
      raise "missing required option" if [:host, :dbpath].any?{|k| !opts[k]}

      config = opts.reject {|k,v| CLUSTER_OPT_KEYS.include?(k)}

      kinds = CLUSTER_OPT_KEYS.select{|key| opts.has_key?(key)} # order is significant

      replica_count = 0

      kinds.each do |kind|
        config[kind] = opts.fetch(kind,1).times.collect do |i| #default to 1 of whatever
          if kind == :shards && opts[:replicas]
            self.cluster(opts.reject{|k,v| SHARDING_OPT_KEYS.include?(k)}.merge(:dbpath => path))
          else
            node = case kind
              when :replicas
                make_replica(opts, replica_count)
              when :arbiters
                make_replica(opts, replica_count)
              when :configs
                make_config(opts)
              when :routers
                make_router(config, opts)
              else
                make_mongod(kind, opts)
            end

            replica_count += 1 if [:replicas, :arbiters].member?(kind)
            node
          end
        end
      end
      config
    end

    def self.make_mongo(kind, opts)
      dbpath  = opts[:dbpath]
      port    = self.get_available_port
      path    = "#{dbpath}/#{kind}-#{port}"
      logpath = "#{path}/#{kind}.log"

      { :host      => opts[:host],
        :port      => port,
        :logpath   => logpath,
        :logappend => true }
    end

    def self.make_mongod(kind, opts)
      params = make_mongo('mongods', opts)

      mongod = ENV['MONGOD'] || 'mongod'
      path   = File.dirname(params[:logpath])

      noprealloc = opts[:noprealloc] || true
      smallfiles = opts[:smallfiles] || true
      quiet      = opts[:quiet]      || true
      fast_sync  = opts[:fastsync]   || false
      auth       = opts[:auth]       || true
      ipv6       = opts[:ipv6].nil? ? true : opts[:ipv6]

      params.merge(:command    => mongod,
                   :dbpath     => path,
                   :smallfiles => smallfiles,
                   :noprealloc => noprealloc,
                   :quiet      => quiet,
                   :fastsync   => fast_sync,
                   :auth       => auth,
                   :ipv6       => ipv6)
    end

    def self.make_replica(opts, id)
      params     = make_mongod('replicas', opts)

      replSet    = opts[:replSet]    || 'ruby-driver-test'
      oplogSize  = opts[:oplog_size] || 5
      keyFile    = opts[:key_file]   || '/test/fixtures/auth/keyfile'

      keyFile    = Dir.pwd << keyFile
      system "chmod 600 #{keyFile}"

      params.merge(:_id       => id,
                   :replSet   => replSet,
                   :oplogSize => oplogSize,
                   :keyFile   => keyFile)
    end

    def self.make_config(opts)
      params = make_mongod('configs', opts)
      params.merge(:configsvr => nil)
    end

    def self.make_router(config, opts)
      params = make_mongo('routers', opts)
      mongos = ENV['MONGOS'] || 'mongos'

      params.merge(
        :command => mongos,
        :configdb => self.configdb(config)
      )
    end

    def self.port_available?(port)
      ret = false
      socket = Socket.new(Socket::Constants::AF_INET, Socket::Constants::SOCK_STREAM, 0)
      socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1)
      sockaddr = Socket.sockaddr_in(port, '0.0.0.0')
      begin
        socket.bind(sockaddr)
        ret = true
      rescue Exception
      end
      socket.close
      ret
    end

    def self.get_available_port
      while true
        port = @@port
        @@port += 1
        break if port_available?(port)
      end
      port
    end

    class SysProc
      attr_reader :pid, :cmd

      def initialize(cmd = nil)
        @pid = nil
        @cmd = cmd
      end

      def clear_zombie
        if @pid
          begin
            pid = Process.waitpid(@pid, Process::WNOHANG)
          rescue Errno::ECHILD
            # JVM might have already reaped the exit status
          end
          @pid = nil if pid && pid > 0
        end
      end

      def start(verifies = 0)
        clear_zombie
        return @pid if running?
        begin
          # redirection not supported in jruby
          if defined?(RUBY_ENGINE) && RUBY_ENGINE == 'jruby'
            @pid = Process.spawn(*@cmd)
          else
            cmd_and_opts = [@cmd, {:out => '/dev/null'}].flatten
            @pid = Process.spawn(*cmd_and_opts)
          end
          verify(verifies) if verifies > 0
          @pid
        end
      end

      def stop
        kill
        wait
      end

      def kill(signal_no = 2)
        begin
          @pid && Process.kill(signal_no, @pid) && true
        rescue Errno::ESRCH
          false
        end
        # cleanup lock if unclean shutdown
        begin
          File.delete(File.join(@config[:dbpath], 'mongod.lock')) if @config[:dbpath]
        rescue Errno::ENOENT
        end
      end

      def wait
        begin
          Process.waitpid(@pid) if @pid
        rescue Errno::ECHILD
          # JVM might have already reaped the exit status
        end
        @pid = nil
      end

      def running?
        begin
          @pid && Process.kill(0, @pid) && true
        rescue Errno::ESRCH
          false
        end
      end

      def verify(verifies = DEFAULT_VERIFIES)
        verifies.times do |i|
          return @pid if running?
          sleep 1
        end
        nil
      end
    end

    class Server < SysProc
      attr_reader :host, :port

      def initialize(cmd = nil, host = nil, port = nil)
        super(cmd)
        @host = host
        @port = port
      end

      def host_port
        [@host, @port].join(':')
      end

      def host_port_a # for old format
        [@host, @port]
      end
    end

    class DbServer < Server
      attr_accessor :config

      def initialize(config)
        @config = config
        dbpath = @config[:dbpath]
        [dbpath, File.dirname(@config[:logpath])].compact.each{|dir| FileUtils.mkdir_p(dir) unless File.directory?(dir) }
        command = @config[:command] || 'mongod'
        params = @config.reject{|k,v| IGNORE_KEYS.include?(k)}
        arguments = params.sort{|a, b| a[0].to_s <=> b[0].to_s}.collect do |arg, value| # sort block is needed for 1.8.7 which lacks Symbol#<=>
          argument = '--' + arg.to_s
          if FLAGS.member?(arg) && value == true
            [argument]
          elsif !FLAGS.member?(arg)
            [argument, value.to_s]
          end
        end
        cmd = [command, arguments].flatten.compact
        super(cmd, @config[:host], @config[:port])
      end

      def start(verifies = DEFAULT_VERIFIES)
        super(verifies)
        verify(verifies)
      end

      def verify(verifies = 600)
        verifies.times do |i|
          #puts "DbServer.verify via connection probe - port:#{@port.inspect} iteration:#{i} @pid:#{@pid.inspect} kill:#{Process.kill(0, @pid).inspect} running?:#{running?.inspect} cmd:#{cmd.inspect}"
          begin
            raise Mongo::ConnectionFailure unless running?
            Mongo::MongoClient.new(@host, @port).close
            #puts "DbServer.verified via connection - port: #{@port} iteration: #{i}"
            return @pid
          rescue Mongo::ConnectionFailure
            sleep 1
          end
        end
        system "ps -fp #{@pid}; cat #{@config[:logpath]}"
        raise Mongo::ConnectionFailure, "DbServer.start verify via connection probe failed - port:#{@port.inspect} @pid:#{@pid.inspect} kill:#{Process.kill(0, @pid).inspect} running?:#{running?.inspect} cmd:#{cmd.inspect}"
      end

    end

    class ClusterManager
      attr_reader :config
      def initialize(config)
        @config = config
        @servers = {}
        Mongo::Config::CLUSTER_OPT_KEYS.each do |key|
          @servers[key] = @config[key].collect{|conf| DbServer.new(conf)} if @config[key]
        end
      end

      def servers(key = nil)
        @servers.collect{|k,v| (!key || key == k) ? v : nil}.flatten.compact
      end

      def command( cmd_servers, db_name, cmd, opts = {} )
        ret = []
        cmd = cmd.class == Array ? cmd : [ cmd ]
        debug 3, "ClusterManager.command cmd:#{cmd.inspect}"
        cmd_servers = cmd_servers.class == Array ? cmd_servers : [cmd_servers]
        cmd_servers.each do |cmd_server|
          debug 3, cmd_server.inspect
          cmd_server = cmd_server.config if cmd_server.is_a?(DbServer)
          client = Mongo::MongoClient.new(cmd_server[:host], cmd_server[:port])
          cmd.each do |c|
            debug 3,  "ClusterManager.command c:#{c.inspect}"
            response = client[db_name].command( c, opts )
            debug 3,  "ClusterManager.command response:#{response.inspect}"
            raise Mongo::OperationFailure, "c:#{c.inspect} opts:#{opts.inspect} failed" unless response["ok"] == 1.0 || opts.fetch(:check_response, true) == false
            ret << response
          end
          client.close
        end
        debug 3, "command ret:#{ret.inspect}"
        ret.size == 1 ? ret.first : ret
      end

      def repl_set_get_status
        command( @config[:replicas], 'admin', { :replSetGetStatus => 1 }, {:check_response => false } )
      end

      def repl_set_get_config
        host, port = primary_name.split(":")
        client = Mongo::MongoClient.new(host, port)
        client['local']['system.replset'].find_one
      end

      def repl_set_config
        members = []
        @config[:replicas].each{|s| members << { :_id => s[:_id], :host => "#{s[:host]}:#{s[:port]}", :tags => { :node => s[:_id].to_s } } }
        @config[:arbiters].each{|s| members << { :_id => s[:_id], :host => "#{s[:host]}:#{s[:port]}", :arbiterOnly => true } }
        {
          :_id => @config[:replicas].first[:replSet],
          :members => members
        }
      end

      def repl_set_initiate( cfg = nil )
        command( @config[:replicas].first, 'admin', { :replSetInitiate => cfg || repl_set_config } )
      end

      def repl_set_startup
        states     = nil
        healthy    = false

        60.times do
          # enter the thunderdome...
          states  = repl_set_get_status.zip(repl_set_is_master)
          healthy = states.all? do |status, is_master|
            # check replica set status for member list
            next unless status['ok'] == 1.0 && (members = status['members'])

            # ensure all replica set members are in a valid state
            next unless members.all? { |m| [1,2,7].include?(m['state']) }

            # check for primary replica set member
            next unless (primary = members.find { |m| m['state'] == 1 })

            # check replica set member optimes
            primary_optime = primary['optime'].seconds
            next unless primary_optime && members.all? do |m|
              m['state'] == 7 || primary_optime - m['optime'].seconds < 5
            end

            # check replica set state
            case status['myState']
              when 1
                is_master['ismaster']  == true &&
                is_master['secondary'] == false
              when 2
                is_master['ismaster']  == false &&
                is_master['secondary'] == true
              when 7
                is_master['ismaster']  == false &&
                is_master['secondary'] == false
            end
          end

          return healthy if healthy
          sleep(1)
        end

        raise Mongo::OperationFailure,
          "replSet startup failed - status: #{states.inspect}"
      end

      def repl_set_seeds
        @config[:replicas].collect{|node| "#{node[:host]}:#{node[:port]}"}
      end

      def repl_set_seeds_old
        @config[:replicas].collect{|node| [node[:host], node[:port]]}
      end

      def repl_set_seeds_uri
        repl_set_seeds.join(',')
      end

      def repl_set_name
        @config[:replicas].first[:replSet]
      end

      def member_names_by_state(state)
        states = Array(state)
        # Any status with a REMOVED node won't have the full cluster state
        status = repl_set_get_status.find {|status| status['members'].find {|m| m['state'] == 'REMOVED'}.nil?}
        status['members'].find_all{|member| states.index(member['state']) }.collect{|member| member['name']}
      end

      def primary_name
        member_names_by_state(1).first
      end

      def secondary_names
        member_names_by_state(2)
      end

      def replica_names
        member_names_by_state([1,2])
      end

      def arbiter_names
        member_names_by_state(7)
      end

      def members_by_name(names)
        names.collect do |name|
          member_by_name(name)
        end.compact
      end

      def member_by_name(name)
        servers.find{|server| server.host_port == name}
      end

      def primary
        members_by_name([primary_name]).first
      end

      def secondaries
        members_by_name(secondary_names)
      end

      def stop_primary
        primary.stop
      end

      def stop_secondary
        secondaries[rand(secondaries.length)].stop
      end

      def replicas
        members_by_name(replica_names)
      end

      def arbiters
        members_by_name(arbiter_names)
      end

      def config_names_by_kind(kind)
        @config[kind].collect{|conf| "#{conf[:host]}:#{conf[:port]}"}
      end

      def shards
        members_by_name(config_names_by_kind(:shards))
      end

      def repl_set_reconfig(new_config)
        new_config['version'] = repl_set_get_config['version'] + 1
        command( primary, 'admin', { :replSetReconfig => new_config } )
        repl_set_startup
      end

      def repl_set_remove_node(state = [1,2])
        names = member_names_by_state(state)
        name = names[rand(names.length)]

        @config[:replicas].delete_if{|node| "#{node[:host]}:#{node[:port]}" == name}
        repl_set_reconfig(repl_set_config)
      end

      def repl_set_add_node
      end

      def configs
        members_by_name(config_names_by_kind(:configs))
      end

      def routers
        members_by_name(config_names_by_kind(:routers))
      end

      def mongos_seeds
        config_names_by_kind(:routers)
      end

      def ismaster(servers)
        command( servers, 'admin', { :ismaster => 1 } )
      end

      def sharded_cluster_is_master
        ismaster(@config[:routers])
      end

      def repl_set_is_master
        ismaster(@config[:replicas])
      end

      def addshards(shards = @config[:shards])
        command( @config[:routers].first, 'admin', Array(shards).collect{|s| { :addshard => "#{s[:host]}:#{s[:port]}" } } )
      end

      def listshards
        command( @config[:routers].first, 'admin', { :listshards => 1 } )
      end

      def enablesharding( dbname )
        command( @config[:routers].first, 'admin', { :enablesharding => dbname } )
      end

      def shardcollection( namespace, key, unique = false )
        command( @config[:routers].first, 'admin', { :shardcollection => namespace, :key => key, :unique => unique } )
      end

      def mongos_discover # can also do @config[:routers] find but only want mongos for connections
        (@config[:configs]).collect do |cmd_server|
          client = Mongo::MongoClient.new(cmd_server[:host], cmd_server[:port])
          result = client['config']['mongos'].find.to_a
          client.close
          result
        end
      end

      def start
        # Must start configs before mongos -- hash order not guaranteed on 1.8.X
        servers(:configs).each{|server| server.start}
        servers.each{|server| server.start}
        # TODO - sharded replica sets - pending
        if @config[:replicas]
          repl_set_initiate if repl_set_get_status.first['startupStatus'] == 3
          repl_set_startup
        end
        if @config[:routers]
          addshards if listshards['shards'].size == 0
        end
        self
      end
      alias :restart :start

      def stop
        servers.each{|server| server.stop}
        self
      end

      def clobber
        FileUtils.rm_rf @config[:dbpath]
        self
      end
    end

  end
end