diff --git a/netflow/collector.py b/netflow/collector.py index c8433e3..1c16908 100644 --- a/netflow/collector.py +++ b/netflow/collector.py @@ -121,6 +121,7 @@ class ThreadedNetFlowListener(threading.Thread): continue try: + # templates is passed as reference, updated in V9ExportPacket export = parse_packet(pkt.data, templates) except UnknownNetFlowVersion as e: logger.error("%s, ignoring the packet", e) diff --git a/netflow/v9.py b/netflow/v9.py index 179fcc2..613bdd0 100644 --- a/netflow/v9.py +++ b/netflow/v9.py @@ -330,22 +330,40 @@ class V9ExportPacket: self.flows = [] offset = self.header.length + skipped_flowsets_offsets = [] while offset != len(data): flowset_id = struct.unpack('!H', data[offset:offset+2])[0] if flowset_id == 0: # TemplateFlowSet always have id 0 tfs = V9TemplateFlowSet(data[offset:]) + # Check for any new/changed templates if not self._new_templates: for id_, template in tfs.templates.items(): if id_ not in self.templates or self.templates[id_] != template: self._new_templates = True break + + # Update the templates with the provided templates, even if they are the same self.templates.update(tfs.templates) offset += tfs.length else: + try: + dfs = V9DataFlowSet(data[offset:], self.templates) + self.flows += dfs.flows + offset += dfs.length + except V9TemplateNotRecognized: + # Could not be parsed, continue to check for templates + length = struct.unpack("!H", data[offset+2:offset+4])[0] + skipped_flowsets_offsets.append(offset) + offset += length + + if skipped_flowsets_offsets and self._new_templates: + # Process flowsets in the data slice which occured before the template sets + for offset in skipped_flowsets_offsets: dfs = V9DataFlowSet(data[offset:], self.templates) self.flows += dfs.flows - offset += dfs.length + elif skipped_flowsets_offsets: + raise V9TemplateNotRecognized @property def contains_new_templates(self): diff --git a/test_netflow.py b/test_netflow.py index 4f987dc..12667cd 100755 --- a/test_netflow.py +++ b/test_netflow.py @@ -21,7 +21,7 @@ import unittest from netflow.collector import ThreadedNetFlowListener -# TODO: tests with 500 packets fail? Probably a problem with UDP sockets +# TODO: tests with 500 packets fail with delay=0. Probably a problem with UDP sockets buffer # The flowset with 2 templates (IPv4 and IPv6) and 8 flows with data PACKET_V9_TEMPLATE = "0009000a000000035c9f55980000000100000000000000400400000e00080004000c000400150004" \ @@ -37,6 +37,23 @@ PACKET_V9_TEMPLATE = "0009000a000000035c9f55980000000100000000000000400400000e00 "fb3c1900fb3c18fe0000004c0000000100000000000000000035b3c9110004007f0000017f000001" \ "fb3c1900fb3c18fe0000003c000000010000000000000000b3c9003511000400" +# This packet is special. We take PACKET_V9_TEMPLATE and re-order the templates and flows. +# The first line is the header, the smaller lines the templates and the long lines the flows (limited to 80 chars) +PACKET_V9_TEMPLATE_MIXED = ("0009000a000000035c9f55980000000100000000" # header + "040001447f0000017f000001fb3c1aaafb3c18fd000190100000004b00000000000000000050942c" + "061b04007f0000017f000001fb3c1aaafb3c18fd00000f94000000360000000000000000942c0050" + "061f04007f0000017f000001fb3c1cfcfb3c1a9b0000d3fc0000002a000000000000000000509434" + "061b04007f0000017f000001fb3c1cfcfb3c1a9b00000a490000001e000000000000000094340050" + "061f04007f0000017f000001fb3bb82cfb3ba48b000002960000000300000000000000000050942a" + "061904007f0000017f000001fb3bb82cfb3ba48b00000068000000020000000000000000942a0050" + "061104007f0000017f000001fb3c1900fb3c18fe0000004c0000000100000000000000000035b3c9" + "110004007f0000017f000001fb3c1900fb3c18fe0000003c000000010000000000000000b3c90035" + "11000400" # end of flow segments + "000000400400000e00080004000c000400150004001600040001000400020004" # template 1024 + "000a0004000e000400070002000b00020004000100060001003c000100050001" + "000000400800000e001b0010001c001000150004001600040001000400020004" # template 2048 + "000a0004000e000400070002000b00020004000100060001003c000100050001") + # Three packets without templates, each with 12 flows, anonymized PACKETS_V9 = [ "0009000c000000035c9f55980000000200000000040001e47f0000017f000001fb3c1a17fb3c19fd" @@ -98,10 +115,10 @@ PACKET_V5 = "00050003000379a35e80c58622a55ab00000000000000000ac110002ac110001000 PACKET_INVALID = "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF" CONNECTION = ('127.0.0.1', 1337) -NUM_PACKETS = 100 +NUM_PACKETS = 500 -def emit_packets(packets, delay=0): +def emit_packets(packets, delay=0.0001): """Send the provided packets to the listener""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) for p in packets: @@ -110,7 +127,7 @@ def emit_packets(packets, delay=0): sock.close() -def send_recv_packets(packets, delay=0) -> (list, float, float): +def send_recv_packets(packets, delay=0.0001) -> (list, float, float): """Starts a listener, send packets, receives packets returns a tuple: ([(ts, export), ...], time_started_sending, time_stopped_sending) @@ -134,7 +151,7 @@ def send_recv_packets(packets, delay=0) -> (list, float, float): class TestFlowExport(unittest.TestCase): - def _test_recv_all_packets(self, num, template_idx, delay=0): + def _test_recv_all_packets(self, num, template_idx, delay=0.0001): """Fling packets at the server and test that it receives them all""" def gen_pkts(n, idx): @@ -235,9 +252,13 @@ class TestFlowExport(unittest.TestCase): pkts, _, _ = send_recv_packets([PACKETS_V9[0]]) self.assertEqual(len(pkts), 0) # no export is parsed due to missing template - # send packet with two templates and eight flows, should succeed + # send packet with two templates and eight flows, should parse correctly since the templates are known pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE]) self.assertEqual(len(pkts), 1) + + # and again, but with the templates at the end in the packet + pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE_MIXED]) + self.assertEqual(len(pkts), 1) p = pkts[0] self.assertEqual(p.client[0], "127.0.0.1") self.assertEqual(len(p.export.flows), 8) # count flows