1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
import os
import pickle
import pytest
from deepdiff import Delta
from deepdiff.helper import Opcode
from deepdiff.serialization import ForbiddenModule
class TestDeltaClassPollution:
def test_builtins_int(self):
pollute_int = pickle.dumps(
{
"values_changed": {"root['tmp']": {"new_value": Opcode("", 0, 0, 0, 0)}},
"dictionary_item_added": {
(
("root", "GETATTR"),
("tmp", "GET"),
("__repr__", "GETATTR"),
("__globals__", "GETATTR"),
("__builtins__", "GET"),
("int", "GET"),
): "no longer a class"
},
}
)
assert isinstance(pollute_int, bytes)
# ------------[ Exploit ]------------
# This could be some example, vulnerable, application.
# The inputs above could be sent via HTTP, for example.
# Existing dictionary; it is assumed that it contains
# at least one entry, otherwise a different Delta needs to be
# applied first, adding an entry to the dictionary.
mydict = {"tmp": "foobar"}
# Before pollution
assert 42 == int("41") + 1
# Apply Delta to mydict
result = mydict + Delta(pollute_int)
assert 1337 == int("1337")
def test_remote_code_execution(self):
if os.path.exists('/tmp/pwned'):
os.remove('/tmp/pwned')
pollute_safe_to_import = pickle.dumps(
{
"values_changed": {"root['tmp']": {"new_value": Opcode("", 0, 0, 0, 0)}},
"set_item_added": {
(
("root", "GETATTR"),
("tmp", "GET"),
("__repr__", "GETATTR"),
("__globals__", "GETATTR"),
("sys", "GET"),
("modules", "GETATTR"),
("deepdiff.serialization", "GET"),
("SAFE_TO_IMPORT", "GETATTR"),
): set(["posix.system"])
},
}
)
# From https://davidhamann.de/2020/04/05/exploiting-python-pickle/
class RCE:
def __reduce__(self):
cmd = "id > /tmp/pwned"
return os.system, (cmd,)
# Wrap object with dictionary so that Delta does not crash
rce_pickle = pickle.dumps({"_": RCE()})
assert isinstance(pollute_safe_to_import, bytes)
assert isinstance(rce_pickle, bytes)
# ------------[ Exploit ]------------
# This could be some example, vulnerable, application.
# The inputs above could be sent via HTTP, for example.
# Existing dictionary; it is assumed that it contains
# at least one entry, otherwise a different Delta needs to be
# applied first, adding an entry to the dictionary.
mydict = {"tmp": "foobar"}
# Apply Delta to mydict
with pytest.raises(ValueError) as exc_info:
mydict + Delta(pollute_safe_to_import)
assert "traversing dunder attributes is not allowed" == str(exc_info.value)
with pytest.raises(ForbiddenModule) as exc_info:
Delta(rce_pickle) # no need to apply this Delta
assert "Module 'posix.system' is forbidden. You need to explicitly pass it by passing a safe_to_import parameter" == str(exc_info.value)
assert not os.path.exists('/tmp/pwned'), "We should not have created this file"
def test_delta_should_not_access_globals(self):
pollute_global = pickle.dumps(
{
"dictionary_item_added": {
(
("root", "GETATTR"),
("myfunc", "GETATTR"),
("__globals__", "GETATTR"),
("PWNED", "GET"),
): 1337
}
}
)
# demo application
class Foo:
def __init__(self):
pass
def myfunc(self):
pass
PWNED = False
delta = Delta(pollute_global)
assert PWNED is False
b = Foo() + delta
assert PWNED is False
|