1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631
|
##
# @namespace ndcctools.chirp
#
# Chirp distributed filesystem client - Python interface.
#
# The Chirp distributed filesystem is a user-level service
# to enable easy deployment of data across distributed systems.
# This interface permits clients to connect to and interact with servers.
# The objects and methods provided by this package correspond to the native
# C API in @ref chirp_reli.h and chirp_swig_wrap.h
#
# The SWIG-based Python bindings provide a higher-level interface that
# revolves around:
#
# - @ref ndcctools.chirp.Client
# - @ref ndcctools.chirp.Stat
import os
import time
import json
import weakref
from .cchirp import *
##
# \class ndcctools.chirp.Client
# Python Client object
#
# This class is used to create a chirp client
class Client(object):
##
# Create a new chirp client
#
# @param self Reference to the current task object.
# @param hostport The host:port of the server.
# @param timeout The time to wait for a server response on every request.
# @param authentication A list of prefered authentications. E.g., ['tickets', 'unix']
# @param tickets A list of ticket filenames.
# @param debug Generate client debug output.
def __init__(self, hostport, timeout=60, authentication=None, tickets=None, debug=False):
self.hostport = hostport
self.timeout = timeout
if debug:
cctools_debug_config('chirp_python_client')
cctools_debug_flags_set('chirp')
if tickets and (authentication is None):
authentication = ['ticket']
self.__set_tickets(tickets)
if authentication is None:
auth_register_all()
else:
for auth in authentication:
auth_register_byname(auth)
self.identity = self.whoami()
if self.identity == '':
raise AuthenticationFailure(authentication)
self._finalizer = weakref.finalize(self, chirp_reli_disconnect, self.hostport)
def __exit__(self, exception_type, exception_value, traceback):
self._finalizer()
def __stoptime(self, absolute_stop_time=None, timeout=None):
if timeout is None:
timeout = self.timeout
if absolute_stop_time is None:
absolute_stop_time = time.time() + timeout
return absolute_stop_time
def __set_tickets(self, tickets):
tickets_str = None
if tickets is None:
try:
tickets_str = os.environ['CHIRP_CLIENT_TICKETS']
except KeyError:
tickets_str = None
else:
tickets_str = ','.join(tickets)
if tickets_str is not None:
auth_ticket_load(tickets_str)
##
# Returns a string with identity of the client according to the server.
#
# @param self Reference to the current task object.
# @param absolute_stop_time If given, maximum number of seconds since
# epoch to wait for a server response.
# (Overrides any timeout.)
# @param timeout If given, maximum number of seconds to
# wait for a server response.
def whoami(self, absolute_stop_time=None, timeout=None):
return chirp_wrap_whoami(self.hostport, self.__stoptime(absolute_stop_time, timeout))
##
# Returns a string with the ACL of the given directory.
# Throws an IOError on error (no such directory).
#
# @param self Reference to the current task object.
# @param path Target directory.
# @param absolute_stop_time If given, maximum number of seconds since
# epoch to wait for a server response.
# (Overrides any timeout.)
# @param timeout If given, maximum number of seconds to
# wait for a server response.
def listacl(self, path='/', absolute_stop_time=None, timeout=None):
acls = chirp_wrap_listacl(self.hostport, path, self.__stoptime(absolute_stop_time, timeout))
if acls is None:
raise IOError(path)
return acls.split('\n')
##
# Returns a string with the ACL of the given directory.
# Throws a GeneralError on error.
#
# @param self Reference to the current task object.
# @param path Target directory.
# @param subject Target subject.
# @param rights Permissions to be granted.
# @param absolute_stop_time If given, maximum number of seconds since
# epoch to wait for a server response.
# (Overrides any timeout.)
# @param timeout If given, maximum number of seconds to
# wait for a server response.
def setacl(self, path, subject, rights, absolute_stop_time=None, timeout=None):
result = chirp_reli_setacl(self.hostport, path, subject, rights, self.__stoptime(absolute_stop_time, timeout))
if result < 0:
raise GeneralFailure('setacl', result, [path, subject, rights])
return result
##
# Set the ACL for the given directory to be only for the rights to the calling user.
# Throws a GeneralError on error.
#
# @param self Reference to the current task object.
# @param path Target directory.
# @param rights Permissions to be granted.
# @param absolute_stop_time If given, maximum number of seconds since
# epoch to wait for a server response.
# (Overrides any timeout.)
# @param timeout If given, maximum number of seconds to
# wait for a server response.
def resetacl(self, path, rights, absolute_stop_time=None, timeout=None):
result = chirp_wrap_resetacl(self.hostport, path, rights, self.__stoptime(absolute_stop_time, timeout))
if result < 0:
raise GeneralFailure('resetacl', result, [path, rights])
return result
##
# Returns a list with the names of the files in the path.
# Throws an IOError on error (no such directory).
#
# @param self Reference to the current task object.
# @param path Target file/directory.
# @param absolute_stop_time If given, maximum number of seconds since
# epoch to wait for a server response.
# (Overrides any timeout.)
# @param timeout If given, maximum number of seconds to
# wait for a server response.
def ls(self, path, absolute_stop_time=None, timeout=None):
dr = chirp_reli_opendir(self.hostport, path, self.__stoptime(absolute_stop_time, timeout))
files = []
if dir is None:
raise IOError(path)
while True:
d = chirp_reli_readdir(dr)
if d is None: break
files.append(Stat(d.name, d.info))
return files
##
# Returns a chirp.Stat object with information on path.
# Throws an IOError on error (e.g., no such path or insufficient permissions).
#
# @param self Reference to the current task object.
# @param path Target file/directory.
# @param absolute_stop_time If given, maximum number of seconds since
# epoch to wait for a server response.
# (Overrides any timeout.)
# @param timeout If given, maximum number of seconds to
# wait for a server response.
def stat(self, path, absolute_stop_time=None, timeout=None):
info = chirp_wrap_stat(self.hostport, path, self.__stoptime(absolute_stop_time, timeout))
if info is None:
raise IOError(path)
return Stat(path, info)
##
# Changes permissions on path.
# Throws a GeneralFailure on error (e.g., no such path or insufficient permissions).
#
# @param self Reference to the current task object.
# @param path Target file/directory.
# @param mode Desired permissions (e.g., 0755)
# @param absolute_stop_time If given, maximum number of seconds since
# epoch to wait for a server response.
# (Overrides any timeout.)
# @param timeout If given, maximum number of seconds to
# wait for a server response.
def chmod(self, path, mode, absolute_stop_time=None, timeout=None):
result = chirp_reli_chmod(self.hostport, path, mode, self.__stoptime(absolute_stop_time, timeout))
if result < 0:
raise GeneralFailure('chmod', result, [path, mode])
return result
##
# Copies local file/directory source to the chirp server as file/directory destination.
# If destination is not given, source name is used.
# Raises chirp.TransferFailure on error.
#
# @param self Reference to the current task object.
# @param source A local file or directory.
# @param destination File or directory name to use in the server (defaults to source).
# @param absolute_stop_time If given, maximum number of seconds since
# epoch to wait for a server response.
# (Overrides any timeout.)
# @param timeout If given, maximum number of seconds to
# wait for a server response.
def put(self, source, destination=None, absolute_stop_time=None, timeout=None):
if destination is None:
destination = source
result = chirp_recursive_put(self.hostport,
source, destination,
self.__stoptime(absolute_stop_time, timeout))
if result > -1:
return result
raise TransferFailure('put', result, source, destination)
##
# Copies server file/directory source to the local file/directory destination.
# If destination is not given, source name is used.
# Raises chirp.TransferFailure on error.
#
# @param self Reference to the current task object.
# @param source A server file or directory.
# @param destination File or directory name to be used locally (defaults to source).
# @param absolute_stop_time If given, maximum number of seconds since
# epoch to wait for a server response.
# (Overrides any timeout.)
# @param timeout If given, maximum number of seconds to
# wait for a server response.
def get(self, source, destination=None, absolute_stop_time=None, timeout=None):
if destination is None:
destination = source
result = chirp_recursive_get(self.hostport,
source, destination,
self.__stoptime(absolute_stop_time, timeout))
if result > -1:
return result
raise TransferFailure('get', result, source, destination)
##
# Removes the given file or directory from the server.
# Raises OSError on error.
#
# @param self Reference to the current task object.
# @param path Target file/directory.
# @param absolute_stop_time If given, maximum number of seconds since
# epoch to wait for a server response.
# (Overrides any timeout.)
# @param timeout If given, maximum number of seconds to
# wait for a server response.
def rm(self, path, absolute_stop_time=None, timeout=None):
status = chirp_reli_rmall(self.hostport, path, self.__stoptime(absolute_stop_time, timeout))
if status < 0:
raise OSError
##
# Recursively create the directories in path.
# Raises OSError on error.
#
# @param self Reference to the current task object.
# @param path Target file/directory.
# @param mode Unix permissions for the created directory.
# @param absolute_stop_time If given, maximum number of seconds since
# epoch to wait for a server response.
# (Overrides any timeout.)
# @param timeout If given, maximum number of seconds to
# wait for a server response.
def mkdir(self, path, mode=493, absolute_stop_time=None, timeout=None):
result = chirp_reli_mkdir_recursive(self.hostport, path, mode, self.__stoptime(absolute_stop_time, timeout))
if result < 0:
raise OSError
return result
##
# Computes the checksum of path.
# Raises IOError on error.
#
# @param self Reference to the current task object.
# @param path Target file.
# @param algorithm One of 'md5' or 'sha1' (default).
# @param absolute_stop_time If given, maximum number of seconds since
# epoch to wait for a server response.
# (Overrides any timeout.)
# @param timeout If given, maximum number of seconds to
# wait for a server response.
def hash(self, path, algorithm='sha1', absolute_stop_time=None, timeout=None):
hash_hex = chirp_wrap_hash(self.hostport, path, algorithm, self.__stoptime(absolute_stop_time, timeout))
if hash_hex is None:
raise IOError
return hash_hex
##
# Creates a chirp job. See http://ccl.cse.nd.edu/software/manuals/chirp.html for details.
#
# @param job_description A dictionary with a job chirp description.
#
# @code
# job_description = {
# 'executable': "/bin/tar",
# 'arguments': [ 'tar', '-cf', 'archive.tar', 'a', 'b' ],
# 'files': { 'task_path': 'a',
# 'serv_path': '/users/magrat/a.txt'
# 'type': 'INPUT' },
# { 'task_path': 'b',
# 'serv_path': '/users/magrat/b.txt'
# 'type': 'INPUT' },
# { 'task_path': 'archive.tar',
# 'serv_path': '/users/magrat/archive.tar'
# 'type': 'OUTPUT' }
# }
# job_id = client.job_create(job_description);
# @endcode
def job_create(self, job_description):
job_json = json.dumps(job_description)
job_id = chirp_wrap_job_create(self.hostport, job_json, self.__stoptime())
if job_id < 0:
raise ChirpJobError('create', job_id, job_json)
return job_id
##
# Kills the jobs identified with the different job ids.
#
# @param job_ids Job ids of the chirp jobs to be killed.
#
def job_kill(self, *job_ids):
ids_str = json.dumps(job_ids)
result = chirp_wrap_job_kill(self.hostport, ids_str, self.__stoptime())
if result < 0:
raise ChirpJobError('kill', result, ids_str)
return result
##
# Commits (starts running) the jobs identified with the different job ids.
#
# @param job_ids Job ids of the chirp jobs to be committed.
#
def job_commit(self, *job_ids):
ids_str = json.dumps(job_ids)
result = chirp_wrap_job_commit(self.hostport, ids_str, self.__stoptime())
if result < 0:
raise ChirpJobError('commit', result, ids_str)
return result
##
# Reaps the jobs identified with the different job ids.
#
# @param job_ids Job ids of the chirp jobs to be reaped.
#
def job_reap(self, *job_ids):
ids_str = json.dumps(job_ids)
result = chirp_wrap_job_reap(self.hostport, ids_str, self.__stoptime())
if result < 0:
raise ChirpJobError('reap', result, ids_str)
return result
##
# Obtains the current status for each job id. The value returned is a
# list which contains a dictionary reference per job id.
#
# @param job_ids Job ids of the chirp jobs to be reaped.
#
def job_status(self, *job_ids):
ids_str = json.dumps(job_ids)
status = chirp_wrap_job_status(self.hostport, ids_str, self.__stoptime())
if status is None:
raise ChirpJobError('status', None, ids_str)
return json.loads(status)
##
# Waits waiting_time seconds for the job_id to terminate. Return value is
# the same as job_status. If the call timesout, an empty string is
# returned. If job_id is missing, `<job_wait>` waits for any of the user's job.
#
# @param waiting_time maximum number of seconds to wait for a job to finish.
# @param job_id id of the job to wait.
def job_wait(self, waiting_time, job_id=0):
status = chirp_wrap_job_wait(self.hostport, job_id, waiting_time, self.__stoptime())
if status is None:
raise ChirpJobError('status', None, job_id)
return json.loads(status)
##
# Python Stat object
#
# @class ndcctools.chirp.Stat
#
# This class is used to record stat information for files/directories of a chirp server.
class Stat(object):
def __init__(self, path, cstat):
self._path = path
self._info = cstat
##
# Target path.
#
# @code
# >>> print(s.path)
# @endcode
@property
def path(self):
return self._path
##
# ID of device containing file.
#
# @code
# >>> print(s.device)
# @endcode
@property
def device(self):
return self._info.cst_dev
##
# inode number
#
# @code
# >>> print(s.inode)
# @endcode
@property
def inode(self):
return self._info.cst_ino
##
# file mode permissions
#
# @code
# >>> print(s.mode)
# @endcode
@property
def mode(self):
return self._info.cst_mode
##
# number of hard links
#
# @code
# >>> print(s.nlink)
# @endcode
@property
def nlink(self):
return self._info.cst_nlink
##
# user ID of owner
#
# @code
# >>> print(s.uid)
# @endcode
@property
def uid(self):
return self._info.cst_uid
##
# group ID of owner
#
# @code
# >>> print(s.gid)
# @endcode
@property
def gid(self):
return self._info.cst_gid
##
# device ID if special file
#
# @code
# >>> print(s.rdev)
# @endcode
@property
def rdev(self):
return self._info.cst_rdev
##
# total size, in bytes
#
# @code
# >>> print(s.size)
# @endcode
@property
def size(self):
return self._info.cst_size
##
# block size for file system I/O
#
# @code
# >>> print(s.block_size)
# @endcode
@property
def block_size(self):
return self._info.cst_blksize
##
# number of 512B blocks allocated
#
# @code
# >>> print(s.blocks)
# @endcode
@property
def blocks(self):
return self._info.cst_blocks
##
# number of seconds since epoch since last access
#
# @code
# >>> print(s.atime)
# @endcode
@property
def atime(self):
return self._info.cst_atime
##
# number of seconds since epoch since last modification
#
# @code
# >>> print(s.mtime)
# @endcode
@property
def mtime(self):
return self._info.cst_mtime
##
# number of seconds since epoch since last status change
#
# @code
# >>> print(s.ctime)
# @endcode
@property
def ctime(self):
return self._info.cst_ctime
def __repr__(self):
return "%s uid:%d gid:%d size:%d" % (self.path, self.uid, self.gid, self.size)
class AuthenticationFailure(Exception):
pass
class GeneralFailure(Exception):
def __init__(self, action, status, value):
message = "Error with %s(%s) %s" % (action, status, value)
super(GeneralFailure, self).__init__(message)
self.action = action
self.status = status
self.value = value
class TransferFailure(Exception):
def __init__(self, action, status, source, dest):
message = "Error with %s(%s) %s %s" % (action, status, source, dest)
super(TransferFailure, self).__init__(message)
self.action = action
self.status = status
self.source = source
self.dest = dest
class ChirpJobError(Exception):
def __init__(self, action, status, value):
message = "Error with %s(%s) %s" % (action, status, value)
super(ChirpJobError, self).__init__(message)
self.action = action
self.status = status
self.value = value
# @endcode
|