1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
|
#!/usr/bin/env python3
import os
srcdir = os.getenv('srcdir')
buf = ["type=LOGIN msg=audit(1143146623.787:142): login pid=2027 uid=0 old auid=4294967295 new auid=848\ntype=SYSCALL msg=audit(1143146623.875:143): arch=c000003e syscall=188 success=yes exit=0 a0=7fffffa9a9f0 a1=3958d11333 a2=5131f0 a3=20 items=1 pid=2027 auid=848 uid=0 gid=0 euid=0 suid=0 fsuid=0 egid=0 sgid=0 fsgid=0 tty=tty3 comm=\"login\" exe=\"/bin/login\" subj=system_u:system_r:local_login_t:s0-s0:c0.c255\n",
"type=USER_LOGIN msg=audit(1143146623.879:146): user pid=2027 uid=0 auid=848 msg=\'uid=848: exe=\"/bin/login\" (hostname=?, addr=?, terminal=tty3 res=success)\'\n",
]
files = ["%s%s" % (srcdir,"/test2.log"), "%s%s" % (srcdir,"/test.log")]
import sys
import time
load_path = '../../bindings/python/python3'
if False:
sys.path.insert(0, load_path)
import auparse
def none_to_null(s):
'used so output matches C version'
if s is None:
return '(null)'
else:
return s
walked_fields = 0
FIELDS_EXPECTED = 403
def walk_test(au, interpret=False):
global walked_fields
event_cnt = 1
au.reset()
if not au.first_record():
print("Error getting first record")
sys.exit(1)
while True:
print("event %d has %d records" % (event_cnt, au.get_num_records()))
record_cnt = 1
while True:
print(" record %d of type %d(%s) has %d fields" % \
(record_cnt,
au.get_type(), au.get_type_name(),
au.get_num_fields()))
print(" line=%d file=%s" % (au.get_line_number(), au.get_filename()))
event = au.get_timestamp()
if event is None:
print("Error getting timestamp - aborting")
sys.exit(1)
print(" event time: %d.%d:%d, host=%s" % (event.sec, event.milli, event.serial, none_to_null(event.host)))
au.first_field()
while True:
if interpret:
print(" %s=%s (%s)" % (au.get_field_name(), au.get_field_str(), au.interpret_field()))
else:
print(" %s=%s" % (au.get_field_name(), au.get_field_str()))
walked_fields += 1
if not au.next_field(): break
print("")
record_cnt += 1
if not au.next_record(): break
event_cnt += 1
if not au.parse_next_event(): break
def light_test(au):
while True:
if not au.first_record():
print("Error getting first record")
sys.exit(1)
print("event has %d records" % (au.get_num_records()))
record_cnt = 1
while True:
print(" record %d of type %d(%s) has %d fields" % \
(record_cnt,
au.get_type(), au.get_type_name(),
au.get_num_fields()))
print(" line=%d file=%s" % (au.get_line_number(), au.get_filename()))
event = au.get_timestamp()
if event is None:
print("Error getting timestamp - aborting")
sys.exit(1)
print(" event time: %d.%d:%d, host=%s" % (event.sec, event.milli, event.serial, none_to_null(event.host)))
print("")
record_cnt += 1
if not au.next_record(): break
if not au.parse_next_event(): break
def simple_search(au, source, where):
if source == auparse.AUSOURCE_FILE:
au = auparse.AuParser(auparse.AUSOURCE_FILE, srcdir + "/test.log");
val = "4294967295"
else:
au = auparse.AuParser(auparse.AUSOURCE_BUFFER_ARRAY, buf)
val = "848"
au.search_add_item("auid", "=", val, auparse.AUSEARCH_RULE_CLEAR)
au.search_set_stop(where)
if not au.search_next_event():
print("Error searching for auid")
else:
print("Found %s = %s" % (au.get_field_name(), au.get_field_str()))
def compound_search(au, how):
au = auparse.AuParser(auparse.AUSOURCE_FILE, srcdir + "/test.log");
if how == auparse.AUSEARCH_RULE_AND:
au.search_add_item("uid", "=", "0", auparse.AUSEARCH_RULE_CLEAR)
au.search_add_item("pid", "=", "13015", how)
au.search_add_item("type", "=", "USER_START", how)
else:
au.search_add_item("auid", "=", "42", auparse.AUSEARCH_RULE_CLEAR)
# should stop on this one
au.search_add_item("auid", "=", "0", how)
au.search_add_item("auid", "=", "500", how)
au.search_set_stop(auparse.AUSEARCH_STOP_FIELD)
if not au.search_next_event():
print("Error searching for auid")
else:
print("Found %s = %s" % (au.get_field_name(), au.get_field_str()))
def feed_callback(au, cb_event_type, event_cnt, interpret=False):
if cb_event_type == auparse.AUPARSE_CB_EVENT_READY:
if not au.first_record():
print("Error getting first record")
sys.exit(1)
print("event %d has %d records" % (event_cnt[0], au.get_num_records()))
record_cnt = 1
while True:
print(" record %d of type %d(%s) has %d fields" % \
(record_cnt,
au.get_type(), au.get_type_name(),
au.get_num_fields()))
print(" line=%d file=%s" % (au.get_line_number(), au.get_filename()))
event = au.get_timestamp()
if event is None:
print("Error getting timestamp - aborting")
sys.exit(1)
print(" event time: %d.%d:%d, host=%s" % (event.sec, event.milli, event.serial, none_to_null(event.host)))
au.first_field()
while True:
if interpret:
print(" %s=%s (%s)" % (au.get_field_name(), au.get_field_str(), au.interpret_field()))
else:
print(" %s=%s" % (au.get_field_name(), au.get_field_str()))
if not au.next_field(): break
print("")
record_cnt += 1
if not au.next_record(): break
event_cnt[0] += 1
au = auparse.AuParser(auparse.AUSOURCE_BUFFER_ARRAY, buf)
print("Starting Test 1, iterate...")
while au.parse_next_event():
if au.find_field("auid"):
print("%s=%s" % (au.get_field_name(), au.get_field_str()))
print("interp auid=%s" % (au.interpret_field()))
else:
print("Error iterating to auid")
print("Test 1 Done\n")
# Reset, now lets go to beginning and walk the list manually */
print("Starting Test 2, walk events, records, and fields...")
walk_test(au, interpret=True)
print("Test 2 Done\n")
# Reset, now lets go to beginning and walk the list manually */
print("Starting Test 3, walk events, records of 1 buffer...")
au = auparse.AuParser(auparse.AUSOURCE_BUFFER, buf[1])
au.reset()
light_test(au);
print("Test 3 Done\n")
print("Starting Test 4, walk events, records of 1 file...")
file1 = "%s%s" % (srcdir,"/test.log")
au = auparse.AuParser(auparse.AUSOURCE_FILE, file1);
walk_test(au);
print("Test 4 Done\n")
print("Starting Test 5, walk events, records of 2 files...")
au = auparse.AuParser(auparse.AUSOURCE_FILE_ARRAY, files);
walk_test(au);
print("Test 5 Done\n")
print("Starting Test 6, search...")
au = auparse.AuParser(auparse.AUSOURCE_BUFFER_ARRAY, buf)
au.search_add_item("auid", "=", "500", auparse.AUSEARCH_RULE_CLEAR)
au.search_set_stop(auparse.AUSEARCH_STOP_EVENT)
if au.search_next_event():
print("Error search found something it shouldn't have")
else:
print("auid = 500 not found...which is correct")
au.search_clear()
au = auparse.AuParser(auparse.AUSOURCE_BUFFER_ARRAY, buf)
#au.search_add_item("auid", "exists", None, auparse.AUSEARCH_RULE_CLEAR)
au.search_add_item("auid", "exists", "", auparse.AUSEARCH_RULE_CLEAR)
au.search_set_stop(auparse.AUSEARCH_STOP_EVENT)
if not au.search_next_event():
print("Error searching for existence of auid")
print("auid exists...which is correct")
print("Testing BUFFER_ARRAY, stop on field")
simple_search(au, auparse.AUSOURCE_BUFFER_ARRAY, auparse.AUSEARCH_STOP_FIELD)
print("Testing BUFFER_ARRAY, stop on record")
simple_search(au, auparse.AUSOURCE_BUFFER_ARRAY, auparse.AUSEARCH_STOP_RECORD)
print("Testing BUFFER_ARRAY, stop on event")
simple_search(au, auparse.AUSOURCE_BUFFER_ARRAY, auparse.AUSEARCH_STOP_EVENT)
print("Testing test.log, stop on field")
simple_search(au, auparse.AUSOURCE_FILE, auparse.AUSEARCH_STOP_FIELD)
print("Testing test.log, stop on record")
simple_search(au, auparse.AUSOURCE_FILE, auparse.AUSEARCH_STOP_RECORD)
print("Testing test.log, stop on event")
simple_search(au, auparse.AUSOURCE_FILE, auparse.AUSEARCH_STOP_EVENT)
print("Test 6 Done\n")
print("Starting Test 7, compound search...")
au = auparse.AuParser(auparse.AUSOURCE_BUFFER_ARRAY, buf)
compound_search(au, auparse.AUSEARCH_RULE_AND)
compound_search(au, auparse.AUSEARCH_RULE_OR)
print("Test 7 Done\n")
print("Starting Test 8, regex search...")
au = auparse.AuParser(auparse.AUSOURCE_BUFFER_ARRAY, buf)
print("Doing regex match...\n")
au = auparse.AuParser(auparse.AUSOURCE_BUFFER_ARRAY, buf)
print("Test 8 Done\n")
# Note: this should match Test 2 exactly
# Note: this should match Test 2 exactly
print("Starting Test 9, buffer feed...")
au = auparse.AuParser(auparse.AUSOURCE_FEED);
event_cnt = 1
au.add_callback(lambda au, cb_event_type, event_cnt: feed_callback(au, cb_event_type, event_cnt, interpret=False), [event_cnt])
chunk_len = 3
for s in buf:
s_len = len(s)
beg = 0
while beg < s_len:
end = min(s_len, beg + chunk_len)
data = s[beg:end]
beg += chunk_len
au.feed(data)
au.flush_feed()
print("Test 9 Done\n")
# Note: this should match Test 4 exactly
print("Starting Test 10, file feed...")
au = auparse.AuParser(auparse.AUSOURCE_FEED);
event_cnt = 1
au.add_callback(lambda au, cb_event_type, event_cnt: feed_callback(au, cb_event_type, event_cnt, interpret=False), [event_cnt])
f = open(srcdir + "/test.log");
while True:
data = f.read(4)
if not data: break
au.feed(data)
au.flush_feed()
print("Test 10 Done\n")
print("Starting Test 11, walk LONG event records from a file...")
au = auparse.AuParser(auparse.AUSOURCE_FILE, "test4.log");
walked_fields = 0
walk_test(au)
if walked_fields != FIELDS_EXPECTED:
print("Error: %i fields expected, but %i read!\n" % \
(FIELDS_EXPECTED, walked_fields))
print("Test 11 Done\n")
print("Finished non-admin tests\n")
au = None
sys.exit(0)
|