netflow/tests/test_ipfix.py
Dominik Pataky 5cdb514ffc Ensure compatibility with Python 3.5.3
This commit replaces multiple occurences of new features which were not
yet implemented with Python 3.5.3, which is the reference backwards
compatibility version for this package. The version is based on the
current Python version in Debian Stretch (oldstable). According to
pkgs.org, all other distros use 3.6+, so 3.5.3 is the lower boundary.

Changes:
  * Add maxsize argument to functools.lru_cache decorator
  * Replace f"" with .format()
  * Replace variable type hints "var: type = val" with "# type:" comments
  * Replace pstats.SortKey enum with strings in performance tests

Additionally, various styling fixes were applied.
The version compatibility was tested with tox, pyenv and Python 3.5.3,
but there is no tox.ini yet which automates this test.

Bump patch version number to 0.10.3
Update author's email address.

Resolves #27
2020-04-24 16:52:25 +02:00

101 lines
4.4 KiB
Python

#!/usr/bin/env python3
"""
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
# TODO: tests with 500 packets fail with delay=0. Probably a problem with UDP sockets buffer
# TODO: add test for template withdrawal
import ipaddress
import unittest
from tests.lib import send_recv_packets, PACKET_IPFIX_TEMPLATE, PACKET_IPFIX, PACKET_IPFIX_ETHER, \
PACKET_IPFIX_TEMPLATE_ETHER
class TestFlowExportIPFIX(unittest.TestCase):
def test_recv_ipfix_packet(self):
"""
Test general sending of raw and receiving and parsing of these packets.
If this test runs successfully, the sender thread has sent a raw bytes packet towards a locally
listening collector thread, and the collector has successfully received and parsed the packets.
:return:
"""
# send packet without any template, must fail to parse (packets are queued)
pkts, _, _ = send_recv_packets([PACKET_IPFIX])
self.assertEqual(len(pkts), 0) # no export is parsed due to missing template
# send packet with 5 templates and 20 flows, should parse correctly since the templates are known
pkts, _, _ = send_recv_packets([PACKET_IPFIX_TEMPLATE])
self.assertEqual(len(pkts), 1)
p = pkts[0]
self.assertEqual(p.client[0], "127.0.0.1")
self.assertEqual(len(p.export.flows), 1 + 2 + 2 + 9 + 1 + 2 + 1 + 2) # count flows
self.assertEqual(len(p.export.templates), 4 + 1) # count new templates
# send template and multiple export packets
pkts, _, _ = send_recv_packets([PACKET_IPFIX, PACKET_IPFIX_TEMPLATE, PACKET_IPFIX])
self.assertEqual(len(pkts), 3)
self.assertEqual(pkts[0].export.header.version, 10)
# check amount of flows across all packets
total_flows = 0
for packet in pkts:
total_flows += len(packet.export.flows)
self.assertEqual(total_flows, 2 + 1 + (1 + 2 + 2 + 9 + 1 + 2 + 1 + 2) + 2 + 1)
def test_ipfix_contents(self):
"""
Inspect content of exported flows, eg. test the value of an option flow and the correct
parsing of IPv4 and IPv6 addresses.
:return:
"""
p = send_recv_packets([PACKET_IPFIX_TEMPLATE])[0][0]
flow = p.export.flows[0]
self.assertEqual(flow.meteringProcessId, 2649)
self.assertEqual(flow.selectorAlgorithm, 1)
self.assertEqual(flow.systemInitTimeMilliseconds, 1585735165729)
flow = p.export.flows[1] # HTTPS flow from web server to client
self.assertEqual(flow.destinationIPv4Address, 2886795266)
self.assertEqual(ipaddress.ip_address(flow.destinationIPv4Address),
ipaddress.ip_address("172.17.0.2"))
self.assertEqual(flow.protocolIdentifier, 6) # TCP
self.assertEqual(flow.sourceTransportPort, 443)
self.assertEqual(flow.destinationTransportPort, 57766)
self.assertEqual(flow.tcpControlBits, 0x1b)
flow = p.export.flows[17] # IPv6 flow
self.assertEqual(flow.protocolIdentifier, 17) # UDP
self.assertEqual(flow.sourceIPv6Address, 0xfde66f14e0f196090000affeaffeaffe)
self.assertEqual(ipaddress.ip_address(flow.sourceIPv6Address), # Docker ULA
ipaddress.ip_address("fde6:6f14:e0f1:9609:0:affe:affe:affe"))
def test_ipfix_contents_ether(self):
"""
IPFIX content tests based on exports with the softflowd "-T ether" flag, meaning that layer 2
is included in the export, like MAC addresses.
:return:
"""
pkts, _, _ = send_recv_packets([PACKET_IPFIX_TEMPLATE_ETHER, PACKET_IPFIX_ETHER])
self.assertEqual(len(pkts), 2)
p = pkts[0]
# Inspect contents of specific flows
flow = p.export.flows[0]
self.assertEqual(flow.meteringProcessId, 9)
self.assertEqual(flow.selectorAlgorithm, 1)
self.assertEqual(flow.systemInitTimeMilliseconds, 759538800000)
flow = p.export.flows[1]
self.assertEqual(flow.destinationIPv4Address, 2886795266)
self.assertTrue(hasattr(flow, "sourceMacAddress"))
self.assertTrue(hasattr(flow, "postDestinationMacAddress"))
self.assertEqual(flow.sourceMacAddress, 0x123456affefe)
self.assertEqual(flow.postDestinationMacAddress, 0xaffeaffeaffe)