diff --git a/netflow/analyzer.py b/netflow/analyzer.py index eddf41d..23ef6db 100644 --- a/netflow/analyzer.py +++ b/netflow/analyzer.py @@ -232,6 +232,10 @@ if __name__ == "__main__": logger.error("Saved line \"{}\" has no timestamp key!".format(line)) continue + if "header" not in entry[ts]: + logger.error("No header dict in entry {}".format(ts)) + raise ValueError + if entry[ts]["header"]["version"] == 10: logger.warning("Skipped IPFIX entry, because analysis of IPFIX is not yet implemented") continue diff --git a/test_netflow.py b/tests/lib.py old mode 100755 new mode 100644 similarity index 50% rename from test_netflow.py rename to tests/lib.py index cb7041f..f769460 --- a/test_netflow.py +++ b/tests/lib.py @@ -1,29 +1,20 @@ -#!/usr/bin/env python3 - """ -This file contains tests for the NetFlow collector for versions 1, 5 and 9. +This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. + The test packets (defined below as hex streams) were extracted from "real" softflowd exports based on a sample PCAP capture file. Copyright 2016-2020 Dominik Pataky Licensed under MIT License. See LICENSE. """ -import gzip -import ipaddress -import json + +# The flowset with 2 templates (IPv4 and IPv6) and 8 flows with data import queue -import random import socket -import subprocess -import sys import time -import unittest from netflow.collector import ThreadedNetFlowListener -# TODO: tests with 500 packets fail with delay=0. Probably a problem with UDP sockets buffer - -# The flowset with 2 templates (IPv4 and IPv6) and 8 flows with data PACKET_V9_TEMPLATE = "0009000a000000035c9f55980000000100000000000000400400000e00080004000c000400150004" \ "001600040001000400020004000a0004000e000400070002000b00020004000100060001003c0001" \ "00050001000000400800000e001b0010001c001000150004001600040001000400020004000a0004" \ @@ -115,7 +106,7 @@ PACKET_V5 = "00050003000379a35e80c58622a55ab00000000000000000ac110002ac110001000 PACKET_INVALID = "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF" CONNECTION = ('127.0.0.1', 1337) -NUM_PACKETS = 500 +NUM_PACKETS = 100 def emit_packets(packets, delay=0.0001): @@ -148,189 +139,3 @@ def send_recv_packets(packets, delay=0.0001) -> (list, float, float): listener.stop() listener.join() return pkts, tstart, tend - - -class TestFlowExport(unittest.TestCase): - def _test_recv_all_packets(self, num, template_idx, delay=0.0001): - """Fling packets at the server and test that it receives them all""" - - def gen_pkts(n, idx): - for x in range(n): - if x == idx: - yield PACKET_V9_TEMPLATE - else: - yield random.choice(PACKETS_V9) - - pkts, tstart, tend = send_recv_packets(gen_pkts(num, template_idx), delay=delay) - - # check number of packets - self.assertEqual(len(pkts), num) - - # check timestamps are when packets were sent, not processed - self.assertTrue(all(tstart < p.ts < tend for p in pkts)) - - # check number of "things" in the packets (flows + templates) - # template packet = 10 things - # other packets = 12 things - self.assertEqual(sum(p.export.header.count for p in pkts), (num - 1) * 12 + 10) - - # check number of flows in the packets - # template packet = 8 flows (2 templates) - # other packets = 12 flows - self.assertEqual(sum(len(p.export.flows) for p in pkts), (num - 1) * 12 + 8) - - def test_recv_all_packets_template_first(self): - """Test all packets are received when the template is sent first""" - self._test_recv_all_packets(NUM_PACKETS, 0) - - def test_recv_all_packets_template_middle(self): - """Test all packets are received when the template is sent in the middle""" - self._test_recv_all_packets(NUM_PACKETS, NUM_PACKETS // 2) - - def test_recv_all_packets_template_last(self): - """Test all packets are received when the template is sent last""" - self._test_recv_all_packets(NUM_PACKETS, NUM_PACKETS - 1) - - def test_recv_all_packets_slowly(self): - """Test all packets are received when things are sent slooooowwwwwwwwlllllllyyyyyy""" - self._test_recv_all_packets(3, 0, delay=1) - - def test_ignore_invalid_packets(self): - """Test that invalid packets log a warning but are otherwise ignored""" - with self.assertLogs(level='WARNING'): - pkts, _, _ = send_recv_packets([ - PACKET_INVALID, PACKET_V9_TEMPLATE, random.choice(PACKETS_V9), PACKET_INVALID, - random.choice(PACKETS_V9), PACKET_INVALID - ]) - self.assertEqual(len(pkts), 3) - - def test_recv_v1_packet(self): - """Test NetFlow v1 packet parsing""" - pkts, _, _ = send_recv_packets([PACKET_V1]) - self.assertEqual(len(pkts), 1) - - # Take the parsed packet and check meta data - p = pkts[0] - self.assertEqual(p.client[0], "127.0.0.1") # collector listens locally - self.assertEqual(len(p.export.flows), 2) # ping request and reply - self.assertEqual(p.export.header.count, 2) # same value, in header - self.assertEqual(p.export.header.version, 1) - - # Check specific IP address contained in a flow. - # Since it might vary which flow of the pair is epxorted first, check both - flow = p.export.flows[0] - self.assertIn( - ipaddress.ip_address(flow.IPV4_SRC_ADDR), # convert to ipaddress obj because value is int - [ipaddress.ip_address("172.17.0.1"), ipaddress.ip_address("172.17.0.2")] - ) - self.assertEqual(flow.PROTO, 1) # ICMP - - def test_recv_v5_packet(self): - """Test NetFlow v5 packet parsing""" - pkts, _, _ = send_recv_packets([PACKET_V5]) - self.assertEqual(len(pkts), 1) - - p = pkts[0] - self.assertEqual(p.client[0], "127.0.0.1") - self.assertEqual(len(p.export.flows), 3) # ping request and reply, one multicast - self.assertEqual(p.export.header.count, 3) - self.assertEqual(p.export.header.version, 5) - - # Check specific IP address contained in a flow. - # Since it might vary which flow of the pair is epxorted first, check both - flow = p.export.flows[0] - self.assertIn( - ipaddress.ip_address(flow.IPV4_SRC_ADDR), # convert to ipaddress obj because value is int - [ipaddress.ip_address("172.17.0.1"), ipaddress.ip_address("172.17.0.2")] # matches multicast packet too - ) - self.assertEqual(flow.PROTO, 1) # ICMP - - def test_recv_v9_packet(self): - """Test NetFlow v9 packet parsing""" - - # send packet without any template, must fail to parse (packets are queued) - pkts, _, _ = send_recv_packets([PACKETS_V9[0]]) - self.assertEqual(len(pkts), 0) # no export is parsed due to missing template - - # send packet with two templates and eight flows, should parse correctly since the templates are known - pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE]) - self.assertEqual(len(pkts), 1) - - # and again, but with the templates at the end in the packet - pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE_MIXED]) - self.assertEqual(len(pkts), 1) - p = pkts[0] - self.assertEqual(p.client[0], "127.0.0.1") - self.assertEqual(len(p.export.flows), 8) # count flows - self.assertEqual(len(p.export.templates), 2) # count new templates - - # Inspect contents of specific flows - flow = p.export.flows[0] - self.assertEqual(flow.PROTOCOL, 6) # TCP - self.assertEqual(flow.L4_SRC_PORT, 80) - self.assertEqual(flow.IPV4_SRC_ADDR, "127.0.0.1") - - flow = p.export.flows[-1] # last flow - self.assertEqual(flow.PROTOCOL, 17) # UDP - self.assertEqual(flow.L4_DST_PORT, 53) - - # send template and multiple export packets - pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE, *PACKETS_V9]) - self.assertEqual(len(pkts), 4) - self.assertEqual(pkts[0].export.header.version, 9) - - # check amount of flows across all packets - total_flows = 0 - for packet in pkts: - total_flows += len(packet.export.flows) - self.assertEqual(total_flows, 8 + 12 + 12 + 12) - - def test_analyzer(self): - """Test the analyzer by producing some packets, parsing them and then calling the analyzer - in a subprocess, piping in a created gzip JSON collection (as if it is coming from a file). - """ - # First create and parse some packets, which should get exported - pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE, *PACKETS_V9]) - - # Now the pkts must be transformed from their data structure to the "gzipped JSON representation", - # which the collector uses for persistant storage. - data_dicts = [] # list holding all entries - for p in pkts: # each pkt has its own entry with timestamp as key - data_dicts.append({p.ts: { - "client": p.client, - "flows": [f.data for f in p.export.flows] - }}) - data = "\n".join([json.dumps(dd) for dd in data_dicts]) # join all entries together by newlines - - # Different stdout/stderr arguments for backwards compatibility - pipe_output_param = {"capture_output": True} - if sys.version_info < (3, 7): # capture_output was added in Python 3.7 - pipe_output_param = { - "stdout": subprocess.PIPE, - "stderr": subprocess.PIPE - } - - # Analyzer takes gzipped input either via stdin or from a file (here: stdin) - gzipped_input = gzip.compress(data.encode()) # encode to unicode - - # Run analyzer as CLI script with no packets ignored (parameter) - analyzer = subprocess.run( - [sys.executable, '-m', 'netflow.analyzer', '-p', '0'], - input=gzipped_input, - **pipe_output_param - ) - - # If stderr has content, print it - if analyzer.stderr: - print(analyzer.stderr.decode()) - - # Every 2 flows are written as a single line (any extras are dropped) - num_flows = sum(len(list(item.values())[0]["flows"]) for item in data_dicts) - self.assertEqual(len(analyzer.stdout.splitlines()) - 2, num_flows // 2) # ignore two header lines - - # make sure there are no errors - self.assertEqual(analyzer.stderr, b"") - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_analyzer.py b/tests/test_analyzer.py new file mode 100644 index 0000000..aaf161e --- /dev/null +++ b/tests/test_analyzer.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 + +""" +This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. + +Copyright 2016-2020 Dominik Pataky +Licensed under MIT License. See LICENSE. +""" +import gzip +import json +import subprocess +import sys +import unittest + +from tests.lib import * + + +class TestFlowExportAnalyzer(unittest.TestCase): + def test_analyzer(self): + """Test the analyzer by producing some packets, parsing them and then calling the analyzer + in a subprocess, piping in a created gzip JSON collection (as if it is coming from a file). + """ + # First create and parse some packets, which should get exported + pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE, *PACKETS_V9]) + + # Now the pkts must be transformed from their data structure to the "gzipped JSON representation", + # which the collector uses for persistant storage. + data_dicts = [] # list holding all entries + for p in pkts: # each pkt has its own entry with timestamp as key + data_dicts.append({p.ts: { + "client": p.client, + "header": p.export.header.to_dict(), + "flows": [f.data for f in p.export.flows] + }}) + data = "\n".join([json.dumps(dd) for dd in data_dicts]) # join all entries together by newlines + + # Different stdout/stderr arguments for backwards compatibility + pipe_output_param = {"capture_output": True} + if sys.version_info < (3, 7): # capture_output was added in Python 3.7 + pipe_output_param = { + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE + } + + # Analyzer takes gzipped input either via stdin or from a file (here: stdin) + gzipped_input = gzip.compress(data.encode()) # encode to unicode + + # Run analyzer as CLI script with no packets ignored (parameter) + analyzer = subprocess.run( + [sys.executable, '-m', 'netflow.analyzer', '-p', '0'], + input=gzipped_input, + **pipe_output_param + ) + + # If stderr has content, print it + # make sure there are no errors + self.assertEqual(analyzer.stderr, b"", analyzer.stderr.decode()) + + # Every 2 flows are written as a single line (any extras are dropped) + num_flows = sum(len(list(item.values())[0]["flows"]) for item in data_dicts) + self.assertEqual(len(analyzer.stdout.splitlines()) - 2, num_flows // 2) # ignore two header lines + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_ipfix.py b/tests/test_ipfix.py new file mode 100644 index 0000000..06afa7e --- /dev/null +++ b/tests/test_ipfix.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 + +""" +This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. + +Copyright 2016-2020 Dominik Pataky +Licensed under MIT License. See LICENSE. +""" +# TODO: tests with 500 packets fail with delay=0. Probably a problem with UDP sockets buffer + +import ipaddress +import random +import unittest + +from tests.lib import * + + +class TestFlowExportIPFIX(unittest.TestCase): + pass diff --git a/tests/test_netflow.py b/tests/test_netflow.py new file mode 100755 index 0000000..2673f33 --- /dev/null +++ b/tests/test_netflow.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 + +""" +This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. + +Copyright 2016-2020 Dominik Pataky +Licensed under MIT License. See LICENSE. +""" +# TODO: tests with 500 packets fail with delay=0. Probably a problem with UDP sockets buffer + +import ipaddress +import random +import unittest + +from tests.lib import * + + +class TestFlowExportNetflow(unittest.TestCase): + def _test_recv_all_packets(self, num, template_idx, delay=0.0001): + """Fling packets at the server and test that it receives them all""" + + def gen_pkts(n, idx): + for x in range(n): + if x == idx: + yield PACKET_V9_TEMPLATE + else: + yield random.choice(PACKETS_V9) + + pkts, tstart, tend = send_recv_packets(gen_pkts(num, template_idx), delay=delay) + + # check number of packets + self.assertEqual(len(pkts), num) + + # check timestamps are when packets were sent, not processed + self.assertTrue(all(tstart < p.ts < tend for p in pkts)) + + # check number of "things" in the packets (flows + templates) + # template packet = 10 things + # other packets = 12 things + self.assertEqual(sum(p.export.header.count for p in pkts), (num - 1) * 12 + 10) + + # check number of flows in the packets + # template packet = 8 flows (2 templates) + # other packets = 12 flows + self.assertEqual(sum(len(p.export.flows) for p in pkts), (num - 1) * 12 + 8) + + def test_recv_all_packets_template_first(self): + """Test all packets are received when the template is sent first""" + self._test_recv_all_packets(NUM_PACKETS, 0) + + def test_recv_all_packets_template_middle(self): + """Test all packets are received when the template is sent in the middle""" + self._test_recv_all_packets(NUM_PACKETS, NUM_PACKETS // 2) + + def test_recv_all_packets_template_last(self): + """Test all packets are received when the template is sent last""" + self._test_recv_all_packets(NUM_PACKETS, NUM_PACKETS - 1) + + def test_recv_all_packets_slowly(self): + """Test all packets are received when things are sent slooooowwwwwwwwlllllllyyyyyy""" + self._test_recv_all_packets(3, 0, delay=1) + + def test_ignore_invalid_packets(self): + """Test that invalid packets log a warning but are otherwise ignored""" + with self.assertLogs(level='WARNING'): + pkts, _, _ = send_recv_packets([ + PACKET_INVALID, PACKET_V9_TEMPLATE, random.choice(PACKETS_V9), PACKET_INVALID, + random.choice(PACKETS_V9), PACKET_INVALID + ]) + self.assertEqual(len(pkts), 3) + + def test_recv_v1_packet(self): + """Test NetFlow v1 packet parsing""" + pkts, _, _ = send_recv_packets([PACKET_V1]) + self.assertEqual(len(pkts), 1) + + # Take the parsed packet and check meta data + p = pkts[0] + self.assertEqual(p.client[0], "127.0.0.1") # collector listens locally + self.assertEqual(len(p.export.flows), 2) # ping request and reply + self.assertEqual(p.export.header.count, 2) # same value, in header + self.assertEqual(p.export.header.version, 1) + + # Check specific IP address contained in a flow. + # Since it might vary which flow of the pair is epxorted first, check both + flow = p.export.flows[0] + self.assertIn( + ipaddress.ip_address(flow.IPV4_SRC_ADDR), # convert to ipaddress obj because value is int + [ipaddress.ip_address("172.17.0.1"), ipaddress.ip_address("172.17.0.2")] + ) + self.assertEqual(flow.PROTO, 1) # ICMP + + def test_recv_v5_packet(self): + """Test NetFlow v5 packet parsing""" + pkts, _, _ = send_recv_packets([PACKET_V5]) + self.assertEqual(len(pkts), 1) + + p = pkts[0] + self.assertEqual(p.client[0], "127.0.0.1") + self.assertEqual(len(p.export.flows), 3) # ping request and reply, one multicast + self.assertEqual(p.export.header.count, 3) + self.assertEqual(p.export.header.version, 5) + + # Check specific IP address contained in a flow. + # Since it might vary which flow of the pair is epxorted first, check both + flow = p.export.flows[0] + self.assertIn( + ipaddress.ip_address(flow.IPV4_SRC_ADDR), # convert to ipaddress obj because value is int + [ipaddress.ip_address("172.17.0.1"), ipaddress.ip_address("172.17.0.2")] # matches multicast packet too + ) + self.assertEqual(flow.PROTO, 1) # ICMP + + def test_recv_v9_packet(self): + """Test NetFlow v9 packet parsing""" + + # send packet without any template, must fail to parse (packets are queued) + pkts, _, _ = send_recv_packets([PACKETS_V9[0]]) + self.assertEqual(len(pkts), 0) # no export is parsed due to missing template + + # send packet with two templates and eight flows, should parse correctly since the templates are known + pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE]) + self.assertEqual(len(pkts), 1) + + # and again, but with the templates at the end in the packet + pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE_MIXED]) + self.assertEqual(len(pkts), 1) + p = pkts[0] + self.assertEqual(p.client[0], "127.0.0.1") + self.assertEqual(len(p.export.flows), 8) # count flows + self.assertEqual(len(p.export.templates), 2) # count new templates + + # Inspect contents of specific flows + flow = p.export.flows[0] + self.assertEqual(flow.PROTOCOL, 6) # TCP + self.assertEqual(flow.L4_SRC_PORT, 80) + self.assertEqual(flow.IPV4_SRC_ADDR, "127.0.0.1") + + flow = p.export.flows[-1] # last flow + self.assertEqual(flow.PROTOCOL, 17) # UDP + self.assertEqual(flow.L4_DST_PORT, 53) + + # send template and multiple export packets + pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE, *PACKETS_V9]) + self.assertEqual(len(pkts), 4) + self.assertEqual(pkts[0].export.header.version, 9) + + # check amount of flows across all packets + total_flows = 0 + for packet in pkts: + total_flows += len(packet.export.flows) + self.assertEqual(total_flows, 8 + 12 + 12 + 12) + + +if __name__ == '__main__': + unittest.main()