From cd07885d289f5e6b043463de9bb4aa954ed4d278 Mon Sep 17 00:00:00 2001 From: Dominik Pataky Date: Mon, 30 Mar 2020 16:42:48 +0200 Subject: [PATCH] Improve handling of mixed template/data exports; add test The collector is able to parse templates in an export and then use these templates to parse dataflows inside the same export packet. But the test implementation was based on the assumption, that the templates always arrive first in the packet. Now, a mixed order is also processed successfully. Test included. --- netflow/collector.py | 1 + netflow/v9.py | 20 +++++++++++++++++++- test_netflow.py | 33 +++++++++++++++++++++++++++------ 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/netflow/collector.py b/netflow/collector.py index c8433e3..1c16908 100644 --- a/netflow/collector.py +++ b/netflow/collector.py @@ -121,6 +121,7 @@ class ThreadedNetFlowListener(threading.Thread): continue try: + # templates is passed as reference, updated in V9ExportPacket export = parse_packet(pkt.data, templates) except UnknownNetFlowVersion as e: logger.error("%s, ignoring the packet", e) diff --git a/netflow/v9.py b/netflow/v9.py index 179fcc2..613bdd0 100644 --- a/netflow/v9.py +++ b/netflow/v9.py @@ -330,22 +330,40 @@ class V9ExportPacket: self.flows = [] offset = self.header.length + skipped_flowsets_offsets = [] while offset != len(data): flowset_id = struct.unpack('!H', data[offset:offset+2])[0] if flowset_id == 0: # TemplateFlowSet always have id 0 tfs = V9TemplateFlowSet(data[offset:]) + # Check for any new/changed templates if not self._new_templates: for id_, template in tfs.templates.items(): if id_ not in self.templates or self.templates[id_] != template: self._new_templates = True break + + # Update the templates with the provided templates, even if they are the same self.templates.update(tfs.templates) offset += tfs.length else: + try: + dfs = V9DataFlowSet(data[offset:], self.templates) + self.flows += dfs.flows + offset += dfs.length + except V9TemplateNotRecognized: + # Could not be parsed, continue to check for templates + length = struct.unpack("!H", data[offset+2:offset+4])[0] + skipped_flowsets_offsets.append(offset) + offset += length + + if skipped_flowsets_offsets and self._new_templates: + # Process flowsets in the data slice which occured before the template sets + for offset in skipped_flowsets_offsets: dfs = V9DataFlowSet(data[offset:], self.templates) self.flows += dfs.flows - offset += dfs.length + elif skipped_flowsets_offsets: + raise V9TemplateNotRecognized @property def contains_new_templates(self): diff --git a/test_netflow.py b/test_netflow.py index 4f987dc..12667cd 100755 --- a/test_netflow.py +++ b/test_netflow.py @@ -21,7 +21,7 @@ import unittest from netflow.collector import ThreadedNetFlowListener -# TODO: tests with 500 packets fail? Probably a problem with UDP sockets +# TODO: tests with 500 packets fail with delay=0. Probably a problem with UDP sockets buffer # The flowset with 2 templates (IPv4 and IPv6) and 8 flows with data PACKET_V9_TEMPLATE = "0009000a000000035c9f55980000000100000000000000400400000e00080004000c000400150004" \ @@ -37,6 +37,23 @@ PACKET_V9_TEMPLATE = "0009000a000000035c9f55980000000100000000000000400400000e00 "fb3c1900fb3c18fe0000004c0000000100000000000000000035b3c9110004007f0000017f000001" \ "fb3c1900fb3c18fe0000003c000000010000000000000000b3c9003511000400" +# This packet is special. We take PACKET_V9_TEMPLATE and re-order the templates and flows. +# The first line is the header, the smaller lines the templates and the long lines the flows (limited to 80 chars) +PACKET_V9_TEMPLATE_MIXED = ("0009000a000000035c9f55980000000100000000" # header + "040001447f0000017f000001fb3c1aaafb3c18fd000190100000004b00000000000000000050942c" + "061b04007f0000017f000001fb3c1aaafb3c18fd00000f94000000360000000000000000942c0050" + "061f04007f0000017f000001fb3c1cfcfb3c1a9b0000d3fc0000002a000000000000000000509434" + "061b04007f0000017f000001fb3c1cfcfb3c1a9b00000a490000001e000000000000000094340050" + "061f04007f0000017f000001fb3bb82cfb3ba48b000002960000000300000000000000000050942a" + "061904007f0000017f000001fb3bb82cfb3ba48b00000068000000020000000000000000942a0050" + "061104007f0000017f000001fb3c1900fb3c18fe0000004c0000000100000000000000000035b3c9" + "110004007f0000017f000001fb3c1900fb3c18fe0000003c000000010000000000000000b3c90035" + "11000400" # end of flow segments + "000000400400000e00080004000c000400150004001600040001000400020004" # template 1024 + "000a0004000e000400070002000b00020004000100060001003c000100050001" + "000000400800000e001b0010001c001000150004001600040001000400020004" # template 2048 + "000a0004000e000400070002000b00020004000100060001003c000100050001") + # Three packets without templates, each with 12 flows, anonymized PACKETS_V9 = [ "0009000c000000035c9f55980000000200000000040001e47f0000017f000001fb3c1a17fb3c19fd" @@ -98,10 +115,10 @@ PACKET_V5 = "00050003000379a35e80c58622a55ab00000000000000000ac110002ac110001000 PACKET_INVALID = "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF" CONNECTION = ('127.0.0.1', 1337) -NUM_PACKETS = 100 +NUM_PACKETS = 500 -def emit_packets(packets, delay=0): +def emit_packets(packets, delay=0.0001): """Send the provided packets to the listener""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) for p in packets: @@ -110,7 +127,7 @@ def emit_packets(packets, delay=0): sock.close() -def send_recv_packets(packets, delay=0) -> (list, float, float): +def send_recv_packets(packets, delay=0.0001) -> (list, float, float): """Starts a listener, send packets, receives packets returns a tuple: ([(ts, export), ...], time_started_sending, time_stopped_sending) @@ -134,7 +151,7 @@ def send_recv_packets(packets, delay=0) -> (list, float, float): class TestFlowExport(unittest.TestCase): - def _test_recv_all_packets(self, num, template_idx, delay=0): + def _test_recv_all_packets(self, num, template_idx, delay=0.0001): """Fling packets at the server and test that it receives them all""" def gen_pkts(n, idx): @@ -235,9 +252,13 @@ class TestFlowExport(unittest.TestCase): pkts, _, _ = send_recv_packets([PACKETS_V9[0]]) self.assertEqual(len(pkts), 0) # no export is parsed due to missing template - # send packet with two templates and eight flows, should succeed + # send packet with two templates and eight flows, should parse correctly since the templates are known pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE]) self.assertEqual(len(pkts), 1) + + # and again, but with the templates at the end in the packet + pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE_MIXED]) + self.assertEqual(len(pkts), 1) p = pkts[0] self.assertEqual(p.client[0], "127.0.0.1") self.assertEqual(len(p.export.flows), 8) # count flows