1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669
|
# ------------------------------------------------------------------------------
# Copyright (c) 2013-2025, Nucleic Development Team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ------------------------------------------------------------------------------
import ast
from contextlib import contextmanager
import bytecode as bc
from atom.api import Atom, Bool, Int, List, Str
from ..compat import PY310, PY311, PY312, PY313, PY314
class _ReturnNoneIdentifier(ast.NodeVisitor):
"""Visit top level nodes looking for return None."""
def __init__(self) -> None:
super().__init__()
# Lines on which an explicit return None or return exist
self.lines: set[int] = set()
def visit_Return(self, node: ast.Return) -> None:
if not node.value or (
isinstance(node.value, ast.Constant) and node.value.value is None
):
self.lines.add(node.lineno)
# Do not inspect nodes in which a return None won't be relevant.
def visit_AsyncFunctionDef(self, node):
pass
def visit_FunctionDef(self, node):
pass
def visit_ClassDef(self, node):
pass
class CodeGenerator(Atom):
"""A class for generating bytecode operations.
This class matches enaml needs and does not support freevars or cellvars.
"""
#: The arguments for the code.
args = List()
#: Number of positional only arguments
posonlyargs = Int()
#: Number of keyword only arguments
kwonlyargs = Int()
#: Whether the code takes variadic args.
varargs = Bool(False)
#: Whether the code takes variadic kwargs.
varkwargs = Bool(False)
#: Whether the code object should get new locals.
newlocals = Bool(False)
#: The name for the code object.
name = Str()
#: The full name of the file which is being compiled.
filename = Str()
#: The first line number for the code object.
firstlineno = Int(1)
#: The docstring for the code object.
docstring = Str()
#: The list of generated code operations.
code_ops = List()
def to_code(self):
"""Create a Python code object from the current code ops."""
bc_code = bc.Bytecode(self.code_ops)
bc_code.argnames = self.args
# The number of positional or keyword args correspond to all args minus:
# - the positionals only
# - the keywords only
# - the variadic positional
# - the variadic keyword
bc_code.argcount = (
len(self.args)
- self.kwonlyargs
- self.posonlyargs
- self.varargs
- self.varkwargs
)
bc_code.posonlyargcount = self.posonlyargs
bc_code.kwonlyargcount = self.kwonlyargs
for name in ("name", "filename", "firstlineno", "docstring"):
setattr(bc_code, name, getattr(self, name))
# Set flags appropriately and update flags based on the instructions
for setting, flag in zip(
(self.varargs, self.varkwargs, self.newlocals),
(
bc.CompilerFlags.VARARGS,
bc.CompilerFlags.VARKEYWORDS,
bc.CompilerFlags.NEWLOCALS,
),
):
# Set the flag
if setting:
bc_code.flags |= flag
# Unset the flag if it was set
else:
bc_code.flags ^= bc_code.flags & flag
bc_code.update_flags()
# Ensure all code objects starts with a RESUME to get the right frame
if PY311:
for i, instr in enumerate(bc_code):
if isinstance(instr, bc.Instr):
if instr.name != "RESUME":
bc_code.insert(i, bc.Instr("RESUME", 0))
break
return bc_code.to_code()
def set_lineno(self, lineno):
"""Set the current line number in the code."""
self.code_ops.append( # TOS
bc.SetLineno(lineno), # TOS
)
def load_global(self, name, push_null=False):
"""Load a global variable onto the TOS."""
if PY311:
args = (push_null, name)
else:
args = name
self.code_ops.append( # TOS
bc.Instr("LOAD_GLOBAL", args), # TOS -> value
)
def load_name(self, name):
"""Load a named variable onto the TOS."""
self.code_ops.append( # TOS
bc.Instr("LOAD_NAME", name), # TOS -> value
)
def load_fast(self, name):
"""Load a fast local variable onto the TOS."""
self.code_ops.append( # TOS
bc.Instr("LOAD_FAST", name), # TOS -> value
)
def load_const(self, const):
"""Load a const value onto the TOS."""
self.code_ops.append( # TOS
bc.Instr("LOAD_CONST", const), # TOS -> value
)
def load_attr(self, name):
"""Load an attribute from the object on TOS."""
if PY312:
args = (False, name)
else:
args = name
self.code_ops.append( # TOS -> obj
bc.Instr("LOAD_ATTR", args), # TOS -> value
)
def load_method(self, name):
"""Load a method from an object on TOS."""
if PY312:
self.code_ops.append( # TOS -> obj
# on 3.12 the order is reversed
bc.Instr("LOAD_ATTR", (True, name)), # TOS -> method -> self
)
else:
# On 3.10 one has to use call_method next
self.code_ops.append(bc.Instr("LOAD_METHOD", name))
def store_global(self, name):
"""Store the TOS as a global."""
self.code_ops.append( # TOS -> value
bc.Instr("STORE_GLOBAL", name), # TOS
)
def store_name(self, name):
"""Store the TOS under name."""
self.code_ops.append( # TOS -> value
bc.Instr("STORE_NAME", name), # TOS
)
def store_fast(self, name):
"""Store the TOS as a fast local."""
self.code_ops.append( # TOS -> value
bc.Instr("STORE_FAST", name), # TOS
)
def store_attr(self, name):
"""Store the value at 2nd as an attr on 1st."""
self.code_ops.append( # TOS -> val -> obj
bc.Instr("STORE_ATTR", name), # TOS
)
def delete_global(self, name):
"""Delete a named global variable."""
self.code_ops.append( # TOS
bc.Instr("DELETE_GLOBAL", name), # TOS
)
def delete_fast(self, name):
"""Delete a named fast local variable."""
self.code_ops.append( # TOS
bc.Instr("DELETE_FAST", name), # TOS
)
def return_value(self):
"""Return the value from the TOS."""
if (
not PY314
and PY312
and self.code_ops
and self.code_ops[-1].name == "LOAD_CONST"
):
self.code_ops[-1] = bc.Instr("RETURN_CONST", self.code_ops[-1].arg)
else:
self.code_ops.append( # TOS -> value
bc.Instr("RETURN_VALUE"), # TOS
)
def binary_subscr(self):
"""Subscript the #2 item with the TOS."""
if PY314:
# In Python 3.14 BINARY_SUBSCR was replaced with BINARY_OP 6
self.code_ops.append(bc.Instr("BINARY_OP", bc.BinaryOp.SUBSCR))
else:
self.code_ops.append( # TOS -> obj -> idx
bc.Instr("BINARY_SUBSCR"), # TOS -> value
)
def binary_multiply(self):
"""Multiply the 2 items on the TOS."""
if PY311:
instr = bc.Instr("BINARY_OP", 5)
else:
instr = bc.Instr("BINARY_MULTIPLY")
self.code_ops.append( # TOS -> val_1 -> val_2
instr, # TOS -> retval
)
def binary_add(self):
"""Add the 2 items on the TOS."""
if PY311:
instr = bc.Instr("BINARY_OP", 0)
else:
instr = bc.Instr("BINARY_ADD")
self.code_ops.append( # TOS -> val_1 -> val_2
instr, # TOS -> retval
)
def dup_top(self):
"""Duplicate the value on the TOS."""
if PY311:
instr = bc.Instr("COPY", 1)
else:
instr = bc.Instr("DUP_TOP")
self.code_ops.append( # TOS -> value
instr, # TOS -> value -> value
)
def build_map(self, n=0):
"""Build a map and store it onto the TOS."""
self.code_ops.append( # TOS
bc.Instr("BUILD_MAP", n), # TOS -> map
)
def build_tuple(self, n=0):
"""Build a tuple from items on the TOS."""
if n == 0:
self.code_ops.append( # TOS
bc.Instr("LOAD_CONST", ()), # TOS -> tuple
)
else:
self.code_ops.append( # TOS
bc.Instr("BUILD_TUPLE", n), # TOS -> tuple
)
def build_list(self, n=0):
"""Build a list from items on the TOS."""
self.code_ops.append( # TOS
bc.Instr("BUILD_LIST", n), # TOS -> list
)
def add_map(self):
"""Store the key/value pair on the TOS into the map at 3rd pos."""
# WARNING in Python 3.8 the order is # TOS -> map -> key -> value
self.code_ops.append( # TOS -> map -> value -> key
bc.Instr("MAP_ADD", 1),
)
def store_subscr(self):
"""Store the index/value pair on the TOS into the 3rd item."""
self.code_ops.append( # TOS -> value -> obj -> index
bc.Instr("STORE_SUBSCR"), # TOS
)
def load_build_class(self):
"""Build a class from the top 3 stack items."""
self.code_ops.append( # TOS
bc.Instr("LOAD_BUILD_CLASS"), # TOS -> builtins.__build_class__
)
def make_function(self, flags=0, name=None):
"""Make a function from a code object on the TOS."""
if not PY311:
self.load_const(name)
if PY313:
if flags:
self.code_ops.extend(
(
bc.Instr("MAKE_FUNCTION"), # TOS -> qual_name -> code
bc.Instr(
"SET_FUNCTION_ATTRIBUTE", flags
), # TOS -> func -> attrs
)
)
else:
self.code_ops.append(bc.Instr("MAKE_FUNCTION"))
else:
self.code_ops.append( # TOS -> qual_name -> code -> defaults
bc.Instr("MAKE_FUNCTION", flags), # TOS -> func
)
def push_null(self):
"""Push NULL on the TOS."""
self.code_ops.append( # TOS
bc.Instr("PUSH_NULL"), # TOS -> NULL
)
def call_function(self, n_args=0, n_kwds=0, is_method: bool = False):
"""Call a function on the TOS with the given args and kwargs."""
if PY313:
# NOTE: In Python 3.13 the caller must push null
# onto the stack before calling this
# TOS -> func -> null -> args -> kwargs_names -> kwargs (tuple)
arg = n_args + n_kwds
self.code_ops.append(bc.Instr("CALL_KW" if n_kwds else "CALL", arg))
elif PY311:
# NOTE: In Python 3.11 the caller must push null
# onto the stack before calling this
# TOS -> null -> func -> args -> kwargs -> kwargs_names
# In Python 3.12 PRECALL was removed
arg = n_args + n_kwds
ops = [bc.Instr("CALL", arg)]
if not PY312:
ops = [bc.Instr("PRECALL", arg)] + ops
if n_kwds:
ops.insert(0, bc.Instr("KW_NAMES", 3))
self.code_ops.extend(ops)
else:
if n_kwds:
if is_method:
raise ValueError(
"Method calling convention cannot be used with keywords"
)
# kwargs_name should be a tuple listing the keyword
# arguments names
# TOS -> func -> args -> kwargs -> kwargs_names
op, arg = "CALL_FUNCTION_KW", n_args + n_kwds
elif is_method:
op, arg = "CALL_METHOD", n_args
else:
op, arg = "CALL_FUNCTION", n_args
self.code_ops.append(bc.Instr(op, arg)) # TOS -> retval
def call_function_var(self, kwds: bool = False):
"""Call a variadic function on the TOS with the given args and kwargs."""
# Under Python 3.6+ positional arguments should always be stored
# in a tuple and keywords in a mapping.
if PY314:
if kwds is False:
self.code_ops.append(bc.Instr("PUSH_NULL"))
self.code_ops.append( # TOS -> func -> NULL -> args -> kwargs
bc.Instr("CALL_FUNCTION_EX"), # TOS -> retval
)
else:
argspec = int(kwds)
self.code_ops.append( # TOS -> func -> args -> kwargs -> varargs
bc.Instr("CALL_FUNCTION_EX", argspec), # TOS -> retval
)
def pop_top(self):
"""Pop the value from the TOS."""
self.code_ops.append( # TOS -> value
bc.Instr("POP_TOP"), # TOS
)
def rot_two(self):
"""Rotate the two values on the TOS."""
if PY311:
instr = bc.Instr("SWAP", 2)
else:
instr = bc.Instr("ROT_TWO")
self.code_ops.append( # TOS -> val_1 -> val_2
instr, # TOS -> val_2 -> val_1
)
def rot_three(self):
"""Rotate the three values on the TOS."""
if PY311:
self.code_ops.extend(
( # TOS -> val_1 -> val_2 -> val_3
bc.Instr("SWAP", 3), # TOS -> val_3 -> val_2 -> val_1
bc.Instr("SWAP", 2), # TOS -> val_3 -> val_1 -> val_2
)
)
else:
self.code_ops.append( # TOS -> val_1 -> val_2 -> val_3
bc.Instr("ROT_THREE") # TOS -> val_3 -> val_1 -> val_2
)
def unpack_sequence(self, n):
"""Unpack the sequence on the TOS."""
self.code_ops.append( # TOS -> obj
bc.Instr("UNPACK_SEQUENCE", n), # TOS -> val_n -> val_2 -> val_1
)
@contextmanager
def try_squash_raise(self):
"""A context manager for squashing tracebacks.
The code written during this context will be wrapped so that
any exception raised will appear to have been generated from
the code, rather than any function called by the code.
Under Python 3.11 this is safe to use only if the inner code does not
contain TryBegin pseudo-instructions.
"""
exc_label = bc.Label()
end_label = bc.Label()
if PY311:
self.code_ops.append(tb := bc.TryBegin(exc_label, False))
first_new = len(self.code_ops)
yield
for i in self.code_ops[first_new:]:
if isinstance(i, bc.TryBegin):
raise ValueError(
"try_squash_raise cannot wrap a block containing "
"exception handling logic. Wrapped block is:\n"
f"{self.code_ops[first_new:]}"
)
ops = [
bc.TryEnd(tb),
bc.Instr("JUMP_FORWARD", end_label),
# Under Python 3.11 only the actual exception is pushed
exc_label, # TOS -> val
bc.Instr("LOAD_CONST", None), # TOS -> val -> None
bc.Instr("COPY", 2), # TOS -> val -> None -> val
bc.Instr("STORE_ATTR", "__traceback__"), # TOS -> val
bc.Instr("RAISE_VARARGS", 1),
end_label,
]
self.code_ops.extend(ops)
else:
op_code = "SETUP_FINALLY"
self.code_ops.append(
bc.Instr(op_code, exc_label), # TOS
)
yield
# exc is only the exception type which can be used for matching
# val is the exception value, raising it directly preserve the traceback
# tb is the traceback and is of little interest
# We reset the traceback to None to make it appear as if the code
# raised the exception instead of a function called by it
ops = [ # TOS
bc.Instr("POP_BLOCK"), # TOS
bc.Instr("JUMP_FORWARD", end_label), # TOS
exc_label, # TOS -> tb -> val -> exc
bc.Instr("POP_TOP"), # TOS -> tb -> val
bc.Instr("ROT_TWO"), # TOS -> val -> tb
bc.Instr("POP_TOP"), # TOS -> val
bc.Instr("DUP_TOP"), # TOS -> val -> val
bc.Instr("LOAD_CONST", None), # TOS -> val -> val -> None
bc.Instr("ROT_TWO"), # TOS -> val -> None -> val
bc.Instr("STORE_ATTR", "__traceback__"), # TOS -> val
bc.Instr("RAISE_VARARGS", 1), # TOS
bc.Instr("POP_EXCEPT"),
bc.Instr("JUMP_FORWARD", end_label), # TOS
end_label, # TOS
]
self.code_ops.extend(ops)
@contextmanager
def for_loop(self, iter_var, fast_var=True):
"""A context manager for creating for-loops.
Parameters
----------
iter_var : str
The name of the loop iter variable.
fast_var : bool, optional
Whether the iter_var lives in fast locals. The default is
True. If False, the iter_var is loaded from globals.
"""
start_label = bc.Label()
jump_label = bc.Label()
# Unused under Python 3.8+ since the compiler handle the blocks
# automatically
end_label = bc.Label()
load_op = "LOAD_FAST" if fast_var else "LOAD_GLOBAL"
self.code_ops.append(
bc.Instr("SETUP_LOOP", end_label),
)
if PY311 and not fast_var:
# LOAD_GLOBAL expects a tuple on 3.11
iter_var = (False, iter_var)
self.code_ops.extend(
[
bc.Instr(load_op, iter_var),
bc.Instr("GET_ITER"),
start_label,
bc.Instr("FOR_ITER", jump_label),
]
)
yield
self.code_ops.extend(
[
bc.Instr("JUMP_BACKWARD", start_label)
if PY312
else bc.Instr("JUMP_ABSOLUTE", start_label),
jump_label,
]
)
self.code_ops.extend(
bc.Instr("POP_BLOCK"),
end_label,
)
if PY312:
self.code_ops.append(bc.Instr("END_FOR"))
def insert_python_block(self, pydata, trim=True):
"""Insert the compiled code for a Python Module ast or string."""
if PY310:
_inspector = _ReturnNoneIdentifier()
_inspector.visit(pydata)
code = compile(pydata, self.filename, mode="exec")
bc_code = bc.Bytecode.from_code(code)
if PY311: # Trim irrelevant RESUME opcode
bc_code = bc_code[1:]
# On python 3.10 with a with or try statement the implicit return None
# can be duplicated. We remove return None from all basic blocks when
# it was not present in the AST
if PY310:
cfg = bc.ControlFlowGraph.from_bytecode(bc_code)
new_end = None
last_block = cfg[-1]
for block in list(cfg):
if isinstance(block[-1], bc.Instr) and (
(rc := (block[-1].name == "RETURN_CONST" and block[-1].arg is None))
or (
block[-1].name == "RETURN_VALUE"
and block[-2].name == "LOAD_CONST"
and block[-2].arg is None
and block[-1].lineno not in _inspector.lines
)
):
if rc:
del block[-1]
else:
del block[-2:]
# If as a result of the trimming the block is empty, we add
# a NOP to make sure it is valid still
if not any(isinstance(i, bc.Instr) for i in block):
block.append(bc.Instr("NOP"))
# If we have multiple block jump to the end of the last block
# to execute the code that may be appended to this block
if block is not last_block:
# We use a NOP to be sure to always have a valid jump target
new_end = new_end or cfg.add_block([bc.Instr("NOP")])
block.append(bc.Instr("JUMP_FORWARD", new_end))
elif new_end is not None:
last_block.next_block = new_end
bc_code = cfg.to_bytecode()
# Skip the LOAD_CONST RETURN_VALUE pair if it exists
elif trim and bc_code[-1].name == "RETURN_VALUE":
bc_code = bc_code[:-2]
self.code_ops.extend(bc_code)
def insert_python_expr(self, pydata, trim=True):
"""Insert the compiled code for a Python Expression ast or string."""
code = compile(pydata, self.filename, mode="eval")
bc_code = bc.Bytecode.from_code(code)
if PY311: # Trim irrelevant RESUME opcode
bc_code = bc_code[1:]
if bc_code[-1].name == "RETURN_CONST":
bc_code[-1] = bc.Instr(
"LOAD_CONST", bc_code[-1].arg, location=bc_code[-1].location
)
trim = False
if trim: # skip RETURN_VALUE opcode
bc_code = bc_code[:-1]
self.code_ops.extend(bc_code)
def rewrite_to_fast_locals(self, local_names):
"""Rewrite the locals to be loaded from fast locals.
Given a set of available local names, this method will rewrite
the current code ops, replaces every instance of a *_NAME opcode
with a *_FAST or *_GLOBAL depending on whether or not the name
exists in local_names or was written via STORE_NAME. This method
is useful to convert the code so it can be used as a function.
Parameters
----------
local_names : set
The set of available locals for the code.
Returns
-------
result : list
The list of names which must be provided as arguments.
"""
arg_names = []
stored_names = set()
code_ops = self.code_ops
for idx, instr in enumerate(code_ops):
if not isinstance(instr, bc.Instr):
continue
if instr.name == "STORE_NAME":
stored_names.add(instr.arg)
instr.name = "STORE_FAST"
for idx, instr in enumerate(code_ops):
if not isinstance(instr, bc.Instr):
continue
i_name = instr.name
if i_name == "LOAD_NAME":
i_arg = instr.arg
if i_arg in local_names:
op = "LOAD_FAST"
arg_names.append(i_arg)
elif i_arg in stored_names:
op = "LOAD_FAST"
elif PY311:
# TODO: Is there a better way to do this?
code_ops[idx] = bc.Instr("LOAD_GLOBAL", (False, instr.arg))
continue
else:
op = "LOAD_GLOBAL"
instr.name = op
elif i_name == "DELETE_NAME":
if instr.arg in stored_names:
op = "DELETE_FAST"
else:
op = "DELETE_GLOBAL"
instr.name = op
self.args = arg_names
self.newlocals = True
return arg_names
|