1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
|
from logging import error
from ptrace.disasm import HAS_DISASSEMBLER
from signal import SIGFPE, SIGSEGV, SIGABRT
try:
from signal import SIGCHLD
except ImportError:
SIGCHLD = None
try:
from signal import SIGBUS
except ImportError:
SIGBUS = None
from ptrace.os_tools import RUNNING_LINUX
from ptrace.cpu_info import CPU_64BITS
from ptrace.debugger import ProcessEvent
from ptrace.error import PtraceError
from ptrace import signalName
from ptrace.debugger.signal_reason import (
DivisionByZero, Abort, StackOverflow,
InvalidMemoryAcces, InvalidRead, InvalidWrite,
InstructionError, ChildExit)
from ptrace.debugger.parse_expr import parseExpression
import re
# Match a pointer dereference (eg. "DWORD [EDX+0x8]")
DEREF_REGEX = r'(?P<deref_size>(BYTE|WORD|DWORD|DQWORD) )?\[(?P<deref>[^]]+)\]'
NAMED_WORD_SIZE = {
'BYTE': 1,
'WORD': 2,
'DWORD': 4,
'DQWORD': 8,
}
# Match any Intel instruction (eg. "ADD")
INSTR_REGEX = '(?:[A-Z]{3,10})'
def findDerefSize(match):
name = match.group("deref_size")
try:
return NAMED_WORD_SIZE[name]
except KeyError:
return None
def evalFaultAddress(process, match):
expr = match.group('deref')
if not expr:
return None
try:
return parseExpression(process, expr)
except ValueError as err:
print("err: %s" % err)
return None
class ProcessSignal(ProcessEvent):
def __init__(self, signum, process):
# Initialize attributes
self.name = signalName(signum)
ProcessEvent.__init__(self, process, "Signal %s" % self.name)
self.signum = signum
self.reason = None
def _analyze(self):
if self.signum in (SIGSEGV, SIGBUS):
self.memoryFault()
elif self.signum == SIGFPE:
self.mathError()
elif self.signum == SIGCHLD:
self.childExit()
elif self.signum == SIGABRT:
self.reason = Abort()
return self.reason
def getInstruction(self):
if not HAS_DISASSEMBLER:
return None
try:
return self.process.disassembleOne()
except PtraceError:
return None
def memoryFaultInstr(self, instr, fault_address):
asm = instr.text
# Invalid write (eg. "MOV [...], value")
match = re.search(r"^(?:MOV|TEST)[A-Z]* %s," % DEREF_REGEX, asm)
if match:
if fault_address is None:
fault_address = evalFaultAddress(self.process, match)
self.reason = InvalidWrite(fault_address, size=findDerefSize(match),
instr=instr, process=self.process)
return
# Invalid read (eg. "CMP BYTE [EAX+EDX-0x1], 0x0")
match = re.search(r"^%s %s," % (INSTR_REGEX, DEREF_REGEX), asm)
if match:
if fault_address is None:
fault_address = evalFaultAddress(self.process, match)
self.reason = InvalidRead(fault_address, size=findDerefSize(match),
instr=instr, process=self.process)
return
# Invalid read (eg. "MOV reg, [...]")
match = re.match(r"%s [^,]+, %s" % (INSTR_REGEX, DEREF_REGEX), asm)
if match:
if fault_address is None:
fault_address = evalFaultAddress(self.process, match)
self.reason = InvalidRead(fault_address, size=findDerefSize(match),
instr=instr, process=self.process)
return
# MOVS* and SCAS* instructions (eg. "MOVSB" or "REP SCASD")
match = re.search(r"^(?:REP(?:NZ)? )?(?P<operator>MOVS|SCAS)(?P<suffix>[BWD])?", asm)
if match:
self.reason = self.movsInstr(fault_address, instr, match)
return
def movsInstr(self, fault_address, instr, match):
operator = match.group("operator")
suffix = match.group("suffix")
size = {'B': 1, 'W': 2, 'D': 4}.get(suffix)
error_cls = InvalidMemoryAcces
try:
process = self.process
if CPU_64BITS:
source_reg = 'rsi'
dest_reg = 'rdi'
else:
source_reg = 'esi'
dest_reg = 'edi'
source_addr = process.getreg(source_reg)
registers = {source_reg: source_addr}
write = (operator == 'MOVS')
if write:
dest_addr = process.getreg(dest_reg)
registers[dest_reg] = dest_addr
if fault_address is not None:
if fault_address == source_addr:
error_cls = InvalidRead
if write and fault_address == dest_addr:
error_cls = InvalidWrite
else:
if write:
fault_address = (source_addr, dest_addr)
else:
fault_address = (source_addr,)
except PtraceError:
registers = {}
return error_cls(fault_address, size=size, instr=instr,
registers=registers, process=self.process)
def getSignalInfo(self):
if RUNNING_LINUX:
return self.process.getsiginfo()
else:
return None
def memoryFault(self):
# Get fault
siginfo = self.getSignalInfo()
if siginfo:
fault_address = siginfo._sigfault._addr
if not fault_address:
fault_address = 0
else:
fault_address = None
# Get current instruction
instr = self.getInstruction()
# Call to invalid address?
if fault_address is not None:
try:
ip = self.process.getInstrPointer()
if ip == fault_address:
self.reason = InstructionError(ip, process=self.process)
return
except PtraceError:
pass
# Stack overflow?
stack = self.process.findStack()
if stack:
sp = self.process.getStackPointer()
if not (stack.start <= sp <= stack.end):
self.reason = StackOverflow(sp, stack, instr=instr, process=self.process)
return
# Guess error type using the assembler instruction
if instr:
self.memoryFaultInstr(instr, fault_address)
if self.reason:
return
# Last chance: use generic invalid memory access error
self.reason = InvalidMemoryAcces(fault_address, instr=instr, process=self.process)
def mathError(self):
instr = self.getInstruction()
if not instr:
return
match = re.match(r"I?DIV (.*)", instr.text)
if not match:
return
self.reason = DivisionByZero(instr=instr, process=self.process)
def childExit(self):
siginfo = self.getSignalInfo()
if siginfo:
child = siginfo._sigchld
self.reason = ChildExit(child.pid, child.status, child.uid)
else:
self.reason = ChildExit()
def display(self, log=None):
self._analyze()
if not log:
log = error
log("-" * 60)
log("PID: %s" % self.process.pid)
log("Signal: %s" % self.name)
if self.reason:
self.reason.display(log)
log("-" * 60)
|