File: send_replication_progress.py

package info (click to toggle)
swift-tools 0.0.24
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 184 kB
  • sloc: python: 1,105; sh: 168; makefile: 14
file content (183 lines) | stat: -rw-r--r-- 5,969 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
"""
This script send replication progress to graphite
"""
from oslo_config import cfg
import os
from os.path import join
import sys
import socket
import datetime
import time
import random
from subprocess import Popen, PIPE
from swift.common.ring import Ring
# from swift.obj.replicator import Stats
# from collections import defaultdict
from six.moves.configparser import ConfigParser

units = {'h': 3600, 'm': 60, 's': 1}
default_cluster = socket.gethostname().split('.')[0].split('-')[0]
# Initialize script common oslo_config
CLI_OPTS = [
    cfg.StrOpt('server', short='s', required=True,
               help='Graphite server to send data'),
    cfg.IntOpt('port', default=2003, short='p',
               help='Graphite server port to send data'),
    cfg.StrOpt('cluster', default=default_cluster, short='c',
               help='Cluster name'),
]


def _prepare_config():
    """
    Prepare the oslo_config of scripts by analyse arguments
    return: the oslo_config object
    """
    CONF = cfg.ConfigOpts()
    CONF.register_cli_opts(CLI_OPTS)
    return CONF


def netcat(server, port, content):
    """
    Send data to graphite server
    """
    p = Popen(['/usr/bin/ncat', server, str(port)], stdin=PIPE)
    p.communicate(input=content.encode())


def build_replication_jobs(ring, ips):
    """
    Helper function for collect_jobs to build jobs for replication
    """
    jobs = []
    # ring2 = ring
    # ring2.all_devs_info = set()
    # ring2.stats_for_dev = defaultdict(Stats)
    # ring2.all_devs_info.update(
    #     [(dev['replication_ip'], dev['device'])
    #         for dev in ring.devs if dev])
    data_dir = "/srv/node"
    # found_local = False
    for local_dev in [device for device in ring.devs if device['ip'] == ips]:
        # found_local = True
        dev_path = join(data_dir, local_dev['device'])
        obj_path = join(dev_path, "objects")
        for partition in os.listdir(obj_path):
            if (partition.startswith('auditor_status_') and
                    partition.endswith('.json')):
                # ignore auditor status files
                continue

            part_nodes = None
            try:
                job_path = join(obj_path, partition)
                part_nodes = ring.get_part_nodes(int(partition))
                nodes = [node for node in part_nodes
                         if node['id'] != local_dev['id']]
                jobs.append(
                    dict(path=job_path,
                         device=local_dev['device'],
                         nodes=nodes,
                         partition=partition))
            except ValueError:
                continue
    # if not found_local:
    #     print("Can't find with ips %s and with port in ring file",
    #           ", ".join(ips))
    return jobs


def collect_jobs(ring):
    """
    Returns a sorted list of jobs (dictionaries) that specify the
    partitions, nodes, etc to be rsynced.
    """
    jobs = []
    swift_conf = ConfigParser()
    with open("/etc/swift/object-server.conf",
              encoding='latin1') as swift_conf_file:
        swift_conf.read_file(swift_conf_file)
    ips = swift_conf.get('DEFAULT', 'bind_ip')
    jobs += build_replication_jobs(ring, ips)
    return jobs


def main():
    """
    Entry point for the script
    """
    args = _prepare_config()
    try:
        args(sys.argv[1:])
    except cfg.RequiredOptError as E:
        print(E)
        args.print_usage()
        sys.exit(1)
    name = socket.gethostname().split('.')[0]
    date = datetime.datetime.now().timestamp()
    try:
        ring = Ring("/etc/swift/object.ring.gz")
        jobs = collect_jobs(ring)
        repl_jobs = []
        for job in jobs:
            nodes_list = []
            nodes_list.append(job['partition'])
            nodes_list.append(job['path'])
            for node in job['nodes']:
                nodes_list.append(node['replication_ip'])
                nodes_list.append(node['device'])
                if len(nodes_list) == 8:
                    repl_jobs.append(nodes_list)
        netcat(args.server, args.port,
               f"{args.cluster}.{name}.replication.handoff "
               f"{len(repl_jobs)} {date}\n")
    except Exception:
        return

    try:
        n_async = 0
        for dir in os.listdir('/srv/node'):
            for root, dirs, files in os.walk(os.path.join('/srv/node',
                                                          dir,
                                                          'async_pending')):
                n_async += len(files)
        netcat(args.server, args.port,
               f"{args.cluster}.{name}.async_pending_total {n_async} {date}\n")
    except Exception:
        return
    line_log = ''
    try:
        if os.path.isfile("/var/log/swift/swift-object-replicator.log"):
            log_path = "/var/log/swift/swift-object-replicator.log"
        elif os.path.isfile("/var/log/swift/swift-object.log"):
            log_path = "/var/log/swift/swift-object.log"
        else:
            log_path = "/var/log/swift/object.log"

        with open(log_path, 'r') as file:
            for line in file:
                if line.endswith('remaining)\n'):
                    line_log = line
    except Exception:
        return
    data = {}
    elems = line_log.split()
    try:
        date = datetime.datetime.strptime(
                        elems[0], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
        name = elems[1]
        done, total = elems[3].split('/')
        data['parts_remaining'] = int(total) - int(done)
        data['duration'] = elems[8].split('.')[0]
        data['remaining'] = int(elems[10][:-1]) * units[elems[10][-1]]
        time.sleep(random.randint(0, 59))
        for k, v in data.items():
            netcat(args.server, args.port,
                   f"{args.cluster}.{name}.replication.{k} {v} {date}\n")
    except Exception:
        return


if __name__ == "__main__":
    sys.exit(main())