1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
|
"""Ruckus AbcSession which connects to Ruckus Unleashed or ZoneDirector backups"""
import configparser
import io
import struct
import tarfile
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from os import SEEK_CUR
from typing import Any, Mapping, TYPE_CHECKING
from .abcsession import AbcSession, ConfigItem
if TYPE_CHECKING:
from .ruckusbackupapi import RuckusBackupApi
class BackupSession(AbcSession):
"""Connect to Ruckus Unleashed or ZoneDirector Backup"""
def __init__(
self,
backup_path: str
) -> None:
super().__init__()
self.backup_file = self.open_backup(backup_path)
self.backup_tarfile = tarfile.open(fileobj = self.backup_file)
def __enter__(self) -> "BackupSession":
return self
def __exit__(self, *exc: Any) -> None:
if self.backup_tarfile:
self.backup_tarfile.close()
if self.backup_file:
self.backup_file.close()
def open_backup(self, backup_path: str) -> io.BytesIO:
"""Return the decrypted backup bytes"""
with open(backup_path, "rb") as backup_file:
magic = backup_file.read(4)
if magic == b'RKSF':
return self.__open_commscope_backup(backup_file)
else:
backup_file.seek(0)
return self.__open_tac_backup(backup_file)
@classmethod
def __decrypt_key(cls, cipher_bytes: bytes) -> bytes:
padded_key = pow(int.from_bytes(cipher_bytes, 'big'), 65537, 23559046888044776627569879690471525499427612616504460325607886880157810091042540109382540840072568820382270758180649018860535002041926018790203547085546162549326945200443019963900872654422143820799219291504478283808912964667353808795633808052022964371726410677357834881346022671448243831605466569511830964339444687659616502868745663064525218488470606514409811838671765944249166136071060850237167429125523755638111097424494275181385870987411479009552515816450089719197508371290305110717762578033949377936003949760003095430389967102852124783026450284389704957901428442687247403657819155956894836033683283023293306459081).to_bytes(256, 'big')
return padded_key[padded_key.index(b'\x00', 2) + 1:]
def __open_tac_backup(self, backup_file: io.BufferedReader) -> io.BytesIO:
"""Return the decrypted TAC backup file"""
(xor_int, xor_flip) = struct.unpack('QQ', b')\x1aB\x05\xbd,\xd6\xf25\xad\xb8\xe0?T\xc58')
struct_int8 = struct.Struct('Q')
output_file = io.BytesIO()
previous_input_int = 0
input_data = backup_file.read()
for input_int in struct.unpack_from(str(len(input_data) // 8) + 'Q', input_data):
output_bytes = struct_int8.pack(previous_input_int ^ xor_int ^ input_int)
xor_int ^= xor_flip
previous_input_int = input_int
output_file.write(output_bytes)
output_file.seek(0)
return output_file
@classmethod
def __skip_block(cls, backup_file: io.BufferedReader) -> None:
backup_file.seek(1, SEEK_CUR)
block_length = int.from_bytes(backup_file.read(4), byteorder='big', signed=False)
backup_file.seek(block_length, SEEK_CUR)
@classmethod
def __get_block_length(cls, backup_file: io.BufferedReader) -> bytes:
backup_file.seek(1, SEEK_CUR)
return int.from_bytes(backup_file.read(4), byteorder='big', signed=False)
def __open_commscope_backup(self, backup_file: io.BufferedReader) -> io.BytesIO:
"""Return the decrypted CommScope Content Manager backup file"""
backup_file.seek(4, SEEK_CUR)
encrypted_key = backup_file.read(self.__get_block_length(backup_file))
key = self.__decrypt_key(encrypted_key)
self.__skip_block(backup_file) # digest
self.__skip_block(backup_file) # signature
decrypted_length = self.__get_block_length(backup_file)
encrypted_bytes = backup_file.read()
cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend())
decryptor = cipher.decryptor()
decrypted_bytes = decryptor.update(encrypted_bytes) + decryptor.finalize()
output_file = io.BytesIO()
output_file.write(decrypted_bytes[:decrypted_length])
output_file.seek(0)
return output_file
@classmethod
def create(cls, backup_path: str) -> "BackupSession":
"""Create a default ClientSession & use this to create a BackupSession instance"""
return BackupSession(backup_path)
@property
def api(self) -> "RuckusBackupApi":
"""Return a RuckusBackupApi instance."""
if not self._api:
# pylint: disable=import-outside-toplevel
from .ruckusbackupapi import RuckusBackupApi
self._api = RuckusBackupApi(self)
return self._api
async def get_conf_str(self, item: ConfigItem, timeout: int | None = None) -> str:
xml = self._get_backup_file(f"etc/airespider/{item.value}.xml")
return "<ajax-response><response>" + xml + "</response></ajax-response>"
def get_metadata(self) -> Mapping[str, str]:
"""Return the backup metadata"""
xml = "[metadata]\n" + self._get_backup_file("metadata")
config = configparser.ConfigParser()
config.read_string(xml)
return config["metadata"]
def _get_backup_file(self, member: str) -> str:
"""Extract a file from the backup and return its contents"""
return self.backup_tarfile.extractfile(member).read().decode("utf-8")
|