File: header.rb

package info (click to toggle)
ruby-packetfu 2.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,520 kB
  • sloc: ruby: 8,344; makefile: 2
file content (334 lines) | stat: -rw-r--r-- 9,677 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# -*- coding: binary -*-
require 'ipaddr'

module PacketFu
  # Octets implements the addressing scheme for IP.
  #
  # ==== Header Definition
  #
  #  Int32 :ip_addr
  class Octets < Struct.new(:ip_addr)
    include StructFu

    IPV4_RE = /^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/
    def initialize(args={})
      super(
      Int32.new(args[:ip_addr]))
    end

    # Returns the object in string form.
    def to_s
      [self[:ip_addr].to_i].pack("N")
    end

    # Reads a string to populate the object.
    def read(str)
      force_binary(str)
      return self if str.nil?
      self[:ip_addr].read str[0,4]
      self
    end

    # Returns an address in dotted-quad format.
    def to_x
      # This could be slightly faster if we reproduced the code in
      # 'octets()' and didn't have to map to strings.
      self.octets.map(&:to_s).join('.')
    end

    # Returns an address in numerical format.
    def to_i
      self[:ip_addr].to_i
    end

    # Set the IP Address by reading a dotted-quad address.
    def read_quad(str)
      match = IPV4_RE.match(str)
      if match.nil?
        raise ArgumentError.new("str is not a valid IPV4 address")
      end
        a = match[1].to_i
        b = match[2].to_i
        c = match[3].to_i
        d = match[4].to_i
      unless (a >= 0 && a <= 255 &&
              b >= 0 && b <= 255 &&
              c >= 0 && c <= 255 &&
              d >= 0 && d <= 255)
        raise ArgumentError.new("str is not a valid IPV4 address")
      end

      self[:ip_addr].value = (a<<24) + (b<<16) + (c<<8) + d
      self
    end

    # Returns the IP address as 4 octets
    def octets
      addr = self.to_i
      [
        ((addr >> 24) & 0xff),
        ((addr >> 16) & 0xff),
        ((addr >> 8) & 0xff),
        (addr & 0xff)
      ]
    end

    # Returns the value for the first octet
    def o1
      (self.to_i >> 24) & 0xff
    end

    # Returns the value for the second octet
    def o2
      (self.to_i >> 16) & 0xff
    end

    # Returns the value for the third octet
    def o3
      (self.to_i >> 8) & 0xff
    end

    # Returns the value for the fourth octet
    def o4
      self.to_i & 0xff
    end

  end

  # IPHeader is a complete IP struct, used in IPPacket. Most traffic on most networks today is IP-based.
  #
  # For more on IP packets, see http://www.networksorcery.com/enp/protocol/ip.htm
  #
  # ==== Header Definition
  #
  #   Integer (4 bits) :ip_v,     Default: 4
  #   Integer (4 bits) :ip_hl,    Default: 5
  #   Int8             :ip_tos,   Default: 0           # TODO: Break out the bits
  #   Int16            :ip_len,   Default: calculated
  #   Int16            :ip_id,    Default: calculated  # IRL, hardly random.
  #   Int16            :ip_frag,  Default: 0           # TODO: Break out the bits
  #   Int8             :ip_ttl,   Default: 64          # https://www.iana.org/assignments/ip-parameters/ip-parameters.xml
  #   Int8             :ip_proto, Default: 0x01        # TCP: 0x06, UDP 0x11, ICMP 0x01
  #   Int16            :ip_sum,   Default: calculated
  #   Octets           :ip_src
  #   Octets           :ip_dst
  #   String           :body
  #
  # Note that IPPackets will always be somewhat incorrect upon initalization,
  # and want an IPHeader#recalc() to become correct before a
  # Packet#to_f or Packet#to_w.
  class IPHeader < Struct.new(:ip_v, :ip_hl, :ip_tos, :ip_len,
                              :ip_id, :ip_frag, :ip_ttl, :ip_proto,
                              :ip_sum, :ip_src, :ip_dst, :body)
    include StructFu

    def initialize(args={})
      @random_id = rand(0xffff)
      super(
        (args[:ip_v] || 4),
        (args[:ip_hl] || 5),
        Int8.new(args[:ip_tos]),
        Int16.new(args[:ip_len] || 20),
        Int16.new(args[:ip_id] || ip_calc_id),
        Int16.new(args[:ip_frag]),
        Int8.new(args[:ip_ttl] || 64),
        Int8.new(args[:ip_proto]),
        Int16.new(args[:ip_sum] || ip_calc_sum),
        Octets.new.read(args[:ip_src] || "\x00\x00\x00\x00"),
        Octets.new.read(args[:ip_dst] || "\x00\x00\x00\x00"),
        StructFu::String.new.read(args[:body])
      )
    end

    # Returns the object in string form.
    def to_s
      byte_v_hl = [(self.ip_v << 4) + self.ip_hl].pack("C")
      byte_v_hl + (self.to_a[2,10].map {|x| x.to_s}.join)
    end

    # Reads a string to populate the object.
    def read(str)
      force_binary(str)
      return self if str.nil?
      self[:ip_v] = str[0,1].unpack("C").first >> 4
      self[:ip_hl] = str[0,1].unpack("C").first.to_i & 0x0f
      self[:ip_tos].read(str[1,1])
      self[:ip_len].read(str[2,2])
      self[:ip_id].read(str[4,2])
      self[:ip_frag].read(str[6,2])
      self[:ip_ttl].read(str[8,1])
      self[:ip_proto].read(str[9,1])
      self[:ip_sum].read(str[10,2])
      self[:ip_src].read(str[12,4])
      self[:ip_dst].read(str[16,4])
      self[:body].read(str[20,str.size]) if str.size > 20
      self
    end

    # Setter for the version.
    def ip_v=(i); self[:ip_v] = i.to_i; end
    # Getter for the version.
    def ip_v; self[:ip_v].to_i; end
    # Setter for the header length (divide by 4)
    def ip_hl=(i); self[:ip_hl] = i.to_i; end
    # Getter for the header length (multiply by 4)
    def ip_hl; self[:ip_hl].to_i; end
    # Setter for the differentiated services
    def ip_tos=(i); typecast i; end
    # Getter for the differentiated services
    def ip_tos; self[:ip_tos].to_i; end
    # Setter for total length.
    def ip_len=(i); typecast i; end
    # Getter for total length.
    def ip_len; self[:ip_len].to_i; end
    # Setter for the identication number.
    def ip_id=(i); typecast i; end
    # Getter for the identication number.
    def ip_id; self[:ip_id].to_i; end
    # Setter for the fragmentation ID.
    def ip_frag=(i); typecast i; end
    # Getter for the fragmentation ID.
    def ip_frag; self[:ip_frag].to_i; end
    # Setter for the time to live.
    def ip_ttl=(i); typecast i; end
    # Getter for the time to live.
    def ip_ttl; self[:ip_ttl].to_i; end
    # Setter for the protocol number.
    def ip_proto=(i); typecast i; end
    # Getter for the protocol number.
    def ip_proto; self[:ip_proto].to_i; end
    # Setter for the checksum.
    def ip_sum=(i); typecast i; end
    # Getter for the checksum.
    def ip_sum; self[:ip_sum].to_i; end
    # Setter for the source IP address.
    def ip_src=(i)
      case i
      when Numeric
        self[:ip_src] = Octets.new.read([i].pack("N"))
      when Octets
        self[:ip_src] = i
      else
        typecast i
      end
    end
    # Getter for the source IP address.
    def ip_src; self[:ip_src].to_i; end
    # Setter for the destination IP address.
    def ip_dst=(i)
      case i
      when Numeric
        self[:ip_dst] = Octets.new.read([i].pack("N"))
      when Octets
        self[:ip_dst] = i
      else
        typecast i
      end
    end
    # Getter for the destination IP address.
    def ip_dst; self[:ip_dst].to_i; end

    # Calulcate the true length of the packet.
    def ip_calc_len
      (ip_hl * 4) + body.to_s.length
    end

    # Return the claimed header length
    def ip_hlen
      (ip_hl * 4)
    end

    # Calculate the true checksum of the packet.
    # (Yes, this is the long way to do it, but it's e-z-2-read for mathtards like me.)
    def ip_calc_sum
      checksum =  (((self.ip_v  <<  4) + self.ip_hl) << 8) + self.ip_tos
      checksum += self.ip_len
      checksum +=	self.ip_id
      checksum += self.ip_frag
      checksum +=	(self.ip_ttl << 8) + self.ip_proto
      checksum += (self.ip_src >> 16)
      checksum += (self.ip_src & 0xffff)
      checksum += (self.ip_dst >> 16)
      checksum += (self.ip_dst & 0xffff)
      checksum = checksum % 0xffff
      checksum = 0xffff - checksum
      checksum == 0 ? 0xffff : checksum
    end

    # Retrieve the IP ID
    def ip_calc_id
      @random_id
    end

    # Sets a more readable IP address. If you wants to manipulate individual octets,
    # (eg, for host scanning in one network), it would be better use ip_src.o1 through
    # ip_src.o4 instead.
    def ip_saddr=(addr)
      self[:ip_src].read_quad(addr)
    end

    # Returns a more readable IP source address.
    def ip_saddr
      self[:ip_src].to_x
    end

    # Sets a more readable IP address.
    def ip_daddr=(addr)
      self[:ip_dst].read_quad(addr)
    end

    # Returns a more readable IP destination address.
    def ip_daddr
      self[:ip_dst].to_x
    end

    # Translate various formats of IPv4 Addresses to an array of digits.
    def self.octet_array(addr)
      if addr.class == String
        oa = addr.split('.').collect {|x| x.to_i}
      elsif addr.kind_of? Integer
        oa = IPAddr.new(addr, Socket::AF_INET).to_s.split('.')
      elsif addr.kind_of? Array
        oa = addr
      else
        raise ArgumentError, "IP Address should be a dotted quad string, an array of ints, or a bignum"
      end
    end

    # Recalculate the calculated IP fields. Valid arguments are:
    #   :all
    #   :ip_len
    #   :ip_sum
    #   :ip_id
    def ip_recalc(arg=:all)
      case arg
      when :ip_len
        self.ip_len=ip_calc_len
      when :ip_sum
        self.ip_sum=ip_calc_sum
      when :ip_id
        @random_id = rand(0xffff)
      when :all
        self.ip_id=		ip_calc_id
        self.ip_len=	ip_calc_len
        self.ip_sum=	ip_calc_sum
      else
        raise ArgumentError, "No such field `#{arg}'"
      end
    end

    # Readability aliases

    alias :ip_src_readable :ip_saddr
    alias :ip_dst_readable :ip_daddr

    def ip_id_readable
      "0x%04x" % ip_id
    end

    def ip_sum_readable
      "0x%04x" % ip_sum
    end

  end
end