File: common.py

package info (click to toggle)
ta-lib 0.6.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 7,684 kB
  • sloc: ansic: 87,070; xml: 6,420; python: 1,806; makefile: 302; sh: 151
file content (443 lines) | stat: -rw-r--r-- 15,999 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
import filecmp
import glob
import os
import re
import shutil
import subprocess
import sys
import tempfile
import time

# Various bool functions to help identify the host environment
def is_redhat_based() -> bool:
    return os.path.exists('/etc/redhat-release')

def is_debian_based() -> bool:
    return os.path.exists('/etc/debian_version')

def is_arch_linux() -> bool:
    return os.path.exists('/etc/arch-release')

def is_ubuntu() -> bool:
    if not is_debian_based():
        return False
    try:
        with open('/etc/os-release') as f:
            for line in f:
                if line.startswith('ID=ubuntu'):
                    return True
    except Exception:
        pass
    return False

def is_linux() -> bool:
    return is_debian_based() or is_redhat_based() or is_arch_linux()

def is_macos() -> bool:
    return sys.platform == 'darwin'

def is_windows() -> bool:
    return sys.platform == 'win32'

def is_cmake_installed() -> bool:
    try:
        subprocess.run(['cmake', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    except subprocess.CalledProcessError:
        return False
    return True

def is_rpmbuild_installed() -> bool:
    if not is_redhat_based():
        return False
    try:
        subprocess.run(['rpmbuild', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    except subprocess.CalledProcessError:
        return False
    return True

def is_dpkg_installed() -> bool:
    if not is_debian_based():
        return False
    try:
        subprocess.run(['dpkg', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    except subprocess.CalledProcessError:
        return False
    return True

def is_dotnet_installed() -> bool:
    try:
        subprocess.run(['dotnet', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        return True
    except (subprocess.CalledProcessError, FileNotFoundError):
        return False

def is_wix_installed() -> bool:
    # For installation, see https://cmake.org/cmake/help/latest/cpack_gen/wix.html#wix-net-tools
    try:
        subprocess.run(['wix', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        return True
    except (subprocess.CalledProcessError, FileNotFoundError):
        return False

def is_msbuild_installed() -> bool:
    if sys.platform == 'Windows':
        try:
            result = subprocess.run(['vswhere', '-latest', '-products', '*', '-requires', 'Microsoft.Component.MSBuild', '-find', 'MSBuild\\**\\Bin\\MSBuild.exe'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            msbuild_path = result.stdout.decode().strip()
            if msbuild_path:
                subprocess.run([msbuild_path, '-version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                return True
        except (subprocess.CalledProcessError, FileNotFoundError):
            return False
    return False

def is_brew_installed() -> bool:
    try:
        subprocess.run(['brew', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        return True
    except (subprocess.CalledProcessError, FileNotFoundError):
        return False
    return False

def is_arm64_toolchain_installed() -> bool:
    if is_linux():
        try:
            subprocess.run(['aarch64-linux-gnu-gcc', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            return True
        except (subprocess.CalledProcessError, FileNotFoundError):
            return False
    elif is_windows:
        if not is_msbuild_installed():
            return False

    # TODO - Add tool specific detection for Windows/MacOS

    return sys.platform.machine().lower() in ['aarch64', 'arm64']

def is_x86_64_toolchain_installed() -> bool:
    if is_linux():
        try:
            subprocess.run(['x86_64-linux-gnu-gcc', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            return True
        except (subprocess.CalledProcessError, FileNotFoundError):
            return False
    elif is_windows:
        if not is_msbuild_installed():
            return False

    # TODO - Add more tool specific detection for Windows/MacOS

    return sys.platform.machine().lower() in ['amd64', 'x86_64']

def is_i386_toolchain_installed() -> bool:
    if is_linux():
        try:
            subprocess.run(['i686-linux-gnu-gcc', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            return True
        except (subprocess.CalledProcessError, FileNotFoundError):
            return False

    # TODO - Add tool specific detection for Windows/MacOS
    return sys.platform.machine().lower() in ['i386', 'i686']

# Utility function to list all assets expected for a release.
def get_release_assets(version:str) -> list:
    """
    Return the list of assets expected for a release.

    This is used for CI.

    TA-Lib maintainers should modify this list everytime an assets is added/removed
    """
    return [
        f'ta-lib-{version}-src.tar.gz',
        f'ta-lib-{version}-windows-x86_64.msi',
        f'ta-lib-{version}-windows-x86_64.zip',
        f'ta-lib-{version}-windows-x86_32.msi',
        f'ta-lib-{version}-windows-x86_32.zip',
        f'ta-lib_{version}_amd64.deb',
        f'ta-lib_{version}_arm64.deb',
        f'ta-lib_{version}_i386.deb',
    ]

# Utility functions to identify the gen_code generated files.
def get_src_generated_files() -> list:
    """
    Return the list of generated files and directories.

    This is only for what is expected in the src.tar.gz package.

    Everything under a directory ('**') and file glob allowed ('*')

    See get_all_generated_files() for more...

    This is used for CI.

    TA-Lib maintainers should update this list everytime a new file is generated (or not).
    """
    return [
        'include/ta_func.h',
        'include/ta_defs.h',
        'src/ta_func/*.c',
        'src/ta_abstract/*.c',
        'src/ta_abstract/frames/*.c',
        'src/ta_abstract/frames/*.h',
        'src/ta_common/ta_retcode.c',
        'src/ta_abstract/ta_java_defs.h',
    ]

def get_all_generated_files() -> list:
    """
    Returns list of all generated files and directories.
    Everything under a directory ('**') and file glob allowed ('*')

    This is used for CI.

    TA-Lib maintainers should update this list everytime a new file is generated (or not).
    """
    return [
        'swig/src/interface/ta_func.swg',
        'dotnet/src/Core/TA-Lib-Core.vcproj',
        'dotnet/src/Core/TA-Lib-Core.h',
        'ide/msvc/lib_proj/ta_func/ta_func.dsp',
        'java/src/**',
    ]  + get_src_generated_files()

def expand_globs(root_dir: str, file_list: list) -> list:
    """
    Expand glob patterns in the file list to actual file paths.
    """
    expanded_files = []
    for file in file_list:
        # Use recursive globbing if '**' is in the pattern
        if '**' in file:
            expanded_files.extend(glob.glob(os.path.join(root_dir, file), recursive=True))
        else:
            expanded_files.extend(glob.glob(os.path.join(root_dir, file)))
    return expanded_files


def run_command(command: list) -> str:
    """Run a shell command and return the output."""
    try:
        result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        if result.returncode != 0:
            print(f"stdout for '{' '.join(command)}': {result.stdout}")
            print(f"stderr for '{' '.join(command)}': {result.stderr}")
            # If result.stderr contains the string "CPack Error: Problem running WiX.", then
            # print the content of the log file at the specified path.
            if "CPack Error: Problem running WiX." in result.stderr:
                log_file_match = re.search(r"'([^']+)'", result.stderr)
                if log_file_match:
                    log_file = log_file_match.group(1)
                    if os.path.exists(log_file):
                        print(f"Contents of {log_file}:")
                        with open(log_file, 'r') as f:
                            print(f.read())
            sys.exit(1)
    except subprocess.CalledProcessError as e:
        print(f"Error during '{' '.join(command)}': {e}")
        sys.exit(1)

    return result.stdout.strip()

def run_command_term(command: list):
    """
    Similar to run_command, but let it print its output to the terminal instead of capturing.
    Also, exit if the command fails.
    """
    try:
        subprocess.run(command, check=True)
    except subprocess.CalledProcessError as e:
        print(f"Error during '{' '.join(command)}': {e}")
        sys.exit(1)

def run_command_sudo(command: list, sudo_pwd: str=''):
    """
    Run a command with sudo, optionally using a password if provided.
    Will exit the script if calling the command fails or exit code != 0.
    """
    try:
        if sudo_pwd:
            # Pipeline the password to sudo
            process = subprocess.Popen(['sudo', '-S'] + command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout, stderr = process.communicate(input=f'{sudo_pwd}\n'.encode())
            if process.returncode != 0:
                print(f"Error during 'sudo {' '.join(command)}': {stderr.decode()}")
                sys.exit(1)
            return stdout.decode().strip()
        else:
            return run_command(['sudo'] + command)
    except subprocess.CalledProcessError as e:
        print(f"Error during 'sudo {' '.join(command)}': {e}")
        sys.exit(1)
    except Exception as e:
        print(f"Unexpected error during 'sudo {' '.join(command)}': {e}")
        sys.exit(1)

    return f"Unexpected error: {' '.join(command)}"


def create_temp_dir(root_dir) -> str:
    # Create a temporary directory under root_dir/temp, also purge older ones.
    #
    # Return the path of the newly created directory.

    # Delete oldest directories if more than 10 exists and it is more
    # than 1 hour old.
    temp_root_dir = os.path.join(root_dir, "temp")
    os.makedirs(temp_root_dir, exist_ok=True)
    temp_dirs = sorted(os.listdir(temp_root_dir), key=lambda x: os.path.getctime(os.path.join(temp_root_dir, x)))
    if len(temp_dirs) > 10:
        for i in range(len(temp_dirs) - 10):
            temp_dir_path = os.path.join(temp_root_dir, temp_dirs[i])
            if os.path.isdir(temp_dir_path) and (time.time() - os.path.getctime(temp_dir_path)) > 3600:
                shutil.rmtree(temp_dir_path)

    # Create the new temp directory
    return tempfile.mkdtemp(dir=temp_root_dir)

def verify_git_repo() -> str:
    # Verify that the script is called from within a ta-lib git repos, and if yes
    # change the working directory to the root of it.
    #
    # That root path is returned.
    try:
        subprocess.run(['git', '--version'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    except subprocess.CalledProcessError:
        print("Git is not installed. Please install Git and try again.")
        sys.exit(1)

    error = False
    try:
        result = subprocess.run(['git', 'rev-parse', '--is-inside-work-tree'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if result.stdout.strip() == b'true':
            # Change to the root directory of the Git repository
            root_dir = subprocess.run(['git', 'rev-parse', '--show-toplevel'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.strip().decode('utf-8')
            os.chdir(root_dir)
            return root_dir
        else:
            error = True
    except subprocess.CalledProcessError:
        error = True

    if error:
        print("Must run this script while the current directory is in a TA-Lib Git repository.")
        sys.exit(1)

    # Sanity check that src/ta_func exists.
    if not os.path.isdir('src/ta_func'):
        print("Current directory is not a complete TA-Lib Git repository (src/ta_func missing)")
        sys.exit(1)

def verify_git_repo_original() -> str:
    # Similar to verify_git_repo, but checks additionally if running from
    # the original repos (will return false if a fork).
    #
    # Some operations (e.g. publishing the official TA-Lib on brew) should
    # be attempted to be done exclusively with the original repos.
    #
    root_dir = verify_git_repo()

    try:
        # Get the remote URL of the repository
        result = subprocess.run(
            ["git", "config", "--get", "remote.origin.url"],
            capture_output=True,
            text=True,
            check=True
        )
        remote_url = result.stdout.strip()

        # Normalize the URL to a common format (account for ssh and https differences).
        normalized_url = re.sub(r'^git@github\.com:', 'https://github.com/', remote_url)
        normalized_url = re.sub(r'\.git$', '', normalized_url).lower()

        # print(f"Remote URL: {normalized_url}")
        # Check if the normalized URL matches the original ta-lib repository URL
        if normalized_url.endswith("ta-lib/ta-lib"):
            return root_dir

    except subprocess.CalledProcessError:
        print("Error: Could not determine the remote URL of the repository.")
        sys.exit(1)

    print("This script performs no operation when run from a fork")
    sys.exit(0)

def are_generated_files_git_changed(root_dir: str) -> bool:
    # Using git, verify if any of the generated files have changed.
    #
    # root_dir must be the root of the TA-Lib Git repository.
    original_dir = os.getcwd()
    os.chdir(root_dir)

    try:
        result = subprocess.run(['git', 'diff', '--exit-code', 'HEAD', '--'] + get_all_generated_files(), check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        return result.returncode != 0

    except subprocess.CalledProcessError as e:
        print(f"Error: {e}")
        return False

    finally:
        os.chdir(original_dir)
    return False

def copy_file_list(src_dir: str, dest_dir: str, file_list: list):
    # Copy the files and directory to dest_dir.
    #
    # The file list can include whole directories ('**') and file glob ('*').
    #
    # 'dest_dir' can then be used later to detect changes
    # with compare_dir.

    # Delete all contents in dest_dir, but not dest_dir itself.
    if os.path.exists(dest_dir):
        for root, dirs, files in os.walk(dest_dir):
            for file in files:
                os.remove(os.path.join(root, file))
            for dir in dirs:
                shutil.rmtree(os.path.join(root, dir))

    os.makedirs(dest_dir, exist_ok=True)

    expanded_files = expand_globs(src_dir, file_list)
    for src_file in expanded_files:
        dest_file = os.path.join(dest_dir, os.path.relpath(src_file, src_dir))
        os.makedirs(os.path.dirname(dest_file), exist_ok=True)
        if os.path.isdir(src_file):
            shutil.copytree(src_file, dest_file, dirs_exist_ok=True)
        else:
            shutil.copy(src_file, dest_file)

def compare_dir(dir1: str, dir2: str) -> bool:
    # Detect any difference in files or directory.
    # For files, also does a binary comparison.
    # Recursively check subdirectories.
    dircmp = filecmp.dircmp(dir1, dir2)

    differences_found = False

    if dircmp.left_only:
        print(f"Files only in {dir1}: {dircmp.left_only}")
        differences_found = True

    if dircmp.right_only:
        print(f"Files only in {dir2}: {dircmp.right_only}")
        differences_found = True

    if dircmp.diff_files:
        print(f"Files that differ: {dircmp.diff_files}")
        differences_found = True

    if dircmp.funny_files:
        print(f"Files that could not be compared: {dircmp.funny_files}")
        differences_found = True

    for subdir in dircmp.common_dirs:
        if not compare_dir(os.path.join(dir1, subdir), os.path.join(dir2, subdir)):
            differences_found = True

    return not differences_found