netflow/tests/test_analyzer.py
Dominik Pataky 5cdb514ffc Ensure compatibility with Python 3.5.3
This commit replaces multiple occurences of new features which were not
yet implemented with Python 3.5.3, which is the reference backwards
compatibility version for this package. The version is based on the
current Python version in Debian Stretch (oldstable). According to
pkgs.org, all other distros use 3.6+, so 3.5.3 is the lower boundary.

Changes:
  * Add maxsize argument to functools.lru_cache decorator
  * Replace f"" with .format()
  * Replace variable type hints "var: type = val" with "# type:" comments
  * Replace pstats.SortKey enum with strings in performance tests

Additionally, various styling fixes were applied.
The version compatibility was tested with tox, pyenv and Python 3.5.3,
but there is no tox.ini yet which automates this test.

Bump patch version number to 0.10.3
Update author's email address.

Resolves #27
2020-04-24 16:52:25 +02:00

67 lines
2.6 KiB
Python

#!/usr/bin/env python3
"""
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
import gzip
import json
import subprocess
import sys
import unittest
from tests.lib import *
from tests.lib import PACKET_V9_TEMPLATE, PACKETS_V9
class TestFlowExportAnalyzer(unittest.TestCase):
def test_analyzer(self):
"""Test the analyzer by producing some packets, parsing them and then calling the analyzer
in a subprocess, piping in a created gzip JSON collection (as if it is coming from a file).
"""
# First create and parse some packets, which should get exported
pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE, *PACKETS_V9])
# Now the pkts must be transformed from their data structure to the "gzipped JSON representation",
# which the collector uses for persistant storage.
data_dicts = [] # list holding all entries
for p in pkts: # each pkt has its own entry with timestamp as key
data_dicts.append({p.ts: {
"client": p.client,
"header": p.export.header.to_dict(),
"flows": [f.data for f in p.export.flows]
}})
data = "\n".join([json.dumps(dd) for dd in data_dicts]) # join all entries together by newlines
# Different stdout/stderr arguments for backwards compatibility
pipe_output_param = {"capture_output": True}
if sys.version_info < (3, 7): # capture_output was added in Python 3.7
pipe_output_param = {
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE
}
# Analyzer takes gzipped input either via stdin or from a file (here: stdin)
gzipped_input = gzip.compress(data.encode()) # encode to unicode
# Run analyzer as CLI script with no packets ignored (parameter)
analyzer = subprocess.run(
[sys.executable, '-m', 'netflow.analyzer', '-p', '0'],
input=gzipped_input,
**pipe_output_param
)
# If stderr has content, print it
# make sure there are no errors
self.assertEqual(analyzer.stderr, b"", analyzer.stderr.decode())
# Every 2 flows are written as a single line (any extras are dropped)
num_flows = sum(len(list(item.values())[0]["flows"]) for item in data_dicts)
self.assertEqual(len(analyzer.stdout.splitlines()) - 2, num_flows // 2) # ignore two header lines
if __name__ == '__main__':
unittest.main()