Improve handling of mixed template/data exports; add test
The collector is able to parse templates in an export and then use these templates to parse dataflows inside the same export packet. But the test implementation was based on the assumption, that the templates always arrive first in the packet. Now, a mixed order is also processed successfully. Test included.
This commit is contained in:
parent
d4d6d59713
commit
cd07885d28
|
@ -121,6 +121,7 @@ class ThreadedNetFlowListener(threading.Thread):
|
|||
continue
|
||||
|
||||
try:
|
||||
# templates is passed as reference, updated in V9ExportPacket
|
||||
export = parse_packet(pkt.data, templates)
|
||||
except UnknownNetFlowVersion as e:
|
||||
logger.error("%s, ignoring the packet", e)
|
||||
|
|
|
@ -330,22 +330,40 @@ class V9ExportPacket:
|
|||
self.flows = []
|
||||
|
||||
offset = self.header.length
|
||||
skipped_flowsets_offsets = []
|
||||
while offset != len(data):
|
||||
flowset_id = struct.unpack('!H', data[offset:offset+2])[0]
|
||||
if flowset_id == 0: # TemplateFlowSet always have id 0
|
||||
tfs = V9TemplateFlowSet(data[offset:])
|
||||
|
||||
# Check for any new/changed templates
|
||||
if not self._new_templates:
|
||||
for id_, template in tfs.templates.items():
|
||||
if id_ not in self.templates or self.templates[id_] != template:
|
||||
self._new_templates = True
|
||||
break
|
||||
|
||||
# Update the templates with the provided templates, even if they are the same
|
||||
self.templates.update(tfs.templates)
|
||||
offset += tfs.length
|
||||
else:
|
||||
try:
|
||||
dfs = V9DataFlowSet(data[offset:], self.templates)
|
||||
self.flows += dfs.flows
|
||||
offset += dfs.length
|
||||
except V9TemplateNotRecognized:
|
||||
# Could not be parsed, continue to check for templates
|
||||
length = struct.unpack("!H", data[offset+2:offset+4])[0]
|
||||
skipped_flowsets_offsets.append(offset)
|
||||
offset += length
|
||||
|
||||
if skipped_flowsets_offsets and self._new_templates:
|
||||
# Process flowsets in the data slice which occured before the template sets
|
||||
for offset in skipped_flowsets_offsets:
|
||||
dfs = V9DataFlowSet(data[offset:], self.templates)
|
||||
self.flows += dfs.flows
|
||||
elif skipped_flowsets_offsets:
|
||||
raise V9TemplateNotRecognized
|
||||
|
||||
@property
|
||||
def contains_new_templates(self):
|
||||
|
|
|
@ -21,7 +21,7 @@ import unittest
|
|||
|
||||
from netflow.collector import ThreadedNetFlowListener
|
||||
|
||||
# TODO: tests with 500 packets fail? Probably a problem with UDP sockets
|
||||
# TODO: tests with 500 packets fail with delay=0. Probably a problem with UDP sockets buffer
|
||||
|
||||
# The flowset with 2 templates (IPv4 and IPv6) and 8 flows with data
|
||||
PACKET_V9_TEMPLATE = "0009000a000000035c9f55980000000100000000000000400400000e00080004000c000400150004" \
|
||||
|
@ -37,6 +37,23 @@ PACKET_V9_TEMPLATE = "0009000a000000035c9f55980000000100000000000000400400000e00
|
|||
"fb3c1900fb3c18fe0000004c0000000100000000000000000035b3c9110004007f0000017f000001" \
|
||||
"fb3c1900fb3c18fe0000003c000000010000000000000000b3c9003511000400"
|
||||
|
||||
# This packet is special. We take PACKET_V9_TEMPLATE and re-order the templates and flows.
|
||||
# The first line is the header, the smaller lines the templates and the long lines the flows (limited to 80 chars)
|
||||
PACKET_V9_TEMPLATE_MIXED = ("0009000a000000035c9f55980000000100000000" # header
|
||||
"040001447f0000017f000001fb3c1aaafb3c18fd000190100000004b00000000000000000050942c"
|
||||
"061b04007f0000017f000001fb3c1aaafb3c18fd00000f94000000360000000000000000942c0050"
|
||||
"061f04007f0000017f000001fb3c1cfcfb3c1a9b0000d3fc0000002a000000000000000000509434"
|
||||
"061b04007f0000017f000001fb3c1cfcfb3c1a9b00000a490000001e000000000000000094340050"
|
||||
"061f04007f0000017f000001fb3bb82cfb3ba48b000002960000000300000000000000000050942a"
|
||||
"061904007f0000017f000001fb3bb82cfb3ba48b00000068000000020000000000000000942a0050"
|
||||
"061104007f0000017f000001fb3c1900fb3c18fe0000004c0000000100000000000000000035b3c9"
|
||||
"110004007f0000017f000001fb3c1900fb3c18fe0000003c000000010000000000000000b3c90035"
|
||||
"11000400" # end of flow segments
|
||||
"000000400400000e00080004000c000400150004001600040001000400020004" # template 1024
|
||||
"000a0004000e000400070002000b00020004000100060001003c000100050001"
|
||||
"000000400800000e001b0010001c001000150004001600040001000400020004" # template 2048
|
||||
"000a0004000e000400070002000b00020004000100060001003c000100050001")
|
||||
|
||||
# Three packets without templates, each with 12 flows, anonymized
|
||||
PACKETS_V9 = [
|
||||
"0009000c000000035c9f55980000000200000000040001e47f0000017f000001fb3c1a17fb3c19fd"
|
||||
|
@ -98,10 +115,10 @@ PACKET_V5 = "00050003000379a35e80c58622a55ab00000000000000000ac110002ac110001000
|
|||
PACKET_INVALID = "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
|
||||
|
||||
CONNECTION = ('127.0.0.1', 1337)
|
||||
NUM_PACKETS = 100
|
||||
NUM_PACKETS = 500
|
||||
|
||||
|
||||
def emit_packets(packets, delay=0):
|
||||
def emit_packets(packets, delay=0.0001):
|
||||
"""Send the provided packets to the listener"""
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
for p in packets:
|
||||
|
@ -110,7 +127,7 @@ def emit_packets(packets, delay=0):
|
|||
sock.close()
|
||||
|
||||
|
||||
def send_recv_packets(packets, delay=0) -> (list, float, float):
|
||||
def send_recv_packets(packets, delay=0.0001) -> (list, float, float):
|
||||
"""Starts a listener, send packets, receives packets
|
||||
|
||||
returns a tuple: ([(ts, export), ...], time_started_sending, time_stopped_sending)
|
||||
|
@ -134,7 +151,7 @@ def send_recv_packets(packets, delay=0) -> (list, float, float):
|
|||
|
||||
|
||||
class TestFlowExport(unittest.TestCase):
|
||||
def _test_recv_all_packets(self, num, template_idx, delay=0):
|
||||
def _test_recv_all_packets(self, num, template_idx, delay=0.0001):
|
||||
"""Fling packets at the server and test that it receives them all"""
|
||||
|
||||
def gen_pkts(n, idx):
|
||||
|
@ -235,9 +252,13 @@ class TestFlowExport(unittest.TestCase):
|
|||
pkts, _, _ = send_recv_packets([PACKETS_V9[0]])
|
||||
self.assertEqual(len(pkts), 0) # no export is parsed due to missing template
|
||||
|
||||
# send packet with two templates and eight flows, should succeed
|
||||
# send packet with two templates and eight flows, should parse correctly since the templates are known
|
||||
pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE])
|
||||
self.assertEqual(len(pkts), 1)
|
||||
|
||||
# and again, but with the templates at the end in the packet
|
||||
pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE_MIXED])
|
||||
self.assertEqual(len(pkts), 1)
|
||||
p = pkts[0]
|
||||
self.assertEqual(p.client[0], "127.0.0.1")
|
||||
self.assertEqual(len(p.export.flows), 8) # count flows
|
||||
|
|
Loading…
Reference in a new issue