1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
|
# (C) Copyright 2005-2025 Enthought, Inc., Austin, TX
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# license included in LICENSE.txt and may be redistributed only under
# the conditions described in the aforementioned license. The license
# is also available online at http://www.enthought.com/licenses/BSD.txt
#
# Thanks for using Enthought open source!
"""
Code to support recording to a readable and executable Python script.
FIXME:
- Support for dictionaries?
"""
import builtins
import warnings
from traits.api import (
HasTraits,
List,
Str,
Dict,
Bool,
Property,
Int,
Instance,
)
from traits.util.camel_case import camel_case_to_python
###############################################################################
# `_RegistryData` class.
###############################################################################
class _RegistryData(HasTraits):
# Object's script ID
script_id = Property(Str)
# Path to object in object hierarchy.
path = Property(Str)
# Parent data for this object if any.
parent_data = Instance("_RegistryData", allow_none=True)
# The name of the trait on the parent which is this object.
trait_name_on_parent = Str("")
# List of traits we are listening for on this object.
names = List(Str)
# Nested recordable instances on the object.
sub_recordables = List(Str)
# List of traits that are lists.
list_names = List(Str)
_script_id = Str("")
###########################################################################
# Non-public interface.
###########################################################################
def _get_path(self):
pdata = self.parent_data
path = ""
if pdata is not None:
pid = pdata.script_id
ppath = pdata.path
tnop = self.trait_name_on_parent
if "[" in tnop:
# If the object is a nested object through an iterator,
# we instantiate it and don't refer to it through the
# path, this makes scripting convenient.
if len(ppath) == 0:
path = pid + "." + tnop
else:
path = ppath + "." + tnop
else:
path = ppath + "." + tnop
return path
def _get_script_id(self):
sid = self._script_id
if len(sid) == 0:
pdata = self.parent_data
sid = pdata.script_id + "." + self.trait_name_on_parent
return sid
def _set_script_id(self, id):
self._script_id = id
###############################################################################
# `RecorderError` class.
###############################################################################
class RecorderError(Exception):
pass
###############################################################################
# `Recorder` class.
###############################################################################
class Recorder(HasTraits):
# The lines of code recorded.
lines = List(Str)
# Are we recording or not?
recording = Bool(False, desc="if script recording is enabled or not")
# The Python script we have recorded so far. This is just a
# convenience trait for the `get_code()` method.
script = Property(Str)
########################################
# Private traits.
# Dict used to store information on objects registered. It stores a
# unique name for the object and its path in the object hierarchy
# traversed.
_registry = Dict
# Reverse registry with keys as script_id and object as value.
_reverse_registry = Dict
# A mapping to generate unique names for objects. The key is the
# name used (which is something derived from the class name of the
# object) and the value is an integer describing the number of times
# that variable name has been used earlier.
_name_map = Dict(Str, Int)
# A list of special reserved script IDs. This is handy when you
# want a particular object to have an easy to read script ID and not
# the default one based on its class name. This leads to slightly
# easier to read scripts.
_special_ids = List
# What are the known names in the script? By known names we mean
# names which are actually bound to objects.
_known_ids = List(Str)
# The known types in the namespace.
_known_types = List(Str)
# A guard to check if we are currently in a recorded function call,
# in which case we don't want to do any recording.
_in_function = Bool(False)
###########################################################################
# `Recorder` interface.
###########################################################################
def record(self, code):
"""Record a string to be stored to the output file.
Parameters
----------
code : str
A string of text.
"""
if self.recording and not self._in_function:
lines = self.lines
# Analyze the code and add extra code if needed.
self._analyze_code(code)
# Add the code.
lines.append(code)
def register(
self,
object,
parent=None,
trait_name_on_parent="",
ignore=None,
known=False,
script_id=None,
):
"""Register an object with the recorder. This sets up the
object for recording.
By default all traits (except those starting and ending with
'_') are recorded. For attributes that are themselves
recordable, one may mark traits with a 'record' metadata as
follows:
- If metadata `record=False` is set, the nested object will not be
recorded.
- If `record=True`, then that object is also recorded if it is
not `None`.
If the object is a list or dict that is marked with
`record=True`, the list is itself not listened to for changes
but all its contents are registered.
If the `object` has a trait named `recorder` then this recorder
instance will be set to it if possible.
Parameters
----------
object : Instance(HasTraits)
The object to register in the registry.
parent : Instance(HasTraits)
An optional parent object in which `object` is contained
trait_name_on_parent : str
An optional trait name of the `object` in the `parent`.
ignore : list(str)
An optional list of trait names on the `object` to be
ignored.
known : bool
Optional specification if the `object` id is known on the
interpreter. This is needed if you are manually injecting
code to define/create an object.
script_id : str
Optionally specify a script_id to use for this object. It
is not guaranteed that this ID will be used since it may
already be in use.
"""
registry = self._registry
# Do nothing if the object is already registered.
if object in registry:
return
# When parent is specified the trait_name_on_parent must also be.
if parent is not None:
assert len(trait_name_on_parent) > 0
if ignore is None:
ignore = []
if isinstance(object, HasTraits):
# Always ignore these.
ignore.extend(["trait_added", "trait_modified"])
sub_recordables = list(object.traits(record=True).keys())
# Find all the trait names we must ignore.
ignore.extend(object.traits(record=False).keys())
# The traits to listen for.
tnames = [
t
for t in object.trait_names()
if not t.startswith("_")
and not t.endswith("_")
and t not in ignore
]
# Find all list traits.
trts = object.traits()
list_names = []
for t in tnames:
tt = trts[t].trait_type
if (
hasattr(tt, "default_value_type")
and tt.default_value_type == 5
):
list_names.append(t)
else:
# No traits, so we can't do much.
sub_recordables = []
tnames = []
list_names = []
# Setup the registry data.
# If a script id is supplied try and use it.
sid = ""
if script_id is not None:
r_registry = self._reverse_registry
while script_id in r_registry:
script_id = "%s1" % script_id
sid = script_id
# Add the chosen id to special_id list.
self._special_ids.append(sid)
if parent is None:
pdata = None
if len(sid) == 0:
sid = self._get_unique_name(object)
else:
pdata = self._get_registry_data(parent)
tnop = trait_name_on_parent
if "[" in tnop:
# If the object is a nested object through an iterator,
# we instantiate it and don't refer to it through the
# path, this makes scripting convenient.
sid = self._get_unique_name(object)
# Register the object with the data.
data = _RegistryData(
script_id=sid,
parent_data=pdata,
trait_name_on_parent=trait_name_on_parent,
names=tnames,
sub_recordables=sub_recordables,
list_names=list_names,
)
registry[object] = data
# Now get the script id of the object -- note that if sid is ''
# above then the script_id is computed from that of the parent.
sid = data.script_id
# Setup reverse registry so we can get the object from the
# script_id.
self._reverse_registry[sid] = object
# Record the script_id if the known argument is explicitly set to
# True.
if known:
self._known_ids.append(sid)
# Try and set the recorder attribute if necessary.
if hasattr(object, "recorder"):
try:
object.recorder = self
except Exception as e:
msg = "Cannot set 'recorder' trait of object %r: " "%s" % (
object,
e,
)
warnings.warn(msg, warnings.RuntimeWarning)
if isinstance(object, HasTraits):
# Add handler for lists.
for name in list_names:
object.on_trait_change(
self._list_items_listner, "%s_items" % name
)
# Register all sub-recordables.
for name in sub_recordables:
obj = getattr(object, name)
if isinstance(obj, list):
# Don't register the object itself but register its
# children.
for i, child in enumerate(obj):
attr = "%s[%d]" % (name, i)
self.register(
child, parent=object, trait_name_on_parent=attr
)
elif obj is not None:
self.register(
obj, parent=object, trait_name_on_parent=name
)
# Listen for changes to the trait itself so the newly
# assigned object can also be listened to.
object.on_trait_change(self._object_changed_handler, name)
# Now add listner for the object itself.
object.on_trait_change(self._listner, tnames)
def unregister(self, object):
"""Unregister the given object from the recorder. This inverts
the logic of the `register(...)` method.
"""
registry = self._registry
# Do nothing if the object isn't registered.
if object not in registry:
return
data = registry[object]
# Try and unset the recorder attribute if necessary.
if hasattr(object, "recorder"):
try:
object.recorder = None
except Exception as e:
msg = "Cannot unset 'recorder' trait of object %r:" "%s" % (
object,
e,
)
warnings.warn(msg, warnings.RuntimeWarning)
if isinstance(object, HasTraits):
# Remove all list_items handlers.
for name in data.list_names:
object.on_trait_change(
self._list_items_listner, "%s_items" % name, remove=True
)
# Unregister all sub-recordables.
for name in data.sub_recordables:
obj = getattr(object, name)
if isinstance(obj, list):
# Unregister the children.
for i, child in enumerate(obj):
self.unregister(child)
elif obj is not None:
self.unregister(obj)
# Remove the trait handler for trait assignments.
object.on_trait_change(
self._object_changed_handler, name, remove=True
)
# Now remove listner for the object itself.
object.on_trait_change(self._listner, data.names, remove=True)
# Remove the object data from the registry etc.
if data.script_id in self._known_ids:
self._known_ids.remove(data.script_id)
del self._reverse_registry[data.script_id]
del registry[object]
def save(self, file):
"""Save the recorded lines to the given file. It does not close
the file.
"""
file.write(self.get_code())
file.flush()
def record_function(self, func, args, kw):
"""Record a function call given the function and its
arguments."""
if self.recording and not self._in_function:
# Record the function name and arguments.
call_str = self._function_as_string(func, args, kw)
# Call the function.
try:
self._in_function = True
result = func(*args, **kw)
finally:
self._in_function = False
# Register the result if it is not None.
if func.__name__ == "__init__":
f_self = args[0]
code = self._import_class_string(f_self.__class__)
self.lines.append(code)
return_str = self._registry.get(f_self).script_id
else:
return_str = self._return_as_string(result)
if len(return_str) > 0:
self.lines.append("%s = %s" % (return_str, call_str))
else:
self.lines.append("%s" % (call_str))
else:
result = func(*args, **kw)
return result
def ui_save(self):
"""Save recording to file, pop up a UI dialog to find out where
and close the file when done.
"""
from pyface.api import FileDialog, OK
wildcard = "Python files (*.py)|*.py|" + FileDialog.WILDCARD_ALL
dialog = FileDialog(
title="Save Script", action="save as", wildcard=wildcard
)
if dialog.open() == OK:
fname = dialog.path
f = open(fname, "w")
self.save(f)
f.close()
def clear(self):
"""Clears all previous recorded state and unregisters all
registered objects."""
# First unregister any registered objects.
registry = self._registry
while len(registry) > 0:
self.unregister(list(registry.keys())[0])
# Clear the various lists.
self.lines[:] = []
self._registry.clear()
self._known_ids[:] = []
self._name_map.clear()
self._reverse_registry.clear()
self._known_types[:] = []
self._special_ids[:] = []
def get_code(self):
"""Returns the recorded lines as a string of printable code."""
return "\n".join(self.lines) + "\n"
def is_registered(self, object):
"""Returns True if the given object is registered with the
recorder."""
return object in self._registry
def get_script_id(self, object):
"""Returns the script_id of a registered object. Useful when
you want to manually add a record statement."""
return self._get_registry_data(object).script_id
def get_object_path(self, object):
"""Returns the path in the object hierarchy of a registered
object. Useful for debugging."""
return self._get_registry_data(object).path
def write_script_id_in_namespace(self, script_id):
"""If a script_id is not known in the current script's namespace,
this sets it using the path of the object or actually
instantiating it. If this is not possible (since the script_id
matches no existing object), nothing is recorded but the
framework is notified that the particular script_id is available
in the namespace. This is useful when you want to inject code
in the namespace to create a particular object.
"""
if not self.recording:
return
known_ids = self._known_ids
if script_id not in known_ids:
obj = self._reverse_registry.get(script_id)
# Add the ID to the known_ids.
known_ids.append(script_id)
if obj is not None:
data = self._registry.get(obj)
result = ""
if len(data.path) > 0:
# Record code for instantiation of object.
result = "%s = %s" % (script_id, data.path)
else:
# This is not the best thing to do but better than
# nothing.
result = self._import_class_string(obj.__class__)
cls = obj.__class__.__name__
result += "\n%s = %s()" % (script_id, cls)
if len(result) > 0:
self.lines.extend(result.split("\n"))
###########################################################################
# Non-public interface.
###########################################################################
def _get_unique_name(self, obj):
"""Return a unique object name (a string). Note that this does
not cache the object, so if called with the same object 3 times
you'll get three different names.
"""
cname = obj.__class__.__name__
nm = self._name_map
result = ""
builtin = False
if cname in builtins.__dict__:
builtin = True
if hasattr(obj, "__name__"):
cname = obj.__name__
else:
cname = camel_case_to_python(cname)
special_ids = self._special_ids
while len(result) == 0 or result in special_ids:
if cname in nm:
id = nm[cname] + 1
nm[cname] = id
result = "%s%d" % (cname, id)
else:
nm[cname] = 0
# The first id doesn't need a number if it isn't builtin.
if builtin:
result = "%s0" % (cname)
else:
result = cname
return result
def _get_registry_data(self, object):
"""Get the data for an object from registry."""
data = self._registry.get(object)
if data is None:
msg = (
"Recorder: Can't get script_id since object %s not registered"
)
raise RecorderError(msg % (object))
return data
def _listner(self, object, name, old, new):
"""The listner for trait changes on an object.
This is called by child listners or when any of the recordable
object's traits change when recording to a script is enabled.
Parameters:
-----------
object : Object which has changed.
name : extended name of attribute that changed.
old : Old value.
new : New value.
"""
if self.recording and not self._in_function:
new_repr = repr(new)
sid = self._get_registry_data(object).script_id
if len(sid) == 0:
msg = "%s = %r" % (name, new)
else:
msg = "%s.%s = %r" % (sid, name, new)
if new_repr.startswith("<") and new_repr.endswith(">"):
self.record("# " + msg)
else:
self.record(msg)
def _list_items_listner(self, object, name, old, event):
"""The listner for *_items on list traits of the object."""
# Set the path of registered objects in the modified list and
# all their children. This is done by unregistering the object
# and re-registering them. This is slow but.
registry = self._registry
sid = registry.get(object).script_id
trait_name = name[:-6]
items = getattr(object, trait_name)
for (i, item) in enumerate(items):
if item in registry:
data = registry.get(item)
tnop = data.trait_name_on_parent
if len(tnop) > 0:
data.trait_name_on_parent = "%s[%d]" % (trait_name, i)
# Record the change.
if self.recording and not self._in_function:
index = event.index
removed = event.removed
added = event.added
nr = len(removed)
slice = "[%d:%d]" % (index, index + nr)
rhs = [self._object_as_string(item) for item in added]
rhs = ", ".join(rhs)
obj = "%s.%s" % (sid, name[:-6])
msg = "%s%s = [%s]" % (obj, slice, rhs)
self.record(msg)
def _object_changed_handler(self, object, name, old, new):
"""Called when a child recordable object has been reassigned."""
registry = self._registry
if old is not None:
if old in registry:
self.unregister(old)
if new is not None:
if new not in registry:
self.register(new, parent=object, trait_name_on_parent=name)
def _get_script(self):
return self.get_code()
def _analyze_code(self, code):
"""Analyze the code and return extra code if needed."""
lhs = ""
try:
lhs = code.split()[0]
except IndexError:
pass
if "." in lhs:
ob_name = lhs.split(".")[0]
self.write_script_id_in_namespace(ob_name)
def _function_as_string(self, func, args, kw):
"""Return a string representing the function call."""
func_name = func.__name__
func_code = func.__code__
# Even if func is really a decorated method it never shows up as
# a bound or unbound method here, so we have to inspect the
# argument names to figure out if this is a method or function.
if func_code.co_argcount > 0 and func_code.co_varnames[0] == "self":
# This is a method, the first argument is bound to self.
f_self = args[0]
# Convert the remaining arguments to strings.
argl = [self._object_as_string(arg) for arg in args[1:]]
# If this is __init__ we special case it.
if func_name == "__init__":
# Register the object.
self.register(f_self, known=True)
func_name = f_self.__class__.__name__
else:
sid = self._object_as_string(f_self)
func_name = "%s.%s" % (sid, func_name)
else:
argl = [self._object_as_string(arg) for arg in args]
# Convert the keyword args.
kwl = [
"%s=%s" % (key, self._object_as_string(value))
for key, value in kw.items()
]
argl.extend(kwl)
# Make a string representation of the args, kw.
argstr = ", ".join(argl)
return "%s(%s)" % (func_name, argstr)
def _is_arbitrary_object(self, object):
"""Return True if the object is an arbitrary non-primitive object.
We assume that if the hex id of the object is in its string
representation then it is an arbitrary object.
"""
ob_id = id(object)
orepr = repr(object)
hex_id = "%x" % ob_id
return hex_id.upper() in orepr.upper()
def _object_as_string(self, object):
"""Return a string representing the object."""
registry = self._registry
if object in registry:
# Return script id if the object is known; create the script
# id on the namespace if needed before that.
sid = registry.get(object).script_id
base_id = sid.split(".")[0]
self.write_script_id_in_namespace(base_id)
return sid
else:
if not self._is_arbitrary_object(object):
return repr(object)
# If we get here, we just register the object and call ourselves
# again to do the needful.
self.register(object)
return self._object_as_string(object)
def _return_as_string(self, object):
"""Return a string given a returned object from a function."""
result = ""
ignore = (float, complex, bool, int, str)
if object is not None and type(object) not in ignore:
# If object is not know, register it.
registry = self._registry
if object not in registry:
self.register(object)
result = registry.get(object).script_id
# Since this is returned it is known on the namespace.
known_ids = self._known_ids
if result not in known_ids:
known_ids.append(result)
return result
def _import_class_string(self, cls):
"""Import a class if needed."""
cname = cls.__name__
result = ""
if cname not in builtins.__dict__:
mod = cls.__module__
typename = "%s.%s" % (mod, cname)
if typename not in self._known_types:
result = "from %s import %s" % (mod, cname)
self._known_types.append(typename)
return result
|