File: conversation.lua

package info (click to toggle)
wireshark 4.6.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 351,352 kB
  • sloc: ansic: 3,102,875; cpp: 129,717; xml: 100,972; python: 56,513; perl: 24,575; sh: 5,874; lex: 4,383; pascal: 4,304; makefile: 166; ruby: 113; objc: 91; tcl: 35
file content (152 lines) | stat: -rw-r--r-- 6,091 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
----------------------------------------
-- script-name: protofield.lua
-- test the ProtoField API
----------------------------------------

local testlib = require("testlib")

local FIRSTPASS = "first-pass"
local FRAME = "frame"
local PER_FRAME = "per-frame"
local OTHER = "other"

-- expected number of runs
local n_frames = 4
local taptests = {
    [FIRSTPASS]=14,
    [FRAME]=n_frames,
    [PER_FRAME]=n_frames*8-1,
    [OTHER]=1,
}
testlib.init(taptests)

------------- test script ------------

----------------------------------------
local test_proto = Proto("test", "Test Proto")

local numinits = 0
function test_proto.init()
    numinits = numinits + 1
    if numinits == 2 then
        testlib.getResults()
    end
end

-- Helper functions for testing
local conv_reg = {}
function unique_conv(name, conv)
    if conv == nil then
        return false
    end

    for k, v in pairs(conv_reg) do
        if (name == k) or (conv == v) then
            return false
        end
    end

    conv_reg[name] = conv
    return true
end

function existing_conv(conv_key, conv)
    return conv_reg[conv_key] == conv
end

-- Check functions, for pcall
function check_find(compare, fnum, ctype, addr_1, addr_2, port_1, port_2, create)
    return compare == Conversation.find(fnum, ctype, addr_1, addr_2, port_1, port_2, create)
end

function check_find_pinfo(compare, pinfo, create)
    return compare == Conversation.find_from_pinfo(pinfo, create)
end

function check_find_id(compare, frame_num, conv_type, conv_id, create)
    return compare == Conversation.find_by_id(frame_num, conv_type, conv_id, create)
end

-- Test variables, to be maintained throughout test
local frame_num = nil
local conv_type = convtypes.TCP
local conv_id   = 62
local addr1 = Address.ip('127.0.0.1')
local port1 = 65333
local addr2 = nil
local port2 = nil


-- Test conversation handling
function test_proto.dissector(buf, pinfo, root)
    testlib.countPacket(FRAME)

    -- First packet of dissector
    if testlib.getPktCount(FRAME) == 1 then
        frame_num = pinfo.number

        -- Check that create == false returns nil when conversation doesn't exist yet
        testlib.test(FIRSTPASS,"Conversation.find(!p1,!p2) == nil",pcall(check_find,nil,frame_num,conv_type,addr1,addr2,nil,nil,false))
        testlib.test(FIRSTPASS,"Conversation.find(!p1, p2) == nil",pcall(check_find,nil,frame_num,conv_type,addr1,addr2,nil,port2,false))
        testlib.test(FIRSTPASS,"Conversation.find( p1,!p2) == nil",pcall(check_find,nil,frame_num,conv_type,addr1,addr2,port1,nil,false))
        testlib.test(FIRSTPASS,"Conversation.find( p1, p2) == nil",pcall(check_find,nil,frame_num,conv_type,addr1,addr2,port1,port2,false))

        testlib.test(FIRSTPASS,"Conversation.find_from_pinfo() == nil",pcall(check_find_pinfo,nil,pinfo,false))
        testlib.test(FIRSTPASS,"Conversation.find_by_id() == nil",pcall(check_find_id,nil,frame_num,conv_type,conv_id,false))

        -- Test conversation creation. Ensure each is unique (i.e. newly created)
        testlib.test(FIRSTPASS,"Conversation.find_from_pinfo",unique_conv("pinfo", Conversation.find_from_pinfo(pinfo)))
        testlib.test(FIRSTPASS,"Conversation.find_by_id",unique_conv("id",Conversation.find_by_id(frame_num,conv_type,conv_id)))
        testlib.test(FIRSTPASS,"Conversation.find",unique_conv("full",Conversation.find(frame_num,conv_type,addr1,addr2,port1,port2)))

        local conv = Conversation.find_from_pinfo(pinfo)

        -- Check that nil is returned if no data has ever been assigned to the conversation
        testlib.test(FIRSTPASS,"conv[proto] == nil", conv[test_proto] == nil)

        -- Check that a non-table value can be stored to and retrieved from the conversation
        conv[test_proto] = 123
        testlib.test(FIRSTPASS,"conv[proto] number", type(conv[test_proto]) == "number")
        testlib.test(FIRSTPASS,"conv[proto] number", conv[test_proto] == 123)

        -- Check that clearing conversation data works as expected
        conv[test_proto] = nil
        testlib.test(FIRSTPASS,"conv[proto] nil", conv[test_proto] == nil)

        -- Check that a table value can be stored to and retrieved from the conversation
        local initial_table = {}
        conv[test_proto] = initial_table
        testlib.test(FIRSTPASS,"conv[proto] table", conv[test_proto] == initial_table)

        -- Check that pinfo.conversation is the same as Conversation.find_from_pinfo
        testlib.test(OTHER, "pinfo.conversation == Conversation.find_from_pinfo", check_find_pinfo(pinfo.conversation, pinfo, true))
    end

    -- Ensure each frame that returned conversations are still the same
    testlib.test(PER_FRAME,"existing conversation (pinfo)", existing_conv("pinfo", Conversation.find_from_pinfo(pinfo)))
    testlib.test(PER_FRAME,"existing conversation (id)", existing_conv("id", Conversation.find_by_id(frame_num, conv_type, conv_id)))
    testlib.test(PER_FRAME,"existing conversation (full)", existing_conv("full", Conversation.find(frame_num,conv_type,addr1,addr2,port1,port2)))

    local data = pinfo.conversation[test_proto]

    -- Check that data hasn't been cleared between frames, and is as expected
    testlib.test(PER_FRAME,"stored data (non-nil)", data ~= nil)
    testlib.test(PER_FRAME,"stored data (is table)", type(data) == "table")
    testlib.test(PER_FRAME,"stored data (len == n-1)", #data == testlib.getPktCount(FRAME)-1)
    if #data > 0 then
        testlib.test(PER_FRAME,"stored data (end == n-1)", data[#data] == testlib.getPktCount(FRAME)-1)
    end

    -- Store current frame number. Check number saved successfully.
    data[#data+1] = testlib.getPktCount(FRAME)
    testlib.test(PER_FRAME,"stored data", #data == testlib.getPktCount(FRAME))

    -- Note: There's no need store in conv[test_proto]. Tables are passed as references

    -- Reached end of frame
    testlib.pass(FRAME)
end

-- Replace default "IP" handler, so that no conversation is created. (The built in IP dissector
-- automatically registers a pinfo conversation).
DissectorTable.get("ethertype"):add(2048, test_proto)