1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054
|
""" WorkQueueExecutor utilizes the Work Queue distributed framework developed by the
Cooperative Computing Lab (CCL) at Notre Dame to provide a fault-tolerant,
high-throughput system for delegating Parsl tasks to thousands of remote machines
"""
import hashlib
import inspect
import itertools
import logging
import multiprocessing
import os
import pickle
import queue
import shutil
import socket
import subprocess
import tempfile
import threading
import time
from collections import namedtuple
from concurrent.futures import Future
from ctypes import c_bool
from typing import Dict, List, Optional, Set, Union
import typeguard
import parsl.utils as putils
from parsl.data_provider.files import File
from parsl.data_provider.staging import Staging
from parsl.errors import OptionalModuleMissing
from parsl.executors.errors import ExecutorError, InvalidResourceSpecification
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.executors.workqueue import exec_parsl_function
from parsl.monitoring.radios.base import RadioConfig
from parsl.monitoring.radios.filesystem import FilesystemRadio
from parsl.multiprocessing import SpawnContext, SpawnProcess
from parsl.process_loggers import wrap_with_logs
from parsl.providers import CondorProvider, LocalProvider
from parsl.providers.base import ExecutionProvider
from parsl.serialize import deserialize, pack_apply_message
from parsl.utils import setproctitle
from .errors import WorkQueueFailure, WorkQueueTaskFailure
IMPORT_EXCEPTION = None
try:
from ndcctools import work_queue as wq
from ndcctools.work_queue import (
WORK_QUEUE_ALLOCATION_MODE_MAX_THROUGHPUT,
WORK_QUEUE_DEFAULT_PORT,
WorkQueue,
)
except ImportError as e:
_work_queue_enabled = False
WORK_QUEUE_DEFAULT_PORT = 0
IMPORT_EXCEPTION = e
else:
_work_queue_enabled = True
package_analyze_script = shutil.which("python_package_analyze")
package_create_script = shutil.which("python_package_create")
package_run_script = shutil.which("python_package_run")
logger = logging.getLogger(__name__)
# Support structure to communicate parsl tasks to the work queue submit thread.
ParslTaskToWq = namedtuple(
'ParslTaskToWq',
'id '
'category '
'cores memory disk gpus priority running_time_min env_pkg map_file function_file result_file input_files output_files')
# Support structure to communicate final status of work queue tasks to parsl
# if result_received is True:
# result_file is the path to the file containing the result.
# if result_received is False:
# reason and status are only valid if result_received is False
# result_file is None
WqTaskToParsl = namedtuple('WqTaskToParsl', 'id result_received result_file reason status')
# Support structure to report parsl filenames to work queue.
# parsl_name is the local_name or filepath attribute of a parsl file object.
# stage tells whether the file should be copied by work queue to the workers.
# cache tells whether the file should be cached at workers. Only valid if stage == True
ParslFileToWq = namedtuple('ParslFileToWq', 'parsl_name stage cache')
class WorkQueueExecutor(BlockProviderExecutor, putils.RepresentationMixin):
"""Executor to use Work Queue batch system
The WorkQueueExecutor system utilizes the Work Queue framework to
efficiently delegate Parsl apps to remote machines in clusters and
grids using a fault-tolerant system. Users can run the
work_queue_worker program on remote machines to connect to the
WorkQueueExecutor, and Parsl apps will then be sent out to these
machines for execution and retrieval.
Parameters
----------
label: str
A human readable label for the executor, unique
with respect to other Work Queue master programs.
Default is "WorkQueueExecutor".
working_dir: str
Location for Parsl to perform app delegation to the Work
Queue system. Defaults to current directory.
project_name: str
If a project_name is given, then Work Queue will periodically
report its status and performance back to the global WQ catalog,
which can be viewed here: http://ccl.cse.nd.edu/software/workqueue/status
Default is None. Overrides address.
project_password_file: str
Optional password file for the work queue project. Default is None.
address: str
The ip to contact this work queue master process.
If not given, uses the address of the current machine as returned
by socket.gethostname().
Ignored if project_name is specified.
port: int
TCP port on Parsl submission machine for Work Queue workers
to connect to. Workers will connect to Parsl using this port.
If 0, Work Queue will allocate a port number automatically.
In this case, environment variables can be used to influence the
choice of port, documented here:
https://ccl.cse.nd.edu/software/manuals/api/html/work__queue_8h.html#a21714a10bcdfcf5c3bd44a96f5dcbda6
Default: WORK_QUEUE_DEFAULT_PORT.
env: dict{str}
Dictionary that contains the environmental variables that
need to be set on the Work Queue worker machine.
shared_fs: bool
Define if working in a shared file system or not. If Parsl
and the Work Queue workers are on a shared file system, Work
Queue does not need to transfer and rename files for execution.
Default is False.
use_cache: bool
Whether workers should cache files that are common to tasks.
Warning: Two files are considered the same if they have the same
filepath name. Use with care when reusing the executor instance
across multiple parsl workflows. Default is False.
source: bool
Choose whether to transfer parsl app information as
source code. (Note: this increases throughput for
@python_apps, but the implementation does not include
functionality for @bash_apps, and thus source=False
must be used for programs utilizing @bash_apps.)
Default is False. Set to True if pack is True
pack: bool
Use conda-pack to prepare a self-contained Python evironment for
each task. This greatly increases task latency, but does not
require a common environment or shared FS on execution nodes.
Implies source=True.
extra_pkgs: list
List of extra pip/conda package names to include when packing
the environment. This may be useful if the app executes other
(possibly non-Python) programs provided via pip or conda.
Scanning the app source for imports would not detect these
dependencies, so they need to be manually specified.
autolabel: bool
Use the Resource Monitor to automatically determine resource
labels based on observed task behavior.
autolabel_window: int
Set the number of tasks considered for autolabeling. Work Queue
will wait for a series of N tasks with steady resource
requirements before making a decision on labels. Increasing
this parameter will reduce the number of failed tasks due to
resource exhaustion when autolabeling, at the cost of increased
resources spent collecting stats.
autocategory: bool
Place each app in its own category by default. If all
invocations of an app have similar performance characteristics,
this will provide a reasonable set of categories automatically.
max_retries: int
Set the number of retries that Work Queue will make when a task
fails. This is distinct from Parsl level retries configured in
parsl.config.Config. Set to None to allow Work Queue to retry
tasks forever. By default, this is set to 1, so that all retries
will be managed by Parsl.
init_command: str
Command line to run before executing a task in a worker.
Default is ''.
worker_options: str
Extra options passed to work_queue_worker. Default is ''.
worker_executable: str
The command used to invoke work_queue_worker. This can be used
when the worker needs to be wrapped inside some other command
(for example, to run the worker inside a container). Default is
'work_queue_worker'.
function_dir: str
The directory where serialized function invocations are placed
to be sent to workers. If undefined, this defaults to a directory
under runinfo/. If shared_filesystem=True, then this directory
must be visible from both the submitting side and workers.
coprocess: bool
Use Work Queue's coprocess facility to avoid launching a new Python
process for each task. Experimental.
This requires a version of Work Queue / cctools after commit
874df524516441da531b694afc9d591e8b134b73 (release 7.5.0 is too early).
Default is False.
scaling_cores_per_worker: int
When using Parsl scaling, this specifies the number of cores that a
worker is expected to have available for computation. Default 1. This
parameter can be ignored when using a fixed number of blocks, or when
using one task per worker (by omitting a ``cores`` resource
specifiation for each task).
"""
@typeguard.typechecked
def __init__(self,
label: str = "WorkQueueExecutor",
provider: ExecutionProvider = LocalProvider(),
working_dir: str = ".",
project_name: Optional[str] = None,
project_password_file: Optional[str] = None,
address: Optional[str] = None,
port: int = WORK_QUEUE_DEFAULT_PORT,
env: Optional[Dict] = None,
shared_fs: bool = False,
storage_access: Optional[List[Staging]] = None,
use_cache: bool = False,
source: bool = False,
pack: bool = False,
extra_pkgs: Optional[List[str]] = None,
autolabel: bool = False,
autolabel_window: int = 1,
autocategory: bool = True,
max_retries: int = 1,
init_command: str = "",
worker_options: str = "",
full_debug: bool = True,
worker_executable: str = 'work_queue_worker',
function_dir: Optional[str] = None,
coprocess: bool = False,
scaling_cores_per_worker: int = 1,
remote_monitoring_radio: Optional[RadioConfig] = None):
BlockProviderExecutor.__init__(self, provider=provider,
block_error_handler=True)
if not _work_queue_enabled:
raise OptionalModuleMissing(['work_queue'], f"WorkQueueExecutor requires the work_queue module. More info: {IMPORT_EXCEPTION}")
self.scaling_cores_per_worker = scaling_cores_per_worker
self.label = label
self.task_queue: multiprocessing.Queue = SpawnContext.Queue()
self.collector_queue: multiprocessing.Queue = SpawnContext.Queue()
self.address = address
self.port = port
self.executor_task_counter = -1
self.project_name = project_name
self.project_password_file = project_password_file
self.env = env
self.init_command = init_command
self.shared_fs = shared_fs
self.storage_access = storage_access
self.use_cache = use_cache
self.working_dir = working_dir
self.registered_files: Set[str] = set()
self.full_debug = full_debug
self.source = True if pack else source
self.pack = pack
self.extra_pkgs = extra_pkgs or []
self.autolabel = autolabel
self.autolabel_window = autolabel_window
self.autocategory = autocategory
self.max_retries = max_retries
self.should_stop = SpawnContext.Value(c_bool, False)
self.cached_envs = {} # type: Dict[int, str]
self.worker_options = worker_options
self.worker_executable = worker_executable
self.function_dir = function_dir
self.coprocess = coprocess
if not self.address:
self.address = socket.gethostname()
if self.project_password_file is not None and not os.path.exists(self.project_password_file):
raise WorkQueueFailure('Could not find password file: {}'.format(self.project_password_file))
if self.project_password_file is not None:
if os.path.exists(self.project_password_file) is False:
logger.debug("Password File does not exist, no file used")
self.project_password_file = None
# Build foundations of the launch command
self.launch_cmd = ("{package_prefix}python3 exec_parsl_function.py {mapping} {function} {result}")
if self.init_command != "":
self.launch_cmd = self.init_command + "; " + self.launch_cmd
if remote_monitoring_radio is not None:
self.remote_monitoring_radio = remote_monitoring_radio
else:
self.remote_monitoring_radio = FilesystemRadio()
def _get_launch_command(self, block_id):
# this executor uses different terminology for worker/launch
# commands than in htex
return f"PARSL_WORKER_BLOCK_ID={block_id} {self.worker_command}"
def start(self):
"""Create submit process and collector thread to create, send, and
retrieve Parsl tasks within the Work Queue system.
"""
super().start()
self.tasks_lock = threading.Lock()
# Create directories for data and results
if not self.function_dir:
self.function_data_dir = os.path.join(self.run_dir, self.label, "function_data")
else:
tp = str(time.time())
tx = os.path.join(self.function_dir, tp)
os.makedirs(tx)
self.function_data_dir = os.path.join(self.function_dir, tp, self.label, "function_data")
self.package_dir = os.path.join(self.run_dir, self.label, "package_data")
self.wq_log_dir = os.path.join(self.run_dir, self.label)
logger.debug("function data directory: {}\nlog directory: {}".format(self.function_data_dir, self.wq_log_dir))
os.makedirs(self.wq_log_dir)
os.makedirs(self.function_data_dir)
os.makedirs(self.package_dir)
logger.debug("Starting WorkQueueExecutor")
port_mailbox = SpawnContext.Queue()
# Create a Process to perform WorkQueue submissions
submit_process_kwargs = {"task_queue": self.task_queue,
"launch_cmd": self.launch_cmd,
"data_dir": self.function_data_dir,
"collector_queue": self.collector_queue,
"full": self.full_debug,
"shared_fs": self.shared_fs,
"autolabel": self.autolabel,
"autolabel_window": self.autolabel_window,
"autocategory": self.autocategory,
"max_retries": self.max_retries,
"should_stop": self.should_stop,
"port": self.port,
"wq_log_dir": self.wq_log_dir,
"project_password_file": self.project_password_file,
"project_name": self.project_name,
"port_mailbox": port_mailbox,
"coprocess": self.coprocess
}
self.submit_process = SpawnProcess(target=_work_queue_submit_wait,
name="WorkQueue-Submit-Process",
kwargs=submit_process_kwargs)
self.collector_thread = threading.Thread(target=self._collect_work_queue_results,
name="WorkQueue-collector-thread")
self.collector_thread.daemon = True
# Begin both processes
self.submit_process.start()
self.collector_thread.start()
self._chosen_port = port_mailbox.get(timeout=60)
logger.debug(f"Chosen listening port is {self._chosen_port}")
# Initialize scaling for the provider
self.initialize_scaling()
def _path_in_task(self, executor_task_id, *path_components):
"""Returns a filename specific to a task.
It is used for the following filename's:
(not given): The subdirectory per task that contains function, result, etc.
'function': Pickled file that contains the function to be executed.
'result': Pickled file that (will) contain the result of the function.
'map': Pickled file with a dict between local parsl names, and remote work queue names.
"""
task_dir = "{:04d}".format(executor_task_id)
return os.path.join(self.function_data_dir, task_dir, *path_components)
def submit(self, func, resource_specification, *args, **kwargs):
"""Processes the Parsl app by its arguments and submits the function
information to the task queue, to be executed using the Work Queue
system. The args and kwargs are processed for input and output files to
the Parsl app, so that the files are appropriately specified for the Work
Queue task.
Parameters
----------
func : function
Parsl app to be submitted to the Work Queue system
args : list
Arguments to the Parsl app
kwargs : dict
Keyword arguments to the Parsl app
"""
cores = None
memory = None
disk = None
gpus = None
priority = None
category = None
running_time_min = None
if resource_specification and isinstance(resource_specification, dict):
logger.debug("Got resource_specification: {}".format(resource_specification))
required_resource_types = set(['cores', 'memory', 'disk'])
acceptable_fields = set(['cores', 'memory', 'disk', 'gpus', 'priority', 'running_time_min'])
keys = set(resource_specification.keys())
if not keys.issubset(acceptable_fields):
message = "Task resource specification only accepts these types of resources: {}".format(
', '.join(acceptable_fields))
logger.error(message)
raise InvalidResourceSpecification(keys, message)
# this checks that either all of the required resource types are specified, or
# that none of them are: the `required_resource_types` are not actually required,
# but if one is specified, then they all must be.
key_check = required_resource_types.intersection(keys)
required_keys_ok = len(key_check) == 0 or key_check == required_resource_types
if not self.autolabel and not required_keys_ok:
logger.error("Running with `autolabel=False`. In this mode, "
"task resource specification requires "
"three resources to be specified simultaneously: cores, memory, and disk")
raise InvalidResourceSpecification(keys,
"Task resource specification requires "
"three resources to be specified simultaneously: cores, memory, and disk. "
"Try setting autolabel=True if you are unsure of the resource usage")
for k in keys:
if k == 'cores':
cores = resource_specification[k]
elif k == 'memory':
memory = resource_specification[k]
elif k == 'disk':
disk = resource_specification[k]
elif k == 'gpus':
gpus = resource_specification[k]
elif k == 'priority':
priority = resource_specification[k]
elif k == 'category':
category = resource_specification[k]
elif k == 'running_time_min':
running_time_min = resource_specification[k]
self.executor_task_counter += 1
executor_task_id = self.executor_task_counter
# Create a per task directory for the function, result, map, and result files
os.mkdir(self._path_in_task(executor_task_id))
input_files = []
output_files = []
# Determine the input and output files that will exist at the workers:
input_files += [self._register_file(f) for f in kwargs.get("inputs", []) if isinstance(f, File)]
output_files += [self._register_file(f) for f in kwargs.get("outputs", []) if isinstance(f, File)]
# Also consider any *arg that looks like a file as an input:
input_files += [self._register_file(f) for f in args if isinstance(f, File)]
for kwarg, maybe_file in kwargs.items():
# Add appropriate input and output files from "stdout" and "stderr" keyword arguments
if kwarg == "stdout" or kwarg == "stderr":
if maybe_file:
output_files.append(self._std_output_to_wq(kwarg, maybe_file))
# For any other keyword that looks like a file, assume it is an input file
elif isinstance(maybe_file, File):
input_files.append(self._register_file(maybe_file))
# Create a Future object and have it be mapped from the task ID in the tasks dictionary
fu = Future()
fu.parsl_executor_task_id = executor_task_id
assert isinstance(resource_specification, dict)
fu.resource_specification = resource_specification
logger.debug("Getting tasks_lock to set WQ-level task entry")
with self.tasks_lock:
logger.debug("Got tasks_lock to set WQ-level task entry")
self.tasks[str(executor_task_id)] = fu
logger.debug("Creating executor task {} for function {} with args {}".format(executor_task_id, func, args))
function_file = self._path_in_task(executor_task_id, "function")
result_file = self._path_in_task(executor_task_id, "result")
map_file = self._path_in_task(executor_task_id, "map")
logger.debug("Creating executor task {} with function at: {}".format(executor_task_id, function_file))
logger.debug("Creating executor task {} with result to be found at: {}".format(executor_task_id, result_file))
self._serialize_function(function_file, func, args, kwargs)
if self.pack:
env_pkg = self._prepare_package(func, self.extra_pkgs)
else:
env_pkg = None
logger.debug("Constructing map for local filenames at worker for executor task {}".format(executor_task_id))
self._construct_map_file(map_file, input_files, output_files)
if not self.submit_process.is_alive():
raise ExecutorError(self, "Workqueue Submit Process is not alive")
# Create message to put into the message queue
logger.debug("Placing executor task {} on message queue".format(executor_task_id))
if category is None:
category = func.__name__ if self.autocategory else 'parsl-default'
self.task_queue.put_nowait(ParslTaskToWq(executor_task_id,
category,
cores,
memory,
disk,
gpus,
priority,
running_time_min,
env_pkg,
map_file,
function_file,
result_file,
input_files,
output_files))
return fu
def _construct_worker_command(self):
worker_command = self.worker_executable
if self.coprocess:
worker_command += " --coprocess parsl_coprocess.py"
if self.project_password_file:
worker_command += ' --password {}'.format(self.project_password_file)
if self.worker_options:
worker_command += ' {}'.format(self.worker_options)
if self.project_name:
worker_command += ' -M {}'.format(self.project_name)
else:
worker_command += ' {} {}'.format(self.address, self._chosen_port)
logger.debug("Using worker command: {}".format(worker_command))
return worker_command
def _patch_providers(self):
# Add the worker and password file to files that the provider needs to stage.
# (Currently only for the CondorProvider)
if isinstance(self.provider, CondorProvider):
path_to_worker = shutil.which('work_queue_worker')
self.worker_command = './' + self.worker_command
self.provider.transfer_input_files.append(path_to_worker)
if self.project_password_file:
self.provider.transfer_input_files.append(self.project_password_file)
def _serialize_function(self, fn_path, parsl_fn, parsl_fn_args, parsl_fn_kwargs):
"""Takes the function application parsl_fn(*parsl_fn_args, **parsl_fn_kwargs)
and serializes it to the file fn_path."""
# Either build a dictionary with the source of the function, or pickle
# the function directly:
if self.source:
function_info = {"source code": inspect.getsource(parsl_fn),
"name": parsl_fn.__name__,
"args": parsl_fn_args,
"kwargs": parsl_fn_kwargs}
else:
function_info = {"byte code": pack_apply_message(parsl_fn, parsl_fn_args, parsl_fn_kwargs,
buffer_threshold=1024 * 1024)}
with open(fn_path, "wb") as f_out:
pickle.dump(function_info, f_out)
def _construct_map_file(self, map_file, input_files, output_files):
""" Map local filepath of parsl files to the filenames at the execution worker.
If using a shared filesystem, the filepath is mapped to its absolute filename.
Otherwise, to its original relative filename. In this later case, work queue
recreates any directory hierarchy needed."""
file_translation_map = {}
for spec in itertools.chain(input_files, output_files):
local_name = spec[0]
if self.shared_fs:
remote_name = os.path.abspath(local_name)
else:
remote_name = local_name
file_translation_map[local_name] = remote_name
with open(map_file, "wb") as f_out:
pickle.dump(file_translation_map, f_out)
def _register_file(self, parsl_file):
"""Generates a tuple (parsl_file.filepath, stage, cache) to give to
work queue. cache is always False if self.use_cache is False.
Otherwise, it is set to True if parsl_file is used more than once.
stage is True if the file needs to be copied by work queue. (i.e., not
a URL or an absolute path)
It has the side-effect of adding parsl_file to a list of registered
files.
Note: The first time a file is used cache is set to False. Since
tasks are generated dynamically, without other information this is
the best we can do."""
to_cache = False
if self.use_cache:
to_cache = parsl_file in self.registered_files
to_stage = False
if parsl_file.scheme == 'file' or (parsl_file.local_path and os.path.exists(parsl_file.local_path)):
to_stage = not os.path.isabs(parsl_file.filepath)
self.registered_files.add(parsl_file)
return ParslFileToWq(parsl_file.filepath, to_stage, to_cache)
def _std_output_to_wq(self, fdname, stdfspec):
"""Find the name of the file that will contain stdout or stderr and
return a ParslFileToWq with it. These files are never cached"""
fname, mode = putils.get_std_fname_mode(fdname, stdfspec)
to_stage = not os.path.isabs(fname)
return ParslFileToWq(fname, stage=to_stage, cache=False)
def _prepare_package(self, fn, extra_pkgs):
fn_id = id(fn)
fn_name = fn.__name__
if fn_id in self.cached_envs:
logger.debug("Skipping analysis of %s, previously got %s", fn_name, self.cached_envs[fn_id])
return self.cached_envs[fn_id]
source_code = inspect.getsource(fn).encode()
pkg_dir = os.path.join(tempfile.gettempdir(), "python_package-{}".format(os.geteuid()))
os.makedirs(pkg_dir, exist_ok=True)
with tempfile.NamedTemporaryFile(suffix='.yaml') as spec:
logger.info("Analyzing dependencies of %s", fn_name)
analyze_cmdline = [package_analyze_script, exec_parsl_function.__file__, '-', spec.name]
for p in extra_pkgs:
analyze_cmdline += ["--extra-pkg", p]
subprocess.run(analyze_cmdline, input=source_code, check=True)
with open(spec.name, mode='rb') as f:
spec_hash = hashlib.sha256(f.read()).hexdigest()
logger.debug("Spec hash for %s is %s", fn_name, spec_hash)
pkg = os.path.join(pkg_dir, "pack-{}.tar.gz".format(spec_hash))
if os.access(pkg, os.R_OK):
self.cached_envs[fn_id] = pkg
logger.debug("Cached package for %s found at %s", fn_name, pkg)
return pkg
(fd, tarball) = tempfile.mkstemp(dir=pkg_dir, prefix='.tmp', suffix='.tar.gz')
os.close(fd)
logger.info("Creating dependency package for %s", fn_name)
logger.debug("Writing deps for %s to %s", fn_name, tarball)
subprocess.run([package_create_script, spec.name, tarball], stdout=subprocess.DEVNULL, check=True)
logger.debug("Done with conda-pack; moving %s to %s", tarball, pkg)
os.rename(tarball, pkg)
self.cached_envs[fn_id] = pkg
return pkg
def initialize_scaling(self):
""" Compose the launch command and call scale out
Scales the workers to the appropriate nodes with provider
"""
# Start scaling in/out
logger.debug("Starting WorkQueueExecutor with provider: %s", self.provider)
self.worker_command = self._construct_worker_command()
self._patch_providers()
def outstanding(self) -> int:
"""Count the number of outstanding slots required. This is inefficiently
implemented and probably could be replaced with a counter.
"""
logger.debug("Calculating outstanding task slot load")
outstanding = 0
tasks = 0 # only for log message...
with self.tasks_lock:
for fut in self.tasks.values():
if not fut.done():
# if a task does not specify a core count, Work Queue will allocate an entire
# worker node to that task. That's approximated here by saying that it uses
# scaling_cores_per_worker.
resource_spec = getattr(fut, 'resource_specification', {})
cores = resource_spec.get('cores', self.scaling_cores_per_worker)
outstanding += cores
tasks += 1
logger.debug(f"Counted {tasks} outstanding tasks with {outstanding} outstanding slots")
return outstanding
@property
def workers_per_node(self) -> Union[int, float]:
return self.scaling_cores_per_worker
def shutdown(self, *args, **kwargs):
"""Shutdown the executor. Sets flag to cancel the submit process and
collector thread, which shuts down the Work Queue system submission.
"""
logger.debug("Work Queue shutdown started")
self.should_stop.value = True
logger.debug("Joining on submit process")
self.submit_process.join()
self.submit_process.close()
logger.debug("Joining on collector thread")
self.collector_thread.join()
logger.debug("Closing multiprocessing queues")
self.task_queue.close()
self.task_queue.join_thread()
self.collector_queue.close()
self.collector_queue.join_thread()
super().shutdown()
logger.debug("Work Queue shutdown completed")
@wrap_with_logs
def _collect_work_queue_results(self):
"""Sets the values of tasks' futures of tasks completed by work queue.
"""
logger.debug("Starting Collector Thread")
try:
while not self.should_stop.value:
if not self.submit_process.is_alive():
raise ExecutorError(self, "Workqueue Submit Process is not alive")
# Get the result message from the collector_queue
try:
task_report = self.collector_queue.get(timeout=1)
except queue.Empty:
continue
# Obtain the future from the tasks dictionary
with self.tasks_lock:
future = self.tasks.pop(task_report.id)
logger.debug("Updating Future for executor task {}".format(task_report.id))
# If result_received, then there's a result file. The object inside the file
# may be a valid result or an exception caused within the function invocation.
# Otherwise there's no result file, implying errors from WorkQueue.
if task_report.result_received:
try:
with open(task_report.result_file, 'rb') as f_in:
result = deserialize(f_in.read())
except Exception as e:
logger.error(f'Cannot load result from result file {task_report.result_file}. Exception: {e}')
ex = WorkQueueTaskFailure('Cannot load result from result file', None)
ex.__cause__ = e
future.set_exception(ex)
else:
if isinstance(result, Exception):
ex = WorkQueueTaskFailure('Task execution raises an exception', result)
ex.__cause__ = result
future.set_exception(ex)
else:
future.set_result(result)
else:
# If there are no results, then the task failed according to one of
# work queue modes, such as resource exhaustion.
ex = WorkQueueTaskFailure(task_report.reason, None)
future.set_exception(ex)
finally:
logger.debug("Marking all outstanding tasks as failed")
logger.debug("Acquiring tasks_lock")
with self.tasks_lock:
logger.debug("Acquired tasks_lock")
# set exception for tasks waiting for results that work queue did not execute
for fu in self.tasks.values():
if not fu.done():
fu.set_exception(WorkQueueFailure("work queue executor failed to execute the task."))
logger.debug("Exiting Collector Thread")
@wrap_with_logs
def _work_queue_submit_wait(*,
port_mailbox: multiprocessing.Queue,
task_queue: multiprocessing.Queue,
launch_cmd: str,
collector_queue: multiprocessing.Queue,
data_dir: str,
full: bool,
shared_fs: bool,
autolabel: bool,
autolabel_window: int,
autocategory: bool,
max_retries: Optional[int],
should_stop, # multiprocessing.Value is an awkward type alias from inside multiprocessing
port: int,
wq_log_dir: str,
project_password_file: Optional[str],
project_name: Optional[str],
coprocess: bool) -> int:
"""Thread to handle Parsl app submissions to the Work Queue objects.
Takes in Parsl functions submitted using submit(), and creates a
Work Queue task with the appropriate specifications, which is then
submitted to Work Queue. After tasks are completed, processes the
exit status and exit code of the task, and sends results to the
Work Queue collector thread.
To avoid python's global interpreter lock with work queue's wait, this
function should be launched as a process, not as a lightweight thread. This
means that any communication should be done using the multiprocessing
module capabilities, rather than shared memory.
"""
logger.debug("Starting WorkQueue Submit/Wait Process")
setproctitle("parsl: Work Queue submit/wait")
# Enable debugging flags and create logging file
wq_debug_log = None
if wq_log_dir is not None:
logger.debug("Setting debugging flags and creating logging file")
wq_debug_log = os.path.join(wq_log_dir, "debug_log")
# Create WorkQueue queue object
logger.debug("Creating WorkQueue Object")
try:
logger.debug("Requested port {}".format(port))
q = WorkQueue(port, debug_log=wq_debug_log)
port_mailbox.put(q.port)
logger.debug("Listening on port {}".format(q.port))
except Exception as e:
logger.error("Unable to create WorkQueue object: {}".format(e))
port_mailbox.put(None)
raise e
# Specify WorkQueue queue attributes
if project_name:
q.specify_name(project_name)
if project_password_file:
q.specify_password_file(project_password_file)
if autolabel:
q.enable_monitoring()
if autolabel_window is not None:
q.tune('category-steady-n-tasks', autolabel_window)
# Only write logs when the wq_log_dir is specified, which it most likely will be
if wq_log_dir is not None:
wq_master_log = os.path.join(wq_log_dir, "master_log")
wq_trans_log = os.path.join(wq_log_dir, "transaction_log")
if full and autolabel:
wq_resource_log = os.path.join(wq_log_dir, "resource_logs")
q.enable_monitoring_full(dirname=wq_resource_log)
q.specify_log(wq_master_log)
q.specify_transactions_log(wq_trans_log)
orig_ppid = os.getppid()
result_file_of_task_id = {} # Mapping executor task id -> result file for active tasks.
while not should_stop.value:
# Monitor the task queue
ppid = os.getppid()
if ppid != orig_ppid:
logger.debug("new Process")
break
# Submit tasks
while task_queue.qsize() > 0 or q.empty() and not should_stop.value:
# Obtain task from task_queue
try:
task = task_queue.get(timeout=1)
logger.debug("Removing task from queue")
except queue.Empty:
continue
try:
pkg_pfx = ""
if task.env_pkg is not None:
if package_run_script is None:
raise ValueError("package_run_script must be specified")
pkg_pfx = "./{} -e {} ".format(os.path.basename(package_run_script),
os.path.basename(task.env_pkg))
if not coprocess:
# Create command string
logger.debug(launch_cmd)
command_str = launch_cmd.format(package_prefix=pkg_pfx,
mapping=os.path.basename(task.map_file),
function=os.path.basename(task.function_file),
result=os.path.basename(task.result_file))
logger.debug(command_str)
# Create WorkQueue task for the command
logger.debug("Sending executor task {} with command: {}".format(task.id, command_str))
t = wq.Task(command_str)
else:
t = wq.RemoteTask("run_parsl_task",
"parsl_coprocess",
os.path.basename(task.map_file),
os.path.basename(task.function_file),
os.path.basename(task.result_file))
t.specify_exec_method("direct")
logger.debug("Sending executor task {} to coprocess".format(task.id))
except Exception as e:
logger.error("Unable to create task: {}".format(e))
collector_queue.put_nowait(WqTaskToParsl(id=task.id,
result_received=False,
result_file=None,
reason="task could not be created by work queue",
status=-1))
continue
t.specify_category(task.category)
if autolabel:
q.specify_category_mode(task.category, WORK_QUEUE_ALLOCATION_MODE_MAX_THROUGHPUT)
if task.cores is not None:
t.specify_cores(task.cores)
if task.memory is not None:
t.specify_memory(task.memory)
if task.disk is not None:
t.specify_disk(task.disk)
if task.gpus is not None:
t.specify_gpus(task.gpus)
if task.priority is not None:
t.specify_priority(task.priority)
if task.running_time_min is not None:
t.specify_running_time_min(task.running_time_min)
if max_retries is not None:
logger.debug(f"Specifying max_retries {max_retries}")
t.specify_max_retries(max_retries)
else:
logger.debug("Not specifying max_retries")
if task.env_pkg is not None:
t.specify_input_file(package_run_script, cache=True)
t.specify_input_file(task.env_pkg, cache=True)
# Specify script, and data/result files for task
t.specify_input_file(exec_parsl_function.__file__, cache=True)
t.specify_input_file(task.function_file, cache=False)
t.specify_input_file(task.map_file, cache=False)
t.specify_output_file(task.result_file, cache=False)
t.specify_tag(str(task.id))
result_file_of_task_id[str(task.id)] = task.result_file
logger.debug("Executor task id: {}".format(task.id))
# Specify input/output files that need to be staged.
# Absolute paths are assumed to be in shared filesystem, and thus
# not staged by work queue.
if not shared_fs:
for spec in task.input_files:
if spec.stage:
t.specify_input_file(spec.parsl_name, spec.parsl_name, cache=spec.cache)
for spec in task.output_files:
if spec.stage:
t.specify_output_file(spec.parsl_name, spec.parsl_name, cache=spec.cache)
# Submit the task to the WorkQueue object
logger.debug("Submitting executor task {} to WorkQueue".format(task.id))
try:
wq_id = q.submit(t)
except Exception as e:
logger.error("Unable to submit task to work queue: {}".format(e))
collector_queue.put_nowait(WqTaskToParsl(id=task.id,
result_received=False,
result_file=None,
reason="task could not be submited to work queue",
status=-1))
continue
logger.info("Executor task {} submitted as Work Queue task {}".format(task.id, wq_id))
# If the queue is not empty wait on the WorkQueue queue for a task
task_found = True
while not q.empty() and task_found and not should_stop.value:
# Obtain the task from the queue
t = q.wait(1)
if t is None:
task_found = False
continue
# When a task is found:
executor_task_id = t.tag
logger.info("Completed Work Queue task {}, executor task {}".format(t.id, t.tag))
result_file = result_file_of_task_id.pop(t.tag)
# A tasks completes 'succesfully' if it has result file.
# The check whether this file can load a serialized Python object
# happens later in the collector thread of the executor process.
logger.debug("Looking for result in {}".format(result_file))
if os.path.exists(result_file):
logger.debug("Found result in {}".format(result_file))
collector_queue.put_nowait(WqTaskToParsl(id=executor_task_id,
result_received=True,
result_file=result_file,
reason=None,
status=t.return_status))
# If a result file could not be generated, explain the
# failure according to work queue error codes.
else:
reason = _explain_work_queue_result(t)
logger.debug("Did not find result in {}".format(result_file))
logger.debug("Wrapper Script status: {}\nWorkQueue Status: {}"
.format(t.return_status, t.result))
logger.debug("Task with executor id {} / Work Queue id {} failed because:\n{}"
.format(executor_task_id, t.id, reason))
collector_queue.put_nowait(WqTaskToParsl(id=executor_task_id,
result_received=False,
result_file=None,
reason=reason,
status=t.return_status))
logger.debug("Exiting WorkQueue Monitoring Process")
return 0
def _explain_work_queue_result(wq_task):
"""Returns a string with the reason why a task failed according to work queue."""
wq_result = wq_task.result
reason = "work queue result: "
if wq_result == wq.WORK_QUEUE_RESULT_SUCCESS:
reason += "succesful execution with exit code {}".format(wq_task.return_status)
elif wq_result == wq.WORK_QUEUE_RESULT_OUTPUT_MISSING:
reason += "The result file was not transfered from the worker.\n"
reason += "This usually means that there is a problem with the python setup,\n"
reason += "or the wrapper that executes the function."
reason += "\nTrace:\n" + str(wq_task.output)
elif wq_result == wq.WORK_QUEUE_RESULT_INPUT_MISSING:
reason += "missing input file"
elif wq_result == wq.WORK_QUEUE_RESULT_STDOUT_MISSING:
reason += "stdout has been truncated"
elif wq_result == wq.WORK_QUEUE_RESULT_SIGNAL:
reason += "task terminated with a signal"
elif wq_result == wq.WORK_QUEUE_RESULT_RESOURCE_EXHAUSTION:
reason += "task used more resources than requested"
elif wq_result == wq.WORK_QUEUE_RESULT_TASK_TIMEOUT:
reason += "task ran past the specified end time"
elif wq_result == wq.WORK_QUEUE_RESULT_UNKNOWN:
reason += "result could not be classified"
elif wq_result == wq.WORK_QUEUE_RESULT_FORSAKEN:
reason += "task failed, but not a task error"
elif wq_result == wq.WORK_QUEUE_RESULT_MAX_RETRIES:
reason += "unable to complete after specified number of retries"
elif wq_result == wq.WORK_QUEUE_RESULT_TASK_MAX_RUN_TIME:
reason += "task ran for more than the specified time"
elif wq_result == wq.WORK_QUEUE_RESULT_DISK_ALLOC_FULL:
reason += "task needed more space to complete task"
elif wq_result == wq.WORK_QUEUE_RESULT_RMONITOR_ERROR:
reason += "task failed because the monitor did not produce an output"
else:
reason += "unable to process Work Queue system failure"
return reason
|