1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
"""
This script send replication progress to graphite
"""
from oslo_config import cfg
import os
from os.path import join
import sys
import socket
import datetime
import time
import random
from subprocess import Popen, PIPE
from swift.common.ring import Ring
# from swift.obj.replicator import Stats
# from collections import defaultdict
from six.moves.configparser import ConfigParser
units = {'h': 3600, 'm': 60, 's': 1}
default_cluster = socket.gethostname().split('.')[0].split('-')[0]
# Initialize script common oslo_config
CLI_OPTS = [
cfg.StrOpt('server', short='s', required=True,
help='Graphite server to send data'),
cfg.IntOpt('port', default=2003, short='p',
help='Graphite server port to send data'),
cfg.StrOpt('cluster', default=default_cluster, short='c',
help='Cluster name'),
]
def _prepare_config():
"""
Prepare the oslo_config of scripts by analyse arguments
return: the oslo_config object
"""
CONF = cfg.ConfigOpts()
CONF.register_cli_opts(CLI_OPTS)
return CONF
def netcat(server, port, content):
"""
Send data to graphite server
"""
p = Popen(['/usr/bin/ncat', server, str(port)], stdin=PIPE)
p.communicate(input=content.encode())
def build_replication_jobs(ring, ips):
"""
Helper function for collect_jobs to build jobs for replication
"""
jobs = []
# ring2 = ring
# ring2.all_devs_info = set()
# ring2.stats_for_dev = defaultdict(Stats)
# ring2.all_devs_info.update(
# [(dev['replication_ip'], dev['device'])
# for dev in ring.devs if dev])
data_dir = "/srv/node"
# found_local = False
for local_dev in [device for device in ring.devs if device['ip'] == ips]:
# found_local = True
dev_path = join(data_dir, local_dev['device'])
obj_path = join(dev_path, "objects")
for partition in os.listdir(obj_path):
if (partition.startswith('auditor_status_') and
partition.endswith('.json')):
# ignore auditor status files
continue
part_nodes = None
try:
job_path = join(obj_path, partition)
part_nodes = ring.get_part_nodes(int(partition))
nodes = [node for node in part_nodes
if node['id'] != local_dev['id']]
jobs.append(
dict(path=job_path,
device=local_dev['device'],
nodes=nodes,
partition=partition))
except ValueError:
continue
# if not found_local:
# print("Can't find with ips %s and with port in ring file",
# ", ".join(ips))
return jobs
def collect_jobs(ring):
"""
Returns a sorted list of jobs (dictionaries) that specify the
partitions, nodes, etc to be rsynced.
"""
jobs = []
swift_conf = ConfigParser()
with open("/etc/swift/object-server.conf",
encoding='latin1') as swift_conf_file:
swift_conf.read_file(swift_conf_file)
ips = swift_conf.get('DEFAULT', 'bind_ip')
jobs += build_replication_jobs(ring, ips)
return jobs
def main():
"""
Entry point for the script
"""
args = _prepare_config()
try:
args(sys.argv[1:])
except cfg.RequiredOptError as E:
print(E)
args.print_usage()
sys.exit(1)
name = socket.gethostname().split('.')[0]
date = datetime.datetime.now().timestamp()
try:
ring = Ring("/etc/swift/object.ring.gz")
jobs = collect_jobs(ring)
repl_jobs = []
for job in jobs:
nodes_list = []
nodes_list.append(job['partition'])
nodes_list.append(job['path'])
for node in job['nodes']:
nodes_list.append(node['replication_ip'])
nodes_list.append(node['device'])
if len(nodes_list) == 8:
repl_jobs.append(nodes_list)
netcat(args.server, args.port,
f"{args.cluster}.{name}.replication.handoff "
f"{len(repl_jobs)} {date}\n")
except Exception:
return
try:
n_async = 0
for dir in os.listdir('/srv/node'):
for root, dirs, files in os.walk(os.path.join('/srv/node',
dir,
'async_pending')):
n_async += len(files)
netcat(args.server, args.port,
f"{args.cluster}.{name}.async_pending_total {n_async} {date}\n")
except Exception:
return
line_log = ''
try:
if os.path.isfile("/var/log/swift/swift-object-replicator.log"):
log_path = "/var/log/swift/swift-object-replicator.log"
elif os.path.isfile("/var/log/swift/swift-object.log"):
log_path = "/var/log/swift/swift-object.log"
else:
log_path = "/var/log/swift/object.log"
with open(log_path, 'r') as file:
for line in file:
if line.endswith('remaining)\n'):
line_log = line
except Exception:
return
data = {}
elems = line_log.split()
try:
date = datetime.datetime.strptime(
elems[0], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
name = elems[1]
done, total = elems[3].split('/')
data['parts_remaining'] = int(total) - int(done)
data['duration'] = elems[8].split('.')[0]
data['remaining'] = int(elems[10][:-1]) * units[elems[10][-1]]
time.sleep(random.randint(0, 59))
for k, v in data.items():
netcat(args.server, args.port,
f"{args.cluster}.{name}.replication.{k} {v} {date}\n")
except Exception:
return
if __name__ == "__main__":
sys.exit(main())
|