Rename test file; fix analyzer test

Tests are now all running, not skipping the analyzer test.
Adapted to the new CLI calling method for the subprocess.
This commit is contained in:
Dominik Pataky 2020-03-29 22:33:26 +02:00
parent abce1f57dd
commit 5765fa31cf

View file

@ -19,7 +19,7 @@ import sys
import time
import unittest
from main import NetFlowListener
from netflow.collector import ThreadedNetFlowListener
# TODO: tests with 500 packets fail? Probably a problem with UDP sockets
@ -115,7 +115,7 @@ def send_recv_packets(packets, delay=0) -> (list, float, float):
returns a tuple: ([(ts, export), ...], time_started_sending, time_stopped_sending)
"""
listener = NetFlowListener(*CONNECTION)
listener = ThreadedNetFlowListener(*CONNECTION)
tstart = time.time()
emit_packets(packets, delay=delay)
time.sleep(0.5) # Allow packets to be sent and recieved
@ -251,11 +251,22 @@ class TestFlowExport(unittest.TestCase):
total_flows += len(packet.export.flows)
self.assertEqual(total_flows, 8 + 12 + 12 + 12)
@unittest.skip("Test is not adapted to current analyzer script")
def test_analyzer(self):
"""Test that the analyzer doesn't break and outputs the correct number of lines"""
"""Test the analyzer by producing some packets, parsing them and then calling the analyzer
in a subprocess, piping in a created gzip JSON collection (as if it is coming from a file).
"""
# First create and parse some packets, which should get exported
pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE, *PACKETS_V9])
data = {p.ts: [f.data for f in p.export.flows] for p in pkts}
# Now the pkts must be transformed from their data structure to the "gzipped JSON representation",
# which the collector uses for persistant storage.
data_dicts = [] # list holding all entries
for p in pkts: # each pkt has its own entry with timestamp as key
data_dicts.append({p.ts: {
"client": p.client,
"flows": [f.data for f in p.export.flows]
}})
data = "\n".join([json.dumps(dd) for dd in data_dicts]) # join all entries together by newlines
# Different stdout/stderr arguments for backwards compatibility
pipe_output_param = {"capture_output": True}
@ -266,23 +277,25 @@ class TestFlowExport(unittest.TestCase):
}
# Analyzer takes gzipped input either via stdin or from a file (here: stdin)
gzipped_input = gzip.compress(json.dumps(data).encode())
gzipped_input = gzip.compress(data.encode()) # encode to unicode
# Run analyzer as CLI script with no packets ignored (parameter)
analyzer = subprocess.run(
[sys.executable, 'analyzer.py'],
[sys.executable, '-m', 'netflow.analyzer', '-p', '0'],
input=gzipped_input,
**pipe_output_param
)
# If stderr has content, print it
if analyzer.stderr:
print(analyzer.stderr.decode())
# every 2 flows are written as a single line (any extras are dropped)
num_flows = sum(len(f) for f in data.values())
self.assertEqual(len(analyzer.stdout.splitlines()), num_flows // 2)
# Every 2 flows are written as a single line (any extras are dropped)
num_flows = sum(len(list(item.values())[0]["flows"]) for item in data_dicts)
self.assertEqual(len(analyzer.stdout.splitlines()) - 2, num_flows // 2) # ignore two header lines
# make sure there are no errors
self.assertEqual(analyzer.stderr, "")
self.assertEqual(analyzer.stderr, b"")
if __name__ == '__main__':