1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
from uuid import uuid4
from celery.canvas import Signature, StampingVisitor
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
class MyStampingVisitor(StampingVisitor):
def on_signature(self, sig: Signature, **headers) -> dict:
logger.critical(f"Visitor: Sig '{sig}' is stamped with: mystamp")
return {"mystamp": "I am a stamp!"}
class MonitoringIdStampingVisitor(StampingVisitor):
def on_signature(self, sig: Signature, **headers) -> dict:
mtask_id = str(uuid4())
logger.critical(f"Visitor: Sig '{sig}' is stamped with: {mtask_id}")
return {"mtask_id": mtask_id}
class FullVisitor(StampingVisitor):
def on_signature(self, sig: Signature, **headers) -> dict:
logger.critical(f"Visitor: Sig '{sig}' is stamped with: on_signature")
return {
"on_signature": "FullVisitor.on_signature()",
}
def on_callback(self, sig, **headers) -> dict:
logger.critical(f"Visitor: Sig '{sig}' is stamped with: on_callback")
return {
"on_callback": "FullVisitor.on_callback()",
}
def on_errback(self, sig, **headers) -> dict:
logger.critical(f"Visitor: Sig '{sig}' is stamped with: on_errback")
return {
"on_errback": "FullVisitor.on_errback()",
}
def on_chain_start(self, sig: Signature, **headers) -> dict:
logger.critical(f"Visitor: Sig '{sig}' is stamped with: on_chain_start")
return {
"on_chain_start": "FullVisitor.on_chain_start()",
}
def on_group_start(self, sig: Signature, **headers) -> dict:
logger.critical(f"Visitor: Sig '{sig}' is stamped with: on_group_start")
return {
"on_group_start": "FullVisitor.on_group_start()",
}
def on_chord_header_start(self, sig: Signature, **headers) -> dict:
logger.critical(f"Visitor: Sig '{sig}' is stamped with: on_chord_header_start")
s = super().on_chord_header_start(sig, **headers)
s.update(
{
"on_chord_header_start": "FullVisitor.on_chord_header_start()",
}
)
return s
def on_chord_body(self, sig: Signature, **headers) -> dict:
logger.critical(f"Visitor: Sig '{sig}' is stamped with: on_chord_body")
return {
"on_chord_body": "FullVisitor.on_chord_body()",
}
|