258b7c1e0b
The static packets in the tests are back in lib.py to avoid circular imports. A new packet generator function was added.
63 lines
2.7 KiB
Python
63 lines
2.7 KiB
Python
#!/usr/bin/env python3
|
|
|
|
"""
|
|
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
|
|
Copyright 2016-2020 Dominik Pataky <dev@bitkeks.eu>
|
|
Licensed under MIT License. See LICENSE.
|
|
"""
|
|
# TODO: tests with 500 packets fail with delay=0. Probably a problem with UDP sockets buffer
|
|
|
|
import ipaddress
|
|
import unittest
|
|
|
|
from tests.lib import send_recv_packets, PACKET_IPFIX_TEMPLATE, PACKET_IPFIX
|
|
|
|
|
|
class TestFlowExportIPFIX(unittest.TestCase):
|
|
"""Test IPFIX packet parsing
|
|
"""
|
|
def test_recv_ipfix_packet(self):
|
|
# send packet without any template, must fail to parse (packets are queued)
|
|
pkts, _, _ = send_recv_packets([PACKET_IPFIX])
|
|
self.assertEqual(len(pkts), 0) # no export is parsed due to missing template
|
|
|
|
# send packet with 5 templates and 20 flows, should parse correctly since the templates are known
|
|
pkts, _, _ = send_recv_packets([PACKET_IPFIX_TEMPLATE])
|
|
self.assertEqual(len(pkts), 1)
|
|
p = pkts[0]
|
|
self.assertEqual(p.client[0], "127.0.0.1")
|
|
self.assertEqual(len(p.export.flows), 1 + 2 + 2 + 9 + 1 + 2 + 1 + 2) # count flows
|
|
self.assertEqual(len(p.export.templates), 4 + 1) # count new templates
|
|
|
|
# Inspect contents of specific flows
|
|
flow = p.export.flows[0]
|
|
self.assertEqual(flow.meteringProcessId, 2649)
|
|
self.assertEqual(flow.selectorAlgorithm, 1)
|
|
self.assertEqual(flow.systemInitTimeMilliseconds, 1585735165729)
|
|
|
|
flow = p.export.flows[1] # HTTPS flow from web server to client
|
|
self.assertEqual(flow.destinationIPv4Address, 2886795266)
|
|
self.assertEqual(ipaddress.ip_address(flow.destinationIPv4Address),
|
|
ipaddress.ip_address("172.17.0.2"))
|
|
self.assertEqual(flow.protocolIdentifier, 6) # TCP
|
|
self.assertEqual(flow.sourceTransportPort, 443)
|
|
self.assertEqual(flow.destinationTransportPort, 57766)
|
|
|
|
flow = p.export.flows[17] # IPv6 flow
|
|
self.assertEqual(flow.protocolIdentifier, 17) # UDP
|
|
self.assertEqual(flow.sourceIPv6Address, 337491164212692683663430561043420610562)
|
|
self.assertEqual(ipaddress.ip_address(flow.sourceIPv6Address), # Docker ULA
|
|
ipaddress.ip_address("fde6:6f14:e0f1:9609:0:242:ac11:2"))
|
|
|
|
# send template and multiple export packets
|
|
pkts, _, _ = send_recv_packets([PACKET_IPFIX, PACKET_IPFIX_TEMPLATE, PACKET_IPFIX])
|
|
self.assertEqual(len(pkts), 3)
|
|
self.assertEqual(pkts[0].export.header.version, 10)
|
|
|
|
# check amount of flows across all packets
|
|
total_flows = 0
|
|
for packet in pkts:
|
|
total_flows += len(packet.export.flows)
|
|
self.assertEqual(total_flows, 2 + 1 + (1 + 2 + 2 + 9 + 1 + 2 + 1 + 2) + 2 + 1)
|