Compare commits

...

No commits in common. "debian/latest" and "pristine-tar" have entirely different histories.

31 changed files with 1 additions and 3622 deletions

View file

@ -1,28 +0,0 @@
name: Run Python unit tests
on:
push:
branches: [ master, release ]
pull_request:
workflow_dispatch:
jobs:
test-netflow:
runs-on: ubuntu-20.04
strategy:
matrix:
python:
- "3.5.3" # Debian Stretch
- "3.7.3" # Debian Buster
- "3.9.2" # Debian Bullseye
- "3.11" # Debian Bookworm uses 3.11.1, but it's in a newer pyenv release
steps:
- uses: actions/checkout@v3
- name: Set up Python with pyenv
uses: gabrielfalcao/pyenv-action@v11
with:
default: "${{ matrix.python }}"
- name: Run Python unittests
run: python3 -m unittest

8
.gitignore vendored
View file

@ -1,8 +0,0 @@
.*egg-info.*
build*
dist*
.*python_netflow_v9_softflowd.egg-info/
*.swp
*.swo
__pycache__
*.json

21
LICENSE
View file

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2016-2020 Dominik Pataky <dev@bitkeks.eu>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -1,89 +0,0 @@
# Python NetFlow/IPFIX library
This package contains libraries and tools for **NetFlow versions 1, 5 and 9, and IPFIX**. It is available [on PyPI as "netflow"](https://pypi.org/project/netflow/).
Version 9 is the first NetFlow version using templates. Templates make dynamically sized and configured NetFlow data flowsets possible, which makes the collector's job harder. The library provides the `netflow.parse_packet()` function as the main API point (see below). By importing `netflow.v1`, `netflow.v5` or `netflow.v9` you have direct access to the respective parsing objects, but at the beginning you probably will have more success by running the reference collector (example below) and look into its code. IPFIX (IP Flow Information Export) is based on NetFlow v9 and standardized by the IETF. All related classes are contained in `netflow.ipfix`.
![Data flow diagram](nf-workflow.png)
Copyright 2016-2023 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
## Using the library
If you chose to use the classes provided by this library directly, here's an example for a NetFlow v5 export packet:
1. Create a collector which listens for exported packets on some UDP port. It should then receive UDP packets from exporters.
2. Inside the UDP packets, the NetFlow payload is contained. For NetFlow v5 it should begin with bytes `0005` for example.
3. Call the `netflow.parse_packet()` function with the payload as first argument (takes string, bytes string and hex'd bytes).
Example UDP collector server (receiving exports on port 2055):
```python
import netflow
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("0.0.0.0", 2055))
payload, client = sock.recvfrom(4096) # experimental, tested with 1464 bytes
p = netflow.parse_packet(payload) # Test result: <ExportPacket v5 with 30 records>
print(p.header.version) # Test result: 5
```
Or from hex dump:
```python
import netflow
p = netflow.parse_packet("00050003000379a35e80c58622a...") # see test_netflow.py
assert p.header.version == 5 # NetFlow v5 packet
assert p.flows[0].PROTO == 1 # ICMP flow
```
In NetFlow v9 and IPFIX, templates are used instead of a fixed set of fields (like `PROTO`). See `collector.py` on how to handle these. You **must** store received templates in between exports and pass them to the parser when new packets arrive. Not storing the templates will always result in parsing failures.
## Using the collector and analyzer
Since v0.9.0 the `netflow` library also includes reference implementations of a collector and an analyzer as CLI tools.
These can be used on the CLI with `python3 -m netflow.collector` and `python3 -m netflow.analyzer`. Use the `-h` flag to receive the respective help output with all provided CLI flags.
Example: to start the collector run `python3 -m netflow.collector -p 9000 -D`. This will start a collector instance at port 9000 in debug mode. Point your flow exporter to this port on your host and after some time the first ExportPackets should appear (the flows need to expire first). After you collected some data, the collector exports them into GZIP files, simply named `<timestamp>.gz` (or the filename you specified with `--file`/`-o`).
To analyze the saved traffic, run `python3 -m netflow.analyzer -f <gzip file>`. The output will look similar to the following snippet, with resolved hostnames and services, transferred bytes and connection duration:
2017-10-28 23:17.01: SSH | 4.25M | 15:27 min | local-2 (<IPv4>) to local-1 (<IPv4>)
2017-10-28 23:17.01: SSH | 4.29M | 16:22 min | remote-1 (<IPv4>) to local-2 (<IPv4>)
2017-10-28 23:19.01: HTTP | 22.79M | 47:32 min | uwstream3.somafm.com (173...) to local-1 (<IPv4>)
2017-10-28 23:22.01: HTTPS | 1.21M | 3 sec | fra16s12-in-x0e.1e100.net (2a00:..) to local-1 (<IPv6>)
2017-10-28 23:23.01: SSH | 93.79M | 21 sec | remote-1 (<IPv4>) to local-2 (<IPv4>)
2017-10-28 23:51.01: SSH | 14.08M | 1:23.09 hours | remote-1 (<IPv4>) to local-2 (<IPv4>)
**Please note that the collector and analyzer are experimental reference implementations. Do not rely on them in production monitoring use cases!** In any case I recommend looking into the `netflow/collector.py` and `netflow/analyzer.py` scripts for customization. Feel free to use the code and extend it in your own tool set - that's what the MIT license is for!
## Resources
* [Cisco NetFlow v9 paper](http://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html)
* [RFC 3954 "Cisco Systems NetFlow Services Export Version 9"](https://tools.ietf.org/html/rfc3954)
* [RFC 7011 "IPFIX Protocol Specification"](https://tools.ietf.org/html/rfc7011)
## Development environment
The library was specifically written in combination with NetFlow exports from Hitoshi Irino's fork of [softflowd](https://github.com/irino/softflowd) (v1.0.0) - it should work with every correct NetFlow/IPFIX implementation though. If you stumble upon new custom template fields please let me know, they will make a fine addition to the `netflow.v9.V9_FIELD_TYPES` collection.
### Running and creating tests
The test files contain tests for all use cases in the library, based on real softflowd export packets. Whenever `softflowd` is referenced, a compiled version of softflowd 1.0.0 is meant, which is probably NOT the one in your distribution's package. During the development of this library, two ways of gathering these hex dumps were used. First, the tcpdump/Wireshark export way:
1. Run tcpdump/Wireshark on your public-facing interface (with tcpdump, save the pcap to disk).
2. Produce some sample flows, e.g. surf the web and refresh your mail client. With Wireshark, save the captured packets to disk.
4. Run tcpdump/Wireshark again on a local interface.
4. Run `softflowd` with the `-r <pcap_file>` flag. softflowd reads the captured traffic, produces the flows and exports them. Use the interface you are capturing packets on to send the exports to. E.g. capture on the localhost interface (with `-i lo` or on loopback) and then let softflowd export to `127.0.0.1:1337`.
5. Examine the captured traffic. Use Wireshark and set the `CFLOW` "decode as" dissector on the export packets (e.g. based on the port). The `data` fields should then be shown correctly as Netflow payload.
6. Extract this payload as hex stream. Anonymize the IP addresses with a hex editor if necessary. A recommended hex editor is [bless](https://github.com/afrantzis/bless).
Second, a Docker way:
2. Run a softflowd daemon in the background inside a Docker container, listening on `eth0` and exporting to e.g. `172.17.0.1:1337`.
3. On your host start Wireshark to listen on the Docker bridge.
4. Create some traffic from inside the container.
5. Check the softflow daemon with `softflowctl dump-flows`.
6. If you have some flows shown to you, export them with `softflowctl expire-all`.
7. Your Wireshark should have picked up the epxort packets (it does not matter if there's a port unreachable error).
8. Set the decoder for the packets to `CFLOW` and copy the hex value from the NetFlow packet.
Your exported hex string should begin with `0001`, `0005`, `0009` or `000a`, depending on the version.
The collector is run in a background thread. The difference in transmission speed from the exporting client can lead to different results, possibly caused by race conditions during the usage of the GZIP output file.

11
debian/changelog vendored
View file

@ -1,11 +0,0 @@
netflow (0.12.2-1~deb11u1) bullseye; urgency=medium
* Backport to bullseye
-- David Prévot <dprevot@evolix.fr> Tue, 16 Apr 2024 15:06:38 +0200
netflow (0.12.2-1~deb12u1) bookworm; urgency=low
* Initial release, autogenerated by py2dsp/3.20230219
-- David Prévot <dprevot@evolix.fr> Tue, 16 Apr 2024 12:28:56 +0000

19
debian/control vendored
View file

@ -1,19 +0,0 @@
Source: netflow
Section: python
Priority: optional
Maintainer: David Prévot <dprevot@evolix.fr>
Build-Depends: debhelper-compat (= 13),
dh-python,
python3-all,
python3-setuptools,
Standards-Version: 4.7.0
Testsuite: autopkgtest-pkg-pybuild
Homepage: https://github.com/bitkeks/python-netflow-v9-softflowd
Rules-Requires-Root: no
Package: python3-netflow
Architecture: all
Depends: ${misc:Depends}, ${python3:Depends},
Description: v1, v5, v9 and IPFIX tool suite implemented in Python 3
This package contains libraries and tools for NetFlow versions 1, 5 and
9, and IPFIX.

25
debian/copyright vendored
View file

@ -1,25 +0,0 @@
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: netflow
Upstream-Contact: Dominik Pataky <software+pynetflow@dpataky.eu>
Source: https://github.com/bitkeks/python-netflow-v9-softflowd
Files: *
Copyright: 2016-2020 Dominik Pataky <dev@bitkeks.eu>
License: Expat
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
.
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

1
debian/docs vendored
View file

@ -1 +0,0 @@
README.md

6
debian/gbp.conf vendored
View file

@ -1,6 +0,0 @@
[DEFAULT]
debian-branch = debian/latest
filter = [ '.gitattributes' ]
pristine-tar = True
upstream-branch = upstream/latest
upstream-vcs-tag = v%(version%~%-)s

5
debian/rules vendored
View file

@ -1,5 +0,0 @@
#! /usr/bin/make -f
export PYBUILD_NAME=netflow
%:
dh $@ --with python3 --buildsystem=pybuild

View file

@ -1 +0,0 @@
3.0 (quilt)

View file

@ -1 +0,0 @@
extend-diff-ignore="^[^/]+.(egg-info|dist-info)/"

3
debian/watch vendored
View file

@ -1,3 +0,0 @@
version=4
opts="pgpmode=none, filenamemangle=s%(?:.*?)?v?(\d[\d.]*)\.tar\.gz%@PACKAGE@-$1.tar.gz%" \
https://github.com/bitkeks/python-netflow-v9-softflowd/tags (?:.*?/)?v?(\d[\d.]*)\.tar\.gz

View file

@ -1,10 +0,0 @@
#!/usr/bin/env python3
"""
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
from .utils import parse_packet

View file

@ -1,349 +0,0 @@
#!/usr/bin/env python3
"""
Reference analyzer script for NetFlow Python package.
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
import argparse
import contextlib
import functools
import gzip
import ipaddress
import json
import logging
import os.path
import socket
import sys
from collections import namedtuple
from datetime import datetime
IP_PROTOCOLS = {
1: "ICMP",
6: "TCP",
17: "UDP",
58: "ICMPv6"
}
Pair = namedtuple('Pair', ['src', 'dest'])
logger = logging.getLogger(__name__)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
def printv(message, *args_, **kwargs):
if args.verbose:
print(message.format(*args_, **kwargs))
@functools.lru_cache(maxsize=None)
def resolve_hostname(ip: str) -> str:
if args.no_dns:
# If no DNS resolution is requested, simply return the IP string
return ip
# else resolve the IP address to a hostname and return the hostname
return socket.getfqdn(ip)
def fallback(d, keys):
for k in keys:
if k in d:
return d[k]
raise KeyError(", ".join(keys))
def human_size(size_bytes):
# Calculate a human readable size of the flow
if size_bytes < 1024:
return "%dB" % size_bytes
elif size_bytes / 1024. < 1024:
return "%.2fK" % (size_bytes / 1024.)
elif size_bytes / 1024. ** 2 < 1024:
return "%.2fM" % (size_bytes / 1024. ** 2)
else:
return "%.2fG" % (size_bytes / 1024. ** 3)
def human_duration(seconds):
# Calculate human readable duration times
if seconds < 60:
# seconds
return "%d sec" % seconds
if seconds / 60 > 60:
# hours
return "%d:%02d.%02d hours" % (seconds / 60 ** 2, seconds % 60 ** 2 / 60, seconds % 60)
# minutes
return "%02d:%02d min" % (seconds / 60, seconds % 60)
class Connection:
"""Connection model for two flows.
The direction of the data flow can be seen by looking at the size.
'src' describes the peer which sends more data towards the other. This
does NOT have to mean that 'src' was the initiator of the connection.
"""
def __init__(self, flow1, flow2):
if not flow1 or not flow2:
raise Exception("A connection requires two flows")
# Assume the size that sent the most data is the source
# TODO: this might not always be right, maybe use earlier timestamp?
size1 = fallback(flow1, ['IN_BYTES', 'IN_OCTETS'])
size2 = fallback(flow2, ['IN_BYTES', 'IN_OCTETS'])
if size1 >= size2:
src = flow1
dest = flow2
else:
src = flow2
dest = flow1
# TODO: this next approach uses the lower port as the service identifier
# port1 = fallback(flow1, ['L4_SRC_PORT', 'SRC_PORT'])
# port2 = fallback(flow2, ['L4_SRC_PORT', 'SRC_PORT'])
#
# src = flow1
# dest = flow2
# if port1 > port2:
# src = flow2
# dest = flow1
self.src_flow = src
self.dest_flow = dest
ips = self.get_ips(src)
self.src = ips.src
self.dest = ips.dest
self.src_port = fallback(src, ['L4_SRC_PORT', 'SRC_PORT'])
self.dest_port = fallback(dest, ['L4_DST_PORT', 'DST_PORT'])
self.size = fallback(src, ['IN_BYTES', 'IN_OCTETS'])
# Duration is given in milliseconds
self.duration = src['LAST_SWITCHED'] - src['FIRST_SWITCHED']
if self.duration < 0:
# 32 bit int has its limits. Handling overflow here
# TODO: Should be handled in the collection phase
self.duration = (2 ** 32 - src['FIRST_SWITCHED']) + src['LAST_SWITCHED']
def __repr__(self):
return "<Connection from {} to {}, size {}>".format(
self.src, self.dest, self.human_size)
@staticmethod
def get_ips(flow):
# IPv4
if flow.get('IP_PROTOCOL_VERSION') == 4 or \
'IPV4_SRC_ADDR' in flow or 'IPV4_DST_ADDR' in flow:
return Pair(
ipaddress.ip_address(flow['IPV4_SRC_ADDR']),
ipaddress.ip_address(flow['IPV4_DST_ADDR'])
)
# IPv6
return Pair(
ipaddress.ip_address(flow['IPV6_SRC_ADDR']),
ipaddress.ip_address(flow['IPV6_DST_ADDR'])
)
@property
def human_size(self):
return human_size(self.size)
@property
def human_duration(self):
duration = self.duration // 1000 # uptime in milliseconds, floor it
return human_duration(duration)
@property
def hostnames(self):
# Resolve the IPs of this flows to their hostname
src_hostname = resolve_hostname(self.src.compressed)
dest_hostname = resolve_hostname(self.dest.compressed)
return Pair(src_hostname, dest_hostname)
@property
def service(self):
# Resolve ports to their services, if known
default = "({} {})".format(self.src_port, self.dest_port)
with contextlib.suppress(OSError):
return socket.getservbyport(self.src_port)
with contextlib.suppress(OSError):
return socket.getservbyport(self.dest_port)
return default
@property
def total_packets(self):
return self.src_flow["IN_PKTS"] + self.dest_flow["IN_PKTS"]
if __name__ == "netflow.analyzer":
logger.error("The analyzer is currently meant to be used as a CLI tool only.")
logger.error("Use 'python3 -m netflow.analyzer -h' in your console for additional help.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data")
parser.add_argument("-f", "--file", dest="file", type=str, default=sys.stdin,
help="The file to analyze (defaults to stdin if not provided)")
parser.add_argument("-p", "--packets", dest="packets_threshold", type=int, default=10,
help="Number of packets representing the lower bound in connections to be processed")
parser.add_argument("-v", "--verbose", dest="verbose", action="store_true",
help="Enable verbose output.")
parser.add_argument("--match-host", dest="match_host", type=str, default=None,
help="Filter output by matching on the given host (matches source or destination)")
parser.add_argument("-n", "--no-dns", dest="no_dns", action="store_true",
help="Disable DNS resolving of IP addresses")
args = parser.parse_args()
# Sanity check for IP address
if args.match_host:
try:
match_host = ipaddress.ip_address(args.match_host)
except ValueError:
exit("IP address '{}' is neither IPv4 nor IPv6".format(args.match_host))
# Using a file and using stdin differ in their further usage for gzip.open
file = args.file
mode = "rb" # reading files
if file != sys.stdin and not os.path.exists(file):
exit("File {} does not exist!".format(file))
if file == sys.stdin:
file = sys.stdin.buffer
mode = "rt" # reading from stdin
data = {}
with gzip.open(file, mode) as gzipped:
# "for line in" lazy-loads all lines in the file
for line in gzipped:
entry = json.loads(line)
if len(entry.keys()) != 1:
logger.warning("The line does not have exactly one timestamp key: \"{}\"".format(line.keys()))
try:
ts = list(entry)[0] # timestamp from key
except KeyError:
logger.error("Saved line \"{}\" has no timestamp key!".format(line))
continue
if "header" not in entry[ts]:
logger.error("No header dict in entry {}".format(ts))
raise ValueError
if entry[ts]["header"]["version"] == 10:
logger.warning("Skipped IPFIX entry, because analysis of IPFIX is not yet implemented")
continue
data[ts] = entry[ts]
# Go through data and dissect every flow saved inside the dump
# The following dict holds flows which are looking for a peer, to analyze a duplex 'Connection'.
# For each flow, the destination address is looked up. If the peer is not in the list of pending peers,
# insert this flow, waiting for its peer. If found, take the waiting peer and create a Connection object.
pending = {}
skipped = 0
skipped_threshold = args.packets_threshold
first_line = True # print header line before first line
for key in sorted(data):
timestamp = datetime.fromtimestamp(float(key)).strftime("%Y-%m-%d %H:%M.%S")
client = data[key]["client"]
flows = data[key]["flows"]
for flow in sorted(flows, key=lambda x: x["FIRST_SWITCHED"]):
first_switched = flow["FIRST_SWITCHED"]
if first_switched - 1 in pending:
# TODO: handle fitting, yet mismatching (here: 1 second) pairs
pass
# Find the peer for this connection
if "IPV4_SRC_ADDR" in flow or flow.get("IP_PROTOCOL_VERSION") == 4:
local_peer = flow["IPV4_SRC_ADDR"]
remote_peer = flow["IPV4_DST_ADDR"]
else:
local_peer = flow["IPV6_SRC_ADDR"]
remote_peer = flow["IPV6_DST_ADDR"]
# Match on host filter passed in as argument
if args.match_host and not any([local_peer == args.match_host, remote_peer == args.match_host]):
# If a match_host is given but neither local_peer nor remote_peer match
continue
if first_switched not in pending:
pending[first_switched] = {}
# Match peers
if remote_peer in pending[first_switched]:
# The destination peer put itself into the pending dict, getting and removing entry
peer_flow = pending[first_switched].pop(remote_peer)
if len(pending[first_switched]) == 0:
del pending[first_switched]
else:
# Flow did not find a matching, pending peer - inserting itself
pending[first_switched][local_peer] = flow
continue
con = Connection(flow, peer_flow)
if con.total_packets < skipped_threshold:
skipped += 1
continue
if first_line:
print("{:19} | {:14} | {:8} | {:9} | {:7} | Involved hosts".format("Timestamp", "Service", "Size",
"Duration", "Packets"))
print("-" * 100)
first_line = False
print("{timestamp} | {service:<14} | {size:8} | {duration:9} | {packets:7} | "
"Between {src_host} ({src}) and {dest_host} ({dest})"
.format(timestamp=timestamp, service=con.service.upper(), src_host=con.hostnames.src, src=con.src,
dest_host=con.hostnames.dest, dest=con.dest, size=con.human_size, duration=con.human_duration,
packets=con.total_packets))
if skipped > 0:
print("{skipped} connections skipped, because they had less than {skipped_threshold} packets "
"(this value can be set with the -p flag).".format(skipped=skipped, skipped_threshold=skipped_threshold))
if not args.verbose:
# Exit here if no debugging session was wanted
exit(0)
if len(pending) > 0:
print("\nThere are {pending} first_switched entries left in the pending dict!".format(pending=len(pending)))
all_noise = True
for first_switched, flows in sorted(pending.items(), key=lambda x: x[0]):
for peer, flow in flows.items():
# Ignore all pings, SYN scans and other noise to find only those peers left over which need a fix
if flow["IN_PKTS"] < skipped_threshold:
continue
all_noise = False
src = flow.get("IPV4_SRC_ADDR") or flow.get("IPV6_SRC_ADDR")
src_host = resolve_hostname(src)
src_text = "{}".format(src) if src == src_host else "{} ({})".format(src_host, src)
dst = flow.get("IPV4_DST_ADDR") or flow.get("IPV6_DST_ADDR")
dst_host = resolve_hostname(dst)
dst_text = "{}".format(dst) if dst == dst_host else "{} ({})".format(dst_host, dst)
proto = flow["PROTOCOL"]
size = flow["IN_BYTES"]
packets = flow["IN_PKTS"]
src_port = flow.get("L4_SRC_PORT", 0)
dst_port = flow.get("L4_DST_PORT", 0)
print("From {src_text}:{src_port} to {dst_text}:{dst_port} with "
"proto {proto} and size {size}"
" ({packets} packets)".format(src_text=src_text, src_port=src_port, dst_text=dst_text,
dst_port=dst_port, proto=IP_PROTOCOLS.get(proto, 'UNKNOWN'),
size=human_size(size), packets=packets))
if all_noise:
print("They were all noise!")

View file

@ -1,242 +0,0 @@
#!/usr/bin/env python3
"""
Reference collector script for NetFlow v1, v5, and v9 Python package.
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
import argparse
import gzip
import json
import logging
import queue
import signal
import socket
import socketserver
import threading
import time
from collections import namedtuple
from netflow.ipfix import IPFIXTemplateNotRecognized
from netflow.utils import UnknownExportVersion, parse_packet
from netflow.v9 import V9TemplateNotRecognized
RawPacket = namedtuple('RawPacket', ['ts', 'client', 'data'])
ParsedPacket = namedtuple('ParsedPacket', ['ts', 'client', 'export'])
# Amount of time to wait before dropping an undecodable ExportPacket
PACKET_TIMEOUT = 60 * 60
logger = logging.getLogger("netflow-collector")
ch = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
class QueuingRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0] # get content, [1] would be the socket
self.server.queue.put(RawPacket(time.time(), self.client_address, data))
logger.debug(
"Received %d bytes of data from %s", len(data), self.client_address
)
class QueuingUDPListener(socketserver.ThreadingUDPServer):
"""A threaded UDP server that adds a (time, data) tuple to a queue for
every request it sees
"""
def __init__(self, interface, queue):
self.queue = queue
# If IPv6 interface addresses are used, override the default AF_INET family
if ":" in interface[0]:
self.address_family = socket.AF_INET6
super().__init__(interface, QueuingRequestHandler)
class ThreadedNetFlowListener(threading.Thread):
"""A thread that listens for incoming NetFlow packets, processes them, and
makes them available to consumers.
- When initialized, will start listening for NetFlow packets on the provided
host and port and queuing them for processing.
- When started, will start processing and parsing queued packets.
- When stopped, will shut down the listener and stop processing.
- When joined, will wait for the listener to exit
For example, a simple script that outputs data until killed with CTRL+C:
>>> listener = ThreadedNetFlowListener('0.0.0.0', 2055)
>>> print("Listening for NetFlow packets")
>>> listener.start() # start processing packets
>>> try:
... while True:
... ts, export = listener.get()
... print("Time: {}".format(ts))
... for f in export.flows:
... print(" - {IPV4_SRC_ADDR} sent data to {IPV4_DST_ADDR}"
... "".format(**f))
... finally:
... print("Stopping...")
... listener.stop()
... listener.join()
... print("Stopped!")
"""
def __init__(self, host: str, port: int):
logger.info("Starting the NetFlow listener on {}:{}".format(host, port))
self.output = queue.Queue()
self.input = queue.Queue()
self.server = QueuingUDPListener((host, port), self.input)
self.thread = threading.Thread(target=self.server.serve_forever)
self.thread.start()
self._shutdown = threading.Event()
super().__init__()
def get(self, block=True, timeout=None) -> ParsedPacket:
"""Get a processed flow.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a flow is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the queue.Empty exception if no flow was available within that time.
Otherwise ('block' is false), return a flow if one is immediately
available, else raise the queue.Empty exception ('timeout' is ignored
in that case).
"""
return self.output.get(block, timeout)
def run(self):
# Process packets from the queue
try:
# TODO: use per-client templates
templates = {"netflow": {}, "ipfix": {}}
to_retry = []
while not self._shutdown.is_set():
try:
# 0.5s delay to limit CPU usage while waiting for new packets
pkt = self.input.get(block=True, timeout=0.5) # type: RawPacket
except queue.Empty:
continue
try:
# templates is passed as reference, updated in V9ExportPacket
export = parse_packet(pkt.data, templates)
except UnknownExportVersion as e:
logger.error("%s, ignoring the packet", e)
continue
except (V9TemplateNotRecognized, IPFIXTemplateNotRecognized):
# TODO: differentiate between v9 and IPFIX, use separate to_retry lists
if time.time() - pkt.ts > PACKET_TIMEOUT:
logger.warning("Dropping an old and undecodable v9/IPFIX ExportPacket")
else:
to_retry.append(pkt)
logger.debug("Failed to decode a v9/IPFIX ExportPacket - will "
"re-attempt when a new template is discovered")
continue
if export.header.version == 10:
logger.debug("Processed an IPFIX ExportPacket with length %d.", export.header.length)
else:
logger.debug("Processed a v%d ExportPacket with %d flows.",
export.header.version, export.header.count)
# If any new templates were discovered, dump the unprocessable
# data back into the queue and try to decode them again
if export.header.version in [9, 10] and export.contains_new_templates and to_retry:
logger.debug("Received new template(s)")
logger.debug("Will re-attempt to decode %d old v9/IPFIX ExportPackets", len(to_retry))
for p in to_retry:
self.input.put(p)
to_retry.clear()
self.output.put(ParsedPacket(pkt.ts, pkt.client, export))
finally:
# Only reached when while loop ends
self.server.shutdown()
self.server.server_close()
def stop(self):
logger.info("Shutting down the NetFlow listener")
self._shutdown.set()
def join(self, timeout=None):
self.thread.join(timeout=timeout)
super().join(timeout=timeout)
def get_export_packets(host: str, port: int) -> ParsedPacket:
"""A threaded generator that will yield ExportPacket objects until it is killed
"""
def handle_signal(s, f):
logger.debug("Received signal {}, raising StopIteration".format(s))
raise StopIteration
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
listener = ThreadedNetFlowListener(host, port)
listener.start()
try:
while True:
yield listener.get()
except StopIteration:
pass
finally:
listener.stop()
listener.join()
if __name__ == "netflow.collector":
logger.error("The collector is currently meant to be used as a CLI tool only.")
logger.error("Use 'python3 -m netflow.collector -h' in your console for additional help.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A sample netflow collector.")
parser.add_argument("--host", type=str, default="0.0.0.0",
help="collector listening address")
parser.add_argument("--port", "-p", type=int, default=2055,
help="collector listener port")
parser.add_argument("--file", "-o", type=str, dest="output_file",
default="{}.gz".format(int(time.time())),
help="collector export multiline JSON file")
parser.add_argument("--debug", "-D", action="store_true",
help="Enable debug output")
args = parser.parse_args()
if args.debug:
logger.setLevel(logging.DEBUG)
ch.setLevel(logging.DEBUG)
try:
# With every parsed flow a new line is appended to the output file. In previous versions, this was implemented
# by storing the whole data dict in memory and dumping it regularly onto disk. This was extremely fragile, as
# it a) consumed a lot of memory and CPU (dropping packets since storing one flow took longer than the arrival
# of the next flow) and b) broke the exported JSON file, if the collector crashed during the write process,
# rendering all collected flows during the runtime of the collector useless (the file contained one large JSON
# dict which represented the 'data' dict).
# In this new approach, each received flow is parsed as usual, but it gets appended to a gzipped file each time.
# All in all, this improves in three aspects:
# 1. collected flow data is not stored in memory any more
# 2. received and parsed flows are persisted reliably
# 3. the disk usage of files with JSON and its full strings as keys is reduced by using gzipped files
# This also means that the files have to be handled differently, because they are gzipped and not formatted as
# one single big JSON dump, but rather many little JSON dumps, separated by line breaks.
for ts, client, export in get_export_packets(args.host, args.port):
entry = {ts: {
"client": client,
"header": export.header.to_dict(),
"flows": [flow.data for flow in export.flows]}
}
line = json.dumps(entry).encode() + b"\n" # byte encoded line
with gzip.open(args.output_file, "ab") as fh: # open as append, not reading the whole file
fh.write(line)
except KeyboardInterrupt:
logger.info("Received KeyboardInterrupt, passing through")
pass

View file

@ -1,1028 +0,0 @@
#!/usr/bin/env python3
"""
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Reference: https://tools.ietf.org/html/rfc7011
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
import functools
import struct
from collections import namedtuple
from typing import Optional, Union, List, Dict
FieldType = namedtuple("FieldType", ["id", "name", "type"])
DataType = namedtuple("DataType", ["type", "unpack_format"])
TemplateField = namedtuple("TemplateField", ["id", "length"])
TemplateFieldEnterprise = namedtuple("TemplateFieldEnterprise", ["id", "length", "enterprise_number"])
class IPFIXFieldTypes:
# Source: https://www.iana.org/assignments/ipfix/ipfix-information-elements.csv
iana_field_types = [
(0, "Reserved", ""),
(1, "octetDeltaCount", "unsigned64"),
(2, "packetDeltaCount", "unsigned64"),
(3, "deltaFlowCount", "unsigned64"),
(4, "protocolIdentifier", "unsigned8"),
(5, "ipClassOfService", "unsigned8"),
(6, "tcpControlBits", "unsigned16"),
(7, "sourceTransportPort", "unsigned16"),
(8, "sourceIPv4Address", "ipv4Address"),
(9, "sourceIPv4PrefixLength", "unsigned8"),
(10, "ingressInterface", "unsigned32"),
(11, "destinationTransportPort", "unsigned16"),
(12, "destinationIPv4Address", "ipv4Address"),
(13, "destinationIPv4PrefixLength", "unsigned8"),
(14, "egressInterface", "unsigned32"),
(15, "ipNextHopIPv4Address", "ipv4Address"),
(16, "bgpSourceAsNumber", "unsigned32"),
(17, "bgpDestinationAsNumber", "unsigned32"),
(18, "bgpNextHopIPv4Address", "ipv4Address"),
(19, "postMCastPacketDeltaCount", "unsigned64"),
(20, "postMCastOctetDeltaCount", "unsigned64"),
(21, "flowEndSysUpTime", "unsigned32"),
(22, "flowStartSysUpTime", "unsigned32"),
(23, "postOctetDeltaCount", "unsigned64"),
(24, "postPacketDeltaCount", "unsigned64"),
(25, "minimumIpTotalLength", "unsigned64"),
(26, "maximumIpTotalLength", "unsigned64"),
(27, "sourceIPv6Address", "ipv6Address"),
(28, "destinationIPv6Address", "ipv6Address"),
(29, "sourceIPv6PrefixLength", "unsigned8"),
(30, "destinationIPv6PrefixLength", "unsigned8"),
(31, "flowLabelIPv6", "unsigned32"),
(32, "icmpTypeCodeIPv4", "unsigned16"),
(33, "igmpType", "unsigned8"),
(34, "samplingInterval", "unsigned32"),
(35, "samplingAlgorithm", "unsigned8"),
(36, "flowActiveTimeout", "unsigned16"),
(37, "flowIdleTimeout", "unsigned16"),
(38, "engineType", "unsigned8"),
(39, "engineId", "unsigned8"),
(40, "exportedOctetTotalCount", "unsigned64"),
(41, "exportedMessageTotalCount", "unsigned64"),
(42, "exportedFlowRecordTotalCount", "unsigned64"),
(43, "ipv4RouterSc", "ipv4Address"),
(44, "sourceIPv4Prefix", "ipv4Address"),
(45, "destinationIPv4Prefix", "ipv4Address"),
(46, "mplsTopLabelType", "unsigned8"),
(47, "mplsTopLabelIPv4Address", "ipv4Address"),
(48, "samplerId", "unsigned8"),
(49, "samplerMode", "unsigned8"),
(50, "samplerRandomInterval", "unsigned32"),
(51, "classId", "unsigned8"),
(52, "minimumTTL", "unsigned8"),
(53, "maximumTTL", "unsigned8"),
(54, "fragmentIdentification", "unsigned32"),
(55, "postIpClassOfService", "unsigned8"),
(56, "sourceMacAddress", "macAddress"),
(57, "postDestinationMacAddress", "macAddress"),
(58, "vlanId", "unsigned16"),
(59, "postVlanId", "unsigned16"),
(60, "ipVersion", "unsigned8"),
(61, "flowDirection", "unsigned8"),
(62, "ipNextHopIPv6Address", "ipv6Address"),
(63, "bgpNextHopIPv6Address", "ipv6Address"),
(64, "ipv6ExtensionHeaders", "unsigned32"),
(70, "mplsTopLabelStackSection", "octetArray"),
(71, "mplsLabelStackSection2", "octetArray"),
(72, "mplsLabelStackSection3", "octetArray"),
(73, "mplsLabelStackSection4", "octetArray"),
(74, "mplsLabelStackSection5", "octetArray"),
(75, "mplsLabelStackSection6", "octetArray"),
(76, "mplsLabelStackSection7", "octetArray"),
(77, "mplsLabelStackSection8", "octetArray"),
(78, "mplsLabelStackSection9", "octetArray"),
(79, "mplsLabelStackSection10", "octetArray"),
(80, "destinationMacAddress", "macAddress"),
(81, "postSourceMacAddress", "macAddress"),
(82, "interfaceName", "string"),
(83, "interfaceDescription", "string"),
(84, "samplerName", "string"),
(85, "octetTotalCount", "unsigned64"),
(86, "packetTotalCount", "unsigned64"),
(87, "flagsAndSamplerId", "unsigned32"),
(88, "fragmentOffset", "unsigned16"),
(89, "forwardingStatus", "unsigned8"),
(90, "mplsVpnRouteDistinguisher", "octetArray"),
(91, "mplsTopLabelPrefixLength", "unsigned8"),
(92, "srcTrafficIndex", "unsigned32"),
(93, "dstTrafficIndex", "unsigned32"),
(94, "applicationDescription", "string"),
(95, "applicationId", "octetArray"),
(96, "applicationName", "string"),
(97, "Assigned for NetFlow v9 compatibility", ""),
(98, "postIpDiffServCodePoint", "unsigned8"),
(99, "multicastReplicationFactor", "unsigned32"),
(100, "className", "string"),
(101, "classificationEngineId", "unsigned8"),
(102, "layer2packetSectionOffset", "unsigned16"),
(103, "layer2packetSectionSize", "unsigned16"),
(104, "layer2packetSectionData", "octetArray"),
(128, "bgpNextAdjacentAsNumber", "unsigned32"),
(129, "bgpPrevAdjacentAsNumber", "unsigned32"),
(130, "exporterIPv4Address", "ipv4Address"),
(131, "exporterIPv6Address", "ipv6Address"),
(132, "droppedOctetDeltaCount", "unsigned64"),
(133, "droppedPacketDeltaCount", "unsigned64"),
(134, "droppedOctetTotalCount", "unsigned64"),
(135, "droppedPacketTotalCount", "unsigned64"),
(136, "flowEndReason", "unsigned8"),
(137, "commonPropertiesId", "unsigned64"),
(138, "observationPointId", "unsigned64"),
(139, "icmpTypeCodeIPv6", "unsigned16"),
(140, "mplsTopLabelIPv6Address", "ipv6Address"),
(141, "lineCardId", "unsigned32"),
(142, "portId", "unsigned32"),
(143, "meteringProcessId", "unsigned32"),
(144, "exportingProcessId", "unsigned32"),
(145, "templateId", "unsigned16"),
(146, "wlanChannelId", "unsigned8"),
(147, "wlanSSID", "string"),
(148, "flowId", "unsigned64"),
(149, "observationDomainId", "unsigned32"),
(150, "flowStartSeconds", "dateTimeSeconds"),
(151, "flowEndSeconds", "dateTimeSeconds"),
(152, "flowStartMilliseconds", "dateTimeMilliseconds"),
(153, "flowEndMilliseconds", "dateTimeMilliseconds"),
(154, "flowStartMicroseconds", "dateTimeMicroseconds"),
(155, "flowEndMicroseconds", "dateTimeMicroseconds"),
(156, "flowStartNanoseconds", "dateTimeNanoseconds"),
(157, "flowEndNanoseconds", "dateTimeNanoseconds"),
(158, "flowStartDeltaMicroseconds", "unsigned32"),
(159, "flowEndDeltaMicroseconds", "unsigned32"),
(160, "systemInitTimeMilliseconds", "dateTimeMilliseconds"),
(161, "flowDurationMilliseconds", "unsigned32"),
(162, "flowDurationMicroseconds", "unsigned32"),
(163, "observedFlowTotalCount", "unsigned64"),
(164, "ignoredPacketTotalCount", "unsigned64"),
(165, "ignoredOctetTotalCount", "unsigned64"),
(166, "notSentFlowTotalCount", "unsigned64"),
(167, "notSentPacketTotalCount", "unsigned64"),
(168, "notSentOctetTotalCount", "unsigned64"),
(169, "destinationIPv6Prefix", "ipv6Address"),
(170, "sourceIPv6Prefix", "ipv6Address"),
(171, "postOctetTotalCount", "unsigned64"),
(172, "postPacketTotalCount", "unsigned64"),
(173, "flowKeyIndicator", "unsigned64"),
(174, "postMCastPacketTotalCount", "unsigned64"),
(175, "postMCastOctetTotalCount", "unsigned64"),
(176, "icmpTypeIPv4", "unsigned8"),
(177, "icmpCodeIPv4", "unsigned8"),
(178, "icmpTypeIPv6", "unsigned8"),
(179, "icmpCodeIPv6", "unsigned8"),
(180, "udpSourcePort", "unsigned16"),
(181, "udpDestinationPort", "unsigned16"),
(182, "tcpSourcePort", "unsigned16"),
(183, "tcpDestinationPort", "unsigned16"),
(184, "tcpSequenceNumber", "unsigned32"),
(185, "tcpAcknowledgementNumber", "unsigned32"),
(186, "tcpWindowSize", "unsigned16"),
(187, "tcpUrgentPointer", "unsigned16"),
(188, "tcpHeaderLength", "unsigned8"),
(189, "ipHeaderLength", "unsigned8"),
(190, "totalLengthIPv4", "unsigned16"),
(191, "payloadLengthIPv6", "unsigned16"),
(192, "ipTTL", "unsigned8"),
(193, "nextHeaderIPv6", "unsigned8"),
(194, "mplsPayloadLength", "unsigned32"),
(195, "ipDiffServCodePoint", "unsigned8"),
(196, "ipPrecedence", "unsigned8"),
(197, "fragmentFlags", "unsigned8"),
(198, "octetDeltaSumOfSquares", "unsigned64"),
(199, "octetTotalSumOfSquares", "unsigned64"),
(200, "mplsTopLabelTTL", "unsigned8"),
(201, "mplsLabelStackLength", "unsigned32"),
(202, "mplsLabelStackDepth", "unsigned32"),
(203, "mplsTopLabelExp", "unsigned8"),
(204, "ipPayloadLength", "unsigned32"),
(205, "udpMessageLength", "unsigned16"),
(206, "isMulticast", "unsigned8"),
(207, "ipv4IHL", "unsigned8"),
(208, "ipv4Options", "unsigned32"),
(209, "tcpOptions", "unsigned64"),
(210, "paddingOctets", "octetArray"),
(211, "collectorIPv4Address", "ipv4Address"),
(212, "collectorIPv6Address", "ipv6Address"),
(213, "exportInterface", "unsigned32"),
(214, "exportProtocolVersion", "unsigned8"),
(215, "exportTransportProtocol", "unsigned8"),
(216, "collectorTransportPort", "unsigned16"),
(217, "exporterTransportPort", "unsigned16"),
(218, "tcpSynTotalCount", "unsigned64"),
(219, "tcpFinTotalCount", "unsigned64"),
(220, "tcpRstTotalCount", "unsigned64"),
(221, "tcpPshTotalCount", "unsigned64"),
(222, "tcpAckTotalCount", "unsigned64"),
(223, "tcpUrgTotalCount", "unsigned64"),
(224, "ipTotalLength", "unsigned64"),
(225, "postNATSourceIPv4Address", "ipv4Address"),
(226, "postNATDestinationIPv4Address", "ipv4Address"),
(227, "postNAPTSourceTransportPort", "unsigned16"),
(228, "postNAPTDestinationTransportPort", "unsigned16"),
(229, "natOriginatingAddressRealm", "unsigned8"),
(230, "natEvent", "unsigned8"),
(231, "initiatorOctets", "unsigned64"),
(232, "responderOctets", "unsigned64"),
(233, "firewallEvent", "unsigned8"),
(234, "ingressVRFID", "unsigned32"),
(235, "egressVRFID", "unsigned32"),
(236, "VRFname", "string"),
(237, "postMplsTopLabelExp", "unsigned8"),
(238, "tcpWindowScale", "unsigned16"),
(239, "biflowDirection", "unsigned8"),
(240, "ethernetHeaderLength", "unsigned8"),
(241, "ethernetPayloadLength", "unsigned16"),
(242, "ethernetTotalLength", "unsigned16"),
(243, "dot1qVlanId", "unsigned16"),
(244, "dot1qPriority", "unsigned8"),
(245, "dot1qCustomerVlanId", "unsigned16"),
(246, "dot1qCustomerPriority", "unsigned8"),
(247, "metroEvcId", "string"),
(248, "metroEvcType", "unsigned8"),
(249, "pseudoWireId", "unsigned32"),
(250, "pseudoWireType", "unsigned16"),
(251, "pseudoWireControlWord", "unsigned32"),
(252, "ingressPhysicalInterface", "unsigned32"),
(253, "egressPhysicalInterface", "unsigned32"),
(254, "postDot1qVlanId", "unsigned16"),
(255, "postDot1qCustomerVlanId", "unsigned16"),
(256, "ethernetType", "unsigned16"),
(257, "postIpPrecedence", "unsigned8"),
(258, "collectionTimeMilliseconds", "dateTimeMilliseconds"),
(259, "exportSctpStreamId", "unsigned16"),
(260, "maxExportSeconds", "dateTimeSeconds"),
(261, "maxFlowEndSeconds", "dateTimeSeconds"),
(262, "messageMD5Checksum", "octetArray"),
(263, "messageScope", "unsigned8"),
(264, "minExportSeconds", "dateTimeSeconds"),
(265, "minFlowStartSeconds", "dateTimeSeconds"),
(266, "opaqueOctets", "octetArray"),
(267, "sessionScope", "unsigned8"),
(268, "maxFlowEndMicroseconds", "dateTimeMicroseconds"),
(269, "maxFlowEndMilliseconds", "dateTimeMilliseconds"),
(270, "maxFlowEndNanoseconds", "dateTimeNanoseconds"),
(271, "minFlowStartMicroseconds", "dateTimeMicroseconds"),
(272, "minFlowStartMilliseconds", "dateTimeMilliseconds"),
(273, "minFlowStartNanoseconds", "dateTimeNanoseconds"),
(274, "collectorCertificate", "octetArray"),
(275, "exporterCertificate", "octetArray"),
(276, "dataRecordsReliability", "boolean"),
(277, "observationPointType", "unsigned8"),
(278, "newConnectionDeltaCount", "unsigned32"),
(279, "connectionSumDurationSeconds", "unsigned64"),
(280, "connectionTransactionId", "unsigned64"),
(281, "postNATSourceIPv6Address", "ipv6Address"),
(282, "postNATDestinationIPv6Address", "ipv6Address"),
(283, "natPoolId", "unsigned32"),
(284, "natPoolName", "string"),
(285, "anonymizationFlags", "unsigned16"),
(286, "anonymizationTechnique", "unsigned16"),
(287, "informationElementIndex", "unsigned16"),
(288, "p2pTechnology", "string"),
(289, "tunnelTechnology", "string"),
(290, "encryptedTechnology", "string"),
(291, "basicList", "basicList"),
(292, "subTemplateList", "subTemplateList"),
(293, "subTemplateMultiList", "subTemplateMultiList"),
(294, "bgpValidityState", "unsigned8"),
(295, "IPSecSPI", "unsigned32"),
(296, "greKey", "unsigned32"),
(297, "natType", "unsigned8"),
(298, "initiatorPackets", "unsigned64"),
(299, "responderPackets", "unsigned64"),
(300, "observationDomainName", "string"),
(301, "selectionSequenceId", "unsigned64"),
(302, "selectorId", "unsigned64"),
(303, "informationElementId", "unsigned16"),
(304, "selectorAlgorithm", "unsigned16"),
(305, "samplingPacketInterval", "unsigned32"),
(306, "samplingPacketSpace", "unsigned32"),
(307, "samplingTimeInterval", "unsigned32"),
(308, "samplingTimeSpace", "unsigned32"),
(309, "samplingSize", "unsigned32"),
(310, "samplingPopulation", "unsigned32"),
(311, "samplingProbability", "float64"),
(312, "dataLinkFrameSize", "unsigned16"),
(313, "ipHeaderPacketSection", "octetArray"),
(314, "ipPayloadPacketSection", "octetArray"),
(315, "dataLinkFrameSection", "octetArray"),
(316, "mplsLabelStackSection", "octetArray"),
(317, "mplsPayloadPacketSection", "octetArray"),
(318, "selectorIdTotalPktsObserved", "unsigned64"),
(319, "selectorIdTotalPktsSelected", "unsigned64"),
(320, "absoluteError", "float64"),
(321, "relativeError", "float64"),
(322, "observationTimeSeconds", "dateTimeSeconds"),
(323, "observationTimeMilliseconds", "dateTimeMilliseconds"),
(324, "observationTimeMicroseconds", "dateTimeMicroseconds"),
(325, "observationTimeNanoseconds", "dateTimeNanoseconds"),
(326, "digestHashValue", "unsigned64"),
(327, "hashIPPayloadOffset", "unsigned64"),
(328, "hashIPPayloadSize", "unsigned64"),
(329, "hashOutputRangeMin", "unsigned64"),
(330, "hashOutputRangeMax", "unsigned64"),
(331, "hashSelectedRangeMin", "unsigned64"),
(332, "hashSelectedRangeMax", "unsigned64"),
(333, "hashDigestOutput", "boolean"),
(334, "hashInitialiserValue", "unsigned64"),
(335, "selectorName", "string"),
(336, "upperCILimit", "float64"),
(337, "lowerCILimit", "float64"),
(338, "confidenceLevel", "float64"),
(339, "informationElementDataType", "unsigned8"),
(340, "informationElementDescription", "string"),
(341, "informationElementName", "string"),
(342, "informationElementRangeBegin", "unsigned64"),
(343, "informationElementRangeEnd", "unsigned64"),
(344, "informationElementSemantics", "unsigned8"),
(345, "informationElementUnits", "unsigned16"),
(346, "privateEnterpriseNumber", "unsigned32"),
(347, "virtualStationInterfaceId", "octetArray"),
(348, "virtualStationInterfaceName", "string"),
(349, "virtualStationUUID", "octetArray"),
(350, "virtualStationName", "string"),
(351, "layer2SegmentId", "unsigned64"),
(352, "layer2OctetDeltaCount", "unsigned64"),
(353, "layer2OctetTotalCount", "unsigned64"),
(354, "ingressUnicastPacketTotalCount", "unsigned64"),
(355, "ingressMulticastPacketTotalCount", "unsigned64"),
(356, "ingressBroadcastPacketTotalCount", "unsigned64"),
(357, "egressUnicastPacketTotalCount", "unsigned64"),
(358, "egressBroadcastPacketTotalCount", "unsigned64"),
(359, "monitoringIntervalStartMilliSeconds", "dateTimeMilliseconds"),
(360, "monitoringIntervalEndMilliSeconds", "dateTimeMilliseconds"),
(361, "portRangeStart", "unsigned16"),
(362, "portRangeEnd", "unsigned16"),
(363, "portRangeStepSize", "unsigned16"),
(364, "portRangeNumPorts", "unsigned16"),
(365, "staMacAddress", "macAddress"),
(366, "staIPv4Address", "ipv4Address"),
(367, "wtpMacAddress", "macAddress"),
(368, "ingressInterfaceType", "unsigned32"),
(369, "egressInterfaceType", "unsigned32"),
(370, "rtpSequenceNumber", "unsigned16"),
(371, "userName", "string"),
(372, "applicationCategoryName", "string"),
(373, "applicationSubCategoryName", "string"),
(374, "applicationGroupName", "string"),
(375, "originalFlowsPresent", "unsigned64"),
(376, "originalFlowsInitiated", "unsigned64"),
(377, "originalFlowsCompleted", "unsigned64"),
(378, "distinctCountOfSourceIPAddress", "unsigned64"),
(379, "distinctCountOfDestinationIPAddress", "unsigned64"),
(380, "distinctCountOfSourceIPv4Address", "unsigned32"),
(381, "distinctCountOfDestinationIPv4Address", "unsigned32"),
(382, "distinctCountOfSourceIPv6Address", "unsigned64"),
(383, "distinctCountOfDestinationIPv6Address", "unsigned64"),
(384, "valueDistributionMethod", "unsigned8"),
(385, "rfc3550JitterMilliseconds", "unsigned32"),
(386, "rfc3550JitterMicroseconds", "unsigned32"),
(387, "rfc3550JitterNanoseconds", "unsigned32"),
(388, "dot1qDEI", "boolean"),
(389, "dot1qCustomerDEI", "boolean"),
(390, "flowSelectorAlgorithm", "unsigned16"),
(391, "flowSelectedOctetDeltaCount", "unsigned64"),
(392, "flowSelectedPacketDeltaCount", "unsigned64"),
(393, "flowSelectedFlowDeltaCount", "unsigned64"),
(394, "selectorIDTotalFlowsObserved", "unsigned64"),
(395, "selectorIDTotalFlowsSelected", "unsigned64"),
(396, "samplingFlowInterval", "unsigned64"),
(397, "samplingFlowSpacing", "unsigned64"),
(398, "flowSamplingTimeInterval", "unsigned64"),
(399, "flowSamplingTimeSpacing", "unsigned64"),
(400, "hashFlowDomain", "unsigned16"),
(401, "transportOctetDeltaCount", "unsigned64"),
(402, "transportPacketDeltaCount", "unsigned64"),
(403, "originalExporterIPv4Address", "ipv4Address"),
(404, "originalExporterIPv6Address", "ipv6Address"),
(405, "originalObservationDomainId", "unsigned32"),
(406, "intermediateProcessId", "unsigned32"),
(407, "ignoredDataRecordTotalCount", "unsigned64"),
(408, "dataLinkFrameType", "unsigned16"),
(409, "sectionOffset", "unsigned16"),
(410, "sectionExportedOctets", "unsigned16"),
(411, "dot1qServiceInstanceTag", "octetArray"),
(412, "dot1qServiceInstanceId", "unsigned32"),
(413, "dot1qServiceInstancePriority", "unsigned8"),
(414, "dot1qCustomerSourceMacAddress", "macAddress"),
(415, "dot1qCustomerDestinationMacAddress", "macAddress"),
(416, "", ""),
(417, "postLayer2OctetDeltaCount", "unsigned64"),
(418, "postMCastLayer2OctetDeltaCount", "unsigned64"),
(419, "", ""),
(420, "postLayer2OctetTotalCount", "unsigned64"),
(421, "postMCastLayer2OctetTotalCount", "unsigned64"),
(422, "minimumLayer2TotalLength", "unsigned64"),
(423, "maximumLayer2TotalLength", "unsigned64"),
(424, "droppedLayer2OctetDeltaCount", "unsigned64"),
(425, "droppedLayer2OctetTotalCount", "unsigned64"),
(426, "ignoredLayer2OctetTotalCount", "unsigned64"),
(427, "notSentLayer2OctetTotalCount", "unsigned64"),
(428, "layer2OctetDeltaSumOfSquares", "unsigned64"),
(429, "layer2OctetTotalSumOfSquares", "unsigned64"),
(430, "layer2FrameDeltaCount", "unsigned64"),
(431, "layer2FrameTotalCount", "unsigned64"),
(432, "pseudoWireDestinationIPv4Address", "ipv4Address"),
(433, "ignoredLayer2FrameTotalCount", "unsigned64"),
(434, "mibObjectValueInteger", "signed32"),
(435, "mibObjectValueOctetString", "octetArray"),
(436, "mibObjectValueOID", "octetArray"),
(437, "mibObjectValueBits", "octetArray"),
(438, "mibObjectValueIPAddress", "ipv4Address"),
(439, "mibObjectValueCounter", "unsigned64"),
(440, "mibObjectValueGauge", "unsigned32"),
(441, "mibObjectValueTimeTicks", "unsigned32"),
(442, "mibObjectValueUnsigned", "unsigned32"),
(443, "mibObjectValueTable", "subTemplateList"),
(444, "mibObjectValueRow", "subTemplateList"),
(445, "mibObjectIdentifier", "octetArray"),
(446, "mibSubIdentifier", "unsigned32"),
(447, "mibIndexIndicator", "unsigned64"),
(448, "mibCaptureTimeSemantics", "unsigned8"),
(449, "mibContextEngineID", "octetArray"),
(450, "mibContextName", "string"),
(451, "mibObjectName", "string"),
(452, "mibObjectDescription", "string"),
(453, "mibObjectSyntax", "string"),
(454, "mibModuleName", "string"),
(455, "mobileIMSI", "string"),
(456, "mobileMSISDN", "string"),
(457, "httpStatusCode", "unsigned16"),
(458, "sourceTransportPortsLimit", "unsigned16"),
(459, "httpRequestMethod", "string"),
(460, "httpRequestHost", "string"),
(461, "httpRequestTarget", "string"),
(462, "httpMessageVersion", "string"),
(463, "natInstanceID", "unsigned32"),
(464, "internalAddressRealm", "octetArray"),
(465, "externalAddressRealm", "octetArray"),
(466, "natQuotaExceededEvent", "unsigned32"),
(467, "natThresholdEvent", "unsigned32"),
(468, "httpUserAgent", "string"),
(469, "httpContentType", "string"),
(470, "httpReasonPhrase", "string"),
(471, "maxSessionEntries", "unsigned32"),
(472, "maxBIBEntries", "unsigned32"),
(473, "maxEntriesPerUser", "unsigned32"),
(474, "maxSubscribers", "unsigned32"),
(475, "maxFragmentsPendingReassembly", "unsigned32"),
(476, "addressPoolHighThreshold", "unsigned32"),
(477, "addressPoolLowThreshold", "unsigned32"),
(478, "addressPortMappingHighThreshold", "unsigned32"),
(479, "addressPortMappingLowThreshold", "unsigned32"),
(480, "addressPortMappingPerUserHighThreshold", "unsigned32"),
(481, "globalAddressMappingHighThreshold", "unsigned32"),
(482, "vpnIdentifier", "octetArray"),
(483, "bgpCommunity", "unsigned32"),
(484, "bgpSourceCommunityList", "basicList"),
(485, "bgpDestinationCommunityList", "basicList"),
(486, "bgpExtendedCommunity", "octetArray"),
(487, "bgpSourceExtendedCommunityList", "basicList"),
(488, "bgpDestinationExtendedCommunityList", "basicList"),
(489, "bgpLargeCommunity", "octetArray"),
(490, "bgpSourceLargeCommunityList", "basicList"),
(491, "bgpDestinationLargeCommunityList", "basicList"),
]
@classmethod
@functools.lru_cache(maxsize=128)
def by_id(cls, id_: int) -> Optional[FieldType]:
for item in cls.iana_field_types:
if item[0] == id_:
return FieldType(*item)
return None
@classmethod
@functools.lru_cache(maxsize=128)
def by_name(cls, key: str) -> Optional[FieldType]:
for item in cls.iana_field_types:
if item[1] == key:
return FieldType(*item)
return None
@classmethod
@functools.lru_cache(maxsize=128)
def get_type_unpack(cls, key: Union[int, str]) -> Optional[DataType]:
"""
This method covers the mapping from a field type to a struct.unpack format string.
BLOCKED: due to Reduced-Size Encoding, fields may be exported with a smaller length than defined in
the standard. Because of this mismatch, the parser in `IPFIXDataRecord.__init__` cannot use this method.
:param key:
:return:
"""
item = None
if type(key) is int:
item = cls.by_id(key)
elif type(key) is str:
item = cls.by_name(key)
if not item:
return None
return IPFIXDataTypes.by_name(item.type)
class IPFIXDataTypes:
# Source: https://www.iana.org/assignments/ipfix/ipfix-information-element-data-types.csv
# Reference: https://tools.ietf.org/html/rfc7011
iana_data_types = [
("octetArray", None), # has no encoding rules; it represents a raw array of zero or more octets
("unsigned8", "B"),
("unsigned16", "H"),
("unsigned32", "I"),
("unsigned64", "Q"),
("signed8", "b"),
("signed16", "h"),
("signed32", "i"),
("signed64", "q"),
("float32", "f"),
("float64", "d"),
("boolean", "?"), # encoded as a single-octet integer [..], with the value 1 for true and value 2 for false.
("macAddress", "6s"),
("string", None), # represents a finite-length string of valid characters of the Unicode encoding set
("dateTimeSeconds", "I"),
("dateTimeMilliseconds", "Q"),
("dateTimeMicroseconds", "8s"), # This field is made up of two unsigned 32-bit integers
("dateTimeNanoseconds", "8s"), # same as above
("ipv4Address", "4s"),
("ipv6Address", "16s"),
# To be implemented
# ("basicList", "x"),
# ("subTemplateList", "x"),
# ("subTemplateMultiList", "x"),
]
@classmethod
@functools.lru_cache(maxsize=128)
def by_name(cls, key: str) -> Optional[DataType]:
"""
Get DataType by name if found, else None.
:param key:
:return:
"""
for t in cls.iana_data_types:
if t[0] == key:
return DataType(*t)
return None
@classmethod
def is_signed(cls, dt: Union[DataType, str]) -> bool:
"""
Check if a data type is meant to be a signed integer.
:param dt:
:return:
"""
fields = ["signed8", "signed16", "signed32", "signed64"]
if type(dt) is DataType:
return dt.type in fields
return dt in fields
@classmethod
def is_float(cls, dt: Union[DataType, str]) -> bool:
"""
Check if data type is meant to be a float.
:param dt:
:return:
"""
fields = ["float32", "float64"]
if type(dt) is DataType:
return dt.type in fields
return dt in fields
@classmethod
def is_bytes(cls, dt: Union[DataType, str]) -> bool:
"""
Check if a data type is meant to be parsed as bytes.
:param dt:
:return:
"""
fields = ["octetArray", "string",
"macAddress", "ipv4Address", "ipv6Address",
"dateTimeMicroseconds", "dateTimeNanoseconds"]
if type(dt) is DataType:
return dt.type in fields
return dt in fields
@classmethod
def to_fitting_object(cls, field):
"""
Could implement conversion to IPv4Address etc.
:param field:
:return:
"""
pass
class IPFIXMalformedRecord(Exception):
pass
class IPFIXRFCError(Exception):
pass
class IPFIXMalformedPacket(Exception):
pass
class IPFIXTemplateError(Exception):
pass
class IPFIXTemplateNotRecognized(KeyError):
pass
class PaddingCalculationError(Exception):
pass
class IPFIXHeader:
"""The header of the IPFIX export packet
"""
size = 16
def __init__(self, data):
pack = struct.unpack('!HHIII', data)
self.version = pack[0]
self.length = pack[1]
self.export_uptime = pack[2]
self.sequence_number = pack[3]
self.obervation_domain_id = pack[4]
def to_dict(self):
return self.__dict__
class IPFIXTemplateRecord:
def __init__(self, data):
pack = struct.unpack("!HH", data[:4])
self.template_id = pack[0] # range 256 to 65535
self.field_count = pack[1] # Number of fields in this Template Record
offset = 4
self.fields, offset_add = parse_fields(data[offset:], self.field_count)
offset += offset_add
if len(self.fields) != self.field_count:
raise IPFIXMalformedRecord
self._length = offset
def get_length(self):
return self._length
def __repr__(self):
return "<IPFIXTemplateRecord with {} fields>".format(len(self.fields))
class IPFIXOptionsTemplateRecord:
def __init__(self, data):
pack = struct.unpack("!HHH", data[:6])
self.template_id = pack[0] # range 256 to 65535
self.field_count = pack[1] # includes count of scope fields
# A scope field count of N specifies that the first N Field Specifiers in
# the Template Record are Scope Fields. The Scope Field Count MUST NOT be zero.
self.scope_field_count = pack[2]
offset = 6
self.scope_fields, offset_add = parse_fields(data[offset:], self.scope_field_count)
if len(self.scope_fields) != self.scope_field_count:
raise IPFIXMalformedRecord
offset += offset_add
self.fields, offset_add = parse_fields(data[offset:], self.field_count - self.scope_field_count)
if len(self.fields) + len(self.scope_fields) != self.field_count:
raise IPFIXMalformedRecord
offset += offset_add
self._length = offset
def get_length(self):
return self._length
def __repr__(self):
return "<IPFIXOptionsTemplateRecord with {} scope fields and {} fields>".format(
len(self.scope_fields), len(self.fields)
)
class IPFIXDataRecord:
"""The IPFIX data record with fields and their value.
The field types are identified by the corresponding template.
In contrast to the NetFlow v9 implementation, this one does not use an extra class for the fields.
"""
def __init__(self, data, template: List[Union[TemplateField, TemplateFieldEnterprise]]):
self.fields = set()
offset = 0
unpacker = "!"
discovered_fields = []
# Iterate through all fields of this template and build the unpack format string
# See https://www.iana.org/assignments/ipfix/ipfix.xhtml
for index, field in enumerate(template):
field_type_id = field.id
field_length = field.length
offset += field_length
# Here, reduced-size encoding of fields blocks the usage of IPFIXFieldTypes.get_type_unpack.
# See comment in IPFIXFieldTypes.get_type_unpack for more information.
field_type = IPFIXFieldTypes.by_id(field_type_id) # type: Optional[FieldType]
if not field_type and type(field) is not TemplateFieldEnterprise:
# This should break, since the exporter seems to use a field identifier
# which is not standardized by IANA.
raise NotImplementedError("Field type with ID {} is not implemented".format(field_type_id))
datatype = field_type.type # type: str
discovered_fields.append((datatype, field_type_id))
# Catch fields which are meant to be raw bytes and skip the rest
if IPFIXDataTypes.is_bytes(datatype):
unpacker += "{}s".format(field_length)
continue
# Go into int, uint, float types
issigned = IPFIXDataTypes.is_signed(datatype)
isfloat = IPFIXDataTypes.is_float(datatype)
assert not (all([issigned, isfloat])) # signed int and float are exclusive
if field_length == 1:
unpacker += "b" if issigned else "B"
elif field_length == 2:
unpacker += "h" if issigned else "H"
elif field_length == 4:
unpacker += "i" if issigned else "f" if isfloat else "I"
elif field_length == 8:
unpacker += "q" if issigned else "d" if isfloat else "Q"
else:
raise IPFIXTemplateError("Template field_length {} not handled in unpacker".format(field_length))
# Finally, unpack the data byte stream according to format defined in iteration above
pack = struct.unpack(unpacker, data[0:offset])
# Iterate through template again, but taking the unpacked values this time
for index, ((field_datatype, field_type_id), value) in enumerate(zip(discovered_fields, pack)):
if type(value) is bytes:
# Check if value is raw bytes, so no conversion happened in struct.unpack
if field_datatype in ["string"]:
try:
value = value.decode()
except UnicodeDecodeError:
value = str(value)
# TODO: handle octetArray (= does not have to be unicode encoded)
elif field_datatype in ["boolean"]:
value = True if value == 1 else False # 2 = false per RFC
elif field_datatype in ["dateTimeMicroseconds", "dateTimeNanoseconds"]:
seconds = value[:4]
fraction = value[4:]
value = (int.from_bytes(seconds, "big"), int.from_bytes(fraction, "big"))
else:
value = int.from_bytes(value, "big")
# If not bytes, struct.unpack already did necessary conversions (int, float...),
# value can be used as-is.
self.fields.add((field_type_id, value))
self._length = offset
self.__dict__.update(self.data)
def get_length(self):
return self._length
@property
def data(self):
return {
IPFIXFieldTypes.by_id(key)[1]: value for (key, value) in self.fields
}
def __repr__(self):
return "<IPFIXDataRecord with {} entries>".format(len(self.fields))
class IPFIXSet:
"""A set containing the set header and a collection of records (one of templates, options, data)
"""
def __init__(self, data: bytes, templates):
self.header = IPFIXSetHeader(data[0:IPFIXSetHeader.size])
self.records = []
self._templates = {}
offset = IPFIXSetHeader.size # fixed size
if self.header.set_id == 2: # template set
while offset < self.header.length: # length of whole set
template_record = IPFIXTemplateRecord(data[offset:])
self.records.append(template_record)
if template_record.field_count == 0:
# Should not happen, since RFC says "one or more"
self._templates[template_record.template_id] = None
else:
self._templates[template_record.template_id] = template_record.fields
offset += template_record.get_length()
# If the rest of the data is deemed to be too small for another
# template record, check existence of padding
if (
offset != self.header.length
and self.header.length - offset <= 16 # 16 is chosen as a guess
and rest_is_padding_zeroes(data[:self.header.length], offset)
):
# Rest should be padding zeroes
break
elif self.header.set_id == 3: # options template
while offset < self.header.length:
optionstemplate_record = IPFIXOptionsTemplateRecord(data[offset:])
self.records.append(optionstemplate_record)
if optionstemplate_record.field_count == 0:
self._templates[optionstemplate_record.template_id] = None
else:
self._templates[optionstemplate_record.template_id] = \
optionstemplate_record.scope_fields + optionstemplate_record.fields
offset += optionstemplate_record.get_length()
# If the rest of the data is deemed to be too small for another
# options template record, check existence of padding
if (
offset != self.header.length
and self.header.length - offset <= 16 # 16 is chosen as a guess
and rest_is_padding_zeroes(data[:self.header.length], offset)
):
# Rest should be padding zeroes
break
elif self.header.set_id >= 256: # data set, set_id is template id
# First, get the template behind the ID. Returns a list of fields or raises an exception
template_fields = templates.get(
self.header.set_id) # type: List[Union[TemplateField, TemplateFieldEnterprise]]
if not template_fields:
raise IPFIXTemplateNotRecognized
# All template fields have a known length. Add them all together to get the length of the data set.
dataset_length = functools.reduce(lambda a, x: a + x.length, template_fields, 0)
# This is the last possible offset value possible if there's no padding.
# If there is padding, this value marks the beginning of the padding.
# Two cases possible:
# 1. No padding: then (4 + x * dataset_length) == self.header.length
# 2. Padding: then (4 + x * dataset_length + p) == self.header.length,
# where p is the remaining length of padding zeroes. The modulo calculates p
no_padding_last_offset = self.header.length - ((self.header.length - IPFIXSetHeader.size) % dataset_length)
while offset < no_padding_last_offset:
data_record = IPFIXDataRecord(data[offset:], template_fields)
self.records.append(data_record)
offset += data_record.get_length()
# Safety check
if (
offset != self.header.length
and not rest_is_padding_zeroes(data[:self.header.length], offset)
):
raise PaddingCalculationError
self._length = self.header.length
def get_length(self):
return self._length
@property
def is_template(self):
return self.header.set_id in [2, 3]
@property
def is_data(self):
return self.header.set_id >= 256
@property
def templates(self):
return self._templates
def __repr__(self):
return "<IPFIXSet with set_id {} and {} records>".format(self.header.set_id, len(self.records))
class IPFIXSetHeader:
"""Header of a set (collection of records)
"""
size = 4
def __init__(self, data):
pack = struct.unpack("!HH", data)
# A value of 2 is reserved for Template Sets.
# A value of 3 is reserved for Options Template Sets. Values from 4
# to 255 are reserved for future use. Values 256 and above are used
# for Data Sets. The Set ID values of 0 and 1 are not used, for
# historical reasons [RFC3954].
self.set_id = pack[0]
if self.set_id in [0, 1] + [i for i in range(4, 256)]:
raise IPFIXRFCError("IPFIX set has forbidden ID {}".format(self.set_id))
self.length = pack[1] # Total length of the Set, in octets, including the Set Header
def to_dict(self):
return self.__dict__
def __repr__(self):
return "<IPFIXSetHeader with set_id {} and length {}>".format(self.set_id, self.length)
class IPFIXExportPacket:
"""IPFIX export packet with header, templates, options and data flowsets
"""
def __init__(self, data: bytes, templates: Dict[int, list]):
self.header = IPFIXHeader(data[:IPFIXHeader.size])
self.sets = []
self._contains_new_templates = False
self._flows = []
self._templates = templates
offset = IPFIXHeader.size
while offset < self.header.length:
try:
new_set = IPFIXSet(data[offset:], templates)
except IPFIXTemplateNotRecognized:
raise
if new_set.is_template:
self._contains_new_templates = True
self._templates.update(new_set.templates)
for template_id, template_fields in self._templates.items():
if template_fields is None:
# Template withdrawal
del self._templates[template_id]
elif new_set.is_data:
self._flows += new_set.records
self.sets.append(new_set)
offset += new_set.get_length()
# Here all data should be processed and offset set to the length
if offset != self.header.length:
raise IPFIXMalformedPacket
@property
def contains_new_templates(self) -> bool:
return self._contains_new_templates
@property
def flows(self):
return self._flows
@property
def templates(self):
return self._templates
def __repr__(self):
return "<IPFIXExportPacket with {} sets, exported at {}>".format(
len(self.sets), self.header.export_uptime
)
def parse_fields(data: bytes, count: int) -> (list, int):
"""
Parse fields from a bytes stream, based on the count of fields.
If the field is an enterprise field or not will be determinded in this function.
:param data:
:param count:
:return: List of fields and the new offset.
"""
offset = 0
fields = [] # type: List[Union[TemplateField, TemplateFieldEnterprise]]
for ctr in range(count):
if (data[offset] & (1 << 7)) != 0: # enterprise flag set. Bitwise AND checks bit only in the first byte/octet
pack = struct.unpack("!HHI", data[offset:offset + 8])
fields.append(
TemplateFieldEnterprise(
id=(pack[0] & ~(1 << 15)), # clear enterprise flag bit. Bitwise AND and INVERT work on two bytes
length=pack[1], # field length
enterprise_number=pack[2] # enterprise number
)
)
offset += 8
else:
pack = struct.unpack("!HH", data[offset:offset + 4])
fields.append(
TemplateField(
id=pack[0],
length=pack[1]
)
)
offset += 4
return fields, offset
def rest_is_padding_zeroes(data: bytes, offset: int) -> bool:
if offset <= len(data):
# padding zeros, so rest of bytes must be summed to 0
if sum(data[offset:]) != 0:
return False
return True
# If offset > len(data) there is an error
raise ValueError("netflow.ipfix.rest_is_padding_zeroes received a greater offset value than there is data")

View file

@ -1,101 +0,0 @@
#!/usr/bin/env python3
"""
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
import struct
from typing import Union, Dict
from .ipfix import IPFIXExportPacket
from .v1 import V1ExportPacket
from .v5 import V5ExportPacket
from .v9 import V9ExportPacket
class UnknownExportVersion(Exception):
def __init__(self, data, version):
self.data = data
self.version = version
r = repr(data)
data_str = ("{:.25}..." if len(r) >= 28 else "{}").format(r)
super().__init__(
"Unknown NetFlow version {} for data {}".format(version, data_str)
)
def get_export_version(data):
return struct.unpack('!H', data[:2])[0]
def parse_packet(data: Union[str, bytes], templates: Dict = None) \
-> Union[V1ExportPacket, V5ExportPacket, V9ExportPacket, IPFIXExportPacket]:
"""
Parse an exported packet, either from string (hex) or from bytes.
NetFlow version 9 and IPFIX use dynamic templates, which are sent by the exporter in regular intervals.
These templates must be cached in between exports and are re-used for incoming new export packets.
The following pseudo-code might help to understand the use case better. First, the collector is started, a new
templates dict is created with default keys and an empty list for buffered packets is added. Then the receiver
loop is started. For each arriving packet, it is tried to be parsed. If parsing fails due to unknown templates,
the packet is queued for later re-parsing (this functionality is not handled in this code snippet).
```
collector = netflow.collector
coll = collector.start('0.0.0.0', 2055)
templates = {"netflow": [], "ipfix": []}
packets_with_unrecognized_templates = []
while coll.receive_export():
packet = coll.get_received_export_packet()
try:
parsed_packet = parse_packet(packet, templates)
except (V9TemplateNotRecognized, IPFIXTemplateNotRecognized):
packets_with_unrecognized_templates.append(packet)
```
See the reference implementation of the collector for more information on how to use this function with templates.
:raises ValueError: When the templates parameter was not passed, but templates must be used (v9, IPFIX).
:raises UnknownExportVersion: When the exported version is not recognized.
:param data: The export packet as string or bytes.
:param templates: The templates dictionary with keys 'netflow' and 'ipfix' (created if not existing).
:return: The parsed packet, or an exception.
"""
if type(data) is str:
# hex dump as string
data = bytes.fromhex(data)
elif type(data) is bytes:
# check representation based on utf-8 decoding result
try:
# hex dump as bytes, but not hex
dec = data.decode()
data = bytes.fromhex(dec)
except UnicodeDecodeError:
# use data as given, assuming hex-formatted bytes
pass
version = get_export_version(data)
if version in [9, 10] and templates is None:
raise ValueError("{} packet detected, but no templates dict was passed! For correct parsing of packets with "
"templates, create a 'templates' dict and pass it into the 'parse_packet' function."
.format("NetFlow v9" if version == 9 else "IPFIX"))
if version == 1:
return V1ExportPacket(data)
elif version == 5:
return V5ExportPacket(data)
elif version == 9:
if "netflow" not in templates:
templates["netflow"] = []
return V9ExportPacket(data, templates["netflow"])
elif version == 10:
if "ipfix" not in templates:
templates["ipfix"] = []
return IPFIXExportPacket(data, templates["ipfix"])
raise UnknownExportVersion(data, version)

View file

@ -1,86 +0,0 @@
#!/usr/bin/env python3
"""
Netflow V1 collector and parser implementation in Python 3.
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Created purely for fun. Not battled tested nor will it be.
Reference https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html
This script is specifically implemented in combination with softflowd. See https://github.com/djmdjm/softflowd
"""
import struct
__all__ = ["V1DataFlow", "V1ExportPacket", "V1Header"]
class V1DataFlow:
"""Holds one v1 DataRecord
"""
length = 48
def __init__(self, data):
pack = struct.unpack('!IIIHHIIIIHHxxBBBxxxxxxx', data)
fields = [
'IPV4_SRC_ADDR',
'IPV4_DST_ADDR',
'NEXT_HOP',
'INPUT',
'OUTPUT',
'IN_PACKETS',
'IN_OCTETS',
'FIRST_SWITCHED',
'LAST_SWITCHED',
'SRC_PORT',
'DST_PORT',
# Word at 36-37 is used for padding
'PROTO',
'TOS',
'TCP_FLAGS',
# Data at 41-47 is padding/reserved
]
self.data = {}
for idx, field in enumerate(fields):
self.data[field] = pack[idx]
self.__dict__.update(self.data) # Make data dict entries accessible as object attributes
def __repr__(self):
return "<DataRecord with data {}>".format(self.data)
class V1Header:
"""The header of the V1ExportPacket
"""
length = 16
def __init__(self, data):
pack = struct.unpack('!HHIII', data[:self.length])
self.version = pack[0]
self.count = pack[1]
self.uptime = pack[2]
self.timestamp = pack[3]
self.timestamp_nano = pack[4]
def to_dict(self):
return self.__dict__
class V1ExportPacket:
"""The flow record holds the header and data flowsets.
"""
def __init__(self, data):
self.flows = []
self.header = V1Header(data)
offset = self.header.length
for flow_count in range(0, self.header.count):
end = offset + V1DataFlow.length
flow = V1DataFlow(data[offset:end])
self.flows.append(flow)
offset += flow.length
def __repr__(self):
return "<ExportPacket v{} with {} records>".format(
self.header.version, self.header.count)

View file

@ -1,94 +0,0 @@
#!/usr/bin/env python3
"""
Netflow V5 collector and parser implementation in Python 3.
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Created purely for fun. Not battled tested nor will it be.
Reference: https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html
This script is specifically implemented in combination with softflowd. See https://github.com/djmdjm/softflowd
"""
import struct
__all__ = ["V5DataFlow", "V5ExportPacket", "V5Header"]
class V5DataFlow:
"""Holds one v5 DataRecord
"""
length = 48
def __init__(self, data):
pack = struct.unpack("!IIIHHIIIIHHxBBBHHBBxx", data)
fields = [
'IPV4_SRC_ADDR',
'IPV4_DST_ADDR',
'NEXT_HOP',
'INPUT',
'OUTPUT',
'IN_PACKETS',
'IN_OCTETS',
'FIRST_SWITCHED',
'LAST_SWITCHED',
'SRC_PORT',
'DST_PORT',
# Byte 36 is used for padding
'TCP_FLAGS',
'PROTO',
'TOS',
'SRC_AS',
'DST_AS',
'SRC_MASK',
'DST_MASK',
# Word 46 is used for padding
]
self.data = {}
for idx, field in enumerate(fields):
self.data[field] = pack[idx]
self.__dict__.update(self.data) # Make data dict entries accessible as object attributes
def __repr__(self):
return "<DataRecord with data {}>".format(self.data)
class V5Header:
"""The header of the V5ExportPacket
"""
length = 24
def __init__(self, data):
pack = struct.unpack('!HHIIIIBBH', data[:self.length])
self.version = pack[0]
self.count = pack[1]
self.uptime = pack[2]
self.timestamp = pack[3]
self.timestamp_nano = pack[4]
self.sequence = pack[5]
self.engine_type = pack[6]
self.engine_id = pack[7]
self.sampling_interval = pack[8]
def to_dict(self):
return self.__dict__
class V5ExportPacket:
"""The flow record holds the header and data flowsets.
"""
def __init__(self, data):
self.flows = []
self.header = V5Header(data)
offset = self.header.length
for flow_count in range(0, self.header.count):
end = offset + V5DataFlow.length
flow = V5DataFlow(data[offset:end])
self.flows.append(flow)
offset += flow.length
def __repr__(self):
return "<ExportPacket v{} with {} records>".format(
self.header.version, self.header.count)

View file

@ -1,615 +0,0 @@
#!/usr/bin/env python3
"""
Netflow V9 collector and parser implementation in Python 3.
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Created for learning purposes and unsatisfying alternatives.
Reference: https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html
This script is specifically implemented in combination with softflowd. See https://github.com/djmdjm/softflowd
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
import ipaddress
import struct
import sys
from .ipfix import IPFIXFieldTypes, IPFIXDataTypes
__all__ = ["V9DataFlowSet", "V9DataRecord", "V9ExportPacket", "V9Header", "V9TemplateField",
"V9TemplateFlowSet", "V9TemplateNotRecognized", "V9TemplateRecord",
"V9OptionsTemplateFlowSet", "V9OptionsTemplateRecord", "V9OptionsDataRecord"]
V9_FIELD_TYPES_CONTAINING_IP = [8, 12, 15, 18, 27, 28, 62, 63]
V9_FIELD_TYPES = {
0: 'UNKNOWN_FIELD_TYPE', # fallback for unknown field types
# Cisco specs for NetFlow v9
# https://tools.ietf.org/html/rfc3954
# https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html
1: 'IN_BYTES',
2: 'IN_PKTS',
3: 'FLOWS',
4: 'PROTOCOL',
5: 'SRC_TOS',
6: 'TCP_FLAGS',
7: 'L4_SRC_PORT',
8: 'IPV4_SRC_ADDR',
9: 'SRC_MASK',
10: 'INPUT_SNMP',
11: 'L4_DST_PORT',
12: 'IPV4_DST_ADDR',
13: 'DST_MASK',
14: 'OUTPUT_SNMP',
15: 'IPV4_NEXT_HOP',
16: 'SRC_AS',
17: 'DST_AS',
18: 'BGP_IPV4_NEXT_HOP',
19: 'MUL_DST_PKTS',
20: 'MUL_DST_BYTES',
21: 'LAST_SWITCHED',
22: 'FIRST_SWITCHED',
23: 'OUT_BYTES',
24: 'OUT_PKTS',
25: 'MIN_PKT_LNGTH',
26: 'MAX_PKT_LNGTH',
27: 'IPV6_SRC_ADDR',
28: 'IPV6_DST_ADDR',
29: 'IPV6_SRC_MASK',
30: 'IPV6_DST_MASK',
31: 'IPV6_FLOW_LABEL',
32: 'ICMP_TYPE',
33: 'MUL_IGMP_TYPE',
34: 'SAMPLING_INTERVAL',
35: 'SAMPLING_ALGORITHM',
36: 'FLOW_ACTIVE_TIMEOUT',
37: 'FLOW_INACTIVE_TIMEOUT',
38: 'ENGINE_TYPE',
39: 'ENGINE_ID',
40: 'TOTAL_BYTES_EXP',
41: 'TOTAL_PKTS_EXP',
42: 'TOTAL_FLOWS_EXP',
# 43 vendor proprietary
44: 'IPV4_SRC_PREFIX',
45: 'IPV4_DST_PREFIX',
46: 'MPLS_TOP_LABEL_TYPE',
47: 'MPLS_TOP_LABEL_IP_ADDR',
48: 'FLOW_SAMPLER_ID',
49: 'FLOW_SAMPLER_MODE',
50: 'NTERVAL',
# 51 vendor proprietary
52: 'MIN_TTL',
53: 'MAX_TTL',
54: 'IPV4_IDENT',
55: 'DST_TOS',
56: 'IN_SRC_MAC',
57: 'OUT_DST_MAC',
58: 'SRC_VLAN',
59: 'DST_VLAN',
60: 'IP_PROTOCOL_VERSION',
61: 'DIRECTION',
62: 'IPV6_NEXT_HOP',
63: 'BPG_IPV6_NEXT_HOP',
64: 'IPV6_OPTION_HEADERS',
# 65-69 vendor proprietary
70: 'MPLS_LABEL_1',
71: 'MPLS_LABEL_2',
72: 'MPLS_LABEL_3',
73: 'MPLS_LABEL_4',
74: 'MPLS_LABEL_5',
75: 'MPLS_LABEL_6',
76: 'MPLS_LABEL_7',
77: 'MPLS_LABEL_8',
78: 'MPLS_LABEL_9',
79: 'MPLS_LABEL_10',
80: 'IN_DST_MAC',
81: 'OUT_SRC_MAC',
82: 'IF_NAME',
83: 'IF_DESC',
84: 'SAMPLER_NAME',
85: 'IN_PERMANENT_BYTES',
86: 'IN_PERMANENT_PKTS',
# 87 vendor property
88: 'FRAGMENT_OFFSET',
89: 'FORWARDING_STATUS',
90: 'MPLS_PAL_RD',
91: 'MPLS_PREFIX_LEN', # Number of consecutive bits in the MPLS prefix length.
92: 'SRC_TRAFFIC_INDEX', # BGP Policy Accounting Source Traffic Index
93: 'DST_TRAFFIC_INDEX', # BGP Policy Accounting Destination Traffic Index
94: 'APPLICATION_DESCRIPTION', # Application description
95: 'APPLICATION_TAG', # 8 bits of engine ID, followed by n bits of classification
96: 'APPLICATION_NAME', # Name associated with a classification
98: 'postipDiffServCodePoint', # The value of a Differentiated Services Code Point (DSCP)
# encoded in the Differentiated Services Field, after modification
99: 'replication_factor', # Multicast replication factor
100: 'DEPRECATED', # DEPRECATED
102: 'layer2packetSectionOffset', # Layer 2 packet section offset. Potentially a generic offset
103: 'layer2packetSectionSize', # Layer 2 packet section size. Potentially a generic size
104: 'layer2packetSectionData', # Layer 2 packet section data
# 105-127 reserved for future use by Cisco
# ASA extensions
# https://www.cisco.com/c/en/us/td/docs/security/asa/special/netflow/guide/asa_netflow.html
148: 'NF_F_CONN_ID', # An identifier of a unique flow for the device
176: 'NF_F_ICMP_TYPE', # ICMP type value
177: 'NF_F_ICMP_CODE', # ICMP code value
178: 'NF_F_ICMP_TYPE_IPV6', # ICMP IPv6 type value
179: 'NF_F_ICMP_CODE_IPV6', # ICMP IPv6 code value
225: 'NF_F_XLATE_SRC_ADDR_IPV4', # Post NAT Source IPv4 Address
226: 'NF_F_XLATE_DST_ADDR_IPV4', # Post NAT Destination IPv4 Address
227: 'NF_F_XLATE_SRC_PORT', # Post NATT Source Transport Port
228: 'NF_F_XLATE_DST_PORT', # Post NATT Destination Transport Port
281: 'NF_F_XLATE_SRC_ADDR_IPV6', # Post NAT Source IPv6 Address
282: 'NF_F_XLATE_DST_ADDR_IPV6', # Post NAT Destination IPv6 Address
233: 'NF_F_FW_EVENT', # High-level event code
33002: 'NF_F_FW_EXT_EVENT', # Extended event code
323: 'NF_F_EVENT_TIME_MSEC', # The time that the event occurred, which comes from IPFIX
152: 'NF_F_FLOW_CREATE_TIME_MSEC',
231: 'NF_F_FWD_FLOW_DELTA_BYTES', # The delta number of bytes from source to destination
232: 'NF_F_REV_FLOW_DELTA_BYTES', # The delta number of bytes from destination to source
33000: 'NF_F_INGRESS_ACL_ID', # The input ACL that permitted or denied the flow
33001: 'NF_F_EGRESS_ACL_ID', # The output ACL that permitted or denied a flow
40000: 'NF_F_USERNAME', # AAA username
# PaloAlto PAN-OS 8.0
# https://www.paloaltonetworks.com/documentation/80/pan-os/pan-os/monitoring/netflow-monitoring/netflow-templates
346: 'PANOS_privateEnterpriseNumber',
56701: 'PANOS_APPID',
56702: 'PANOS_USERID'
}
V9_SCOPE_TYPES = {
1: "System",
2: "Interface",
3: "Line Card",
4: "Cache",
5: "Template"
}
class V9TemplateNotRecognized(KeyError):
pass
class V9DataRecord:
"""This is a 'flow' as we want it from our source. What it contains is
variable in NetFlow V9, so to work with the data you have to analyze the
data dict keys (which are integers and can be mapped with the FIELD_TYPES
dict).
Should hold a 'data' dict with keys=field_type (integer) and value (in bytes).
"""
def __init__(self):
self.data = {}
def __repr__(self):
return "<DataRecord with data: {}>".format(self.data)
class V9DataFlowSet:
"""Holds one or multiple DataRecord which are all defined after the same
template. This template is referenced in the field 'flowset_id' of this
DataFlowSet and must not be zero.
"""
def __init__(self, data, template):
pack = struct.unpack('!HH', data[:4])
self.template_id = pack[0] # flowset_id is reference to a template_id
self.length = pack[1]
self.flows = []
offset = 4
# As the field lengths are variable V9 has padding to next 32 Bit
padding_size = 4 - (self.length % 4) # 4 Byte
# For performance reasons, we use struct.unpack to get individual values. Here
# we prepare the format string for parsing it. The format string is based on the template fields and their
# lengths. The string can then be re-used for every data record in the data stream
struct_format = '!'
struct_len = 0
for field in template.fields:
# The length of the value byte slice is defined in the template
flen = field.field_length
if flen == 4:
struct_format += 'L'
elif flen == 2:
struct_format += 'H'
elif flen == 1:
struct_format += 'B'
else:
struct_format += '%ds' % flen
struct_len += flen
while offset <= (self.length - padding_size):
# Here we actually unpack the values, the struct format string is used in every data record
# iteration, until the final offset reaches the end of the whole data stream
unpacked_values = struct.unpack(struct_format, data[offset:offset + struct_len])
new_record = V9DataRecord()
for field, value in zip(template.fields, unpacked_values):
flen = field.field_length
fkey = V9_FIELD_TYPES[field.field_type]
# Special handling of IP addresses to convert integers to strings to not lose precision in dump
# TODO: might only be needed for IPv6
if field.field_type in V9_FIELD_TYPES_CONTAINING_IP:
try:
ip = ipaddress.ip_address(value)
except ValueError:
print("IP address could not be parsed: {}".format(repr(value)))
continue
new_record.data[fkey] = ip.compressed
elif flen in (1, 2, 4):
# These values are already converted to numbers by struct.unpack:
new_record.data[fkey] = value
else:
# Caveat: this code assumes little-endian system (like x86)
if sys.byteorder != "little":
print("v9.py uses bit shifting for little endianness. Your processor is not little endian")
fdata = 0
for idx, byte in enumerate(reversed(bytearray(value))):
fdata += byte << (idx * 8)
new_record.data[fkey] = fdata
offset += flen
new_record.__dict__.update(new_record.data)
self.flows.append(new_record)
def __repr__(self):
return "<DataFlowSet with template {} of length {} holding {} flows>" \
.format(self.template_id, self.length, len(self.flows))
class V9TemplateField:
"""A field with type identifier and length.
"""
def __init__(self, field_type, field_length):
self.field_type = field_type # integer
self.field_length = field_length # bytes
def __repr__(self):
return "<TemplateField type {}:{}, length {}>".format(
self.field_type, V9_FIELD_TYPES[self.field_type], self.field_length)
class V9TemplateRecord:
"""A template record contained in a TemplateFlowSet.
"""
def __init__(self, template_id, field_count, fields: list):
self.template_id = template_id
self.field_count = field_count
self.fields = fields
def __repr__(self):
return "<TemplateRecord {} with {} fields: {}>".format(
self.template_id, self.field_count,
' '.join([V9_FIELD_TYPES[field.field_type] for field in self.fields]))
class V9OptionsDataRecord:
def __init__(self):
self.scopes = {}
self.data = {}
def __repr__(self):
return "<V9OptionsDataRecord with scopes {} and data {}>".format(self.scopes.keys(), self.data.keys())
class V9OptionsTemplateRecord:
"""An options template record contained in an options template flowset.
"""
def __init__(self, template_id, scope_fields: dict, option_fields: dict):
self.template_id = template_id
self.scope_fields = scope_fields
self.option_fields = option_fields
def __repr__(self):
return "<V9OptionsTemplateRecord with scope fields {} and option fields {}>".format(
self.scope_fields.keys(), self.option_fields.keys())
class V9OptionsTemplateFlowSet:
"""An options template flowset.
> Each Options Template FlowSet MAY contain multiple Options Template Records.
Scope field types range from 1 to 5:
1 System
2 Interface
3 Line Card
4 Cache
5 Template
"""
def __init__(self, data: bytes):
pack = struct.unpack('!HH', data[:4])
self.flowset_id = pack[0] # always 1
self.flowset_length = pack[1] # length of this flowset
self.templates = {}
offset = 4
while offset < self.flowset_length:
pack = struct.unpack("!HHH", data[offset:offset + 6]) # options template header
template_id = pack[0] # value above 255
option_scope_length = pack[1]
options_length = pack[2]
offset += 6
# Fetch all scope fields (most probably only one field)
scopes = {} # Holds "type: length" key-value pairs
if option_scope_length % 4 != 0 or options_length % 4 != 0:
raise ValueError(option_scope_length, options_length)
for scope_counter in range(option_scope_length // 4): # example: option_scope_length = 4 means one scope
pack = struct.unpack("!HH", data[offset:offset + 4])
scope_field_type = pack[0] # values range from 1 to 5
scope_field_length = pack[1]
scopes[scope_field_type] = scope_field_length
offset += 4
# Fetch all option fields
options = {} # same
for option_counter in range(options_length // 4): # now counting the options
pack = struct.unpack("!HH", data[offset:offset + 4])
option_field_type = pack[0]
option_field_length = pack[1]
options[option_field_type] = option_field_length
offset += 4
optionstemplate = V9OptionsTemplateRecord(template_id, scopes, options)
self.templates[template_id] = optionstemplate
# handle padding and add offset if needed
if offset % 4 == 2:
offset += 2
def __repr__(self):
return "<V9OptionsTemplateFlowSet with {} templates: {}>".format(len(self.templates), self.templates.keys())
class V9OptionsDataFlowset:
"""An options data flowset with option data records
"""
def __init__(self, data: bytes, template: V9OptionsTemplateRecord):
pack = struct.unpack('!HH', data[:4])
self.template_id = pack[0]
self.length = pack[1]
self.option_data_records = []
offset = 4
while offset < self.length:
new_options_record = V9OptionsDataRecord()
for scope_type, length in template.scope_fields.items():
type_name = V9_SCOPE_TYPES.get(scope_type, scope_type) # Either name, or unknown int
value = int.from_bytes(data[offset:offset + length], 'big') # TODO: is this always integer?
new_options_record.scopes[type_name] = value
offset += length
for field_type, length in template.option_fields.items():
type_name = V9_FIELD_TYPES.get(field_type, None)
is_bytes = False
if not type_name: # Cisco refers to the IANA IPFIX table for types >256...
iana_type = IPFIXFieldTypes.by_id(field_type) # try to get from IPFIX types
if iana_type:
type_name = iana_type.name
is_bytes = IPFIXDataTypes.is_bytes(iana_type)
if not type_name:
raise ValueError
value = None
if is_bytes:
value = data[offset:offset + length]
else:
value = int.from_bytes(data[offset:offset + length], 'big')
new_options_record.data[type_name] = value
offset += length
self.option_data_records.append(new_options_record)
if offset % 4 == 2:
offset += 2
class V9TemplateFlowSet:
"""A template flowset, which holds an id that is used by data flowsets to
reference back to the template. The template then has fields which hold
identifiers of data types (eg "IP_SRC_ADDR", "PKTS"..). This way the flow
sender can dynamically put together data flowsets.
"""
def __init__(self, data):
pack = struct.unpack('!HH', data[:4])
self.flowset_id = pack[0] # always 0
self.length = pack[1] # total length including this header in bytes
self.templates = {}
offset = 4 # Skip header
# Iterate through all template records in this template flowset
while offset < self.length:
pack = struct.unpack('!HH', data[offset:offset + 4])
template_id = pack[0]
field_count = pack[1]
fields = []
for field in range(field_count):
# Get all fields of this template
offset += 4
field_type, field_length = struct.unpack('!HH', data[offset:offset + 4])
if field_type not in V9_FIELD_TYPES:
field_type = 0 # Set field_type to UNKNOWN_FIELD_TYPE as fallback
field = V9TemplateField(field_type, field_length)
fields.append(field)
# Create a template object with all collected data
template = V9TemplateRecord(template_id, field_count, fields)
# Append the new template to the global templates list
self.templates[template.template_id] = template
# Set offset to next template_id field
offset += 4
def __repr__(self):
return "<TemplateFlowSet with id {} of length {} containing templates: {}>" \
.format(self.flowset_id, self.length, self.templates.keys())
class V9Header:
"""The header of the V9ExportPacket
"""
length = 20
def __init__(self, data):
pack = struct.unpack('!HHIIII', data[:self.length])
self.version = pack[0]
self.count = pack[1] # not sure if correct. softflowd: no of flows
self.uptime = pack[2]
self.timestamp = pack[3]
self.sequence = pack[4]
self.source_id = pack[5]
def to_dict(self):
return self.__dict__
class V9ExportPacket:
"""The flow record holds the header and all template and data flowsets.
TODO: refactor into two loops: first get all contained flowsets and examine template
flowsets first. Then data flowsets.
"""
def __init__(self, data: bytes, templates: dict):
self.header = V9Header(data)
self._templates = templates
self._new_templates = False
self._flows = []
self._options = []
offset = self.header.length
skipped_flowsets_offsets = []
while offset != len(data):
pack = struct.unpack('!HH', data[offset:offset + 4])
flowset_id = pack[0] # = template id
flowset_length = pack[1]
# Data template flowsets
if flowset_id == 0: # TemplateFlowSet always have id 0
tfs = V9TemplateFlowSet(data[offset:])
# Update the templates with the provided templates, even if they are the same
for id_, template in tfs.templates.items():
if id_ not in self._templates:
self._new_templates = True
self._templates[id_] = template
if tfs.length == 0:
break
offset += tfs.length
continue
# Option template flowsets
elif flowset_id == 1: # Option templates always use ID 1
otfs = V9OptionsTemplateFlowSet(data[offset:])
for id_, template in otfs.templates.items():
if id_ not in self._templates:
self._new_templates = True
self._templates[id_] = template
offset += otfs.flowset_length
if otfs.flowset_length == 0:
break
continue
# Data / option flowsets
# First, check if template is known
if flowset_id not in self._templates:
# Could not be parsed, continue to check for templates
skipped_flowsets_offsets.append(offset)
offset += flowset_length
if flowset_length == 0:
break
continue
matched_template = self._templates[flowset_id]
if isinstance(matched_template, V9TemplateRecord):
dfs = V9DataFlowSet(data[offset:], matched_template)
self._flows += dfs.flows
if dfs.length == 0:
break
offset += dfs.length
elif isinstance(matched_template, V9OptionsTemplateRecord):
odfs = V9OptionsDataFlowset(data[offset:], matched_template)
self._options += odfs.option_data_records
if odfs.length == 0:
break
offset += odfs.length
else:
raise NotImplementedError
# In the same export packet, re-try flowsets with previously unknown templates.
# Might happen, if an export packet first contains data flowsets, and template flowsets after
if skipped_flowsets_offsets and self._new_templates:
# Process flowsets in the data slice which occured before the template sets
# Handling of offset increases is not needed here
for offset in skipped_flowsets_offsets:
pack = struct.unpack('!H', data[offset:offset + 2])
flowset_id = pack[0]
if flowset_id not in self._templates:
raise V9TemplateNotRecognized
matched_template = self._templates[flowset_id]
if isinstance(matched_template, V9TemplateRecord):
dfs = V9DataFlowSet(data[offset:], matched_template)
self._flows += dfs.flows
elif isinstance(matched_template, V9OptionsTemplateRecord):
odfs = V9OptionsDataFlowset(data[offset:], matched_template)
self._options += odfs.option_data_records
elif skipped_flowsets_offsets:
raise V9TemplateNotRecognized
@property
def contains_new_templates(self):
return self._new_templates
@property
def flows(self):
return self._flows
@property
def templates(self):
return self._templates
@property
def options(self):
return self._options
def __repr__(self):
s = " and new template(s)" if self.contains_new_templates else ""
return "<V9ExportPacket with {} records{}>".format(self.header.count, s)

Binary file not shown.

View file

@ -0,0 +1 @@
647710280e4ea6e71e5fe4e639f91d73170e6557

Binary file not shown.

Before

Width:  |  Height:  |  Size: 19 KiB

View file

@ -1,27 +0,0 @@
#!/usr/bin/env python3
from setuptools import setup
with open("README.md", "r") as fh:
long_description = fh.read()
setup(
name='netflow',
version='0.12.2',
description='NetFlow v1, v5, v9 and IPFIX tool suite implemented in Python 3',
long_description=long_description,
long_description_content_type='text/markdown',
author='Dominik Pataky',
author_email='software+pynetflow@dpataky.eu',
url='https://github.com/bitkeks/python-netflow-v9-softflowd',
packages=["netflow"],
license='MIT',
python_requires='>=3.5.3',
keywords='netflow ipfix collector parser',
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Intended Audience :: Developers",
"Intended Audience :: System Administrators"
],
)

View file

View file

@ -1,325 +0,0 @@
"""
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
The test packets (defined below as hex streams) were extracted from "real"
softflowd exports based on a sample PCAP capture file.
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
# The flowset with 2 templates (IPv4 and IPv6) and 8 flows with data
import queue
import random
import socket
import time
from netflow.collector import ThreadedNetFlowListener
# Invalid export hex stream
PACKET_INVALID = "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
CONNECTION = ('127.0.0.1', 1337)
NUM_PACKETS = 1000
def emit_packets(packets, delay=0.0001):
"""Send the provided packets to the listener"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for p in packets:
sock.sendto(bytes.fromhex(p), CONNECTION)
time.sleep(delay)
sock.close()
def send_recv_packets(packets, delay=0.0001, store_packets=-1) -> (list, float, float):
"""Starts a listener, send packets, receives packets
returns a tuple: ([(ts, export), ...], time_started_sending, time_stopped_sending)
"""
listener = ThreadedNetFlowListener(*CONNECTION)
tstart = time.time()
emit_packets(packets, delay=delay)
time.sleep(0.5) # Allow packets to be sent and recieved
tend = time.time()
listener.start()
pkts = []
to_pad = 0
while True:
try:
packet = listener.get(timeout=0.5)
if -1 == store_packets or store_packets > 0:
# Case where a programm yields from the queue and stores all packets.
pkts.append(packet)
if store_packets != -1 and len(pkts) > store_packets:
to_pad += len(pkts) # Hack for testing
pkts.clear()
else:
# Performance measurements for cases where yielded objects are processed
# immediatelly instead of stored. Add empty tuple to retain counting possibility.
pkts.append(())
except queue.Empty:
break
listener.stop()
listener.join()
if to_pad > 0:
pkts = [()] * to_pad + pkts
return pkts, tstart, tend
def generate_packets(amount, version, template_every_x=100):
packets = [PACKET_IPFIX]
template = PACKET_IPFIX_TEMPLATE
if version == 1:
packets = [PACKET_V1]
elif version == 5:
packets = [PACKET_V5]
elif version == 9:
packets = [*PACKETS_V9]
template = PACKET_V9_TEMPLATE
if amount < template_every_x:
template_every_x = 10
# If the list of test packets is only one item big (the same packet is used over and over),
# do not use random.choice - it costs performance and results in the same packet every time.
def single_packet(pkts):
return pkts[0]
packet_func = single_packet
if len(packets) > 1:
packet_func = random.choice
for x in range(amount):
if x % template_every_x == 0 and version in [9, 10]:
# First packet always a template, then periodically
# Note: this was once based on random.random, but it costs performance
yield template
else:
yield packet_func(packets)
# Example export for v1 which contains two flows from one ICMP ping request/reply session
PACKET_V1 = "000100020001189b5e80c32c2fd41848ac110002ac11000100000000000000000000000a00000348" \
"000027c700004af100000800000001000000000000000000ac110001ac1100020000000000000000" \
"0000000a00000348000027c700004af100000000000001000000000000000000"
# Example export for v5 which contains three flows, two for ICMP ping and one multicast on interface (224.0.0.251)
PACKET_V5 = "00050003000379a35e80c58622a55ab00000000000000000ac110002ac1100010000000000000000" \
"0000000a0000034800002f4c0000527600000800000001000000000000000000ac110001ac110002" \
"00000000000000000000000a0000034800002f4c0000527600000000000001000000000000000000" \
"ac110001e00000fb000000000000000000000001000000a90000e01c0000e01c14e914e900001100" \
"0000000000000000"
PACKET_V9_TEMPLATE = "0009000a000000035c9f55980000000100000000000000400400000e00080004000c000400150004" \
"001600040001000400020004000a0004000e000400070002000b00020004000100060001003c0001" \
"00050001000000400800000e001b0010001c001000150004001600040001000400020004000a0004" \
"000e000400070002000b00020004000100060001003c000100050001040001447f0000017f000001" \
"fb3c1aaafb3c18fd000190100000004b00000000000000000050942c061b04007f0000017f000001" \
"fb3c1aaafb3c18fd00000f94000000360000000000000000942c0050061f04007f0000017f000001" \
"fb3c1cfcfb3c1a9b0000d3fc0000002a000000000000000000509434061b04007f0000017f000001" \
"fb3c1cfcfb3c1a9b00000a490000001e000000000000000094340050061f04007f0000017f000001" \
"fb3bb82cfb3ba48b000002960000000300000000000000000050942a061904007f0000017f000001" \
"fb3bb82cfb3ba48b00000068000000020000000000000000942a0050061104007f0000017f000001" \
"fb3c1900fb3c18fe0000004c0000000100000000000000000035b3c9110004007f0000017f000001" \
"fb3c1900fb3c18fe0000003c000000010000000000000000b3c9003511000400"
# This packet is special. We take PACKET_V9_TEMPLATE and re-order the templates and flows.
# The first line is the header, the smaller lines the templates and the long lines the flows (limited to 80 chars)
PACKET_V9_TEMPLATE_MIXED = ("0009000a000000035c9f55980000000100000000" # header
"040001447f0000017f000001fb3c1aaafb3c18fd000190100000004b00000000000000000050942c"
"061b04007f0000017f000001fb3c1aaafb3c18fd00000f94000000360000000000000000942c0050"
"061f04007f0000017f000001fb3c1cfcfb3c1a9b0000d3fc0000002a000000000000000000509434"
"061b04007f0000017f000001fb3c1cfcfb3c1a9b00000a490000001e000000000000000094340050"
"061f04007f0000017f000001fb3bb82cfb3ba48b000002960000000300000000000000000050942a"
"061904007f0000017f000001fb3bb82cfb3ba48b00000068000000020000000000000000942a0050"
"061104007f0000017f000001fb3c1900fb3c18fe0000004c0000000100000000000000000035b3c9"
"110004007f0000017f000001fb3c1900fb3c18fe0000003c000000010000000000000000b3c90035"
"11000400" # end of flow segments
"000000400400000e00080004000c000400150004001600040001000400020004" # template 1024
"000a0004000e000400070002000b00020004000100060001003c000100050001"
"000000400800000e001b0010001c001000150004001600040001000400020004" # template 2048
"000a0004000e000400070002000b00020004000100060001003c000100050001")
# Three packets without templates, each with 12 flows
PACKETS_V9 = [
"0009000c000000035c9f55980000000200000000040001e47f0000017f000001fb3c1a17fb3c19fd"
"000001480000000200000000000000000035ea82110004007f0000017f000001fb3c1a17fb3c19fd"
"0000007a000000020000000000000000ea820035110004007f0000017f000001fb3c1a17fb3c19fd"
"000000f80000000200000000000000000035c6e2110004007f0000017f000001fb3c1a17fb3c19fd"
"0000007a000000020000000000000000c6e20035110004007f0000017f000001fb3c1a9efb3c1a9c"
"0000004c0000000100000000000000000035adc1110004007f0000017f000001fb3c1a9efb3c1a9c"
"0000003c000000010000000000000000adc10035110004007f0000017f000001fb3c1b74fb3c1b72"
"0000004c0000000100000000000000000035d0b3110004007f0000017f000001fb3c1b74fb3c1b72"
"0000003c000000010000000000000000d0b30035110004007f0000017f000001fb3c2f59fb3c1b71"
"00001a350000000a000000000000000000509436061b04007f0000017f000001fb3c2f59fb3c1b71"
"0000038a0000000a000000000000000094360050061b04007f0000017f000001fb3c913bfb3c9138"
"0000004c0000000100000000000000000035e262110004007f0000017f000001fb3c913bfb3c9138"
"0000003c000000010000000000000000e262003511000400",
"0009000c000000035c9f55980000000300000000040001e47f0000017f000001fb3ca523fb3c913b"
"0000030700000005000000000000000000509438061b04007f0000017f000001fb3ca523fb3c913b"
"000002a200000005000000000000000094380050061b04007f0000017f000001fb3f7fe1fb3dbc97"
"0002d52800000097000000000000000001bb8730061b04007f0000017f000001fb3f7fe1fb3dbc97"
"0000146c000000520000000000000000873001bb061f04007f0000017f000001fb3d066ffb3d066c"
"0000004c0000000100000000000000000035e5bd110004007f0000017f000001fb3d066ffb3d066c"
"0000003c000000010000000000000000e5bd0035110004007f0000017f000001fb3d1a61fb3d066b"
"000003060000000500000000000000000050943a061b04007f0000017f000001fb3d1a61fb3d066b"
"000002a2000000050000000000000000943a0050061b04007f0000017f000001fb3fed00fb3f002c"
"0000344000000016000000000000000001bbae50061f04007f0000017f000001fb3fed00fb3f002c"
"00000a47000000120000000000000000ae5001bb061b04007f0000017f000001fb402f17fb402a75"
"0003524c000000a5000000000000000001bbc48c061b04007f0000017f000001fb402f17fb402a75"
"000020a60000007e0000000000000000c48c01bb061f0400",
"0009000c000000035c9f55980000000400000000040001e47f0000017f000001fb3d7ba2fb3d7ba0"
"0000004c0000000100000000000000000035a399110004007f0000017f000001fb3d7ba2fb3d7ba0"
"0000003c000000010000000000000000a3990035110004007f0000017f000001fb3d8f85fb3d7b9f"
"000003070000000500000000000000000050943c061b04007f0000017f000001fb3d8f85fb3d7b9f"
"000002a2000000050000000000000000943c0050061b04007f0000017f000001fb3d9165fb3d7f6d"
"0000c97b0000002a000000000000000001bbae48061b04007f0000017f000001fb3d9165fb3d7f6d"
"000007f40000001a0000000000000000ae4801bb061b04007f0000017f000001fb3dbc96fb3dbc7e"
"0000011e0000000200000000000000000035bd4f110004007f0000017f000001fb3dbc96fb3dbc7e"
"0000008e000000020000000000000000bd4f0035110004007f0000017f000001fb3ddbb3fb3c1a18"
"0000bfee0000002f00000000000000000050ae56061b04007f0000017f000001fb3ddbb3fb3c1a18"
"00000982000000270000000000000000ae560050061b04007f0000017f000001fb3ddbb3fb3c1a18"
"0000130e0000001200000000000000000050e820061b04007f0000017f000001fb3ddbb3fb3c1a18"
"0000059c000000140000000000000000e8200050061b0400",
]
PACKET_V9_WITH_ZEROS = (
"000900057b72e830620b717d78cf34e30102000001040048000000000000006e0000000101000000"
"000a20076a06065c0800000d6b15c80000000b7b72e4487b72e448080000000000000438bf6401c7"
"65ad1e0d6b15c8000000000001040048000000000000006700000001110000c951ac180b0306065c"
"080035010000010000000b7b72e4487b72e448000000000000000443177b01c765ada501000001c3"
"9c00350001040048000000000000004a000000010100000000ac19bc3206065c080000287048cd00"
"00000b7b72e8307b72e83008000000000000048f071a01c765ae42287048cd000000000001040048"
"000000000000004600000001060002cbef0a30681f06065c0801bb142a49180000000b7b72e8307b"
"72e8300000000000000004801c7801c765ae1d142a49185a2b01bb00010400480000000000000046"
"00000001060002fe800a2f601206065c0801bb142a49180000000b7b72e8307b72e8300000000000"
"0000040806b001c765ae28142a4918d4b501bb000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
)
# Example export for IPFIX (v10) with 4 templates, 1 option template and 8 data flow sets
PACKET_IPFIX_TEMPLATE = "000a05202d45a4700000001300000000000200400400000e00080004000c00040016000400150004" \
"0001000400020004000a0004000e000400070002000b00020004000100060001003c000100050001" \
"000200340401000b00080004000c000400160004001500040001000400020004000a0004000e0004" \
"00200002003c000100050001000200400800000e001b0010001c0010001600040015000400010004" \
"00020004000a0004000e000400070002000b00020004000100060001003c00010005000100020034" \
"0801000b001b0010001c001000160004001500040001000400020004000a0004000e0004008b0002" \
"003c0001000500010003001e010000050001008f000400a000080131000401320004013000020100" \
"001a00000a5900000171352e672100000001000000000001040000547f000001ac110002ff7ed688" \
"ff7ed73a000015c70000000d000000000000000001bbe1a6061b0400ac1100027f000001ff7ed688" \
"ff7ed73a0000074f000000130000000000000000e1a601bb061f04000401004cac110002ac110001" \
"ff7db9e0ff7dc1d0000000fc00000003000000000000000008000400ac110001ac110002ff7db9e0" \
"ff7dc1d0000000fc0000000300000000000000000000040008010220fde66f14e0f1960900000242" \
"ac110002ff0200000000000000000001ff110001ff7dfad6ff7e0e95000001b00000000600000000" \
"0000000087000600fde66f14e0f196090000affeaffeaffefdabcdef123456789000000000000001" \
"ff7e567fff7e664a0000020800000005000000000000000080000600fde66f14e0f1960900000000" \
"00000001fde66f14e0f196090000affeaffeaffeff7e567fff7e664a000002080000000500000000" \
"0000000081000600fe800000000000000042aafffe73bbfafde66f14e0f196090000affeaffeaffe" \
"ff7e6aaaff7e6aaa0000004800000001000000000000000087000600fde66f14e0f1960900000242" \
"ac110002fe800000000000000042aafffe73bbfaff7e6aaaff7e6aaa000000400000000100000000" \
"0000000088000600fe800000000000000042acfffe110002fe800000000000000042aafffe73bbfa" \
"ff7e7eaaff7e7eaa0000004800000001000000000000000087000600fe800000000000000042aaff" \
"fe73bbfafe800000000000000042acfffe110002ff7e7eaaff7e7eaa000000400000000100000000" \
"0000000088000600fe800000000000000042aafffe73bbfafe800000000000000042acfffe110002" \
"ff7e92aaff7e92aa0000004800000001000000000000000087000600fe800000000000000042acff" \
"fe110002fe800000000000000042aafffe73bbfaff7e92aaff7e92aa000000400000000100000000" \
"000000008800060008000044fde66f14e0f196090000affeaffeaffefd41b7143f86000000000000" \
"00000001ff7ec2a0ff7ec2a00000004a000000010000000000000000d20100351100060004000054" \
"ac1100027f000001ff7ed62eff7ed68700000036000000010000000000000000c496003511000400" \
"7f000001ac110002ff7ed62eff7ed687000000760000000100000000000000000035c49611000400" \
"08000044fde66f14e0f196090000affeaffeaffefd41b7143f8600000000000000000001ff7ef359" \
"ff7ef3590000004a000000010000000000000000b1e700351100060004000054ac1100027f000001" \
"ff7f06e4ff7f06e800000036000000010000000000000000a8f90035110004007f000001ac110002" \
"ff7f06e4ff7f06e8000000a60000000100000000000000000035a8f911000400"
# Example export for IPFIX with two data sets
PACKET_IPFIX = "000a00d02d45a47000000016000000000801007cfe800000000000000042acfffe110002fde66f14" \
"e0f196090000000000000001ff7f0755ff7f07550000004800000001000000000000000087000600" \
"fdabcdef123456789000000000000001fe800000000000000042acfffe110002ff7f0755ff7f0755" \
"000000400000000100000000000000008800060008000044fde66f14e0f196090000affeaffeaffe" \
"2a044e42020000000000000000000223ff7f06e9ff7f22d500000140000000040000000000000000" \
"e54c01bb06020600"
PACKET_IPFIX_TEMPLATE_ETHER = "000a05002d45a4700000000d00000000" \
"000200500400001200080004000c000400160004001500040001000400020004000a0004000e0004" \
"00070002000b00020004000100060001003c000100050001003a0002003b00020038000600390006" \
"000200440401000f00080004000c000400160004001500040001000400020004000a0004000e0004" \
"00200002003c000100050001003a0002003b000200380006003900060002005008000012001b0010" \
"001c001000160004001500040001000400020004000a0004000e000400070002000b000200040001" \
"00060001003c000100050001003a0002003b00020038000600390006000200440801000f001b0010" \
"001c001000160004001500040001000400020004000a0004000e0004008b0002003c000100050001" \
"003a0002003b000200380006003900060003001e010000050001008f000400a00008013100040132" \
"0004013000020100001a00000009000000b0d80a558000000001000000000001040000747f000001" \
"ac110002e58b988be58b993e000015c70000000d000000000000000001bbe1a6061b040000000000" \
"123456affefeaffeaffeaffeac1100027f000001e58b988be58b993e0000074f0000001300000000" \
"00000000e1a601bb061f040000000000affeaffeaffe123456affefe0401006cac110002ac110001" \
"e58a7be3e58a83d3000000fc0000000300000000000000000800040000000000affeaffeaffe0242" \
"aa73bbfaac110001ac110002e58a7be3e58a83d3000000fc00000003000000000000000000000400" \
"00000000123456affefeaffeaffeaffe080102b0fde66f14e0f196090000affeaffeaffeff020000" \
"0000000000000001ff110001e58abcd9e58ad098000001b000000006000000000000000087000600" \
"00000000affeaffeaffe3333ff110001fde66f14e0f196090000affeaffeaffefde66f14e0f19609" \
"0000000000000001e58b1883e58b284e000002080000000500000000000000008000060000000000" \
"affeaffeaffe123456affefefdabcdef123456789000000000000001fde66f14e0f1960900000242" \
"ac110002e58b1883e58b284e0000020800000005000000000000000081000600000000000242aa73" \
"bbfaaffeaffeaffefe800000000000000042aafffe73bbfafde66f14e0f196090000affeaffeaffe" \
"e58b2caee58b2cae000000480000000100000000000000008700060000000000123456affefe0242" \
"ac110002fde66f14e0f196090000affeaffeaffefe800000000000000042aafffe73bbfae58b2cae" \
"e58b2cae000000400000000100000000000000008800060000000000affeaffeaffe123456affefe" \
"fe800000000000000042acfffe110002fe800000000000000042aafffe73bbfae58b40aee58b40ae" \
"000000480000000100000000000000008700060000000000affeaffeaffe123456affefefe800000" \
"000000000042aafffe73bbfafe800000000000000042acfffe110002e58b40aee58b40ae00000040" \
"0000000100000000000000008800060000000000123456affefeaffeaffeaffefe80000000000000" \
"0042aafffe73bbfafe800000000000000042acfffe110002e58b54aee58b54ae0000004800000001" \
"00000000000000008700060000000000123456affefeaffeaffeaffefe800000000000000042acff" \
"fe110002fe800000000000000042aafffe73bbfae58b54aee58b54ae000000400000000100000000" \
"000000008800060000000000affeaffeaffe123456affefe"
PACKET_IPFIX_ETHER = "000a02905e8b0aa90000001600000000" \
"08000054fde66f14e0f196090000affeaffeaffefd40abcdabcd00000000000000011111e58b84a4" \
"e58b84a40000004a000000010000000000000000d20100351100060000000000affeaffeaffe0242" \
"aa73bbfa04000074ac1100027f000001e58b9831e58b988a00000036000000010000000000000000" \
"c49600351100040000000000affeaffeaffe123456affefe7f000001ac110002e58b9831e58b988a" \
"000000760000000100000000000000000035c4961100040000000000123456affefeaffeaffeaffe" \
"08000054fde66f14e0f196090000affeaffeaffefd40abcdabcd00000000000000011111e58bb55c" \
"e58bb55c0000004a000000010000000000000000b1e700351100060000000000affeaffeaffe0242" \
"aa73bbfa04000074ac1100027f000001e58bc8e8e58bc8ec00000036000000010000000000000000" \
"a8f900351100040000000000affeaffeaffe123456affefe7f000001ac110002e58bc8e8e58bc8ec" \
"000000a60000000100000000000000000035a8f91100040000000000123456affefeaffeaffeaffe" \
"0801009cfe800000000000000042acfffe110002fdabcdef123456789000000000000001e58bc958" \
"e58bc958000000480000000100000000000000008700060000000000affeaffeaffe123456affefe" \
"fdabcdef123456789000000000000001fe800000000000000042acfffe110002e58bc958e58bc958" \
"000000400000000100000000000000008800060000000000123456affefeaffeaffeaffe08000054" \
"fde66f14e0f196090000affeaffeaffe2a044e42020000000000000000000223e58bc8ede58be4d8" \
"00000140000000040000000000000000e54c01bb0602060000000000affeaffeaffe123456affefe"
PACKET_IPFIX_PADDING = "000a01c064e0b1900000000200000000000200480400001000080004000c00040016000400150004" \
"0001000400020004000a0004000e0004003d00010088000100070002000b00020004000100060001" \
"003c000100050001000200400401000e00080004000c000400160004001500040001000400020004" \
"000a0004000e0004003d0001008800010020000200040001003c0001000500010002004808000010" \
"001b0010001c001000160004001500040001000400020004000a0004000e0004003d000100880001" \
"00070002000b00020004000100060001003c000100050001000200400801000e001b0010001c0010" \
"00160004001500040001000400020004000a0004000e0004003d000100880001008b000200040001" \
"003c00010005000100030022010000060001008f000400a000080131000401320004013000020052" \
"0010040100547f0000017f000001ffff07d0ffff0ff7000000fc0000000300000000000000000001" \
"08000104007f0000017f000001ffff07d0ffff0ff7000000fc000000030000000000000000000100" \
"0001040000000100002a0000b2da0000018a0db59d2e000000010000000000017465737463617074" \
"7572655f73696e67"

View file

@ -1,66 +0,0 @@
#!/usr/bin/env python3
"""
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
import gzip
import json
import subprocess
import sys
import unittest
from tests.lib import *
from tests.lib import PACKET_V9_TEMPLATE, PACKETS_V9
class TestFlowExportAnalyzer(unittest.TestCase):
def test_analyzer(self):
"""Test the analyzer by producing some packets, parsing them and then calling the analyzer
in a subprocess, piping in a created gzip JSON collection (as if it is coming from a file).
"""
# First create and parse some packets, which should get exported
pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE, *PACKETS_V9])
# Now the pkts must be transformed from their data structure to the "gzipped JSON representation",
# which the collector uses for persistant storage.
data_dicts = [] # list holding all entries
for p in pkts: # each pkt has its own entry with timestamp as key
data_dicts.append({p.ts: {
"client": p.client,
"header": p.export.header.to_dict(),
"flows": [f.data for f in p.export.flows]
}})
data = "\n".join([json.dumps(dd) for dd in data_dicts]) # join all entries together by newlines
# Different stdout/stderr arguments for backwards compatibility
pipe_output_param = {"capture_output": True}
if sys.version_info < (3, 7): # capture_output was added in Python 3.7
pipe_output_param = {
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE
}
# Analyzer takes gzipped input either via stdin or from a file (here: stdin)
gzipped_input = gzip.compress(data.encode()) # encode to unicode
# Run analyzer as CLI script with no packets ignored (parameter)
analyzer = subprocess.run(
[sys.executable, '-m', 'netflow.analyzer', '-p', '0'],
input=gzipped_input,
**pipe_output_param
)
# If stderr has content, print it
# make sure there are no errors
self.assertEqual(analyzer.stderr, b"", analyzer.stderr.decode())
# Every 2 flows are written as a single line (any extras are dropped)
num_flows = sum(len(list(item.values())[0]["flows"]) for item in data_dicts)
self.assertEqual(len(analyzer.stdout.splitlines()) - 2, num_flows // 2) # ignore two header lines
if __name__ == '__main__':
unittest.main()

View file

@ -1,116 +0,0 @@
#!/usr/bin/env python3
"""
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
# TODO: tests with 500 packets fail with delay=0. Probably a problem with UDP sockets buffer
# TODO: add test for template withdrawal
import ipaddress
import unittest
from tests.lib import send_recv_packets, PACKET_IPFIX_TEMPLATE, PACKET_IPFIX, PACKET_IPFIX_ETHER, \
PACKET_IPFIX_TEMPLATE_ETHER, PACKET_IPFIX_PADDING
class TestFlowExportIPFIX(unittest.TestCase):
def test_recv_ipfix_packet(self):
"""
Test general sending of raw and receiving and parsing of these packets.
If this test runs successfully, the sender thread has sent a raw bytes packet towards a locally
listening collector thread, and the collector has successfully received and parsed the packets.
:return:
"""
# send packet without any template, must fail to parse (packets are queued)
pkts, _, _ = send_recv_packets([PACKET_IPFIX])
self.assertEqual(len(pkts), 0) # no export is parsed due to missing template
# send packet with 5 templates and 20 flows, should parse correctly since the templates are known
pkts, _, _ = send_recv_packets([PACKET_IPFIX_TEMPLATE])
self.assertEqual(len(pkts), 1)
p = pkts[0]
self.assertEqual(p.client[0], "127.0.0.1")
self.assertEqual(len(p.export.flows), 1 + 2 + 2 + 9 + 1 + 2 + 1 + 2) # count flows
self.assertEqual(len(p.export.templates), 4 + 1) # count new templates
# send template and multiple export packets
pkts, _, _ = send_recv_packets([PACKET_IPFIX, PACKET_IPFIX_TEMPLATE, PACKET_IPFIX])
self.assertEqual(len(pkts), 3)
self.assertEqual(pkts[0].export.header.version, 10)
# check amount of flows across all packets
total_flows = 0
for packet in pkts:
total_flows += len(packet.export.flows)
self.assertEqual(total_flows, 2 + 1 + (1 + 2 + 2 + 9 + 1 + 2 + 1 + 2) + 2 + 1)
def test_ipfix_contents(self):
"""
Inspect content of exported flows, eg. test the value of an option flow and the correct
parsing of IPv4 and IPv6 addresses.
:return:
"""
p = send_recv_packets([PACKET_IPFIX_TEMPLATE])[0][0]
flow = p.export.flows[0]
self.assertEqual(flow.meteringProcessId, 2649)
self.assertEqual(flow.selectorAlgorithm, 1)
self.assertEqual(flow.systemInitTimeMilliseconds, 1585735165729)
flow = p.export.flows[1] # HTTPS flow from web server to client
self.assertEqual(flow.destinationIPv4Address, 2886795266)
self.assertEqual(ipaddress.ip_address(flow.destinationIPv4Address),
ipaddress.ip_address("172.17.0.2"))
self.assertEqual(flow.protocolIdentifier, 6) # TCP
self.assertEqual(flow.sourceTransportPort, 443)
self.assertEqual(flow.destinationTransportPort, 57766)
self.assertEqual(flow.tcpControlBits, 0x1b)
flow = p.export.flows[17] # IPv6 flow
self.assertEqual(flow.protocolIdentifier, 17) # UDP
self.assertEqual(flow.sourceIPv6Address, 0xfde66f14e0f196090000affeaffeaffe)
self.assertEqual(ipaddress.ip_address(flow.sourceIPv6Address), # Docker ULA
ipaddress.ip_address("fde6:6f14:e0f1:9609:0:affe:affe:affe"))
def test_ipfix_contents_ether(self):
"""
IPFIX content tests based on exports with the softflowd "-T ether" flag, meaning that layer 2
is included in the export, like MAC addresses.
:return:
"""
pkts, _, _ = send_recv_packets([PACKET_IPFIX_TEMPLATE_ETHER, PACKET_IPFIX_ETHER])
self.assertEqual(len(pkts), 2)
p = pkts[0]
# Inspect contents of specific flows
flow = p.export.flows[0]
self.assertEqual(flow.meteringProcessId, 9)
self.assertEqual(flow.selectorAlgorithm, 1)
self.assertEqual(flow.systemInitTimeMilliseconds, 759538800000)
flow = p.export.flows[1]
self.assertEqual(flow.destinationIPv4Address, 2886795266)
self.assertTrue(hasattr(flow, "sourceMacAddress"))
self.assertTrue(hasattr(flow, "postDestinationMacAddress"))
self.assertEqual(flow.sourceMacAddress, 0x123456affefe)
self.assertEqual(flow.postDestinationMacAddress, 0xaffeaffeaffe)
def test_ipfix_padding(self):
"""
Checks successful parsing of export packets that contain padding zeroes in an IPFIX set.
The padding in the example data is in between the last two data sets, so the successful parsing of the last
data set indicates correct handling of padding zero bytes.
"""
pkts, _, _ = send_recv_packets([PACKET_IPFIX_PADDING])
self.assertEqual(len(pkts), 1)
p = pkts[0]
# Check for length of whole export
self.assertEqual(p.export.header.length, 448)
# Check a specific value of the last flow in the export. Success means correct handling of padding in the set
self.assertEqual(p.export.flows[-1].meteringProcessId, 45786)

View file

@ -1,157 +0,0 @@
#!/usr/bin/env python3
"""
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
# TODO: tests with 500 packets fail with delay=0. Probably a problem with UDP sockets buffer
import ipaddress
import random
import unittest
from tests.lib import send_recv_packets, NUM_PACKETS, \
PACKET_INVALID, PACKET_V1, PACKET_V5, PACKET_V9_WITH_ZEROS, \
PACKET_V9_TEMPLATE, PACKET_V9_TEMPLATE_MIXED, PACKETS_V9
class TestFlowExportNetflow(unittest.TestCase):
def _test_recv_all_packets(self, num, template_idx, delay=0.0001):
"""Fling packets at the server and test that it receives them all"""
def gen_pkts(n, idx):
for x in range(n):
if x == idx:
yield PACKET_V9_TEMPLATE
else:
yield random.choice(PACKETS_V9)
pkts, tstart, tend = send_recv_packets(gen_pkts(num, template_idx), delay=delay)
# check number of packets
self.assertEqual(len(pkts), num)
# check timestamps are when packets were sent, not processed
self.assertTrue(all(tstart < p.ts < tend for p in pkts))
# check number of "things" in the packets (flows + templates)
# template packet = 10 things
# other packets = 12 things
self.assertEqual(sum(p.export.header.count for p in pkts), (num - 1) * 12 + 10)
# check number of flows in the packets
# template packet = 8 flows (2 templates)
# other packets = 12 flows
self.assertEqual(sum(len(p.export.flows) for p in pkts), (num - 1) * 12 + 8)
def test_recv_all_packets_template_first(self):
"""Test all packets are received when the template is sent first"""
self._test_recv_all_packets(NUM_PACKETS, 0)
def test_recv_all_packets_template_middle(self):
"""Test all packets are received when the template is sent in the middle"""
self._test_recv_all_packets(NUM_PACKETS, NUM_PACKETS // 2)
def test_recv_all_packets_template_last(self):
"""Test all packets are received when the template is sent last"""
self._test_recv_all_packets(NUM_PACKETS, NUM_PACKETS - 1)
def test_recv_all_packets_slowly(self):
"""Test all packets are received when things are sent slooooowwwwwwwwlllllllyyyyyy"""
self._test_recv_all_packets(3, 0, delay=1)
def test_ignore_invalid_packets(self):
"""Test that invalid packets log a warning but are otherwise ignored"""
with self.assertLogs(level='WARNING'):
pkts, _, _ = send_recv_packets([
PACKET_INVALID, PACKET_V9_TEMPLATE, random.choice(PACKETS_V9), PACKET_INVALID,
random.choice(PACKETS_V9), PACKET_INVALID
])
self.assertEqual(len(pkts), 3)
def test_recv_v1_packet(self):
"""Test NetFlow v1 packet parsing"""
pkts, _, _ = send_recv_packets([PACKET_V1])
self.assertEqual(len(pkts), 1)
# Take the parsed packet and check meta data
p = pkts[0]
self.assertEqual(p.client[0], "127.0.0.1") # collector listens locally
self.assertEqual(len(p.export.flows), 2) # ping request and reply
self.assertEqual(p.export.header.count, 2) # same value, in header
self.assertEqual(p.export.header.version, 1)
# Check specific IP address contained in a flow.
# Since it might vary which flow of the pair is epxorted first, check both
flow = p.export.flows[0]
self.assertIn(
ipaddress.ip_address(flow.IPV4_SRC_ADDR), # convert to ipaddress obj because value is int
[ipaddress.ip_address("172.17.0.1"), ipaddress.ip_address("172.17.0.2")]
)
self.assertEqual(flow.PROTO, 1) # ICMP
def test_recv_v5_packet(self):
"""Test NetFlow v5 packet parsing"""
pkts, _, _ = send_recv_packets([PACKET_V5])
self.assertEqual(len(pkts), 1)
p = pkts[0]
self.assertEqual(p.client[0], "127.0.0.1")
self.assertEqual(len(p.export.flows), 3) # ping request and reply, one multicast
self.assertEqual(p.export.header.count, 3)
self.assertEqual(p.export.header.version, 5)
# Check specific IP address contained in a flow.
# Since it might vary which flow of the pair is epxorted first, check both
flow = p.export.flows[0]
self.assertIn(
ipaddress.ip_address(flow.IPV4_SRC_ADDR), # convert to ipaddress obj because value is int
[ipaddress.ip_address("172.17.0.1"), ipaddress.ip_address("172.17.0.2")] # matches multicast packet too
)
self.assertEqual(flow.PROTO, 1) # ICMP
def test_recv_v9_packet(self):
"""Test NetFlow v9 packet parsing"""
# send packet without any template, must fail to parse (packets are queued)
pkts, _, _ = send_recv_packets([PACKETS_V9[0]])
self.assertEqual(len(pkts), 0) # no export is parsed due to missing template
# send an invalid packet with zero bytes, must fail to parse
pkts, _, _ = send_recv_packets([PACKET_V9_WITH_ZEROS])
self.assertEqual(len(pkts), 0) # no export is parsed due to missing template
# send packet with two templates and eight flows, should parse correctly since the templates are known
pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE])
self.assertEqual(len(pkts), 1)
# and again, but with the templates at the end in the packet
pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE_MIXED])
self.assertEqual(len(pkts), 1)
p = pkts[0]
self.assertEqual(p.client[0], "127.0.0.1")
self.assertEqual(len(p.export.flows), 8) # count flows
self.assertEqual(len(p.export.templates), 2) # count new templates
# Inspect contents of specific flows
flow = p.export.flows[0]
self.assertEqual(flow.PROTOCOL, 6) # TCP
self.assertEqual(flow.L4_SRC_PORT, 80)
self.assertEqual(flow.IPV4_SRC_ADDR, "127.0.0.1")
flow = p.export.flows[-1] # last flow
self.assertEqual(flow.PROTOCOL, 17) # UDP
self.assertEqual(flow.L4_DST_PORT, 53)
# send template and multiple export packets
pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE, *PACKETS_V9])
self.assertEqual(len(pkts), 4)
self.assertEqual(pkts[0].export.header.version, 9)
# check amount of flows across all packets
total_flows = 0
for packet in pkts:
total_flows += len(packet.export.flows)
self.assertEqual(total_flows, 8 + 12 + 12 + 12)

View file

@ -1,188 +0,0 @@
#!/usr/bin/env python3
"""
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
import cProfile
import io
import linecache
import pstats
import tracemalloc
import unittest
from tests.lib import send_recv_packets, generate_packets
NUM_PACKETS_PERFORMANCE = 2000
@unittest.skip("Not necessary in functional tests, used as analysis tool")
class TestNetflowIPFIXPerformance(unittest.TestCase):
def setUp(self) -> None:
"""
Before each test run, start tracemalloc profiling.
:return:
"""
tracemalloc.start()
print("\n\n")
def tearDown(self) -> None:
"""
After each test run, stop tracemalloc.
:return:
"""
tracemalloc.stop()
def _memory_of_version(self, version, store_packets=500) -> tracemalloc.Snapshot:
"""
Create memory snapshot of collector run with packets of version :version:
:param version:
:return:
"""
if not tracemalloc.is_tracing():
raise RuntimeError
pkts, t1, t2 = send_recv_packets(generate_packets(NUM_PACKETS_PERFORMANCE, version),
store_packets=store_packets)
self.assertEqual(len(pkts), NUM_PACKETS_PERFORMANCE)
snapshot = tracemalloc.take_snapshot()
del pkts
return snapshot
@staticmethod
def _print_memory_statistics(snapshot: tracemalloc.Snapshot, key: str, topx: int = 10):
"""
Print memory statistics from a tracemalloc.Snapshot in certain formats.
:param snapshot:
:param key:
:param topx:
:return:
"""
if key not in ["filename", "lineno", "traceback"]:
raise KeyError
stats = snapshot.statistics(key)
if key == "lineno":
for idx, stat in enumerate(stats[:topx]):
frame = stat.traceback[0]
print("\n{idx:02d}: {filename}:{lineno} {size:.1f} KiB, count {count}".format(
idx=idx + 1, filename=frame.filename, lineno=frame.lineno, size=stat.size / 1024, count=stat.count
))
lines = []
lines_whitespaces = []
for lineshift in range(-3, 2):
stat = linecache.getline(frame.filename, frame.lineno + lineshift)
lines_whitespaces.append(len(stat) - len(stat.lstrip(" "))) # count
lines.append(stat.strip())
lines_whitespaces = [x - min([y for y in lines_whitespaces if y > 0]) for x in lines_whitespaces]
for lidx, stat in enumerate(lines):
print(" {}{}".format("> " if lidx == 3 else "| ", " " * lines_whitespaces.pop(0) + stat))
elif key == "filename":
for idx, stat in enumerate(stats[:topx]):
frame = stat.traceback[0]
print("{idx:02d}: {filename:80s} {size:6.1f} KiB, count {count:5<d}".format(
idx=idx + 1, filename=frame.filename, size=stat.size / 1024, count=stat.count
))
def test_compare_memory(self):
"""
Test memory usage of two collector runs with IPFIX and NetFlow v9 packets respectively.
Then compare the two memory snapshots to make sure the libraries do not cross each other.
TODO: more features could be tested, e.g. too big of a difference if one version is optimized better
:return:
"""
pkts, t1, t2 = send_recv_packets(generate_packets(NUM_PACKETS_PERFORMANCE, 10))
self.assertEqual(len(pkts), NUM_PACKETS_PERFORMANCE)
snapshot_ipfix = tracemalloc.take_snapshot()
del pkts
tracemalloc.clear_traces()
pkts, t1, t2 = send_recv_packets(generate_packets(NUM_PACKETS_PERFORMANCE, 9))
self.assertEqual(len(pkts), NUM_PACKETS_PERFORMANCE)
snapshot_v9 = tracemalloc.take_snapshot()
del pkts
stats = snapshot_v9.compare_to(snapshot_ipfix, "lineno")
for stat in stats:
if stat.traceback[0].filename.endswith("netflow/ipfix.py"):
self.assertEqual(stat.count, 0)
self.assertEqual(stat.size, 0)
stats = snapshot_ipfix.compare_to(snapshot_v9, "lineno")
for stat in stats:
if stat.traceback[0].filename.endswith("netflow/v9.py"):
self.assertEqual(stat.count, 0)
self.assertEqual(stat.size, 0)
def test_memory_ipfix(self):
"""
Test memory usage of the collector with IPFIX packets.
Three iterations are done with different amounts of packets to be stored.
With this approach, increased usage of memory can be captured when the ExportPacket objects are not deleted.
:return:
"""
for store_pkts in [0, 500, -1]: # -1 is compatibility value for "store all"
snapshot_ipfix = self._memory_of_version(10, store_packets=store_pkts)
# TODO: this seems misleading, maybe reads the memory of the whole testing process?
# system_memory = pathlib.Path("/proc/self/statm").read_text()
# pagesize = resource.getpagesize()
# print("Total RSS memory used: {:.1f} KiB".format(int(system_memory.split()[1]) * pagesize // 1024.))
print("\nIPFIX memory usage with {} packets being stored".format(store_pkts))
self._print_memory_statistics(snapshot_ipfix, "filename")
if store_pkts == -1:
# very verbose and most interesting in iteration with all ExportPackets being stored in memory
print("\nTraceback for run with all packets being stored in memory")
self._print_memory_statistics(snapshot_ipfix, "lineno")
def test_memory_v1(self):
"""
Test memory with NetFlow v1
:return:
"""
snapshot_v1 = self._memory_of_version(1)
print("\nNetFlow v1 memory usage by file")
self._print_memory_statistics(snapshot_v1, "filename")
def test_memory_v5(self):
"""
Test memory with NetFlow v5
:return:
"""
snapshot_v5 = self._memory_of_version(5)
print("\nNetFlow v5 memory usage by file")
self._print_memory_statistics(snapshot_v5, "filename")
def test_memory_v9(self):
"""
Test memory usage of the collector with NetFlow v9 packets.
:return:
"""
snapshot_v9 = self._memory_of_version(9)
print("\nNetFlow v9 memory usage by file")
self._print_memory_statistics(snapshot_v9, "filename")
print("\nNetFlow v9 memory usage by line")
self._print_memory_statistics(snapshot_v9, "lineno")
@unittest.skip("Does not work as expected due to threading")
def test_time_ipfix(self):
"""
Profile function calls and CPU time.
TODO: this does not work with threading in the collector, yet
:return:
"""
profile = cProfile.Profile()
profile.enable(subcalls=True, builtins=True)
pkts, t1, t2 = send_recv_packets(generate_packets(NUM_PACKETS_PERFORMANCE, 10), delay=0, store_packets=500)
self.assertEqual(len(pkts), NUM_PACKETS_PERFORMANCE)
profile.disable()
for sort_by in ['cumulative', 'calls']:
s = io.StringIO()
ps = pstats.Stats(profile, stream=s)
ps.sort_stats(sort_by).print_stats("netflow")
ps.sort_stats(sort_by).print_callees(.5)
print(s.getvalue())