From 5765fa31cfca66c73ed7b1f2f15282aa1a8ea3a2 Mon Sep 17 00:00:00 2001 From: Dominik Pataky Date: Sun, 29 Mar 2020 22:33:26 +0200 Subject: [PATCH] Rename test file; fix analyzer test Tests are now all running, not skipping the analyzer test. Adapted to the new CLI calling method for the subprocess. --- tests.py => test_netflow.py | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) rename tests.py => test_netflow.py (89%) diff --git a/tests.py b/test_netflow.py similarity index 89% rename from tests.py rename to test_netflow.py index ea55464..883b053 100755 --- a/tests.py +++ b/test_netflow.py @@ -19,7 +19,7 @@ import sys import time import unittest -from main import NetFlowListener +from netflow.collector import ThreadedNetFlowListener # TODO: tests with 500 packets fail? Probably a problem with UDP sockets @@ -115,7 +115,7 @@ def send_recv_packets(packets, delay=0) -> (list, float, float): returns a tuple: ([(ts, export), ...], time_started_sending, time_stopped_sending) """ - listener = NetFlowListener(*CONNECTION) + listener = ThreadedNetFlowListener(*CONNECTION) tstart = time.time() emit_packets(packets, delay=delay) time.sleep(0.5) # Allow packets to be sent and recieved @@ -251,11 +251,22 @@ class TestFlowExport(unittest.TestCase): total_flows += len(packet.export.flows) self.assertEqual(total_flows, 8 + 12 + 12 + 12) - @unittest.skip("Test is not adapted to current analyzer script") def test_analyzer(self): - """Test that the analyzer doesn't break and outputs the correct number of lines""" + """Test the analyzer by producing some packets, parsing them and then calling the analyzer + in a subprocess, piping in a created gzip JSON collection (as if it is coming from a file). + """ + # First create and parse some packets, which should get exported pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE, *PACKETS_V9]) - data = {p.ts: [f.data for f in p.export.flows] for p in pkts} + + # Now the pkts must be transformed from their data structure to the "gzipped JSON representation", + # which the collector uses for persistant storage. + data_dicts = [] # list holding all entries + for p in pkts: # each pkt has its own entry with timestamp as key + data_dicts.append({p.ts: { + "client": p.client, + "flows": [f.data for f in p.export.flows] + }}) + data = "\n".join([json.dumps(dd) for dd in data_dicts]) # join all entries together by newlines # Different stdout/stderr arguments for backwards compatibility pipe_output_param = {"capture_output": True} @@ -266,23 +277,25 @@ class TestFlowExport(unittest.TestCase): } # Analyzer takes gzipped input either via stdin or from a file (here: stdin) - gzipped_input = gzip.compress(json.dumps(data).encode()) + gzipped_input = gzip.compress(data.encode()) # encode to unicode + # Run analyzer as CLI script with no packets ignored (parameter) analyzer = subprocess.run( - [sys.executable, 'analyzer.py'], + [sys.executable, '-m', 'netflow.analyzer', '-p', '0'], input=gzipped_input, **pipe_output_param ) + # If stderr has content, print it if analyzer.stderr: print(analyzer.stderr.decode()) - # every 2 flows are written as a single line (any extras are dropped) - num_flows = sum(len(f) for f in data.values()) - self.assertEqual(len(analyzer.stdout.splitlines()), num_flows // 2) + # Every 2 flows are written as a single line (any extras are dropped) + num_flows = sum(len(list(item.values())[0]["flows"]) for item in data_dicts) + self.assertEqual(len(analyzer.stdout.splitlines()) - 2, num_flows // 2) # ignore two header lines # make sure there are no errors - self.assertEqual(analyzer.stderr, "") + self.assertEqual(analyzer.stderr, b"") if __name__ == '__main__':