1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
|
import filecmp
import glob
import os
import re
import shutil
import subprocess
import sys
import tempfile
import time
# Various bool functions to help identify the host environment
def is_redhat_based() -> bool:
return os.path.exists('/etc/redhat-release')
def is_debian_based() -> bool:
return os.path.exists('/etc/debian_version')
def is_arch_linux() -> bool:
return os.path.exists('/etc/arch-release')
def is_ubuntu() -> bool:
if not is_debian_based():
return False
try:
with open('/etc/os-release') as f:
for line in f:
if line.startswith('ID=ubuntu'):
return True
except Exception:
pass
return False
def is_linux() -> bool:
return is_debian_based() or is_redhat_based() or is_arch_linux()
def is_macos() -> bool:
return sys.platform == 'darwin'
def is_windows() -> bool:
return sys.platform == 'win32'
def is_cmake_installed() -> bool:
try:
subprocess.run(['cmake', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.CalledProcessError:
return False
return True
def is_rpmbuild_installed() -> bool:
if not is_redhat_based():
return False
try:
subprocess.run(['rpmbuild', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.CalledProcessError:
return False
return True
def is_dpkg_installed() -> bool:
if not is_debian_based():
return False
try:
subprocess.run(['dpkg', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.CalledProcessError:
return False
return True
def is_dotnet_installed() -> bool:
try:
subprocess.run(['dotnet', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def is_wix_installed() -> bool:
# For installation, see https://cmake.org/cmake/help/latest/cpack_gen/wix.html#wix-net-tools
try:
subprocess.run(['wix', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def is_msbuild_installed() -> bool:
if sys.platform == 'Windows':
try:
result = subprocess.run(['vswhere', '-latest', '-products', '*', '-requires', 'Microsoft.Component.MSBuild', '-find', 'MSBuild\\**\\Bin\\MSBuild.exe'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
msbuild_path = result.stdout.decode().strip()
if msbuild_path:
subprocess.run([msbuild_path, '-version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
return False
def is_brew_installed() -> bool:
try:
subprocess.run(['brew', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
return False
def is_arm64_toolchain_installed() -> bool:
if is_linux():
try:
subprocess.run(['aarch64-linux-gnu-gcc', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
elif is_windows:
if not is_msbuild_installed():
return False
# TODO - Add tool specific detection for Windows/MacOS
return sys.platform.machine().lower() in ['aarch64', 'arm64']
def is_x86_64_toolchain_installed() -> bool:
if is_linux():
try:
subprocess.run(['x86_64-linux-gnu-gcc', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
elif is_windows:
if not is_msbuild_installed():
return False
# TODO - Add more tool specific detection for Windows/MacOS
return sys.platform.machine().lower() in ['amd64', 'x86_64']
def is_i386_toolchain_installed() -> bool:
if is_linux():
try:
subprocess.run(['i686-linux-gnu-gcc', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
# TODO - Add tool specific detection for Windows/MacOS
return sys.platform.machine().lower() in ['i386', 'i686']
# Utility function to list all assets expected for a release.
def get_release_assets(version:str) -> list:
"""
Return the list of assets expected for a release.
This is used for CI.
TA-Lib maintainers should modify this list everytime an assets is added/removed
"""
return [
f'ta-lib-{version}-src.tar.gz',
f'ta-lib-{version}-windows-x86_64.msi',
f'ta-lib-{version}-windows-x86_64.zip',
f'ta-lib-{version}-windows-x86_32.msi',
f'ta-lib-{version}-windows-x86_32.zip',
f'ta-lib_{version}_amd64.deb',
f'ta-lib_{version}_arm64.deb',
f'ta-lib_{version}_i386.deb',
]
# Utility functions to identify the gen_code generated files.
def get_src_generated_files() -> list:
"""
Return the list of generated files and directories.
This is only for what is expected in the src.tar.gz package.
Everything under a directory ('**') and file glob allowed ('*')
See get_all_generated_files() for more...
This is used for CI.
TA-Lib maintainers should update this list everytime a new file is generated (or not).
"""
return [
'include/ta_func.h',
'include/ta_defs.h',
'src/ta_func/*.c',
'src/ta_abstract/*.c',
'src/ta_abstract/frames/*.c',
'src/ta_abstract/frames/*.h',
'src/ta_common/ta_retcode.c',
'src/ta_abstract/ta_java_defs.h',
]
def get_all_generated_files() -> list:
"""
Returns list of all generated files and directories.
Everything under a directory ('**') and file glob allowed ('*')
This is used for CI.
TA-Lib maintainers should update this list everytime a new file is generated (or not).
"""
return [
'swig/src/interface/ta_func.swg',
'dotnet/src/Core/TA-Lib-Core.vcproj',
'dotnet/src/Core/TA-Lib-Core.h',
'ide/msvc/lib_proj/ta_func/ta_func.dsp',
'java/src/**',
] + get_src_generated_files()
def expand_globs(root_dir: str, file_list: list) -> list:
"""
Expand glob patterns in the file list to actual file paths.
"""
expanded_files = []
for file in file_list:
# Use recursive globbing if '**' is in the pattern
if '**' in file:
expanded_files.extend(glob.glob(os.path.join(root_dir, file), recursive=True))
else:
expanded_files.extend(glob.glob(os.path.join(root_dir, file)))
return expanded_files
def run_command(command: list) -> str:
"""Run a shell command and return the output."""
try:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
print(f"stdout for '{' '.join(command)}': {result.stdout}")
print(f"stderr for '{' '.join(command)}': {result.stderr}")
# If result.stderr contains the string "CPack Error: Problem running WiX.", then
# print the content of the log file at the specified path.
if "CPack Error: Problem running WiX." in result.stderr:
log_file_match = re.search(r"'([^']+)'", result.stderr)
if log_file_match:
log_file = log_file_match.group(1)
if os.path.exists(log_file):
print(f"Contents of {log_file}:")
with open(log_file, 'r') as f:
print(f.read())
sys.exit(1)
except subprocess.CalledProcessError as e:
print(f"Error during '{' '.join(command)}': {e}")
sys.exit(1)
return result.stdout.strip()
def run_command_term(command: list):
"""
Similar to run_command, but let it print its output to the terminal instead of capturing.
Also, exit if the command fails.
"""
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
print(f"Error during '{' '.join(command)}': {e}")
sys.exit(1)
def run_command_sudo(command: list, sudo_pwd: str=''):
"""
Run a command with sudo, optionally using a password if provided.
Will exit the script if calling the command fails or exit code != 0.
"""
try:
if sudo_pwd:
# Pipeline the password to sudo
process = subprocess.Popen(['sudo', '-S'] + command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate(input=f'{sudo_pwd}\n'.encode())
if process.returncode != 0:
print(f"Error during 'sudo {' '.join(command)}': {stderr.decode()}")
sys.exit(1)
return stdout.decode().strip()
else:
return run_command(['sudo'] + command)
except subprocess.CalledProcessError as e:
print(f"Error during 'sudo {' '.join(command)}': {e}")
sys.exit(1)
except Exception as e:
print(f"Unexpected error during 'sudo {' '.join(command)}': {e}")
sys.exit(1)
return f"Unexpected error: {' '.join(command)}"
def create_temp_dir(root_dir) -> str:
# Create a temporary directory under root_dir/temp, also purge older ones.
#
# Return the path of the newly created directory.
# Delete oldest directories if more than 10 exists and it is more
# than 1 hour old.
temp_root_dir = os.path.join(root_dir, "temp")
os.makedirs(temp_root_dir, exist_ok=True)
temp_dirs = sorted(os.listdir(temp_root_dir), key=lambda x: os.path.getctime(os.path.join(temp_root_dir, x)))
if len(temp_dirs) > 10:
for i in range(len(temp_dirs) - 10):
temp_dir_path = os.path.join(temp_root_dir, temp_dirs[i])
if os.path.isdir(temp_dir_path) and (time.time() - os.path.getctime(temp_dir_path)) > 3600:
shutil.rmtree(temp_dir_path)
# Create the new temp directory
return tempfile.mkdtemp(dir=temp_root_dir)
def verify_git_repo() -> str:
# Verify that the script is called from within a ta-lib git repos, and if yes
# change the working directory to the root of it.
#
# That root path is returned.
try:
subprocess.run(['git', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except subprocess.CalledProcessError:
print("Git is not installed. Please install Git and try again.")
sys.exit(1)
error = False
try:
result = subprocess.run(['git', 'rev-parse', '--is-inside-work-tree'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.stdout.strip() == b'true':
# Change to the root directory of the Git repository
root_dir = subprocess.run(['git', 'rev-parse', '--show-toplevel'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.strip().decode('utf-8')
os.chdir(root_dir)
return root_dir
else:
error = True
except subprocess.CalledProcessError:
error = True
if error:
print("Must run this script while the current directory is in a TA-Lib Git repository.")
sys.exit(1)
# Sanity check that src/ta_func exists.
if not os.path.isdir('src/ta_func'):
print("Current directory is not a complete TA-Lib Git repository (src/ta_func missing)")
sys.exit(1)
def verify_git_repo_original() -> str:
# Similar to verify_git_repo, but checks additionally if running from
# the original repos (will return false if a fork).
#
# Some operations (e.g. publishing the official TA-Lib on brew) should
# be attempted to be done exclusively with the original repos.
#
root_dir = verify_git_repo()
try:
# Get the remote URL of the repository
result = subprocess.run(
["git", "config", "--get", "remote.origin.url"],
capture_output=True,
text=True,
check=True
)
remote_url = result.stdout.strip()
# Normalize the URL to a common format (account for ssh and https differences).
normalized_url = re.sub(r'^git@github\.com:', 'https://github.com/', remote_url)
normalized_url = re.sub(r'\.git$', '', normalized_url).lower()
# print(f"Remote URL: {normalized_url}")
# Check if the normalized URL matches the original ta-lib repository URL
if normalized_url.endswith("ta-lib/ta-lib"):
return root_dir
except subprocess.CalledProcessError:
print("Error: Could not determine the remote URL of the repository.")
sys.exit(1)
print("This script performs no operation when run from a fork")
sys.exit(0)
def are_generated_files_git_changed(root_dir: str) -> bool:
# Using git, verify if any of the generated files have changed.
#
# root_dir must be the root of the TA-Lib Git repository.
original_dir = os.getcwd()
os.chdir(root_dir)
try:
result = subprocess.run(['git', 'diff', '--exit-code', 'HEAD', '--'] + get_all_generated_files(), check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return result.returncode != 0
except subprocess.CalledProcessError as e:
print(f"Error: {e}")
return False
finally:
os.chdir(original_dir)
return False
def copy_file_list(src_dir: str, dest_dir: str, file_list: list):
# Copy the files and directory to dest_dir.
#
# The file list can include whole directories ('**') and file glob ('*').
#
# 'dest_dir' can then be used later to detect changes
# with compare_dir.
# Delete all contents in dest_dir, but not dest_dir itself.
if os.path.exists(dest_dir):
for root, dirs, files in os.walk(dest_dir):
for file in files:
os.remove(os.path.join(root, file))
for dir in dirs:
shutil.rmtree(os.path.join(root, dir))
os.makedirs(dest_dir, exist_ok=True)
expanded_files = expand_globs(src_dir, file_list)
for src_file in expanded_files:
dest_file = os.path.join(dest_dir, os.path.relpath(src_file, src_dir))
os.makedirs(os.path.dirname(dest_file), exist_ok=True)
if os.path.isdir(src_file):
shutil.copytree(src_file, dest_file, dirs_exist_ok=True)
else:
shutil.copy(src_file, dest_file)
def compare_dir(dir1: str, dir2: str) -> bool:
# Detect any difference in files or directory.
# For files, also does a binary comparison.
# Recursively check subdirectories.
dircmp = filecmp.dircmp(dir1, dir2)
differences_found = False
if dircmp.left_only:
print(f"Files only in {dir1}: {dircmp.left_only}")
differences_found = True
if dircmp.right_only:
print(f"Files only in {dir2}: {dircmp.right_only}")
differences_found = True
if dircmp.diff_files:
print(f"Files that differ: {dircmp.diff_files}")
differences_found = True
if dircmp.funny_files:
print(f"Files that could not be compared: {dircmp.funny_files}")
differences_found = True
for subdir in dircmp.common_dirs:
if not compare_dir(os.path.join(dir1, subdir), os.path.join(dir2, subdir)):
differences_found = True
return not differences_found
|