File: memcached_manager.rb

package info (click to toggle)
ruby-dalli 3.2.8-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 684 kB
  • sloc: ruby: 6,552; sh: 20; makefile: 4
file content (151 lines) | stat: -rw-r--r-- 4,045 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# frozen_string_literal: true

require 'etc'

##
# Utility module for spinning up memcached instances locally, and generating a corresponding
# Dalli::Client to access the local instance.  Supports access via TCP and UNIX domain socket.
##
module MemcachedManager
  # TODO: This is all UNIX specific.  To support
  # running CI on Windows we'll need to conditionally
  # define a Windows equivalent
  PATH_PREFIXES = [
    '',
    '/usr/local/bin/',
    '/opt/local/bin/',
    '/usr/bin/'
  ].freeze

  MEMCACHED_CMD = 'memcached'
  MEMCACHED_VERSION_CMD = "#{MEMCACHED_CMD} -h | head -1"
  MEMCACHED_VERSION_REGEXP = /^memcached (\d\.\d\.\d+)/.freeze
  MEMCACHED_MIN_MAJOR_VERSION = ::Dalli::MIN_SUPPORTED_MEMCACHED_VERSION

  @running_pids = {}

  def self.start_and_flush_with_retry(port_or_socket, args = '', client_options = {})
    retry_count = 0
    loop do
      return start_and_flush(port_or_socket, args, client_options, flush: retry_count.zero?)
    rescue StandardError => e
      MemcachedManager.failed_start(port_or_socket)
      retry_count += 1
      raise e if retry_count >= 3
    end
  end

  def self.start_and_flush(port_or_socket, args = '', client_options = {}, flush: true)
    MemcachedManager.start(port_or_socket, args)
    dc = client_for_port_or_socket(port_or_socket, client_options)
    dc.flush_all if flush
    dc
  end

  def self.client_for_port_or_socket(port_or_socket, client_options)
    is_unix = port_or_socket.to_i.zero?
    servers_arg = is_unix ? port_or_socket : ["localhost:#{port_or_socket}", "127.0.0.1:#{port_or_socket}"]
    Dalli::Client.new(servers_arg, client_options)
  end

  def self.start(port_or_socket, args)
    cmd_with_args, key = cmd_with_args(port_or_socket, args)

    @running_pids[key] ||= begin
      pid = IO.popen(cmd_with_args).pid
      at_exit do
        kill_and_wait(pid)
      rescue Errno::ECHILD, Errno::ESRCH
        # Ignore errors
      end
      sleep 0.1
      pid
    end
  end

  def self.stop(port_or_socket)
    pid = @running_pids.delete(port_or_socket)
    return unless pid

    begin
      kill_and_wait(pid)
    rescue Errno::ECHILD, Errno::ESRCH => e
      puts e.inspect
    end
  end

  def self.kill_and_wait(pid)
    Process.kill('TERM', pid)
    Process.wait(pid)
  end

  def self.failed_start(port_or_socket)
    @running_pids[port_or_socket] = nil
  end

  def self.parse_port_or_socket(port)
    return "-p #{port}", port.to_i unless port.to_i.zero?

    # unix socket
    begin
      File.delete(port)
    rescue Errno::ENOENT
      # Ignore errors
    end
    ["-s #{port}", port]
  end

  def self.cmd
    @cmd ||= determine_cmd
  end

  def self.version
    return @version unless @version.nil?

    cmd
    @version
  end

  MIN_META_VERSION = '1.6'
  def self.supported_protocols
    return [] unless version

    version > MIN_META_VERSION ? %i[binary meta] : %i[binary]
  end

  META_DELETE_CAS_FIX_PATCH_VERSION = '13'
  def self.supports_delete_cas?(protocol)
    return true unless protocol == :meta

    return false unless version > MIN_META_VERSION

    minor_patch_delimiter = version.index('.', 2)
    minor_version = version[0...minor_patch_delimiter]
    return true if minor_version > MIN_META_VERSION

    patch_version = version[minor_patch_delimiter + 1..]

    patch_version >= META_DELETE_CAS_FIX_PATCH_VERSION
  end

  def self.cmd_with_args(port_or_socket, args)
    socket_arg, key = parse_port_or_socket(port_or_socket)
    ["#{cmd} #{args} #{socket_arg}", key]
  end

  def self.determine_cmd
    PATH_PREFIXES.each do |prefix|
      output = `#{prefix}#{MEMCACHED_VERSION_CMD}`.strip
      next unless output && output =~ MEMCACHED_VERSION_REGEXP

      version = Regexp.last_match(1)
      next unless version > MEMCACHED_MIN_MAJOR_VERSION

      @version = version
      puts "Found #{output} in #{prefix.empty? ? 'PATH' : prefix}"
      return "#{prefix}#{MEMCACHED_CMD} -u #{Etc.getpwuid.name}"
    end

    raise Errno::ENOENT, "Unable to find memcached #{MEMCACHED_MIN_MAJOR_VERSION}+ locally"
  end
end