1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
import base64
import json
from pcs_test.tools.command_env.mock_node_communicator import (
place_multinode_call,
)
class FilesShortcuts:
def __init__(self, calls):
self.__calls = calls
def put_files(
self,
*,
node_labels=None,
pcmk_authkey=None,
corosync_authkey=None,
corosync_conf=None,
pcs_disaster_recovery_conf=None,
pcs_settings_conf=None,
communication_list=None,
name="http.files.put_files",
):
# pylint: disable=too-many-arguments
"""
Create a call for the files distribution to the nodes.
node_labels list -- create success responses from these nodes
pcmk_authkey bytes -- content of pacemaker authkey file
corosync_authkey bytes -- content of corosync authkey file
corosync_conf string -- content of corosync.conf
pcs_disaster_recovery_conf string -- content of pcs DR config
pcs_settings_conf string -- content of pcs_settings.conf
communication_list list -- create custom responses
name string -- the key of this call
"""
input_data = {}
output_data = {}
written_output_dict = dict(
code="written",
message="",
)
if pcmk_authkey:
file_id = "pacemaker_remote authkey"
input_data[file_id] = dict(
data=base64.b64encode(pcmk_authkey).decode("utf-8"),
type="pcmk_remote_authkey",
rewrite_existing=True,
)
output_data[file_id] = written_output_dict
if corosync_authkey:
file_id = "corosync authkey"
input_data[file_id] = dict(
data=base64.b64encode(corosync_authkey).decode("utf-8"),
type="corosync_authkey",
rewrite_existing=True,
)
output_data[file_id] = written_output_dict
if corosync_conf:
file_id = "corosync.conf"
input_data[file_id] = dict(
data=corosync_conf,
type="corosync_conf",
)
output_data[file_id] = written_output_dict
if pcs_disaster_recovery_conf:
file_id = "disaster-recovery config"
input_data[file_id] = dict(
data=base64.b64encode(pcs_disaster_recovery_conf).decode(
"utf-8"
),
type="pcs_disaster_recovery_conf",
rewrite_existing=True,
)
output_data[file_id] = written_output_dict
if pcs_settings_conf:
file_id = "pcs_settings.conf"
input_data[file_id] = dict(
data=pcs_settings_conf,
type="pcs_settings_conf",
rewrite_existing=True,
)
output_data[file_id] = written_output_dict
place_multinode_call(
self.__calls,
name,
node_labels,
communication_list,
action="remote/put_file",
param_list=[("data_json", json.dumps(input_data))],
output=json.dumps(dict(files=output_data)),
)
def remove_files(
self,
node_labels=None,
pcsd_settings=False,
pcs_disaster_recovery_conf=False,
communication_list=None,
name="http.files.remove_files",
):
"""
Create a call for removing the files on the nodes.
node_labels list -- create success responses from these nodes
pcsd_settings bool -- if True, remove file pcsd_settings
pcs_disaster_recovery_conf bool -- if True, remove pcs DR config
communication_list list -- create custom responses
name string -- the key of this call
"""
input_data = {}
output_data = {}
if pcsd_settings:
file_id = "pcsd settings"
input_data[file_id] = dict(type="pcsd_settings")
output_data[file_id] = dict(
code="deleted",
message="",
)
if pcs_disaster_recovery_conf:
file_id = "pcs disaster-recovery config"
input_data[file_id] = dict(type="pcs_disaster_recovery_conf")
output_data[file_id] = dict(
code="deleted",
message="",
)
place_multinode_call(
self.__calls,
name,
node_labels,
communication_list,
action="remote/remove_file",
param_list=[("data_json", json.dumps(input_data))],
output=json.dumps(dict(files=output_data)),
)
|