File: mutation_chain.py

package info (click to toggle)
aflplusplus 4.33c-0.2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 14,740 kB
  • sloc: ansic: 111,574; cpp: 16,019; sh: 4,766; python: 4,546; makefile: 1,000; javascript: 521; java: 43; sql: 3; xml: 1
file content (154 lines) | stat: -rw-r--r-- 5,334 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python3
#
# MUTATION CHAIN COMPUTATION TOOL
#
# Ever wondered what the complete history of your AFL crash file looks like?
# Now you can!
#
# This tool is developed to support file structures for parallel fuzzing runs using the
# naming of main/secondary nodes as stated in the AFL docs (fuzzer01, fuzzer02 etc...)
# In case you want to use it for single node runs just recreate the directory structure
# which is used when parallel fuzzing is used (dump your results in a dir called fuzzer01).
#
# author: Maarten Dekker

# import required modules
import os, re, json
import argparse

crashes = {}
queues = {}

def fillDictWithFilenameKeys(dir):
    dict = {}
    for filename in os.listdir(dir):
        if re.match("^id:\\d+", filename):
            dict[filename] = None
    return dict

# recursively compute the chain of queue items that led to the AFL crash file
def compute_mutation_chain(filename, current_fuzzer, n):

    if re.match(".*src:(\\d+),", filename):

        source_id = re.match(".*src:(\\d+),", filename).group(1)
        file_we_look_for_rex = "^id:" + source_id + ","

        fuzzer_queue = None

        # determine if we need to look in the queue of another fuzzer instance
        if re.match(".*sync:(fuzzer\\d+),", filename):
            fuzzer_queue = re.match(".*sync:(fuzzer\\d+),", filename).group(1)
        else:
            fuzzer_queue = current_fuzzer

        for k,v in queues[fuzzer_queue].items():

            if re.match(file_we_look_for_rex, k):
               
                retval = {}
                retval[k] = compute_mutation_chain(k, fuzzer_queue, n+1)
                return retval

    # if the mutation result is a splice it thas 2 sources
    elif re.match(".*src:(\\d+)\\+(\\d+)", filename):

        sources = re.match(".*src:(\\d+)\\+(\\d+)", filename)

        source_id_1 = sources.group(1)
        source_id_2 = sources.group(2)

        file_we_look_for_1_rex = "^id:" + source_id_1 + ","
        file_we_look_for_2_rex = "^id:" + source_id_2 + ","

        # for mutation with two sources, the sources are never synced form other queues
        retval = {}

        for k,v in queues[current_fuzzer].items():

            if re.match(file_we_look_for_1_rex, k):
                retval[k] = compute_mutation_chain(k, current_fuzzer, n+1)

            elif re.match(file_we_look_for_2_rex, k):
                retval[k] = compute_mutation_chain(k, current_fuzzer, n+1)

        return retval

    else:
        return "seed"


def main():

    parser = argparse.ArgumentParser(
                    prog='mutation_chain.py',
                    description='Compute the mutation chain of AFL crash files to visulise the mutation history from seed files to crash' +
                    'This tool just dump json data to the CLI, it is advised to echo them into a file for further analysis (i.e. [command] >> your_file.json)',
                    epilog='Greetings from old zealand'
    )

    parser.add_argument(
        "-m", "--mode", 
        choices = ['single', 'all'], 
        help = 'compute chain for one file or all crash files in supplied directory. In single mode the -f argument is required', 
        required = True
    )

    parser.add_argument(
        "-i", "--input",
        action = 'store',
        help = 'Input directory for the mutation chain tool (the fuzzer\'s output directory)',
        required = True
    )

    parser.add_argument(
        "-n", "--node",
        action = 'store',
        help = '[Only used in single mode; optional] name of the fuzzer node that contains the crash file supplied in the --file argument (e.g. \'fuzzer03\'). Defaults to \'fuzzer01\' if not supplied',
        required = False
    )

    parser.add_argument(
        "-f", "--file",
        action = 'store',
        help = '[Only used in single mode; required] filename of specific crash file (e.g. \'id:000008,sig:06,src:000005,op:havoc,rep:8\')',
        required = False
    )

    args = parser.parse_args()

    if args.mode == "single":

        if args.node == None:
            args.node = "fuzzer01"

        if args.file == None:
            parser.error("'--mode single' requires the '--file' argument.")

        crash_file_path = args.input + '/' + args.node + '/crashes/' + args.file
        if not os.path.isfile(crash_file_path):
            print("Error: \'" + crash_file_path + "\' does not exist.\nPlease verify whether the node and filename are correct.")
            return

    # Create the internal representation of the various queues of parallel fuzzing nodes
    for dir in os.listdir(args.input):
        if re.match("^fuzzer\\d+", dir):
            queues[dir] = fillDictWithFilenameKeys(args.input + '/' + dir + '/queue')

    if args.mode == "all":

        for dir in os.listdir(args.input):
            if re.match("^fuzzer\\d+", dir):
                for filename in os.listdir(args.input + '/' + dir + "/crashes"):
                    if re.match("^id:\\d+", filename):
                        print(filename)
                        crashes[filename] = compute_mutation_chain(filename, dir, 0)

    elif args.mode == "single":

        crashes[args.file] = compute_mutation_chain(args.file, args.node, 0)

    print(json.dumps(crashes, sort_keys=True, indent=4))

if __name__ == '__main__':
    main()