Compare commits
No commits in common. "debian/latest" and "pristine-tar" have entirely different histories.
debian/lat
...
pristine-t
28
.github/workflows/run_tests.yml
vendored
28
.github/workflows/run_tests.yml
vendored
|
@ -1,28 +0,0 @@
|
||||||
name: Run Python unit tests
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches: [ master, release ]
|
|
||||||
pull_request:
|
|
||||||
workflow_dispatch:
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
test-netflow:
|
|
||||||
runs-on: ubuntu-20.04
|
|
||||||
strategy:
|
|
||||||
matrix:
|
|
||||||
python:
|
|
||||||
- "3.5.3" # Debian Stretch
|
|
||||||
- "3.7.3" # Debian Buster
|
|
||||||
- "3.9.2" # Debian Bullseye
|
|
||||||
- "3.11" # Debian Bookworm uses 3.11.1, but it's in a newer pyenv release
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v3
|
|
||||||
|
|
||||||
- name: Set up Python with pyenv
|
|
||||||
uses: gabrielfalcao/pyenv-action@v11
|
|
||||||
with:
|
|
||||||
default: "${{ matrix.python }}"
|
|
||||||
|
|
||||||
- name: Run Python unittests
|
|
||||||
run: python3 -m unittest
|
|
8
.gitignore
vendored
8
.gitignore
vendored
|
@ -1,8 +0,0 @@
|
||||||
.*egg-info.*
|
|
||||||
build*
|
|
||||||
dist*
|
|
||||||
.*python_netflow_v9_softflowd.egg-info/
|
|
||||||
*.swp
|
|
||||||
*.swo
|
|
||||||
__pycache__
|
|
||||||
*.json
|
|
21
LICENSE
21
LICENSE
|
@ -1,21 +0,0 @@
|
||||||
MIT License
|
|
||||||
|
|
||||||
Copyright (c) 2016-2020 Dominik Pataky <dev@bitkeks.eu>
|
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
|
||||||
in the Software without restriction, including without limitation the rights
|
|
||||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
copies of the Software, and to permit persons to whom the Software is
|
|
||||||
furnished to do so, subject to the following conditions:
|
|
||||||
|
|
||||||
The above copyright notice and this permission notice shall be included in all
|
|
||||||
copies or substantial portions of the Software.
|
|
||||||
|
|
||||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
SOFTWARE.
|
|
89
README.md
89
README.md
|
@ -1,89 +0,0 @@
|
||||||
# Python NetFlow/IPFIX library
|
|
||||||
This package contains libraries and tools for **NetFlow versions 1, 5 and 9, and IPFIX**. It is available [on PyPI as "netflow"](https://pypi.org/project/netflow/).
|
|
||||||
|
|
||||||
Version 9 is the first NetFlow version using templates. Templates make dynamically sized and configured NetFlow data flowsets possible, which makes the collector's job harder. The library provides the `netflow.parse_packet()` function as the main API point (see below). By importing `netflow.v1`, `netflow.v5` or `netflow.v9` you have direct access to the respective parsing objects, but at the beginning you probably will have more success by running the reference collector (example below) and look into its code. IPFIX (IP Flow Information Export) is based on NetFlow v9 and standardized by the IETF. All related classes are contained in `netflow.ipfix`.
|
|
||||||
|
|
||||||
![Data flow diagram](nf-workflow.png)
|
|
||||||
|
|
||||||
Copyright 2016-2023 Dominik Pataky <software+pynetflow@dpataky.eu>
|
|
||||||
|
|
||||||
Licensed under MIT License. See LICENSE.
|
|
||||||
|
|
||||||
|
|
||||||
## Using the library
|
|
||||||
If you chose to use the classes provided by this library directly, here's an example for a NetFlow v5 export packet:
|
|
||||||
|
|
||||||
1. Create a collector which listens for exported packets on some UDP port. It should then receive UDP packets from exporters.
|
|
||||||
2. Inside the UDP packets, the NetFlow payload is contained. For NetFlow v5 it should begin with bytes `0005` for example.
|
|
||||||
3. Call the `netflow.parse_packet()` function with the payload as first argument (takes string, bytes string and hex'd bytes).
|
|
||||||
|
|
||||||
Example UDP collector server (receiving exports on port 2055):
|
|
||||||
```python
|
|
||||||
import netflow
|
|
||||||
import socket
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
sock.bind(("0.0.0.0", 2055))
|
|
||||||
payload, client = sock.recvfrom(4096) # experimental, tested with 1464 bytes
|
|
||||||
p = netflow.parse_packet(payload) # Test result: <ExportPacket v5 with 30 records>
|
|
||||||
print(p.header.version) # Test result: 5
|
|
||||||
```
|
|
||||||
|
|
||||||
Or from hex dump:
|
|
||||||
```python
|
|
||||||
import netflow
|
|
||||||
p = netflow.parse_packet("00050003000379a35e80c58622a...") # see test_netflow.py
|
|
||||||
assert p.header.version == 5 # NetFlow v5 packet
|
|
||||||
assert p.flows[0].PROTO == 1 # ICMP flow
|
|
||||||
```
|
|
||||||
|
|
||||||
In NetFlow v9 and IPFIX, templates are used instead of a fixed set of fields (like `PROTO`). See `collector.py` on how to handle these. You **must** store received templates in between exports and pass them to the parser when new packets arrive. Not storing the templates will always result in parsing failures.
|
|
||||||
|
|
||||||
## Using the collector and analyzer
|
|
||||||
Since v0.9.0 the `netflow` library also includes reference implementations of a collector and an analyzer as CLI tools.
|
|
||||||
These can be used on the CLI with `python3 -m netflow.collector` and `python3 -m netflow.analyzer`. Use the `-h` flag to receive the respective help output with all provided CLI flags.
|
|
||||||
|
|
||||||
Example: to start the collector run `python3 -m netflow.collector -p 9000 -D`. This will start a collector instance at port 9000 in debug mode. Point your flow exporter to this port on your host and after some time the first ExportPackets should appear (the flows need to expire first). After you collected some data, the collector exports them into GZIP files, simply named `<timestamp>.gz` (or the filename you specified with `--file`/`-o`).
|
|
||||||
|
|
||||||
To analyze the saved traffic, run `python3 -m netflow.analyzer -f <gzip file>`. The output will look similar to the following snippet, with resolved hostnames and services, transferred bytes and connection duration:
|
|
||||||
|
|
||||||
2017-10-28 23:17.01: SSH | 4.25M | 15:27 min | local-2 (<IPv4>) to local-1 (<IPv4>)
|
|
||||||
2017-10-28 23:17.01: SSH | 4.29M | 16:22 min | remote-1 (<IPv4>) to local-2 (<IPv4>)
|
|
||||||
2017-10-28 23:19.01: HTTP | 22.79M | 47:32 min | uwstream3.somafm.com (173...) to local-1 (<IPv4>)
|
|
||||||
2017-10-28 23:22.01: HTTPS | 1.21M | 3 sec | fra16s12-in-x0e.1e100.net (2a00:..) to local-1 (<IPv6>)
|
|
||||||
2017-10-28 23:23.01: SSH | 93.79M | 21 sec | remote-1 (<IPv4>) to local-2 (<IPv4>)
|
|
||||||
2017-10-28 23:51.01: SSH | 14.08M | 1:23.09 hours | remote-1 (<IPv4>) to local-2 (<IPv4>)
|
|
||||||
|
|
||||||
**Please note that the collector and analyzer are experimental reference implementations. Do not rely on them in production monitoring use cases!** In any case I recommend looking into the `netflow/collector.py` and `netflow/analyzer.py` scripts for customization. Feel free to use the code and extend it in your own tool set - that's what the MIT license is for!
|
|
||||||
|
|
||||||
|
|
||||||
## Resources
|
|
||||||
* [Cisco NetFlow v9 paper](http://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html)
|
|
||||||
* [RFC 3954 "Cisco Systems NetFlow Services Export Version 9"](https://tools.ietf.org/html/rfc3954)
|
|
||||||
* [RFC 7011 "IPFIX Protocol Specification"](https://tools.ietf.org/html/rfc7011)
|
|
||||||
|
|
||||||
## Development environment
|
|
||||||
The library was specifically written in combination with NetFlow exports from Hitoshi Irino's fork of [softflowd](https://github.com/irino/softflowd) (v1.0.0) - it should work with every correct NetFlow/IPFIX implementation though. If you stumble upon new custom template fields please let me know, they will make a fine addition to the `netflow.v9.V9_FIELD_TYPES` collection.
|
|
||||||
|
|
||||||
### Running and creating tests
|
|
||||||
The test files contain tests for all use cases in the library, based on real softflowd export packets. Whenever `softflowd` is referenced, a compiled version of softflowd 1.0.0 is meant, which is probably NOT the one in your distribution's package. During the development of this library, two ways of gathering these hex dumps were used. First, the tcpdump/Wireshark export way:
|
|
||||||
|
|
||||||
1. Run tcpdump/Wireshark on your public-facing interface (with tcpdump, save the pcap to disk).
|
|
||||||
2. Produce some sample flows, e.g. surf the web and refresh your mail client. With Wireshark, save the captured packets to disk.
|
|
||||||
4. Run tcpdump/Wireshark again on a local interface.
|
|
||||||
4. Run `softflowd` with the `-r <pcap_file>` flag. softflowd reads the captured traffic, produces the flows and exports them. Use the interface you are capturing packets on to send the exports to. E.g. capture on the localhost interface (with `-i lo` or on loopback) and then let softflowd export to `127.0.0.1:1337`.
|
|
||||||
5. Examine the captured traffic. Use Wireshark and set the `CFLOW` "decode as" dissector on the export packets (e.g. based on the port). The `data` fields should then be shown correctly as Netflow payload.
|
|
||||||
6. Extract this payload as hex stream. Anonymize the IP addresses with a hex editor if necessary. A recommended hex editor is [bless](https://github.com/afrantzis/bless).
|
|
||||||
|
|
||||||
Second, a Docker way:
|
|
||||||
|
|
||||||
2. Run a softflowd daemon in the background inside a Docker container, listening on `eth0` and exporting to e.g. `172.17.0.1:1337`.
|
|
||||||
3. On your host start Wireshark to listen on the Docker bridge.
|
|
||||||
4. Create some traffic from inside the container.
|
|
||||||
5. Check the softflow daemon with `softflowctl dump-flows`.
|
|
||||||
6. If you have some flows shown to you, export them with `softflowctl expire-all`.
|
|
||||||
7. Your Wireshark should have picked up the epxort packets (it does not matter if there's a port unreachable error).
|
|
||||||
8. Set the decoder for the packets to `CFLOW` and copy the hex value from the NetFlow packet.
|
|
||||||
|
|
||||||
Your exported hex string should begin with `0001`, `0005`, `0009` or `000a`, depending on the version.
|
|
||||||
|
|
||||||
The collector is run in a background thread. The difference in transmission speed from the exporting client can lead to different results, possibly caused by race conditions during the usage of the GZIP output file.
|
|
11
debian/changelog
vendored
11
debian/changelog
vendored
|
@ -1,11 +0,0 @@
|
||||||
netflow (0.12.2-1~deb11u1) bullseye; urgency=medium
|
|
||||||
|
|
||||||
* Backport to bullseye
|
|
||||||
|
|
||||||
-- David Prévot <dprevot@evolix.fr> Tue, 16 Apr 2024 15:06:38 +0200
|
|
||||||
|
|
||||||
netflow (0.12.2-1~deb12u1) bookworm; urgency=low
|
|
||||||
|
|
||||||
* Initial release, autogenerated by py2dsp/3.20230219
|
|
||||||
|
|
||||||
-- David Prévot <dprevot@evolix.fr> Tue, 16 Apr 2024 12:28:56 +0000
|
|
19
debian/control
vendored
19
debian/control
vendored
|
@ -1,19 +0,0 @@
|
||||||
Source: netflow
|
|
||||||
Section: python
|
|
||||||
Priority: optional
|
|
||||||
Maintainer: David Prévot <dprevot@evolix.fr>
|
|
||||||
Build-Depends: debhelper-compat (= 13),
|
|
||||||
dh-python,
|
|
||||||
python3-all,
|
|
||||||
python3-setuptools,
|
|
||||||
Standards-Version: 4.7.0
|
|
||||||
Testsuite: autopkgtest-pkg-pybuild
|
|
||||||
Homepage: https://github.com/bitkeks/python-netflow-v9-softflowd
|
|
||||||
Rules-Requires-Root: no
|
|
||||||
|
|
||||||
Package: python3-netflow
|
|
||||||
Architecture: all
|
|
||||||
Depends: ${misc:Depends}, ${python3:Depends},
|
|
||||||
Description: v1, v5, v9 and IPFIX tool suite implemented in Python 3
|
|
||||||
This package contains libraries and tools for NetFlow versions 1, 5 and
|
|
||||||
9, and IPFIX.
|
|
25
debian/copyright
vendored
25
debian/copyright
vendored
|
@ -1,25 +0,0 @@
|
||||||
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
|
|
||||||
Upstream-Name: netflow
|
|
||||||
Upstream-Contact: Dominik Pataky <software+pynetflow@dpataky.eu>
|
|
||||||
Source: https://github.com/bitkeks/python-netflow-v9-softflowd
|
|
||||||
|
|
||||||
Files: *
|
|
||||||
Copyright: 2016-2020 Dominik Pataky <dev@bitkeks.eu>
|
|
||||||
License: Expat
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
|
||||||
in the Software without restriction, including without limitation the rights
|
|
||||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
copies of the Software, and to permit persons to whom the Software is
|
|
||||||
furnished to do so, subject to the following conditions:
|
|
||||||
.
|
|
||||||
The above copyright notice and this permission notice shall be included in all
|
|
||||||
copies or substantial portions of the Software.
|
|
||||||
.
|
|
||||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
SOFTWARE.
|
|
1
debian/docs
vendored
1
debian/docs
vendored
|
@ -1 +0,0 @@
|
||||||
README.md
|
|
6
debian/gbp.conf
vendored
6
debian/gbp.conf
vendored
|
@ -1,6 +0,0 @@
|
||||||
[DEFAULT]
|
|
||||||
debian-branch = debian/latest
|
|
||||||
filter = [ '.gitattributes' ]
|
|
||||||
pristine-tar = True
|
|
||||||
upstream-branch = upstream/latest
|
|
||||||
upstream-vcs-tag = v%(version%~%-)s
|
|
5
debian/rules
vendored
5
debian/rules
vendored
|
@ -1,5 +0,0 @@
|
||||||
#! /usr/bin/make -f
|
|
||||||
|
|
||||||
export PYBUILD_NAME=netflow
|
|
||||||
%:
|
|
||||||
dh $@ --with python3 --buildsystem=pybuild
|
|
1
debian/source/format
vendored
1
debian/source/format
vendored
|
@ -1 +0,0 @@
|
||||||
3.0 (quilt)
|
|
1
debian/source/options
vendored
1
debian/source/options
vendored
|
@ -1 +0,0 @@
|
||||||
extend-diff-ignore="^[^/]+.(egg-info|dist-info)/"
|
|
3
debian/watch
vendored
3
debian/watch
vendored
|
@ -1,3 +0,0 @@
|
||||||
version=4
|
|
||||||
opts="pgpmode=none, filenamemangle=s%(?:.*?)?v?(\d[\d.]*)\.tar\.gz%@PACKAGE@-$1.tar.gz%" \
|
|
||||||
https://github.com/bitkeks/python-netflow-v9-softflowd/tags (?:.*?/)?v?(\d[\d.]*)\.tar\.gz
|
|
|
@ -1,10 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
"""
|
|
||||||
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
||||||
|
|
||||||
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
|
|
||||||
Licensed under MIT License. See LICENSE.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from .utils import parse_packet
|
|
|
@ -1,349 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
"""
|
|
||||||
Reference analyzer script for NetFlow Python package.
|
|
||||||
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
||||||
|
|
||||||
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
|
|
||||||
Licensed under MIT License. See LICENSE.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import contextlib
|
|
||||||
import functools
|
|
||||||
import gzip
|
|
||||||
import ipaddress
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import os.path
|
|
||||||
import socket
|
|
||||||
import sys
|
|
||||||
from collections import namedtuple
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
IP_PROTOCOLS = {
|
|
||||||
1: "ICMP",
|
|
||||||
6: "TCP",
|
|
||||||
17: "UDP",
|
|
||||||
58: "ICMPv6"
|
|
||||||
}
|
|
||||||
|
|
||||||
Pair = namedtuple('Pair', ['src', 'dest'])
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
ch = logging.StreamHandler()
|
|
||||||
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
||||||
ch.setFormatter(formatter)
|
|
||||||
logger.addHandler(ch)
|
|
||||||
|
|
||||||
|
|
||||||
def printv(message, *args_, **kwargs):
|
|
||||||
if args.verbose:
|
|
||||||
print(message.format(*args_, **kwargs))
|
|
||||||
|
|
||||||
|
|
||||||
@functools.lru_cache(maxsize=None)
|
|
||||||
def resolve_hostname(ip: str) -> str:
|
|
||||||
if args.no_dns:
|
|
||||||
# If no DNS resolution is requested, simply return the IP string
|
|
||||||
return ip
|
|
||||||
# else resolve the IP address to a hostname and return the hostname
|
|
||||||
return socket.getfqdn(ip)
|
|
||||||
|
|
||||||
|
|
||||||
def fallback(d, keys):
|
|
||||||
for k in keys:
|
|
||||||
if k in d:
|
|
||||||
return d[k]
|
|
||||||
raise KeyError(", ".join(keys))
|
|
||||||
|
|
||||||
|
|
||||||
def human_size(size_bytes):
|
|
||||||
# Calculate a human readable size of the flow
|
|
||||||
if size_bytes < 1024:
|
|
||||||
return "%dB" % size_bytes
|
|
||||||
elif size_bytes / 1024. < 1024:
|
|
||||||
return "%.2fK" % (size_bytes / 1024.)
|
|
||||||
elif size_bytes / 1024. ** 2 < 1024:
|
|
||||||
return "%.2fM" % (size_bytes / 1024. ** 2)
|
|
||||||
else:
|
|
||||||
return "%.2fG" % (size_bytes / 1024. ** 3)
|
|
||||||
|
|
||||||
|
|
||||||
def human_duration(seconds):
|
|
||||||
# Calculate human readable duration times
|
|
||||||
if seconds < 60:
|
|
||||||
# seconds
|
|
||||||
return "%d sec" % seconds
|
|
||||||
if seconds / 60 > 60:
|
|
||||||
# hours
|
|
||||||
return "%d:%02d.%02d hours" % (seconds / 60 ** 2, seconds % 60 ** 2 / 60, seconds % 60)
|
|
||||||
# minutes
|
|
||||||
return "%02d:%02d min" % (seconds / 60, seconds % 60)
|
|
||||||
|
|
||||||
|
|
||||||
class Connection:
|
|
||||||
"""Connection model for two flows.
|
|
||||||
The direction of the data flow can be seen by looking at the size.
|
|
||||||
|
|
||||||
'src' describes the peer which sends more data towards the other. This
|
|
||||||
does NOT have to mean that 'src' was the initiator of the connection.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, flow1, flow2):
|
|
||||||
if not flow1 or not flow2:
|
|
||||||
raise Exception("A connection requires two flows")
|
|
||||||
|
|
||||||
# Assume the size that sent the most data is the source
|
|
||||||
# TODO: this might not always be right, maybe use earlier timestamp?
|
|
||||||
size1 = fallback(flow1, ['IN_BYTES', 'IN_OCTETS'])
|
|
||||||
size2 = fallback(flow2, ['IN_BYTES', 'IN_OCTETS'])
|
|
||||||
if size1 >= size2:
|
|
||||||
src = flow1
|
|
||||||
dest = flow2
|
|
||||||
else:
|
|
||||||
src = flow2
|
|
||||||
dest = flow1
|
|
||||||
|
|
||||||
# TODO: this next approach uses the lower port as the service identifier
|
|
||||||
# port1 = fallback(flow1, ['L4_SRC_PORT', 'SRC_PORT'])
|
|
||||||
# port2 = fallback(flow2, ['L4_SRC_PORT', 'SRC_PORT'])
|
|
||||||
#
|
|
||||||
# src = flow1
|
|
||||||
# dest = flow2
|
|
||||||
# if port1 > port2:
|
|
||||||
# src = flow2
|
|
||||||
# dest = flow1
|
|
||||||
|
|
||||||
self.src_flow = src
|
|
||||||
self.dest_flow = dest
|
|
||||||
ips = self.get_ips(src)
|
|
||||||
self.src = ips.src
|
|
||||||
self.dest = ips.dest
|
|
||||||
self.src_port = fallback(src, ['L4_SRC_PORT', 'SRC_PORT'])
|
|
||||||
self.dest_port = fallback(dest, ['L4_DST_PORT', 'DST_PORT'])
|
|
||||||
self.size = fallback(src, ['IN_BYTES', 'IN_OCTETS'])
|
|
||||||
|
|
||||||
# Duration is given in milliseconds
|
|
||||||
self.duration = src['LAST_SWITCHED'] - src['FIRST_SWITCHED']
|
|
||||||
if self.duration < 0:
|
|
||||||
# 32 bit int has its limits. Handling overflow here
|
|
||||||
# TODO: Should be handled in the collection phase
|
|
||||||
self.duration = (2 ** 32 - src['FIRST_SWITCHED']) + src['LAST_SWITCHED']
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<Connection from {} to {}, size {}>".format(
|
|
||||||
self.src, self.dest, self.human_size)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get_ips(flow):
|
|
||||||
# IPv4
|
|
||||||
if flow.get('IP_PROTOCOL_VERSION') == 4 or \
|
|
||||||
'IPV4_SRC_ADDR' in flow or 'IPV4_DST_ADDR' in flow:
|
|
||||||
return Pair(
|
|
||||||
ipaddress.ip_address(flow['IPV4_SRC_ADDR']),
|
|
||||||
ipaddress.ip_address(flow['IPV4_DST_ADDR'])
|
|
||||||
)
|
|
||||||
|
|
||||||
# IPv6
|
|
||||||
return Pair(
|
|
||||||
ipaddress.ip_address(flow['IPV6_SRC_ADDR']),
|
|
||||||
ipaddress.ip_address(flow['IPV6_DST_ADDR'])
|
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def human_size(self):
|
|
||||||
return human_size(self.size)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def human_duration(self):
|
|
||||||
duration = self.duration // 1000 # uptime in milliseconds, floor it
|
|
||||||
return human_duration(duration)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def hostnames(self):
|
|
||||||
# Resolve the IPs of this flows to their hostname
|
|
||||||
src_hostname = resolve_hostname(self.src.compressed)
|
|
||||||
dest_hostname = resolve_hostname(self.dest.compressed)
|
|
||||||
return Pair(src_hostname, dest_hostname)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def service(self):
|
|
||||||
# Resolve ports to their services, if known
|
|
||||||
default = "({} {})".format(self.src_port, self.dest_port)
|
|
||||||
with contextlib.suppress(OSError):
|
|
||||||
return socket.getservbyport(self.src_port)
|
|
||||||
with contextlib.suppress(OSError):
|
|
||||||
return socket.getservbyport(self.dest_port)
|
|
||||||
return default
|
|
||||||
|
|
||||||
@property
|
|
||||||
def total_packets(self):
|
|
||||||
return self.src_flow["IN_PKTS"] + self.dest_flow["IN_PKTS"]
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "netflow.analyzer":
|
|
||||||
logger.error("The analyzer is currently meant to be used as a CLI tool only.")
|
|
||||||
logger.error("Use 'python3 -m netflow.analyzer -h' in your console for additional help.")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data")
|
|
||||||
parser.add_argument("-f", "--file", dest="file", type=str, default=sys.stdin,
|
|
||||||
help="The file to analyze (defaults to stdin if not provided)")
|
|
||||||
parser.add_argument("-p", "--packets", dest="packets_threshold", type=int, default=10,
|
|
||||||
help="Number of packets representing the lower bound in connections to be processed")
|
|
||||||
parser.add_argument("-v", "--verbose", dest="verbose", action="store_true",
|
|
||||||
help="Enable verbose output.")
|
|
||||||
parser.add_argument("--match-host", dest="match_host", type=str, default=None,
|
|
||||||
help="Filter output by matching on the given host (matches source or destination)")
|
|
||||||
parser.add_argument("-n", "--no-dns", dest="no_dns", action="store_true",
|
|
||||||
help="Disable DNS resolving of IP addresses")
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
# Sanity check for IP address
|
|
||||||
if args.match_host:
|
|
||||||
try:
|
|
||||||
match_host = ipaddress.ip_address(args.match_host)
|
|
||||||
except ValueError:
|
|
||||||
exit("IP address '{}' is neither IPv4 nor IPv6".format(args.match_host))
|
|
||||||
|
|
||||||
# Using a file and using stdin differ in their further usage for gzip.open
|
|
||||||
file = args.file
|
|
||||||
mode = "rb" # reading files
|
|
||||||
if file != sys.stdin and not os.path.exists(file):
|
|
||||||
exit("File {} does not exist!".format(file))
|
|
||||||
|
|
||||||
if file == sys.stdin:
|
|
||||||
file = sys.stdin.buffer
|
|
||||||
mode = "rt" # reading from stdin
|
|
||||||
|
|
||||||
data = {}
|
|
||||||
|
|
||||||
with gzip.open(file, mode) as gzipped:
|
|
||||||
# "for line in" lazy-loads all lines in the file
|
|
||||||
for line in gzipped:
|
|
||||||
entry = json.loads(line)
|
|
||||||
if len(entry.keys()) != 1:
|
|
||||||
logger.warning("The line does not have exactly one timestamp key: \"{}\"".format(line.keys()))
|
|
||||||
|
|
||||||
try:
|
|
||||||
ts = list(entry)[0] # timestamp from key
|
|
||||||
except KeyError:
|
|
||||||
logger.error("Saved line \"{}\" has no timestamp key!".format(line))
|
|
||||||
continue
|
|
||||||
|
|
||||||
if "header" not in entry[ts]:
|
|
||||||
logger.error("No header dict in entry {}".format(ts))
|
|
||||||
raise ValueError
|
|
||||||
|
|
||||||
if entry[ts]["header"]["version"] == 10:
|
|
||||||
logger.warning("Skipped IPFIX entry, because analysis of IPFIX is not yet implemented")
|
|
||||||
continue
|
|
||||||
|
|
||||||
data[ts] = entry[ts]
|
|
||||||
|
|
||||||
# Go through data and dissect every flow saved inside the dump
|
|
||||||
|
|
||||||
# The following dict holds flows which are looking for a peer, to analyze a duplex 'Connection'.
|
|
||||||
# For each flow, the destination address is looked up. If the peer is not in the list of pending peers,
|
|
||||||
# insert this flow, waiting for its peer. If found, take the waiting peer and create a Connection object.
|
|
||||||
pending = {}
|
|
||||||
skipped = 0
|
|
||||||
skipped_threshold = args.packets_threshold
|
|
||||||
|
|
||||||
first_line = True # print header line before first line
|
|
||||||
|
|
||||||
for key in sorted(data):
|
|
||||||
timestamp = datetime.fromtimestamp(float(key)).strftime("%Y-%m-%d %H:%M.%S")
|
|
||||||
client = data[key]["client"]
|
|
||||||
flows = data[key]["flows"]
|
|
||||||
|
|
||||||
for flow in sorted(flows, key=lambda x: x["FIRST_SWITCHED"]):
|
|
||||||
first_switched = flow["FIRST_SWITCHED"]
|
|
||||||
|
|
||||||
if first_switched - 1 in pending:
|
|
||||||
# TODO: handle fitting, yet mismatching (here: 1 second) pairs
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Find the peer for this connection
|
|
||||||
if "IPV4_SRC_ADDR" in flow or flow.get("IP_PROTOCOL_VERSION") == 4:
|
|
||||||
local_peer = flow["IPV4_SRC_ADDR"]
|
|
||||||
remote_peer = flow["IPV4_DST_ADDR"]
|
|
||||||
else:
|
|
||||||
local_peer = flow["IPV6_SRC_ADDR"]
|
|
||||||
remote_peer = flow["IPV6_DST_ADDR"]
|
|
||||||
|
|
||||||
# Match on host filter passed in as argument
|
|
||||||
if args.match_host and not any([local_peer == args.match_host, remote_peer == args.match_host]):
|
|
||||||
# If a match_host is given but neither local_peer nor remote_peer match
|
|
||||||
continue
|
|
||||||
|
|
||||||
if first_switched not in pending:
|
|
||||||
pending[first_switched] = {}
|
|
||||||
|
|
||||||
# Match peers
|
|
||||||
if remote_peer in pending[first_switched]:
|
|
||||||
# The destination peer put itself into the pending dict, getting and removing entry
|
|
||||||
peer_flow = pending[first_switched].pop(remote_peer)
|
|
||||||
if len(pending[first_switched]) == 0:
|
|
||||||
del pending[first_switched]
|
|
||||||
else:
|
|
||||||
# Flow did not find a matching, pending peer - inserting itself
|
|
||||||
pending[first_switched][local_peer] = flow
|
|
||||||
continue
|
|
||||||
|
|
||||||
con = Connection(flow, peer_flow)
|
|
||||||
if con.total_packets < skipped_threshold:
|
|
||||||
skipped += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
if first_line:
|
|
||||||
print("{:19} | {:14} | {:8} | {:9} | {:7} | Involved hosts".format("Timestamp", "Service", "Size",
|
|
||||||
"Duration", "Packets"))
|
|
||||||
print("-" * 100)
|
|
||||||
first_line = False
|
|
||||||
|
|
||||||
print("{timestamp} | {service:<14} | {size:8} | {duration:9} | {packets:7} | "
|
|
||||||
"Between {src_host} ({src}) and {dest_host} ({dest})"
|
|
||||||
.format(timestamp=timestamp, service=con.service.upper(), src_host=con.hostnames.src, src=con.src,
|
|
||||||
dest_host=con.hostnames.dest, dest=con.dest, size=con.human_size, duration=con.human_duration,
|
|
||||||
packets=con.total_packets))
|
|
||||||
|
|
||||||
if skipped > 0:
|
|
||||||
print("{skipped} connections skipped, because they had less than {skipped_threshold} packets "
|
|
||||||
"(this value can be set with the -p flag).".format(skipped=skipped, skipped_threshold=skipped_threshold))
|
|
||||||
|
|
||||||
if not args.verbose:
|
|
||||||
# Exit here if no debugging session was wanted
|
|
||||||
exit(0)
|
|
||||||
|
|
||||||
if len(pending) > 0:
|
|
||||||
print("\nThere are {pending} first_switched entries left in the pending dict!".format(pending=len(pending)))
|
|
||||||
all_noise = True
|
|
||||||
for first_switched, flows in sorted(pending.items(), key=lambda x: x[0]):
|
|
||||||
for peer, flow in flows.items():
|
|
||||||
# Ignore all pings, SYN scans and other noise to find only those peers left over which need a fix
|
|
||||||
if flow["IN_PKTS"] < skipped_threshold:
|
|
||||||
continue
|
|
||||||
all_noise = False
|
|
||||||
|
|
||||||
src = flow.get("IPV4_SRC_ADDR") or flow.get("IPV6_SRC_ADDR")
|
|
||||||
src_host = resolve_hostname(src)
|
|
||||||
src_text = "{}".format(src) if src == src_host else "{} ({})".format(src_host, src)
|
|
||||||
dst = flow.get("IPV4_DST_ADDR") or flow.get("IPV6_DST_ADDR")
|
|
||||||
dst_host = resolve_hostname(dst)
|
|
||||||
dst_text = "{}".format(dst) if dst == dst_host else "{} ({})".format(dst_host, dst)
|
|
||||||
proto = flow["PROTOCOL"]
|
|
||||||
size = flow["IN_BYTES"]
|
|
||||||
packets = flow["IN_PKTS"]
|
|
||||||
src_port = flow.get("L4_SRC_PORT", 0)
|
|
||||||
dst_port = flow.get("L4_DST_PORT", 0)
|
|
||||||
|
|
||||||
print("From {src_text}:{src_port} to {dst_text}:{dst_port} with "
|
|
||||||
"proto {proto} and size {size}"
|
|
||||||
" ({packets} packets)".format(src_text=src_text, src_port=src_port, dst_text=dst_text,
|
|
||||||
dst_port=dst_port, proto=IP_PROTOCOLS.get(proto, 'UNKNOWN'),
|
|
||||||
size=human_size(size), packets=packets))
|
|
||||||
|
|
||||||
if all_noise:
|
|
||||||
print("They were all noise!")
|
|
|
@ -1,242 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
"""
|
|
||||||
Reference collector script for NetFlow v1, v5, and v9 Python package.
|
|
||||||
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
||||||
|
|
||||||
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
|
|
||||||
Licensed under MIT License. See LICENSE.
|
|
||||||
"""
|
|
||||||
import argparse
|
|
||||||
import gzip
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import queue
|
|
||||||
import signal
|
|
||||||
import socket
|
|
||||||
import socketserver
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
from collections import namedtuple
|
|
||||||
|
|
||||||
from netflow.ipfix import IPFIXTemplateNotRecognized
|
|
||||||
from netflow.utils import UnknownExportVersion, parse_packet
|
|
||||||
from netflow.v9 import V9TemplateNotRecognized
|
|
||||||
|
|
||||||
RawPacket = namedtuple('RawPacket', ['ts', 'client', 'data'])
|
|
||||||
ParsedPacket = namedtuple('ParsedPacket', ['ts', 'client', 'export'])
|
|
||||||
|
|
||||||
# Amount of time to wait before dropping an undecodable ExportPacket
|
|
||||||
PACKET_TIMEOUT = 60 * 60
|
|
||||||
|
|
||||||
logger = logging.getLogger("netflow-collector")
|
|
||||||
ch = logging.StreamHandler()
|
|
||||||
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
||||||
ch.setFormatter(formatter)
|
|
||||||
logger.addHandler(ch)
|
|
||||||
|
|
||||||
|
|
||||||
class QueuingRequestHandler(socketserver.BaseRequestHandler):
|
|
||||||
def handle(self):
|
|
||||||
data = self.request[0] # get content, [1] would be the socket
|
|
||||||
self.server.queue.put(RawPacket(time.time(), self.client_address, data))
|
|
||||||
logger.debug(
|
|
||||||
"Received %d bytes of data from %s", len(data), self.client_address
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class QueuingUDPListener(socketserver.ThreadingUDPServer):
|
|
||||||
"""A threaded UDP server that adds a (time, data) tuple to a queue for
|
|
||||||
every request it sees
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, interface, queue):
|
|
||||||
self.queue = queue
|
|
||||||
|
|
||||||
# If IPv6 interface addresses are used, override the default AF_INET family
|
|
||||||
if ":" in interface[0]:
|
|
||||||
self.address_family = socket.AF_INET6
|
|
||||||
|
|
||||||
super().__init__(interface, QueuingRequestHandler)
|
|
||||||
|
|
||||||
|
|
||||||
class ThreadedNetFlowListener(threading.Thread):
|
|
||||||
"""A thread that listens for incoming NetFlow packets, processes them, and
|
|
||||||
makes them available to consumers.
|
|
||||||
|
|
||||||
- When initialized, will start listening for NetFlow packets on the provided
|
|
||||||
host and port and queuing them for processing.
|
|
||||||
- When started, will start processing and parsing queued packets.
|
|
||||||
- When stopped, will shut down the listener and stop processing.
|
|
||||||
- When joined, will wait for the listener to exit
|
|
||||||
|
|
||||||
For example, a simple script that outputs data until killed with CTRL+C:
|
|
||||||
>>> listener = ThreadedNetFlowListener('0.0.0.0', 2055)
|
|
||||||
>>> print("Listening for NetFlow packets")
|
|
||||||
>>> listener.start() # start processing packets
|
|
||||||
>>> try:
|
|
||||||
... while True:
|
|
||||||
... ts, export = listener.get()
|
|
||||||
... print("Time: {}".format(ts))
|
|
||||||
... for f in export.flows:
|
|
||||||
... print(" - {IPV4_SRC_ADDR} sent data to {IPV4_DST_ADDR}"
|
|
||||||
... "".format(**f))
|
|
||||||
... finally:
|
|
||||||
... print("Stopping...")
|
|
||||||
... listener.stop()
|
|
||||||
... listener.join()
|
|
||||||
... print("Stopped!")
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, host: str, port: int):
|
|
||||||
logger.info("Starting the NetFlow listener on {}:{}".format(host, port))
|
|
||||||
self.output = queue.Queue()
|
|
||||||
self.input = queue.Queue()
|
|
||||||
self.server = QueuingUDPListener((host, port), self.input)
|
|
||||||
self.thread = threading.Thread(target=self.server.serve_forever)
|
|
||||||
self.thread.start()
|
|
||||||
self._shutdown = threading.Event()
|
|
||||||
super().__init__()
|
|
||||||
|
|
||||||
def get(self, block=True, timeout=None) -> ParsedPacket:
|
|
||||||
"""Get a processed flow.
|
|
||||||
|
|
||||||
If optional args 'block' is true and 'timeout' is None (the default),
|
|
||||||
block if necessary until a flow is available. If 'timeout' is
|
|
||||||
a non-negative number, it blocks at most 'timeout' seconds and raises
|
|
||||||
the queue.Empty exception if no flow was available within that time.
|
|
||||||
Otherwise ('block' is false), return a flow if one is immediately
|
|
||||||
available, else raise the queue.Empty exception ('timeout' is ignored
|
|
||||||
in that case).
|
|
||||||
"""
|
|
||||||
return self.output.get(block, timeout)
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
# Process packets from the queue
|
|
||||||
try:
|
|
||||||
# TODO: use per-client templates
|
|
||||||
templates = {"netflow": {}, "ipfix": {}}
|
|
||||||
to_retry = []
|
|
||||||
while not self._shutdown.is_set():
|
|
||||||
try:
|
|
||||||
# 0.5s delay to limit CPU usage while waiting for new packets
|
|
||||||
pkt = self.input.get(block=True, timeout=0.5) # type: RawPacket
|
|
||||||
except queue.Empty:
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
# templates is passed as reference, updated in V9ExportPacket
|
|
||||||
export = parse_packet(pkt.data, templates)
|
|
||||||
except UnknownExportVersion as e:
|
|
||||||
logger.error("%s, ignoring the packet", e)
|
|
||||||
continue
|
|
||||||
except (V9TemplateNotRecognized, IPFIXTemplateNotRecognized):
|
|
||||||
# TODO: differentiate between v9 and IPFIX, use separate to_retry lists
|
|
||||||
if time.time() - pkt.ts > PACKET_TIMEOUT:
|
|
||||||
logger.warning("Dropping an old and undecodable v9/IPFIX ExportPacket")
|
|
||||||
else:
|
|
||||||
to_retry.append(pkt)
|
|
||||||
logger.debug("Failed to decode a v9/IPFIX ExportPacket - will "
|
|
||||||
"re-attempt when a new template is discovered")
|
|
||||||
continue
|
|
||||||
|
|
||||||
if export.header.version == 10:
|
|
||||||
logger.debug("Processed an IPFIX ExportPacket with length %d.", export.header.length)
|
|
||||||
else:
|
|
||||||
logger.debug("Processed a v%d ExportPacket with %d flows.",
|
|
||||||
export.header.version, export.header.count)
|
|
||||||
|
|
||||||
# If any new templates were discovered, dump the unprocessable
|
|
||||||
# data back into the queue and try to decode them again
|
|
||||||
if export.header.version in [9, 10] and export.contains_new_templates and to_retry:
|
|
||||||
logger.debug("Received new template(s)")
|
|
||||||
logger.debug("Will re-attempt to decode %d old v9/IPFIX ExportPackets", len(to_retry))
|
|
||||||
for p in to_retry:
|
|
||||||
self.input.put(p)
|
|
||||||
to_retry.clear()
|
|
||||||
|
|
||||||
self.output.put(ParsedPacket(pkt.ts, pkt.client, export))
|
|
||||||
finally:
|
|
||||||
# Only reached when while loop ends
|
|
||||||
self.server.shutdown()
|
|
||||||
self.server.server_close()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
logger.info("Shutting down the NetFlow listener")
|
|
||||||
self._shutdown.set()
|
|
||||||
|
|
||||||
def join(self, timeout=None):
|
|
||||||
self.thread.join(timeout=timeout)
|
|
||||||
super().join(timeout=timeout)
|
|
||||||
|
|
||||||
|
|
||||||
def get_export_packets(host: str, port: int) -> ParsedPacket:
|
|
||||||
"""A threaded generator that will yield ExportPacket objects until it is killed
|
|
||||||
"""
|
|
||||||
def handle_signal(s, f):
|
|
||||||
logger.debug("Received signal {}, raising StopIteration".format(s))
|
|
||||||
raise StopIteration
|
|
||||||
signal.signal(signal.SIGTERM, handle_signal)
|
|
||||||
signal.signal(signal.SIGINT, handle_signal)
|
|
||||||
|
|
||||||
listener = ThreadedNetFlowListener(host, port)
|
|
||||||
listener.start()
|
|
||||||
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
yield listener.get()
|
|
||||||
except StopIteration:
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
listener.stop()
|
|
||||||
listener.join()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "netflow.collector":
|
|
||||||
logger.error("The collector is currently meant to be used as a CLI tool only.")
|
|
||||||
logger.error("Use 'python3 -m netflow.collector -h' in your console for additional help.")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
parser = argparse.ArgumentParser(description="A sample netflow collector.")
|
|
||||||
parser.add_argument("--host", type=str, default="0.0.0.0",
|
|
||||||
help="collector listening address")
|
|
||||||
parser.add_argument("--port", "-p", type=int, default=2055,
|
|
||||||
help="collector listener port")
|
|
||||||
parser.add_argument("--file", "-o", type=str, dest="output_file",
|
|
||||||
default="{}.gz".format(int(time.time())),
|
|
||||||
help="collector export multiline JSON file")
|
|
||||||
parser.add_argument("--debug", "-D", action="store_true",
|
|
||||||
help="Enable debug output")
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
if args.debug:
|
|
||||||
logger.setLevel(logging.DEBUG)
|
|
||||||
ch.setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# With every parsed flow a new line is appended to the output file. In previous versions, this was implemented
|
|
||||||
# by storing the whole data dict in memory and dumping it regularly onto disk. This was extremely fragile, as
|
|
||||||
# it a) consumed a lot of memory and CPU (dropping packets since storing one flow took longer than the arrival
|
|
||||||
# of the next flow) and b) broke the exported JSON file, if the collector crashed during the write process,
|
|
||||||
# rendering all collected flows during the runtime of the collector useless (the file contained one large JSON
|
|
||||||
# dict which represented the 'data' dict).
|
|
||||||
|
|
||||||
# In this new approach, each received flow is parsed as usual, but it gets appended to a gzipped file each time.
|
|
||||||
# All in all, this improves in three aspects:
|
|
||||||
# 1. collected flow data is not stored in memory any more
|
|
||||||
# 2. received and parsed flows are persisted reliably
|
|
||||||
# 3. the disk usage of files with JSON and its full strings as keys is reduced by using gzipped files
|
|
||||||
# This also means that the files have to be handled differently, because they are gzipped and not formatted as
|
|
||||||
# one single big JSON dump, but rather many little JSON dumps, separated by line breaks.
|
|
||||||
for ts, client, export in get_export_packets(args.host, args.port):
|
|
||||||
entry = {ts: {
|
|
||||||
"client": client,
|
|
||||||
"header": export.header.to_dict(),
|
|
||||||
"flows": [flow.data for flow in export.flows]}
|
|
||||||
}
|
|
||||||
line = json.dumps(entry).encode() + b"\n" # byte encoded line
|
|
||||||
with gzip.open(args.output_file, "ab") as fh: # open as append, not reading the whole file
|
|
||||||
fh.write(line)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
logger.info("Received KeyboardInterrupt, passing through")
|
|
||||||
pass
|
|
1028
netflow/ipfix.py
1028
netflow/ipfix.py
|
@ -1,1028 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
"""
|
|
||||||
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
||||||
Reference: https://tools.ietf.org/html/rfc7011
|
|
||||||
|
|
||||||
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
|
|
||||||
Licensed under MIT License. See LICENSE.
|
|
||||||
"""
|
|
||||||
import functools
|
|
||||||
import struct
|
|
||||||
from collections import namedtuple
|
|
||||||
from typing import Optional, Union, List, Dict
|
|
||||||
|
|
||||||
FieldType = namedtuple("FieldType", ["id", "name", "type"])
|
|
||||||
DataType = namedtuple("DataType", ["type", "unpack_format"])
|
|
||||||
TemplateField = namedtuple("TemplateField", ["id", "length"])
|
|
||||||
TemplateFieldEnterprise = namedtuple("TemplateFieldEnterprise", ["id", "length", "enterprise_number"])
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXFieldTypes:
|
|
||||||
# Source: https://www.iana.org/assignments/ipfix/ipfix-information-elements.csv
|
|
||||||
iana_field_types = [
|
|
||||||
(0, "Reserved", ""),
|
|
||||||
(1, "octetDeltaCount", "unsigned64"),
|
|
||||||
(2, "packetDeltaCount", "unsigned64"),
|
|
||||||
(3, "deltaFlowCount", "unsigned64"),
|
|
||||||
(4, "protocolIdentifier", "unsigned8"),
|
|
||||||
(5, "ipClassOfService", "unsigned8"),
|
|
||||||
(6, "tcpControlBits", "unsigned16"),
|
|
||||||
(7, "sourceTransportPort", "unsigned16"),
|
|
||||||
(8, "sourceIPv4Address", "ipv4Address"),
|
|
||||||
(9, "sourceIPv4PrefixLength", "unsigned8"),
|
|
||||||
(10, "ingressInterface", "unsigned32"),
|
|
||||||
(11, "destinationTransportPort", "unsigned16"),
|
|
||||||
(12, "destinationIPv4Address", "ipv4Address"),
|
|
||||||
(13, "destinationIPv4PrefixLength", "unsigned8"),
|
|
||||||
(14, "egressInterface", "unsigned32"),
|
|
||||||
(15, "ipNextHopIPv4Address", "ipv4Address"),
|
|
||||||
(16, "bgpSourceAsNumber", "unsigned32"),
|
|
||||||
(17, "bgpDestinationAsNumber", "unsigned32"),
|
|
||||||
(18, "bgpNextHopIPv4Address", "ipv4Address"),
|
|
||||||
(19, "postMCastPacketDeltaCount", "unsigned64"),
|
|
||||||
(20, "postMCastOctetDeltaCount", "unsigned64"),
|
|
||||||
(21, "flowEndSysUpTime", "unsigned32"),
|
|
||||||
(22, "flowStartSysUpTime", "unsigned32"),
|
|
||||||
(23, "postOctetDeltaCount", "unsigned64"),
|
|
||||||
(24, "postPacketDeltaCount", "unsigned64"),
|
|
||||||
(25, "minimumIpTotalLength", "unsigned64"),
|
|
||||||
(26, "maximumIpTotalLength", "unsigned64"),
|
|
||||||
(27, "sourceIPv6Address", "ipv6Address"),
|
|
||||||
(28, "destinationIPv6Address", "ipv6Address"),
|
|
||||||
(29, "sourceIPv6PrefixLength", "unsigned8"),
|
|
||||||
(30, "destinationIPv6PrefixLength", "unsigned8"),
|
|
||||||
(31, "flowLabelIPv6", "unsigned32"),
|
|
||||||
(32, "icmpTypeCodeIPv4", "unsigned16"),
|
|
||||||
(33, "igmpType", "unsigned8"),
|
|
||||||
(34, "samplingInterval", "unsigned32"),
|
|
||||||
(35, "samplingAlgorithm", "unsigned8"),
|
|
||||||
(36, "flowActiveTimeout", "unsigned16"),
|
|
||||||
(37, "flowIdleTimeout", "unsigned16"),
|
|
||||||
(38, "engineType", "unsigned8"),
|
|
||||||
(39, "engineId", "unsigned8"),
|
|
||||||
(40, "exportedOctetTotalCount", "unsigned64"),
|
|
||||||
(41, "exportedMessageTotalCount", "unsigned64"),
|
|
||||||
(42, "exportedFlowRecordTotalCount", "unsigned64"),
|
|
||||||
(43, "ipv4RouterSc", "ipv4Address"),
|
|
||||||
(44, "sourceIPv4Prefix", "ipv4Address"),
|
|
||||||
(45, "destinationIPv4Prefix", "ipv4Address"),
|
|
||||||
(46, "mplsTopLabelType", "unsigned8"),
|
|
||||||
(47, "mplsTopLabelIPv4Address", "ipv4Address"),
|
|
||||||
(48, "samplerId", "unsigned8"),
|
|
||||||
(49, "samplerMode", "unsigned8"),
|
|
||||||
(50, "samplerRandomInterval", "unsigned32"),
|
|
||||||
(51, "classId", "unsigned8"),
|
|
||||||
(52, "minimumTTL", "unsigned8"),
|
|
||||||
(53, "maximumTTL", "unsigned8"),
|
|
||||||
(54, "fragmentIdentification", "unsigned32"),
|
|
||||||
(55, "postIpClassOfService", "unsigned8"),
|
|
||||||
(56, "sourceMacAddress", "macAddress"),
|
|
||||||
(57, "postDestinationMacAddress", "macAddress"),
|
|
||||||
(58, "vlanId", "unsigned16"),
|
|
||||||
(59, "postVlanId", "unsigned16"),
|
|
||||||
(60, "ipVersion", "unsigned8"),
|
|
||||||
(61, "flowDirection", "unsigned8"),
|
|
||||||
(62, "ipNextHopIPv6Address", "ipv6Address"),
|
|
||||||
(63, "bgpNextHopIPv6Address", "ipv6Address"),
|
|
||||||
(64, "ipv6ExtensionHeaders", "unsigned32"),
|
|
||||||
(70, "mplsTopLabelStackSection", "octetArray"),
|
|
||||||
(71, "mplsLabelStackSection2", "octetArray"),
|
|
||||||
(72, "mplsLabelStackSection3", "octetArray"),
|
|
||||||
(73, "mplsLabelStackSection4", "octetArray"),
|
|
||||||
(74, "mplsLabelStackSection5", "octetArray"),
|
|
||||||
(75, "mplsLabelStackSection6", "octetArray"),
|
|
||||||
(76, "mplsLabelStackSection7", "octetArray"),
|
|
||||||
(77, "mplsLabelStackSection8", "octetArray"),
|
|
||||||
(78, "mplsLabelStackSection9", "octetArray"),
|
|
||||||
(79, "mplsLabelStackSection10", "octetArray"),
|
|
||||||
(80, "destinationMacAddress", "macAddress"),
|
|
||||||
(81, "postSourceMacAddress", "macAddress"),
|
|
||||||
(82, "interfaceName", "string"),
|
|
||||||
(83, "interfaceDescription", "string"),
|
|
||||||
(84, "samplerName", "string"),
|
|
||||||
(85, "octetTotalCount", "unsigned64"),
|
|
||||||
(86, "packetTotalCount", "unsigned64"),
|
|
||||||
(87, "flagsAndSamplerId", "unsigned32"),
|
|
||||||
(88, "fragmentOffset", "unsigned16"),
|
|
||||||
(89, "forwardingStatus", "unsigned8"),
|
|
||||||
(90, "mplsVpnRouteDistinguisher", "octetArray"),
|
|
||||||
(91, "mplsTopLabelPrefixLength", "unsigned8"),
|
|
||||||
(92, "srcTrafficIndex", "unsigned32"),
|
|
||||||
(93, "dstTrafficIndex", "unsigned32"),
|
|
||||||
(94, "applicationDescription", "string"),
|
|
||||||
(95, "applicationId", "octetArray"),
|
|
||||||
(96, "applicationName", "string"),
|
|
||||||
(97, "Assigned for NetFlow v9 compatibility", ""),
|
|
||||||
(98, "postIpDiffServCodePoint", "unsigned8"),
|
|
||||||
(99, "multicastReplicationFactor", "unsigned32"),
|
|
||||||
(100, "className", "string"),
|
|
||||||
(101, "classificationEngineId", "unsigned8"),
|
|
||||||
(102, "layer2packetSectionOffset", "unsigned16"),
|
|
||||||
(103, "layer2packetSectionSize", "unsigned16"),
|
|
||||||
(104, "layer2packetSectionData", "octetArray"),
|
|
||||||
(128, "bgpNextAdjacentAsNumber", "unsigned32"),
|
|
||||||
(129, "bgpPrevAdjacentAsNumber", "unsigned32"),
|
|
||||||
(130, "exporterIPv4Address", "ipv4Address"),
|
|
||||||
(131, "exporterIPv6Address", "ipv6Address"),
|
|
||||||
(132, "droppedOctetDeltaCount", "unsigned64"),
|
|
||||||
(133, "droppedPacketDeltaCount", "unsigned64"),
|
|
||||||
(134, "droppedOctetTotalCount", "unsigned64"),
|
|
||||||
(135, "droppedPacketTotalCount", "unsigned64"),
|
|
||||||
(136, "flowEndReason", "unsigned8"),
|
|
||||||
(137, "commonPropertiesId", "unsigned64"),
|
|
||||||
(138, "observationPointId", "unsigned64"),
|
|
||||||
(139, "icmpTypeCodeIPv6", "unsigned16"),
|
|
||||||
(140, "mplsTopLabelIPv6Address", "ipv6Address"),
|
|
||||||
(141, "lineCardId", "unsigned32"),
|
|
||||||
(142, "portId", "unsigned32"),
|
|
||||||
(143, "meteringProcessId", "unsigned32"),
|
|
||||||
(144, "exportingProcessId", "unsigned32"),
|
|
||||||
(145, "templateId", "unsigned16"),
|
|
||||||
(146, "wlanChannelId", "unsigned8"),
|
|
||||||
(147, "wlanSSID", "string"),
|
|
||||||
(148, "flowId", "unsigned64"),
|
|
||||||
(149, "observationDomainId", "unsigned32"),
|
|
||||||
(150, "flowStartSeconds", "dateTimeSeconds"),
|
|
||||||
(151, "flowEndSeconds", "dateTimeSeconds"),
|
|
||||||
(152, "flowStartMilliseconds", "dateTimeMilliseconds"),
|
|
||||||
(153, "flowEndMilliseconds", "dateTimeMilliseconds"),
|
|
||||||
(154, "flowStartMicroseconds", "dateTimeMicroseconds"),
|
|
||||||
(155, "flowEndMicroseconds", "dateTimeMicroseconds"),
|
|
||||||
(156, "flowStartNanoseconds", "dateTimeNanoseconds"),
|
|
||||||
(157, "flowEndNanoseconds", "dateTimeNanoseconds"),
|
|
||||||
(158, "flowStartDeltaMicroseconds", "unsigned32"),
|
|
||||||
(159, "flowEndDeltaMicroseconds", "unsigned32"),
|
|
||||||
(160, "systemInitTimeMilliseconds", "dateTimeMilliseconds"),
|
|
||||||
(161, "flowDurationMilliseconds", "unsigned32"),
|
|
||||||
(162, "flowDurationMicroseconds", "unsigned32"),
|
|
||||||
(163, "observedFlowTotalCount", "unsigned64"),
|
|
||||||
(164, "ignoredPacketTotalCount", "unsigned64"),
|
|
||||||
(165, "ignoredOctetTotalCount", "unsigned64"),
|
|
||||||
(166, "notSentFlowTotalCount", "unsigned64"),
|
|
||||||
(167, "notSentPacketTotalCount", "unsigned64"),
|
|
||||||
(168, "notSentOctetTotalCount", "unsigned64"),
|
|
||||||
(169, "destinationIPv6Prefix", "ipv6Address"),
|
|
||||||
(170, "sourceIPv6Prefix", "ipv6Address"),
|
|
||||||
(171, "postOctetTotalCount", "unsigned64"),
|
|
||||||
(172, "postPacketTotalCount", "unsigned64"),
|
|
||||||
(173, "flowKeyIndicator", "unsigned64"),
|
|
||||||
(174, "postMCastPacketTotalCount", "unsigned64"),
|
|
||||||
(175, "postMCastOctetTotalCount", "unsigned64"),
|
|
||||||
(176, "icmpTypeIPv4", "unsigned8"),
|
|
||||||
(177, "icmpCodeIPv4", "unsigned8"),
|
|
||||||
(178, "icmpTypeIPv6", "unsigned8"),
|
|
||||||
(179, "icmpCodeIPv6", "unsigned8"),
|
|
||||||
(180, "udpSourcePort", "unsigned16"),
|
|
||||||
(181, "udpDestinationPort", "unsigned16"),
|
|
||||||
(182, "tcpSourcePort", "unsigned16"),
|
|
||||||
(183, "tcpDestinationPort", "unsigned16"),
|
|
||||||
(184, "tcpSequenceNumber", "unsigned32"),
|
|
||||||
(185, "tcpAcknowledgementNumber", "unsigned32"),
|
|
||||||
(186, "tcpWindowSize", "unsigned16"),
|
|
||||||
(187, "tcpUrgentPointer", "unsigned16"),
|
|
||||||
(188, "tcpHeaderLength", "unsigned8"),
|
|
||||||
(189, "ipHeaderLength", "unsigned8"),
|
|
||||||
(190, "totalLengthIPv4", "unsigned16"),
|
|
||||||
(191, "payloadLengthIPv6", "unsigned16"),
|
|
||||||
(192, "ipTTL", "unsigned8"),
|
|
||||||
(193, "nextHeaderIPv6", "unsigned8"),
|
|
||||||
(194, "mplsPayloadLength", "unsigned32"),
|
|
||||||
(195, "ipDiffServCodePoint", "unsigned8"),
|
|
||||||
(196, "ipPrecedence", "unsigned8"),
|
|
||||||
(197, "fragmentFlags", "unsigned8"),
|
|
||||||
(198, "octetDeltaSumOfSquares", "unsigned64"),
|
|
||||||
(199, "octetTotalSumOfSquares", "unsigned64"),
|
|
||||||
(200, "mplsTopLabelTTL", "unsigned8"),
|
|
||||||
(201, "mplsLabelStackLength", "unsigned32"),
|
|
||||||
(202, "mplsLabelStackDepth", "unsigned32"),
|
|
||||||
(203, "mplsTopLabelExp", "unsigned8"),
|
|
||||||
(204, "ipPayloadLength", "unsigned32"),
|
|
||||||
(205, "udpMessageLength", "unsigned16"),
|
|
||||||
(206, "isMulticast", "unsigned8"),
|
|
||||||
(207, "ipv4IHL", "unsigned8"),
|
|
||||||
(208, "ipv4Options", "unsigned32"),
|
|
||||||
(209, "tcpOptions", "unsigned64"),
|
|
||||||
(210, "paddingOctets", "octetArray"),
|
|
||||||
(211, "collectorIPv4Address", "ipv4Address"),
|
|
||||||
(212, "collectorIPv6Address", "ipv6Address"),
|
|
||||||
(213, "exportInterface", "unsigned32"),
|
|
||||||
(214, "exportProtocolVersion", "unsigned8"),
|
|
||||||
(215, "exportTransportProtocol", "unsigned8"),
|
|
||||||
(216, "collectorTransportPort", "unsigned16"),
|
|
||||||
(217, "exporterTransportPort", "unsigned16"),
|
|
||||||
(218, "tcpSynTotalCount", "unsigned64"),
|
|
||||||
(219, "tcpFinTotalCount", "unsigned64"),
|
|
||||||
(220, "tcpRstTotalCount", "unsigned64"),
|
|
||||||
(221, "tcpPshTotalCount", "unsigned64"),
|
|
||||||
(222, "tcpAckTotalCount", "unsigned64"),
|
|
||||||
(223, "tcpUrgTotalCount", "unsigned64"),
|
|
||||||
(224, "ipTotalLength", "unsigned64"),
|
|
||||||
(225, "postNATSourceIPv4Address", "ipv4Address"),
|
|
||||||
(226, "postNATDestinationIPv4Address", "ipv4Address"),
|
|
||||||
(227, "postNAPTSourceTransportPort", "unsigned16"),
|
|
||||||
(228, "postNAPTDestinationTransportPort", "unsigned16"),
|
|
||||||
(229, "natOriginatingAddressRealm", "unsigned8"),
|
|
||||||
(230, "natEvent", "unsigned8"),
|
|
||||||
(231, "initiatorOctets", "unsigned64"),
|
|
||||||
(232, "responderOctets", "unsigned64"),
|
|
||||||
(233, "firewallEvent", "unsigned8"),
|
|
||||||
(234, "ingressVRFID", "unsigned32"),
|
|
||||||
(235, "egressVRFID", "unsigned32"),
|
|
||||||
(236, "VRFname", "string"),
|
|
||||||
(237, "postMplsTopLabelExp", "unsigned8"),
|
|
||||||
(238, "tcpWindowScale", "unsigned16"),
|
|
||||||
(239, "biflowDirection", "unsigned8"),
|
|
||||||
(240, "ethernetHeaderLength", "unsigned8"),
|
|
||||||
(241, "ethernetPayloadLength", "unsigned16"),
|
|
||||||
(242, "ethernetTotalLength", "unsigned16"),
|
|
||||||
(243, "dot1qVlanId", "unsigned16"),
|
|
||||||
(244, "dot1qPriority", "unsigned8"),
|
|
||||||
(245, "dot1qCustomerVlanId", "unsigned16"),
|
|
||||||
(246, "dot1qCustomerPriority", "unsigned8"),
|
|
||||||
(247, "metroEvcId", "string"),
|
|
||||||
(248, "metroEvcType", "unsigned8"),
|
|
||||||
(249, "pseudoWireId", "unsigned32"),
|
|
||||||
(250, "pseudoWireType", "unsigned16"),
|
|
||||||
(251, "pseudoWireControlWord", "unsigned32"),
|
|
||||||
(252, "ingressPhysicalInterface", "unsigned32"),
|
|
||||||
(253, "egressPhysicalInterface", "unsigned32"),
|
|
||||||
(254, "postDot1qVlanId", "unsigned16"),
|
|
||||||
(255, "postDot1qCustomerVlanId", "unsigned16"),
|
|
||||||
(256, "ethernetType", "unsigned16"),
|
|
||||||
(257, "postIpPrecedence", "unsigned8"),
|
|
||||||
(258, "collectionTimeMilliseconds", "dateTimeMilliseconds"),
|
|
||||||
(259, "exportSctpStreamId", "unsigned16"),
|
|
||||||
(260, "maxExportSeconds", "dateTimeSeconds"),
|
|
||||||
(261, "maxFlowEndSeconds", "dateTimeSeconds"),
|
|
||||||
(262, "messageMD5Checksum", "octetArray"),
|
|
||||||
(263, "messageScope", "unsigned8"),
|
|
||||||
(264, "minExportSeconds", "dateTimeSeconds"),
|
|
||||||
(265, "minFlowStartSeconds", "dateTimeSeconds"),
|
|
||||||
(266, "opaqueOctets", "octetArray"),
|
|
||||||
(267, "sessionScope", "unsigned8"),
|
|
||||||
(268, "maxFlowEndMicroseconds", "dateTimeMicroseconds"),
|
|
||||||
(269, "maxFlowEndMilliseconds", "dateTimeMilliseconds"),
|
|
||||||
(270, "maxFlowEndNanoseconds", "dateTimeNanoseconds"),
|
|
||||||
(271, "minFlowStartMicroseconds", "dateTimeMicroseconds"),
|
|
||||||
(272, "minFlowStartMilliseconds", "dateTimeMilliseconds"),
|
|
||||||
(273, "minFlowStartNanoseconds", "dateTimeNanoseconds"),
|
|
||||||
(274, "collectorCertificate", "octetArray"),
|
|
||||||
(275, "exporterCertificate", "octetArray"),
|
|
||||||
(276, "dataRecordsReliability", "boolean"),
|
|
||||||
(277, "observationPointType", "unsigned8"),
|
|
||||||
(278, "newConnectionDeltaCount", "unsigned32"),
|
|
||||||
(279, "connectionSumDurationSeconds", "unsigned64"),
|
|
||||||
(280, "connectionTransactionId", "unsigned64"),
|
|
||||||
(281, "postNATSourceIPv6Address", "ipv6Address"),
|
|
||||||
(282, "postNATDestinationIPv6Address", "ipv6Address"),
|
|
||||||
(283, "natPoolId", "unsigned32"),
|
|
||||||
(284, "natPoolName", "string"),
|
|
||||||
(285, "anonymizationFlags", "unsigned16"),
|
|
||||||
(286, "anonymizationTechnique", "unsigned16"),
|
|
||||||
(287, "informationElementIndex", "unsigned16"),
|
|
||||||
(288, "p2pTechnology", "string"),
|
|
||||||
(289, "tunnelTechnology", "string"),
|
|
||||||
(290, "encryptedTechnology", "string"),
|
|
||||||
(291, "basicList", "basicList"),
|
|
||||||
(292, "subTemplateList", "subTemplateList"),
|
|
||||||
(293, "subTemplateMultiList", "subTemplateMultiList"),
|
|
||||||
(294, "bgpValidityState", "unsigned8"),
|
|
||||||
(295, "IPSecSPI", "unsigned32"),
|
|
||||||
(296, "greKey", "unsigned32"),
|
|
||||||
(297, "natType", "unsigned8"),
|
|
||||||
(298, "initiatorPackets", "unsigned64"),
|
|
||||||
(299, "responderPackets", "unsigned64"),
|
|
||||||
(300, "observationDomainName", "string"),
|
|
||||||
(301, "selectionSequenceId", "unsigned64"),
|
|
||||||
(302, "selectorId", "unsigned64"),
|
|
||||||
(303, "informationElementId", "unsigned16"),
|
|
||||||
(304, "selectorAlgorithm", "unsigned16"),
|
|
||||||
(305, "samplingPacketInterval", "unsigned32"),
|
|
||||||
(306, "samplingPacketSpace", "unsigned32"),
|
|
||||||
(307, "samplingTimeInterval", "unsigned32"),
|
|
||||||
(308, "samplingTimeSpace", "unsigned32"),
|
|
||||||
(309, "samplingSize", "unsigned32"),
|
|
||||||
(310, "samplingPopulation", "unsigned32"),
|
|
||||||
(311, "samplingProbability", "float64"),
|
|
||||||
(312, "dataLinkFrameSize", "unsigned16"),
|
|
||||||
(313, "ipHeaderPacketSection", "octetArray"),
|
|
||||||
(314, "ipPayloadPacketSection", "octetArray"),
|
|
||||||
(315, "dataLinkFrameSection", "octetArray"),
|
|
||||||
(316, "mplsLabelStackSection", "octetArray"),
|
|
||||||
(317, "mplsPayloadPacketSection", "octetArray"),
|
|
||||||
(318, "selectorIdTotalPktsObserved", "unsigned64"),
|
|
||||||
(319, "selectorIdTotalPktsSelected", "unsigned64"),
|
|
||||||
(320, "absoluteError", "float64"),
|
|
||||||
(321, "relativeError", "float64"),
|
|
||||||
(322, "observationTimeSeconds", "dateTimeSeconds"),
|
|
||||||
(323, "observationTimeMilliseconds", "dateTimeMilliseconds"),
|
|
||||||
(324, "observationTimeMicroseconds", "dateTimeMicroseconds"),
|
|
||||||
(325, "observationTimeNanoseconds", "dateTimeNanoseconds"),
|
|
||||||
(326, "digestHashValue", "unsigned64"),
|
|
||||||
(327, "hashIPPayloadOffset", "unsigned64"),
|
|
||||||
(328, "hashIPPayloadSize", "unsigned64"),
|
|
||||||
(329, "hashOutputRangeMin", "unsigned64"),
|
|
||||||
(330, "hashOutputRangeMax", "unsigned64"),
|
|
||||||
(331, "hashSelectedRangeMin", "unsigned64"),
|
|
||||||
(332, "hashSelectedRangeMax", "unsigned64"),
|
|
||||||
(333, "hashDigestOutput", "boolean"),
|
|
||||||
(334, "hashInitialiserValue", "unsigned64"),
|
|
||||||
(335, "selectorName", "string"),
|
|
||||||
(336, "upperCILimit", "float64"),
|
|
||||||
(337, "lowerCILimit", "float64"),
|
|
||||||
(338, "confidenceLevel", "float64"),
|
|
||||||
(339, "informationElementDataType", "unsigned8"),
|
|
||||||
(340, "informationElementDescription", "string"),
|
|
||||||
(341, "informationElementName", "string"),
|
|
||||||
(342, "informationElementRangeBegin", "unsigned64"),
|
|
||||||
(343, "informationElementRangeEnd", "unsigned64"),
|
|
||||||
(344, "informationElementSemantics", "unsigned8"),
|
|
||||||
(345, "informationElementUnits", "unsigned16"),
|
|
||||||
(346, "privateEnterpriseNumber", "unsigned32"),
|
|
||||||
(347, "virtualStationInterfaceId", "octetArray"),
|
|
||||||
(348, "virtualStationInterfaceName", "string"),
|
|
||||||
(349, "virtualStationUUID", "octetArray"),
|
|
||||||
(350, "virtualStationName", "string"),
|
|
||||||
(351, "layer2SegmentId", "unsigned64"),
|
|
||||||
(352, "layer2OctetDeltaCount", "unsigned64"),
|
|
||||||
(353, "layer2OctetTotalCount", "unsigned64"),
|
|
||||||
(354, "ingressUnicastPacketTotalCount", "unsigned64"),
|
|
||||||
(355, "ingressMulticastPacketTotalCount", "unsigned64"),
|
|
||||||
(356, "ingressBroadcastPacketTotalCount", "unsigned64"),
|
|
||||||
(357, "egressUnicastPacketTotalCount", "unsigned64"),
|
|
||||||
(358, "egressBroadcastPacketTotalCount", "unsigned64"),
|
|
||||||
(359, "monitoringIntervalStartMilliSeconds", "dateTimeMilliseconds"),
|
|
||||||
(360, "monitoringIntervalEndMilliSeconds", "dateTimeMilliseconds"),
|
|
||||||
(361, "portRangeStart", "unsigned16"),
|
|
||||||
(362, "portRangeEnd", "unsigned16"),
|
|
||||||
(363, "portRangeStepSize", "unsigned16"),
|
|
||||||
(364, "portRangeNumPorts", "unsigned16"),
|
|
||||||
(365, "staMacAddress", "macAddress"),
|
|
||||||
(366, "staIPv4Address", "ipv4Address"),
|
|
||||||
(367, "wtpMacAddress", "macAddress"),
|
|
||||||
(368, "ingressInterfaceType", "unsigned32"),
|
|
||||||
(369, "egressInterfaceType", "unsigned32"),
|
|
||||||
(370, "rtpSequenceNumber", "unsigned16"),
|
|
||||||
(371, "userName", "string"),
|
|
||||||
(372, "applicationCategoryName", "string"),
|
|
||||||
(373, "applicationSubCategoryName", "string"),
|
|
||||||
(374, "applicationGroupName", "string"),
|
|
||||||
(375, "originalFlowsPresent", "unsigned64"),
|
|
||||||
(376, "originalFlowsInitiated", "unsigned64"),
|
|
||||||
(377, "originalFlowsCompleted", "unsigned64"),
|
|
||||||
(378, "distinctCountOfSourceIPAddress", "unsigned64"),
|
|
||||||
(379, "distinctCountOfDestinationIPAddress", "unsigned64"),
|
|
||||||
(380, "distinctCountOfSourceIPv4Address", "unsigned32"),
|
|
||||||
(381, "distinctCountOfDestinationIPv4Address", "unsigned32"),
|
|
||||||
(382, "distinctCountOfSourceIPv6Address", "unsigned64"),
|
|
||||||
(383, "distinctCountOfDestinationIPv6Address", "unsigned64"),
|
|
||||||
(384, "valueDistributionMethod", "unsigned8"),
|
|
||||||
(385, "rfc3550JitterMilliseconds", "unsigned32"),
|
|
||||||
(386, "rfc3550JitterMicroseconds", "unsigned32"),
|
|
||||||
(387, "rfc3550JitterNanoseconds", "unsigned32"),
|
|
||||||
(388, "dot1qDEI", "boolean"),
|
|
||||||
(389, "dot1qCustomerDEI", "boolean"),
|
|
||||||
(390, "flowSelectorAlgorithm", "unsigned16"),
|
|
||||||
(391, "flowSelectedOctetDeltaCount", "unsigned64"),
|
|
||||||
(392, "flowSelectedPacketDeltaCount", "unsigned64"),
|
|
||||||
(393, "flowSelectedFlowDeltaCount", "unsigned64"),
|
|
||||||
(394, "selectorIDTotalFlowsObserved", "unsigned64"),
|
|
||||||
(395, "selectorIDTotalFlowsSelected", "unsigned64"),
|
|
||||||
(396, "samplingFlowInterval", "unsigned64"),
|
|
||||||
(397, "samplingFlowSpacing", "unsigned64"),
|
|
||||||
(398, "flowSamplingTimeInterval", "unsigned64"),
|
|
||||||
(399, "flowSamplingTimeSpacing", "unsigned64"),
|
|
||||||
(400, "hashFlowDomain", "unsigned16"),
|
|
||||||
(401, "transportOctetDeltaCount", "unsigned64"),
|
|
||||||
(402, "transportPacketDeltaCount", "unsigned64"),
|
|
||||||
(403, "originalExporterIPv4Address", "ipv4Address"),
|
|
||||||
(404, "originalExporterIPv6Address", "ipv6Address"),
|
|
||||||
(405, "originalObservationDomainId", "unsigned32"),
|
|
||||||
(406, "intermediateProcessId", "unsigned32"),
|
|
||||||
(407, "ignoredDataRecordTotalCount", "unsigned64"),
|
|
||||||
(408, "dataLinkFrameType", "unsigned16"),
|
|
||||||
(409, "sectionOffset", "unsigned16"),
|
|
||||||
(410, "sectionExportedOctets", "unsigned16"),
|
|
||||||
(411, "dot1qServiceInstanceTag", "octetArray"),
|
|
||||||
(412, "dot1qServiceInstanceId", "unsigned32"),
|
|
||||||
(413, "dot1qServiceInstancePriority", "unsigned8"),
|
|
||||||
(414, "dot1qCustomerSourceMacAddress", "macAddress"),
|
|
||||||
(415, "dot1qCustomerDestinationMacAddress", "macAddress"),
|
|
||||||
(416, "", ""),
|
|
||||||
(417, "postLayer2OctetDeltaCount", "unsigned64"),
|
|
||||||
(418, "postMCastLayer2OctetDeltaCount", "unsigned64"),
|
|
||||||
(419, "", ""),
|
|
||||||
(420, "postLayer2OctetTotalCount", "unsigned64"),
|
|
||||||
(421, "postMCastLayer2OctetTotalCount", "unsigned64"),
|
|
||||||
(422, "minimumLayer2TotalLength", "unsigned64"),
|
|
||||||
(423, "maximumLayer2TotalLength", "unsigned64"),
|
|
||||||
(424, "droppedLayer2OctetDeltaCount", "unsigned64"),
|
|
||||||
(425, "droppedLayer2OctetTotalCount", "unsigned64"),
|
|
||||||
(426, "ignoredLayer2OctetTotalCount", "unsigned64"),
|
|
||||||
(427, "notSentLayer2OctetTotalCount", "unsigned64"),
|
|
||||||
(428, "layer2OctetDeltaSumOfSquares", "unsigned64"),
|
|
||||||
(429, "layer2OctetTotalSumOfSquares", "unsigned64"),
|
|
||||||
(430, "layer2FrameDeltaCount", "unsigned64"),
|
|
||||||
(431, "layer2FrameTotalCount", "unsigned64"),
|
|
||||||
(432, "pseudoWireDestinationIPv4Address", "ipv4Address"),
|
|
||||||
(433, "ignoredLayer2FrameTotalCount", "unsigned64"),
|
|
||||||
(434, "mibObjectValueInteger", "signed32"),
|
|
||||||
(435, "mibObjectValueOctetString", "octetArray"),
|
|
||||||
(436, "mibObjectValueOID", "octetArray"),
|
|
||||||
(437, "mibObjectValueBits", "octetArray"),
|
|
||||||
(438, "mibObjectValueIPAddress", "ipv4Address"),
|
|
||||||
(439, "mibObjectValueCounter", "unsigned64"),
|
|
||||||
(440, "mibObjectValueGauge", "unsigned32"),
|
|
||||||
(441, "mibObjectValueTimeTicks", "unsigned32"),
|
|
||||||
(442, "mibObjectValueUnsigned", "unsigned32"),
|
|
||||||
(443, "mibObjectValueTable", "subTemplateList"),
|
|
||||||
(444, "mibObjectValueRow", "subTemplateList"),
|
|
||||||
(445, "mibObjectIdentifier", "octetArray"),
|
|
||||||
(446, "mibSubIdentifier", "unsigned32"),
|
|
||||||
(447, "mibIndexIndicator", "unsigned64"),
|
|
||||||
(448, "mibCaptureTimeSemantics", "unsigned8"),
|
|
||||||
(449, "mibContextEngineID", "octetArray"),
|
|
||||||
(450, "mibContextName", "string"),
|
|
||||||
(451, "mibObjectName", "string"),
|
|
||||||
(452, "mibObjectDescription", "string"),
|
|
||||||
(453, "mibObjectSyntax", "string"),
|
|
||||||
(454, "mibModuleName", "string"),
|
|
||||||
(455, "mobileIMSI", "string"),
|
|
||||||
(456, "mobileMSISDN", "string"),
|
|
||||||
(457, "httpStatusCode", "unsigned16"),
|
|
||||||
(458, "sourceTransportPortsLimit", "unsigned16"),
|
|
||||||
(459, "httpRequestMethod", "string"),
|
|
||||||
(460, "httpRequestHost", "string"),
|
|
||||||
(461, "httpRequestTarget", "string"),
|
|
||||||
(462, "httpMessageVersion", "string"),
|
|
||||||
(463, "natInstanceID", "unsigned32"),
|
|
||||||
(464, "internalAddressRealm", "octetArray"),
|
|
||||||
(465, "externalAddressRealm", "octetArray"),
|
|
||||||
(466, "natQuotaExceededEvent", "unsigned32"),
|
|
||||||
(467, "natThresholdEvent", "unsigned32"),
|
|
||||||
(468, "httpUserAgent", "string"),
|
|
||||||
(469, "httpContentType", "string"),
|
|
||||||
(470, "httpReasonPhrase", "string"),
|
|
||||||
(471, "maxSessionEntries", "unsigned32"),
|
|
||||||
(472, "maxBIBEntries", "unsigned32"),
|
|
||||||
(473, "maxEntriesPerUser", "unsigned32"),
|
|
||||||
(474, "maxSubscribers", "unsigned32"),
|
|
||||||
(475, "maxFragmentsPendingReassembly", "unsigned32"),
|
|
||||||
(476, "addressPoolHighThreshold", "unsigned32"),
|
|
||||||
(477, "addressPoolLowThreshold", "unsigned32"),
|
|
||||||
(478, "addressPortMappingHighThreshold", "unsigned32"),
|
|
||||||
(479, "addressPortMappingLowThreshold", "unsigned32"),
|
|
||||||
(480, "addressPortMappingPerUserHighThreshold", "unsigned32"),
|
|
||||||
(481, "globalAddressMappingHighThreshold", "unsigned32"),
|
|
||||||
(482, "vpnIdentifier", "octetArray"),
|
|
||||||
(483, "bgpCommunity", "unsigned32"),
|
|
||||||
(484, "bgpSourceCommunityList", "basicList"),
|
|
||||||
(485, "bgpDestinationCommunityList", "basicList"),
|
|
||||||
(486, "bgpExtendedCommunity", "octetArray"),
|
|
||||||
(487, "bgpSourceExtendedCommunityList", "basicList"),
|
|
||||||
(488, "bgpDestinationExtendedCommunityList", "basicList"),
|
|
||||||
(489, "bgpLargeCommunity", "octetArray"),
|
|
||||||
(490, "bgpSourceLargeCommunityList", "basicList"),
|
|
||||||
(491, "bgpDestinationLargeCommunityList", "basicList"),
|
|
||||||
]
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
@functools.lru_cache(maxsize=128)
|
|
||||||
def by_id(cls, id_: int) -> Optional[FieldType]:
|
|
||||||
for item in cls.iana_field_types:
|
|
||||||
if item[0] == id_:
|
|
||||||
return FieldType(*item)
|
|
||||||
return None
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
@functools.lru_cache(maxsize=128)
|
|
||||||
def by_name(cls, key: str) -> Optional[FieldType]:
|
|
||||||
for item in cls.iana_field_types:
|
|
||||||
if item[1] == key:
|
|
||||||
return FieldType(*item)
|
|
||||||
return None
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
@functools.lru_cache(maxsize=128)
|
|
||||||
def get_type_unpack(cls, key: Union[int, str]) -> Optional[DataType]:
|
|
||||||
"""
|
|
||||||
This method covers the mapping from a field type to a struct.unpack format string.
|
|
||||||
BLOCKED: due to Reduced-Size Encoding, fields may be exported with a smaller length than defined in
|
|
||||||
the standard. Because of this mismatch, the parser in `IPFIXDataRecord.__init__` cannot use this method.
|
|
||||||
:param key:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
item = None
|
|
||||||
if type(key) is int:
|
|
||||||
item = cls.by_id(key)
|
|
||||||
elif type(key) is str:
|
|
||||||
item = cls.by_name(key)
|
|
||||||
if not item:
|
|
||||||
return None
|
|
||||||
return IPFIXDataTypes.by_name(item.type)
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXDataTypes:
|
|
||||||
# Source: https://www.iana.org/assignments/ipfix/ipfix-information-element-data-types.csv
|
|
||||||
# Reference: https://tools.ietf.org/html/rfc7011
|
|
||||||
iana_data_types = [
|
|
||||||
("octetArray", None), # has no encoding rules; it represents a raw array of zero or more octets
|
|
||||||
("unsigned8", "B"),
|
|
||||||
("unsigned16", "H"),
|
|
||||||
("unsigned32", "I"),
|
|
||||||
("unsigned64", "Q"),
|
|
||||||
("signed8", "b"),
|
|
||||||
("signed16", "h"),
|
|
||||||
("signed32", "i"),
|
|
||||||
("signed64", "q"),
|
|
||||||
("float32", "f"),
|
|
||||||
("float64", "d"),
|
|
||||||
("boolean", "?"), # encoded as a single-octet integer [..], with the value 1 for true and value 2 for false.
|
|
||||||
("macAddress", "6s"),
|
|
||||||
("string", None), # represents a finite-length string of valid characters of the Unicode encoding set
|
|
||||||
("dateTimeSeconds", "I"),
|
|
||||||
("dateTimeMilliseconds", "Q"),
|
|
||||||
("dateTimeMicroseconds", "8s"), # This field is made up of two unsigned 32-bit integers
|
|
||||||
("dateTimeNanoseconds", "8s"), # same as above
|
|
||||||
("ipv4Address", "4s"),
|
|
||||||
("ipv6Address", "16s"),
|
|
||||||
|
|
||||||
# To be implemented
|
|
||||||
# ("basicList", "x"),
|
|
||||||
# ("subTemplateList", "x"),
|
|
||||||
# ("subTemplateMultiList", "x"),
|
|
||||||
]
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
@functools.lru_cache(maxsize=128)
|
|
||||||
def by_name(cls, key: str) -> Optional[DataType]:
|
|
||||||
"""
|
|
||||||
Get DataType by name if found, else None.
|
|
||||||
:param key:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
for t in cls.iana_data_types:
|
|
||||||
if t[0] == key:
|
|
||||||
return DataType(*t)
|
|
||||||
return None
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def is_signed(cls, dt: Union[DataType, str]) -> bool:
|
|
||||||
"""
|
|
||||||
Check if a data type is meant to be a signed integer.
|
|
||||||
:param dt:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
fields = ["signed8", "signed16", "signed32", "signed64"]
|
|
||||||
if type(dt) is DataType:
|
|
||||||
return dt.type in fields
|
|
||||||
return dt in fields
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def is_float(cls, dt: Union[DataType, str]) -> bool:
|
|
||||||
"""
|
|
||||||
Check if data type is meant to be a float.
|
|
||||||
:param dt:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
fields = ["float32", "float64"]
|
|
||||||
if type(dt) is DataType:
|
|
||||||
return dt.type in fields
|
|
||||||
return dt in fields
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def is_bytes(cls, dt: Union[DataType, str]) -> bool:
|
|
||||||
"""
|
|
||||||
Check if a data type is meant to be parsed as bytes.
|
|
||||||
:param dt:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
fields = ["octetArray", "string",
|
|
||||||
"macAddress", "ipv4Address", "ipv6Address",
|
|
||||||
"dateTimeMicroseconds", "dateTimeNanoseconds"]
|
|
||||||
if type(dt) is DataType:
|
|
||||||
return dt.type in fields
|
|
||||||
return dt in fields
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def to_fitting_object(cls, field):
|
|
||||||
"""
|
|
||||||
Could implement conversion to IPv4Address etc.
|
|
||||||
:param field:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXMalformedRecord(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXRFCError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXMalformedPacket(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXTemplateError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXTemplateNotRecognized(KeyError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class PaddingCalculationError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXHeader:
|
|
||||||
"""The header of the IPFIX export packet
|
|
||||||
"""
|
|
||||||
size = 16
|
|
||||||
|
|
||||||
def __init__(self, data):
|
|
||||||
pack = struct.unpack('!HHIII', data)
|
|
||||||
self.version = pack[0]
|
|
||||||
self.length = pack[1]
|
|
||||||
self.export_uptime = pack[2]
|
|
||||||
self.sequence_number = pack[3]
|
|
||||||
self.obervation_domain_id = pack[4]
|
|
||||||
|
|
||||||
def to_dict(self):
|
|
||||||
return self.__dict__
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXTemplateRecord:
|
|
||||||
def __init__(self, data):
|
|
||||||
pack = struct.unpack("!HH", data[:4])
|
|
||||||
self.template_id = pack[0] # range 256 to 65535
|
|
||||||
self.field_count = pack[1] # Number of fields in this Template Record
|
|
||||||
|
|
||||||
offset = 4
|
|
||||||
self.fields, offset_add = parse_fields(data[offset:], self.field_count)
|
|
||||||
offset += offset_add
|
|
||||||
if len(self.fields) != self.field_count:
|
|
||||||
raise IPFIXMalformedRecord
|
|
||||||
self._length = offset
|
|
||||||
|
|
||||||
def get_length(self):
|
|
||||||
return self._length
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<IPFIXTemplateRecord with {} fields>".format(len(self.fields))
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXOptionsTemplateRecord:
|
|
||||||
def __init__(self, data):
|
|
||||||
pack = struct.unpack("!HHH", data[:6])
|
|
||||||
self.template_id = pack[0] # range 256 to 65535
|
|
||||||
self.field_count = pack[1] # includes count of scope fields
|
|
||||||
|
|
||||||
# A scope field count of N specifies that the first N Field Specifiers in
|
|
||||||
# the Template Record are Scope Fields. The Scope Field Count MUST NOT be zero.
|
|
||||||
self.scope_field_count = pack[2]
|
|
||||||
|
|
||||||
offset = 6
|
|
||||||
|
|
||||||
self.scope_fields, offset_add = parse_fields(data[offset:], self.scope_field_count)
|
|
||||||
if len(self.scope_fields) != self.scope_field_count:
|
|
||||||
raise IPFIXMalformedRecord
|
|
||||||
offset += offset_add
|
|
||||||
|
|
||||||
self.fields, offset_add = parse_fields(data[offset:], self.field_count - self.scope_field_count)
|
|
||||||
if len(self.fields) + len(self.scope_fields) != self.field_count:
|
|
||||||
raise IPFIXMalformedRecord
|
|
||||||
offset += offset_add
|
|
||||||
|
|
||||||
self._length = offset
|
|
||||||
|
|
||||||
def get_length(self):
|
|
||||||
return self._length
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<IPFIXOptionsTemplateRecord with {} scope fields and {} fields>".format(
|
|
||||||
len(self.scope_fields), len(self.fields)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXDataRecord:
|
|
||||||
"""The IPFIX data record with fields and their value.
|
|
||||||
The field types are identified by the corresponding template.
|
|
||||||
In contrast to the NetFlow v9 implementation, this one does not use an extra class for the fields.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, data, template: List[Union[TemplateField, TemplateFieldEnterprise]]):
|
|
||||||
self.fields = set()
|
|
||||||
offset = 0
|
|
||||||
unpacker = "!"
|
|
||||||
discovered_fields = []
|
|
||||||
|
|
||||||
# Iterate through all fields of this template and build the unpack format string
|
|
||||||
# See https://www.iana.org/assignments/ipfix/ipfix.xhtml
|
|
||||||
for index, field in enumerate(template):
|
|
||||||
field_type_id = field.id
|
|
||||||
field_length = field.length
|
|
||||||
offset += field_length
|
|
||||||
|
|
||||||
# Here, reduced-size encoding of fields blocks the usage of IPFIXFieldTypes.get_type_unpack.
|
|
||||||
# See comment in IPFIXFieldTypes.get_type_unpack for more information.
|
|
||||||
|
|
||||||
field_type = IPFIXFieldTypes.by_id(field_type_id) # type: Optional[FieldType]
|
|
||||||
if not field_type and type(field) is not TemplateFieldEnterprise:
|
|
||||||
# This should break, since the exporter seems to use a field identifier
|
|
||||||
# which is not standardized by IANA.
|
|
||||||
raise NotImplementedError("Field type with ID {} is not implemented".format(field_type_id))
|
|
||||||
|
|
||||||
datatype = field_type.type # type: str
|
|
||||||
discovered_fields.append((datatype, field_type_id))
|
|
||||||
|
|
||||||
# Catch fields which are meant to be raw bytes and skip the rest
|
|
||||||
if IPFIXDataTypes.is_bytes(datatype):
|
|
||||||
unpacker += "{}s".format(field_length)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Go into int, uint, float types
|
|
||||||
issigned = IPFIXDataTypes.is_signed(datatype)
|
|
||||||
isfloat = IPFIXDataTypes.is_float(datatype)
|
|
||||||
assert not (all([issigned, isfloat])) # signed int and float are exclusive
|
|
||||||
|
|
||||||
if field_length == 1:
|
|
||||||
unpacker += "b" if issigned else "B"
|
|
||||||
elif field_length == 2:
|
|
||||||
unpacker += "h" if issigned else "H"
|
|
||||||
elif field_length == 4:
|
|
||||||
unpacker += "i" if issigned else "f" if isfloat else "I"
|
|
||||||
elif field_length == 8:
|
|
||||||
unpacker += "q" if issigned else "d" if isfloat else "Q"
|
|
||||||
else:
|
|
||||||
raise IPFIXTemplateError("Template field_length {} not handled in unpacker".format(field_length))
|
|
||||||
|
|
||||||
# Finally, unpack the data byte stream according to format defined in iteration above
|
|
||||||
pack = struct.unpack(unpacker, data[0:offset])
|
|
||||||
|
|
||||||
# Iterate through template again, but taking the unpacked values this time
|
|
||||||
for index, ((field_datatype, field_type_id), value) in enumerate(zip(discovered_fields, pack)):
|
|
||||||
if type(value) is bytes:
|
|
||||||
# Check if value is raw bytes, so no conversion happened in struct.unpack
|
|
||||||
if field_datatype in ["string"]:
|
|
||||||
try:
|
|
||||||
value = value.decode()
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
value = str(value)
|
|
||||||
# TODO: handle octetArray (= does not have to be unicode encoded)
|
|
||||||
elif field_datatype in ["boolean"]:
|
|
||||||
value = True if value == 1 else False # 2 = false per RFC
|
|
||||||
elif field_datatype in ["dateTimeMicroseconds", "dateTimeNanoseconds"]:
|
|
||||||
seconds = value[:4]
|
|
||||||
fraction = value[4:]
|
|
||||||
value = (int.from_bytes(seconds, "big"), int.from_bytes(fraction, "big"))
|
|
||||||
else:
|
|
||||||
value = int.from_bytes(value, "big")
|
|
||||||
# If not bytes, struct.unpack already did necessary conversions (int, float...),
|
|
||||||
# value can be used as-is.
|
|
||||||
self.fields.add((field_type_id, value))
|
|
||||||
|
|
||||||
self._length = offset
|
|
||||||
self.__dict__.update(self.data)
|
|
||||||
|
|
||||||
def get_length(self):
|
|
||||||
return self._length
|
|
||||||
|
|
||||||
@property
|
|
||||||
def data(self):
|
|
||||||
return {
|
|
||||||
IPFIXFieldTypes.by_id(key)[1]: value for (key, value) in self.fields
|
|
||||||
}
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<IPFIXDataRecord with {} entries>".format(len(self.fields))
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXSet:
|
|
||||||
"""A set containing the set header and a collection of records (one of templates, options, data)
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, data: bytes, templates):
|
|
||||||
self.header = IPFIXSetHeader(data[0:IPFIXSetHeader.size])
|
|
||||||
self.records = []
|
|
||||||
self._templates = {}
|
|
||||||
|
|
||||||
offset = IPFIXSetHeader.size # fixed size
|
|
||||||
|
|
||||||
if self.header.set_id == 2: # template set
|
|
||||||
while offset < self.header.length: # length of whole set
|
|
||||||
template_record = IPFIXTemplateRecord(data[offset:])
|
|
||||||
self.records.append(template_record)
|
|
||||||
if template_record.field_count == 0:
|
|
||||||
# Should not happen, since RFC says "one or more"
|
|
||||||
self._templates[template_record.template_id] = None
|
|
||||||
else:
|
|
||||||
self._templates[template_record.template_id] = template_record.fields
|
|
||||||
offset += template_record.get_length()
|
|
||||||
|
|
||||||
# If the rest of the data is deemed to be too small for another
|
|
||||||
# template record, check existence of padding
|
|
||||||
if (
|
|
||||||
offset != self.header.length
|
|
||||||
and self.header.length - offset <= 16 # 16 is chosen as a guess
|
|
||||||
and rest_is_padding_zeroes(data[:self.header.length], offset)
|
|
||||||
):
|
|
||||||
# Rest should be padding zeroes
|
|
||||||
break
|
|
||||||
|
|
||||||
elif self.header.set_id == 3: # options template
|
|
||||||
while offset < self.header.length:
|
|
||||||
optionstemplate_record = IPFIXOptionsTemplateRecord(data[offset:])
|
|
||||||
self.records.append(optionstemplate_record)
|
|
||||||
if optionstemplate_record.field_count == 0:
|
|
||||||
self._templates[optionstemplate_record.template_id] = None
|
|
||||||
else:
|
|
||||||
self._templates[optionstemplate_record.template_id] = \
|
|
||||||
optionstemplate_record.scope_fields + optionstemplate_record.fields
|
|
||||||
offset += optionstemplate_record.get_length()
|
|
||||||
|
|
||||||
# If the rest of the data is deemed to be too small for another
|
|
||||||
# options template record, check existence of padding
|
|
||||||
if (
|
|
||||||
offset != self.header.length
|
|
||||||
and self.header.length - offset <= 16 # 16 is chosen as a guess
|
|
||||||
and rest_is_padding_zeroes(data[:self.header.length], offset)
|
|
||||||
):
|
|
||||||
# Rest should be padding zeroes
|
|
||||||
break
|
|
||||||
|
|
||||||
elif self.header.set_id >= 256: # data set, set_id is template id
|
|
||||||
# First, get the template behind the ID. Returns a list of fields or raises an exception
|
|
||||||
template_fields = templates.get(
|
|
||||||
self.header.set_id) # type: List[Union[TemplateField, TemplateFieldEnterprise]]
|
|
||||||
if not template_fields:
|
|
||||||
raise IPFIXTemplateNotRecognized
|
|
||||||
|
|
||||||
# All template fields have a known length. Add them all together to get the length of the data set.
|
|
||||||
dataset_length = functools.reduce(lambda a, x: a + x.length, template_fields, 0)
|
|
||||||
|
|
||||||
# This is the last possible offset value possible if there's no padding.
|
|
||||||
# If there is padding, this value marks the beginning of the padding.
|
|
||||||
# Two cases possible:
|
|
||||||
# 1. No padding: then (4 + x * dataset_length) == self.header.length
|
|
||||||
# 2. Padding: then (4 + x * dataset_length + p) == self.header.length,
|
|
||||||
# where p is the remaining length of padding zeroes. The modulo calculates p
|
|
||||||
no_padding_last_offset = self.header.length - ((self.header.length - IPFIXSetHeader.size) % dataset_length)
|
|
||||||
|
|
||||||
while offset < no_padding_last_offset:
|
|
||||||
data_record = IPFIXDataRecord(data[offset:], template_fields)
|
|
||||||
self.records.append(data_record)
|
|
||||||
offset += data_record.get_length()
|
|
||||||
|
|
||||||
# Safety check
|
|
||||||
if (
|
|
||||||
offset != self.header.length
|
|
||||||
and not rest_is_padding_zeroes(data[:self.header.length], offset)
|
|
||||||
):
|
|
||||||
raise PaddingCalculationError
|
|
||||||
|
|
||||||
self._length = self.header.length
|
|
||||||
|
|
||||||
def get_length(self):
|
|
||||||
return self._length
|
|
||||||
|
|
||||||
@property
|
|
||||||
def is_template(self):
|
|
||||||
return self.header.set_id in [2, 3]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def is_data(self):
|
|
||||||
return self.header.set_id >= 256
|
|
||||||
|
|
||||||
@property
|
|
||||||
def templates(self):
|
|
||||||
return self._templates
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<IPFIXSet with set_id {} and {} records>".format(self.header.set_id, len(self.records))
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXSetHeader:
|
|
||||||
"""Header of a set (collection of records)
|
|
||||||
"""
|
|
||||||
size = 4
|
|
||||||
|
|
||||||
def __init__(self, data):
|
|
||||||
pack = struct.unpack("!HH", data)
|
|
||||||
|
|
||||||
# A value of 2 is reserved for Template Sets.
|
|
||||||
# A value of 3 is reserved for Options Template Sets. Values from 4
|
|
||||||
# to 255 are reserved for future use. Values 256 and above are used
|
|
||||||
# for Data Sets. The Set ID values of 0 and 1 are not used, for
|
|
||||||
# historical reasons [RFC3954].
|
|
||||||
self.set_id = pack[0]
|
|
||||||
if self.set_id in [0, 1] + [i for i in range(4, 256)]:
|
|
||||||
raise IPFIXRFCError("IPFIX set has forbidden ID {}".format(self.set_id))
|
|
||||||
|
|
||||||
self.length = pack[1] # Total length of the Set, in octets, including the Set Header
|
|
||||||
|
|
||||||
def to_dict(self):
|
|
||||||
return self.__dict__
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<IPFIXSetHeader with set_id {} and length {}>".format(self.set_id, self.length)
|
|
||||||
|
|
||||||
|
|
||||||
class IPFIXExportPacket:
|
|
||||||
"""IPFIX export packet with header, templates, options and data flowsets
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, data: bytes, templates: Dict[int, list]):
|
|
||||||
self.header = IPFIXHeader(data[:IPFIXHeader.size])
|
|
||||||
self.sets = []
|
|
||||||
self._contains_new_templates = False
|
|
||||||
self._flows = []
|
|
||||||
self._templates = templates
|
|
||||||
|
|
||||||
offset = IPFIXHeader.size
|
|
||||||
while offset < self.header.length:
|
|
||||||
try:
|
|
||||||
new_set = IPFIXSet(data[offset:], templates)
|
|
||||||
except IPFIXTemplateNotRecognized:
|
|
||||||
raise
|
|
||||||
if new_set.is_template:
|
|
||||||
self._contains_new_templates = True
|
|
||||||
self._templates.update(new_set.templates)
|
|
||||||
for template_id, template_fields in self._templates.items():
|
|
||||||
if template_fields is None:
|
|
||||||
# Template withdrawal
|
|
||||||
del self._templates[template_id]
|
|
||||||
elif new_set.is_data:
|
|
||||||
self._flows += new_set.records
|
|
||||||
|
|
||||||
self.sets.append(new_set)
|
|
||||||
offset += new_set.get_length()
|
|
||||||
|
|
||||||
# Here all data should be processed and offset set to the length
|
|
||||||
if offset != self.header.length:
|
|
||||||
raise IPFIXMalformedPacket
|
|
||||||
|
|
||||||
@property
|
|
||||||
def contains_new_templates(self) -> bool:
|
|
||||||
return self._contains_new_templates
|
|
||||||
|
|
||||||
@property
|
|
||||||
def flows(self):
|
|
||||||
return self._flows
|
|
||||||
|
|
||||||
@property
|
|
||||||
def templates(self):
|
|
||||||
return self._templates
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<IPFIXExportPacket with {} sets, exported at {}>".format(
|
|
||||||
len(self.sets), self.header.export_uptime
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def parse_fields(data: bytes, count: int) -> (list, int):
|
|
||||||
"""
|
|
||||||
Parse fields from a bytes stream, based on the count of fields.
|
|
||||||
If the field is an enterprise field or not will be determinded in this function.
|
|
||||||
:param data:
|
|
||||||
:param count:
|
|
||||||
:return: List of fields and the new offset.
|
|
||||||
"""
|
|
||||||
offset = 0
|
|
||||||
fields = [] # type: List[Union[TemplateField, TemplateFieldEnterprise]]
|
|
||||||
for ctr in range(count):
|
|
||||||
if (data[offset] & (1 << 7)) != 0: # enterprise flag set. Bitwise AND checks bit only in the first byte/octet
|
|
||||||
pack = struct.unpack("!HHI", data[offset:offset + 8])
|
|
||||||
fields.append(
|
|
||||||
TemplateFieldEnterprise(
|
|
||||||
id=(pack[0] & ~(1 << 15)), # clear enterprise flag bit. Bitwise AND and INVERT work on two bytes
|
|
||||||
length=pack[1], # field length
|
|
||||||
enterprise_number=pack[2] # enterprise number
|
|
||||||
)
|
|
||||||
)
|
|
||||||
offset += 8
|
|
||||||
else:
|
|
||||||
pack = struct.unpack("!HH", data[offset:offset + 4])
|
|
||||||
fields.append(
|
|
||||||
TemplateField(
|
|
||||||
id=pack[0],
|
|
||||||
length=pack[1]
|
|
||||||
)
|
|
||||||
)
|
|
||||||
offset += 4
|
|
||||||
return fields, offset
|
|
||||||
|
|
||||||
|
|
||||||
def rest_is_padding_zeroes(data: bytes, offset: int) -> bool:
|
|
||||||
if offset <= len(data):
|
|
||||||
# padding zeros, so rest of bytes must be summed to 0
|
|
||||||
if sum(data[offset:]) != 0:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
# If offset > len(data) there is an error
|
|
||||||
raise ValueError("netflow.ipfix.rest_is_padding_zeroes received a greater offset value than there is data")
|
|
101
netflow/utils.py
101
netflow/utils.py
|
@ -1,101 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
"""
|
|
||||||
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
||||||
|
|
||||||
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
|
|
||||||
Licensed under MIT License. See LICENSE.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import struct
|
|
||||||
from typing import Union, Dict
|
|
||||||
|
|
||||||
from .ipfix import IPFIXExportPacket
|
|
||||||
from .v1 import V1ExportPacket
|
|
||||||
from .v5 import V5ExportPacket
|
|
||||||
from .v9 import V9ExportPacket
|
|
||||||
|
|
||||||
|
|
||||||
class UnknownExportVersion(Exception):
|
|
||||||
def __init__(self, data, version):
|
|
||||||
self.data = data
|
|
||||||
self.version = version
|
|
||||||
r = repr(data)
|
|
||||||
data_str = ("{:.25}..." if len(r) >= 28 else "{}").format(r)
|
|
||||||
super().__init__(
|
|
||||||
"Unknown NetFlow version {} for data {}".format(version, data_str)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def get_export_version(data):
|
|
||||||
return struct.unpack('!H', data[:2])[0]
|
|
||||||
|
|
||||||
|
|
||||||
def parse_packet(data: Union[str, bytes], templates: Dict = None) \
|
|
||||||
-> Union[V1ExportPacket, V5ExportPacket, V9ExportPacket, IPFIXExportPacket]:
|
|
||||||
"""
|
|
||||||
Parse an exported packet, either from string (hex) or from bytes.
|
|
||||||
|
|
||||||
NetFlow version 9 and IPFIX use dynamic templates, which are sent by the exporter in regular intervals.
|
|
||||||
These templates must be cached in between exports and are re-used for incoming new export packets.
|
|
||||||
|
|
||||||
The following pseudo-code might help to understand the use case better. First, the collector is started, a new
|
|
||||||
templates dict is created with default keys and an empty list for buffered packets is added. Then the receiver
|
|
||||||
loop is started. For each arriving packet, it is tried to be parsed. If parsing fails due to unknown templates,
|
|
||||||
the packet is queued for later re-parsing (this functionality is not handled in this code snippet).
|
|
||||||
|
|
||||||
```
|
|
||||||
collector = netflow.collector
|
|
||||||
coll = collector.start('0.0.0.0', 2055)
|
|
||||||
templates = {"netflow": [], "ipfix": []}
|
|
||||||
packets_with_unrecognized_templates = []
|
|
||||||
while coll.receive_export():
|
|
||||||
packet = coll.get_received_export_packet()
|
|
||||||
try:
|
|
||||||
parsed_packet = parse_packet(packet, templates)
|
|
||||||
except (V9TemplateNotRecognized, IPFIXTemplateNotRecognized):
|
|
||||||
packets_with_unrecognized_templates.append(packet)
|
|
||||||
```
|
|
||||||
|
|
||||||
See the reference implementation of the collector for more information on how to use this function with templates.
|
|
||||||
|
|
||||||
:raises ValueError: When the templates parameter was not passed, but templates must be used (v9, IPFIX).
|
|
||||||
:raises UnknownExportVersion: When the exported version is not recognized.
|
|
||||||
|
|
||||||
:param data: The export packet as string or bytes.
|
|
||||||
:param templates: The templates dictionary with keys 'netflow' and 'ipfix' (created if not existing).
|
|
||||||
:return: The parsed packet, or an exception.
|
|
||||||
"""
|
|
||||||
if type(data) is str:
|
|
||||||
# hex dump as string
|
|
||||||
data = bytes.fromhex(data)
|
|
||||||
elif type(data) is bytes:
|
|
||||||
# check representation based on utf-8 decoding result
|
|
||||||
try:
|
|
||||||
# hex dump as bytes, but not hex
|
|
||||||
dec = data.decode()
|
|
||||||
data = bytes.fromhex(dec)
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
# use data as given, assuming hex-formatted bytes
|
|
||||||
pass
|
|
||||||
|
|
||||||
version = get_export_version(data)
|
|
||||||
|
|
||||||
if version in [9, 10] and templates is None:
|
|
||||||
raise ValueError("{} packet detected, but no templates dict was passed! For correct parsing of packets with "
|
|
||||||
"templates, create a 'templates' dict and pass it into the 'parse_packet' function."
|
|
||||||
.format("NetFlow v9" if version == 9 else "IPFIX"))
|
|
||||||
|
|
||||||
if version == 1:
|
|
||||||
return V1ExportPacket(data)
|
|
||||||
elif version == 5:
|
|
||||||
return V5ExportPacket(data)
|
|
||||||
elif version == 9:
|
|
||||||
if "netflow" not in templates:
|
|
||||||
templates["netflow"] = []
|
|
||||||
return V9ExportPacket(data, templates["netflow"])
|
|
||||||
elif version == 10:
|
|
||||||
if "ipfix" not in templates:
|
|
||||||
templates["ipfix"] = []
|
|
||||||
return IPFIXExportPacket(data, templates["ipfix"])
|
|
||||||
raise UnknownExportVersion(data, version)
|
|
|
@ -1,86 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
"""
|
|
||||||
Netflow V1 collector and parser implementation in Python 3.
|
|
||||||
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
||||||
Created purely for fun. Not battled tested nor will it be.
|
|
||||||
|
|
||||||
Reference https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html
|
|
||||||
This script is specifically implemented in combination with softflowd. See https://github.com/djmdjm/softflowd
|
|
||||||
"""
|
|
||||||
|
|
||||||
import struct
|
|
||||||
|
|
||||||
__all__ = ["V1DataFlow", "V1ExportPacket", "V1Header"]
|
|
||||||
|
|
||||||
|
|
||||||
class V1DataFlow:
|
|
||||||
"""Holds one v1 DataRecord
|
|
||||||
"""
|
|
||||||
length = 48
|
|
||||||
|
|
||||||
def __init__(self, data):
|
|
||||||
pack = struct.unpack('!IIIHHIIIIHHxxBBBxxxxxxx', data)
|
|
||||||
fields = [
|
|
||||||
'IPV4_SRC_ADDR',
|
|
||||||
'IPV4_DST_ADDR',
|
|
||||||
'NEXT_HOP',
|
|
||||||
'INPUT',
|
|
||||||
'OUTPUT',
|
|
||||||
'IN_PACKETS',
|
|
||||||
'IN_OCTETS',
|
|
||||||
'FIRST_SWITCHED',
|
|
||||||
'LAST_SWITCHED',
|
|
||||||
'SRC_PORT',
|
|
||||||
'DST_PORT',
|
|
||||||
# Word at 36-37 is used for padding
|
|
||||||
'PROTO',
|
|
||||||
'TOS',
|
|
||||||
'TCP_FLAGS',
|
|
||||||
# Data at 41-47 is padding/reserved
|
|
||||||
]
|
|
||||||
|
|
||||||
self.data = {}
|
|
||||||
for idx, field in enumerate(fields):
|
|
||||||
self.data[field] = pack[idx]
|
|
||||||
self.__dict__.update(self.data) # Make data dict entries accessible as object attributes
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<DataRecord with data {}>".format(self.data)
|
|
||||||
|
|
||||||
|
|
||||||
class V1Header:
|
|
||||||
"""The header of the V1ExportPacket
|
|
||||||
"""
|
|
||||||
length = 16
|
|
||||||
|
|
||||||
def __init__(self, data):
|
|
||||||
pack = struct.unpack('!HHIII', data[:self.length])
|
|
||||||
self.version = pack[0]
|
|
||||||
self.count = pack[1]
|
|
||||||
self.uptime = pack[2]
|
|
||||||
self.timestamp = pack[3]
|
|
||||||
self.timestamp_nano = pack[4]
|
|
||||||
|
|
||||||
def to_dict(self):
|
|
||||||
return self.__dict__
|
|
||||||
|
|
||||||
|
|
||||||
class V1ExportPacket:
|
|
||||||
"""The flow record holds the header and data flowsets.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, data):
|
|
||||||
self.flows = []
|
|
||||||
self.header = V1Header(data)
|
|
||||||
|
|
||||||
offset = self.header.length
|
|
||||||
for flow_count in range(0, self.header.count):
|
|
||||||
end = offset + V1DataFlow.length
|
|
||||||
flow = V1DataFlow(data[offset:end])
|
|
||||||
self.flows.append(flow)
|
|
||||||
offset += flow.length
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<ExportPacket v{} with {} records>".format(
|
|
||||||
self.header.version, self.header.count)
|
|
|
@ -1,94 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
"""
|
|
||||||
Netflow V5 collector and parser implementation in Python 3.
|
|
||||||
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
||||||
Created purely for fun. Not battled tested nor will it be.
|
|
||||||
|
|
||||||
Reference: https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html
|
|
||||||
This script is specifically implemented in combination with softflowd. See https://github.com/djmdjm/softflowd
|
|
||||||
"""
|
|
||||||
|
|
||||||
import struct
|
|
||||||
|
|
||||||
__all__ = ["V5DataFlow", "V5ExportPacket", "V5Header"]
|
|
||||||
|
|
||||||
|
|
||||||
class V5DataFlow:
|
|
||||||
"""Holds one v5 DataRecord
|
|
||||||
"""
|
|
||||||
length = 48
|
|
||||||
|
|
||||||
def __init__(self, data):
|
|
||||||
pack = struct.unpack("!IIIHHIIIIHHxBBBHHBBxx", data)
|
|
||||||
fields = [
|
|
||||||
'IPV4_SRC_ADDR',
|
|
||||||
'IPV4_DST_ADDR',
|
|
||||||
'NEXT_HOP',
|
|
||||||
'INPUT',
|
|
||||||
'OUTPUT',
|
|
||||||
'IN_PACKETS',
|
|
||||||
'IN_OCTETS',
|
|
||||||
'FIRST_SWITCHED',
|
|
||||||
'LAST_SWITCHED',
|
|
||||||
'SRC_PORT',
|
|
||||||
'DST_PORT',
|
|
||||||
# Byte 36 is used for padding
|
|
||||||
'TCP_FLAGS',
|
|
||||||
'PROTO',
|
|
||||||
'TOS',
|
|
||||||
'SRC_AS',
|
|
||||||
'DST_AS',
|
|
||||||
'SRC_MASK',
|
|
||||||
'DST_MASK',
|
|
||||||
# Word 46 is used for padding
|
|
||||||
]
|
|
||||||
|
|
||||||
self.data = {}
|
|
||||||
for idx, field in enumerate(fields):
|
|
||||||
self.data[field] = pack[idx]
|
|
||||||
self.__dict__.update(self.data) # Make data dict entries accessible as object attributes
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<DataRecord with data {}>".format(self.data)
|
|
||||||
|
|
||||||
|
|
||||||
class V5Header:
|
|
||||||
"""The header of the V5ExportPacket
|
|
||||||
"""
|
|
||||||
length = 24
|
|
||||||
|
|
||||||
def __init__(self, data):
|
|
||||||
pack = struct.unpack('!HHIIIIBBH', data[:self.length])
|
|
||||||
self.version = pack[0]
|
|
||||||
self.count = pack[1]
|
|
||||||
self.uptime = pack[2]
|
|
||||||
self.timestamp = pack[3]
|
|
||||||
self.timestamp_nano = pack[4]
|
|
||||||
self.sequence = pack[5]
|
|
||||||
self.engine_type = pack[6]
|
|
||||||
self.engine_id = pack[7]
|
|
||||||
self.sampling_interval = pack[8]
|
|
||||||
|
|
||||||
def to_dict(self):
|
|
||||||
return self.__dict__
|
|
||||||
|
|
||||||
|
|
||||||
class V5ExportPacket:
|
|
||||||
"""The flow record holds the header and data flowsets.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, data):
|
|
||||||
self.flows = []
|
|
||||||
self.header = V5Header(data)
|
|
||||||
|
|
||||||
offset = self.header.length
|
|
||||||
for flow_count in range(0, self.header.count):
|
|
||||||
end = offset + V5DataFlow.length
|
|
||||||
flow = V5DataFlow(data[offset:end])
|
|
||||||
self.flows.append(flow)
|
|
||||||
offset += flow.length
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<ExportPacket v{} with {} records>".format(
|
|
||||||
self.header.version, self.header.count)
|
|
615
netflow/v9.py
615
netflow/v9.py
|
@ -1,615 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
"""
|
|
||||||
Netflow V9 collector and parser implementation in Python 3.
|
|
||||||
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
||||||
Created for learning purposes and unsatisfying alternatives.
|
|
||||||
|
|
||||||
Reference: https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html
|
|
||||||
This script is specifically implemented in combination with softflowd. See https://github.com/djmdjm/softflowd
|
|
||||||
|
|
||||||
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
|
|
||||||
Licensed under MIT License. See LICENSE.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import ipaddress
|
|
||||||
import struct
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from .ipfix import IPFIXFieldTypes, IPFIXDataTypes
|
|
||||||
|
|
||||||
__all__ = ["V9DataFlowSet", "V9DataRecord", "V9ExportPacket", "V9Header", "V9TemplateField",
|
|
||||||
"V9TemplateFlowSet", "V9TemplateNotRecognized", "V9TemplateRecord",
|
|
||||||
"V9OptionsTemplateFlowSet", "V9OptionsTemplateRecord", "V9OptionsDataRecord"]
|
|
||||||
|
|
||||||
V9_FIELD_TYPES_CONTAINING_IP = [8, 12, 15, 18, 27, 28, 62, 63]
|
|
||||||
|
|
||||||
V9_FIELD_TYPES = {
|
|
||||||
0: 'UNKNOWN_FIELD_TYPE', # fallback for unknown field types
|
|
||||||
|
|
||||||
# Cisco specs for NetFlow v9
|
|
||||||
# https://tools.ietf.org/html/rfc3954
|
|
||||||
# https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html
|
|
||||||
1: 'IN_BYTES',
|
|
||||||
2: 'IN_PKTS',
|
|
||||||
3: 'FLOWS',
|
|
||||||
4: 'PROTOCOL',
|
|
||||||
5: 'SRC_TOS',
|
|
||||||
6: 'TCP_FLAGS',
|
|
||||||
7: 'L4_SRC_PORT',
|
|
||||||
8: 'IPV4_SRC_ADDR',
|
|
||||||
9: 'SRC_MASK',
|
|
||||||
10: 'INPUT_SNMP',
|
|
||||||
11: 'L4_DST_PORT',
|
|
||||||
12: 'IPV4_DST_ADDR',
|
|
||||||
13: 'DST_MASK',
|
|
||||||
14: 'OUTPUT_SNMP',
|
|
||||||
15: 'IPV4_NEXT_HOP',
|
|
||||||
16: 'SRC_AS',
|
|
||||||
17: 'DST_AS',
|
|
||||||
18: 'BGP_IPV4_NEXT_HOP',
|
|
||||||
19: 'MUL_DST_PKTS',
|
|
||||||
20: 'MUL_DST_BYTES',
|
|
||||||
21: 'LAST_SWITCHED',
|
|
||||||
22: 'FIRST_SWITCHED',
|
|
||||||
23: 'OUT_BYTES',
|
|
||||||
24: 'OUT_PKTS',
|
|
||||||
25: 'MIN_PKT_LNGTH',
|
|
||||||
26: 'MAX_PKT_LNGTH',
|
|
||||||
27: 'IPV6_SRC_ADDR',
|
|
||||||
28: 'IPV6_DST_ADDR',
|
|
||||||
29: 'IPV6_SRC_MASK',
|
|
||||||
30: 'IPV6_DST_MASK',
|
|
||||||
31: 'IPV6_FLOW_LABEL',
|
|
||||||
32: 'ICMP_TYPE',
|
|
||||||
33: 'MUL_IGMP_TYPE',
|
|
||||||
34: 'SAMPLING_INTERVAL',
|
|
||||||
35: 'SAMPLING_ALGORITHM',
|
|
||||||
36: 'FLOW_ACTIVE_TIMEOUT',
|
|
||||||
37: 'FLOW_INACTIVE_TIMEOUT',
|
|
||||||
38: 'ENGINE_TYPE',
|
|
||||||
39: 'ENGINE_ID',
|
|
||||||
40: 'TOTAL_BYTES_EXP',
|
|
||||||
41: 'TOTAL_PKTS_EXP',
|
|
||||||
42: 'TOTAL_FLOWS_EXP',
|
|
||||||
# 43 vendor proprietary
|
|
||||||
44: 'IPV4_SRC_PREFIX',
|
|
||||||
45: 'IPV4_DST_PREFIX',
|
|
||||||
46: 'MPLS_TOP_LABEL_TYPE',
|
|
||||||
47: 'MPLS_TOP_LABEL_IP_ADDR',
|
|
||||||
48: 'FLOW_SAMPLER_ID',
|
|
||||||
49: 'FLOW_SAMPLER_MODE',
|
|
||||||
50: 'NTERVAL',
|
|
||||||
# 51 vendor proprietary
|
|
||||||
52: 'MIN_TTL',
|
|
||||||
53: 'MAX_TTL',
|
|
||||||
54: 'IPV4_IDENT',
|
|
||||||
55: 'DST_TOS',
|
|
||||||
56: 'IN_SRC_MAC',
|
|
||||||
57: 'OUT_DST_MAC',
|
|
||||||
58: 'SRC_VLAN',
|
|
||||||
59: 'DST_VLAN',
|
|
||||||
60: 'IP_PROTOCOL_VERSION',
|
|
||||||
61: 'DIRECTION',
|
|
||||||
62: 'IPV6_NEXT_HOP',
|
|
||||||
63: 'BPG_IPV6_NEXT_HOP',
|
|
||||||
64: 'IPV6_OPTION_HEADERS',
|
|
||||||
# 65-69 vendor proprietary
|
|
||||||
70: 'MPLS_LABEL_1',
|
|
||||||
71: 'MPLS_LABEL_2',
|
|
||||||
72: 'MPLS_LABEL_3',
|
|
||||||
73: 'MPLS_LABEL_4',
|
|
||||||
74: 'MPLS_LABEL_5',
|
|
||||||
75: 'MPLS_LABEL_6',
|
|
||||||
76: 'MPLS_LABEL_7',
|
|
||||||
77: 'MPLS_LABEL_8',
|
|
||||||
78: 'MPLS_LABEL_9',
|
|
||||||
79: 'MPLS_LABEL_10',
|
|
||||||
80: 'IN_DST_MAC',
|
|
||||||
81: 'OUT_SRC_MAC',
|
|
||||||
82: 'IF_NAME',
|
|
||||||
83: 'IF_DESC',
|
|
||||||
84: 'SAMPLER_NAME',
|
|
||||||
85: 'IN_PERMANENT_BYTES',
|
|
||||||
86: 'IN_PERMANENT_PKTS',
|
|
||||||
# 87 vendor property
|
|
||||||
88: 'FRAGMENT_OFFSET',
|
|
||||||
89: 'FORWARDING_STATUS',
|
|
||||||
90: 'MPLS_PAL_RD',
|
|
||||||
91: 'MPLS_PREFIX_LEN', # Number of consecutive bits in the MPLS prefix length.
|
|
||||||
92: 'SRC_TRAFFIC_INDEX', # BGP Policy Accounting Source Traffic Index
|
|
||||||
93: 'DST_TRAFFIC_INDEX', # BGP Policy Accounting Destination Traffic Index
|
|
||||||
94: 'APPLICATION_DESCRIPTION', # Application description
|
|
||||||
95: 'APPLICATION_TAG', # 8 bits of engine ID, followed by n bits of classification
|
|
||||||
96: 'APPLICATION_NAME', # Name associated with a classification
|
|
||||||
98: 'postipDiffServCodePoint', # The value of a Differentiated Services Code Point (DSCP)
|
|
||||||
# encoded in the Differentiated Services Field, after modification
|
|
||||||
99: 'replication_factor', # Multicast replication factor
|
|
||||||
100: 'DEPRECATED', # DEPRECATED
|
|
||||||
102: 'layer2packetSectionOffset', # Layer 2 packet section offset. Potentially a generic offset
|
|
||||||
103: 'layer2packetSectionSize', # Layer 2 packet section size. Potentially a generic size
|
|
||||||
104: 'layer2packetSectionData', # Layer 2 packet section data
|
|
||||||
# 105-127 reserved for future use by Cisco
|
|
||||||
|
|
||||||
# ASA extensions
|
|
||||||
# https://www.cisco.com/c/en/us/td/docs/security/asa/special/netflow/guide/asa_netflow.html
|
|
||||||
148: 'NF_F_CONN_ID', # An identifier of a unique flow for the device
|
|
||||||
176: 'NF_F_ICMP_TYPE', # ICMP type value
|
|
||||||
177: 'NF_F_ICMP_CODE', # ICMP code value
|
|
||||||
178: 'NF_F_ICMP_TYPE_IPV6', # ICMP IPv6 type value
|
|
||||||
179: 'NF_F_ICMP_CODE_IPV6', # ICMP IPv6 code value
|
|
||||||
225: 'NF_F_XLATE_SRC_ADDR_IPV4', # Post NAT Source IPv4 Address
|
|
||||||
226: 'NF_F_XLATE_DST_ADDR_IPV4', # Post NAT Destination IPv4 Address
|
|
||||||
227: 'NF_F_XLATE_SRC_PORT', # Post NATT Source Transport Port
|
|
||||||
228: 'NF_F_XLATE_DST_PORT', # Post NATT Destination Transport Port
|
|
||||||
281: 'NF_F_XLATE_SRC_ADDR_IPV6', # Post NAT Source IPv6 Address
|
|
||||||
282: 'NF_F_XLATE_DST_ADDR_IPV6', # Post NAT Destination IPv6 Address
|
|
||||||
233: 'NF_F_FW_EVENT', # High-level event code
|
|
||||||
33002: 'NF_F_FW_EXT_EVENT', # Extended event code
|
|
||||||
323: 'NF_F_EVENT_TIME_MSEC', # The time that the event occurred, which comes from IPFIX
|
|
||||||
152: 'NF_F_FLOW_CREATE_TIME_MSEC',
|
|
||||||
231: 'NF_F_FWD_FLOW_DELTA_BYTES', # The delta number of bytes from source to destination
|
|
||||||
232: 'NF_F_REV_FLOW_DELTA_BYTES', # The delta number of bytes from destination to source
|
|
||||||
33000: 'NF_F_INGRESS_ACL_ID', # The input ACL that permitted or denied the flow
|
|
||||||
33001: 'NF_F_EGRESS_ACL_ID', # The output ACL that permitted or denied a flow
|
|
||||||
40000: 'NF_F_USERNAME', # AAA username
|
|
||||||
|
|
||||||
# PaloAlto PAN-OS 8.0
|
|
||||||
# https://www.paloaltonetworks.com/documentation/80/pan-os/pan-os/monitoring/netflow-monitoring/netflow-templates
|
|
||||||
346: 'PANOS_privateEnterpriseNumber',
|
|
||||||
56701: 'PANOS_APPID',
|
|
||||||
56702: 'PANOS_USERID'
|
|
||||||
}
|
|
||||||
|
|
||||||
V9_SCOPE_TYPES = {
|
|
||||||
1: "System",
|
|
||||||
2: "Interface",
|
|
||||||
3: "Line Card",
|
|
||||||
4: "Cache",
|
|
||||||
5: "Template"
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class V9TemplateNotRecognized(KeyError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class V9DataRecord:
|
|
||||||
"""This is a 'flow' as we want it from our source. What it contains is
|
|
||||||
variable in NetFlow V9, so to work with the data you have to analyze the
|
|
||||||
data dict keys (which are integers and can be mapped with the FIELD_TYPES
|
|
||||||
dict).
|
|
||||||
Should hold a 'data' dict with keys=field_type (integer) and value (in bytes).
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.data = {}
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<DataRecord with data: {}>".format(self.data)
|
|
||||||
|
|
||||||
|
|
||||||
class V9DataFlowSet:
|
|
||||||
"""Holds one or multiple DataRecord which are all defined after the same
|
|
||||||
template. This template is referenced in the field 'flowset_id' of this
|
|
||||||
DataFlowSet and must not be zero.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, data, template):
|
|
||||||
pack = struct.unpack('!HH', data[:4])
|
|
||||||
|
|
||||||
self.template_id = pack[0] # flowset_id is reference to a template_id
|
|
||||||
self.length = pack[1]
|
|
||||||
self.flows = []
|
|
||||||
|
|
||||||
offset = 4
|
|
||||||
|
|
||||||
# As the field lengths are variable V9 has padding to next 32 Bit
|
|
||||||
padding_size = 4 - (self.length % 4) # 4 Byte
|
|
||||||
|
|
||||||
# For performance reasons, we use struct.unpack to get individual values. Here
|
|
||||||
# we prepare the format string for parsing it. The format string is based on the template fields and their
|
|
||||||
# lengths. The string can then be re-used for every data record in the data stream
|
|
||||||
struct_format = '!'
|
|
||||||
struct_len = 0
|
|
||||||
for field in template.fields:
|
|
||||||
# The length of the value byte slice is defined in the template
|
|
||||||
flen = field.field_length
|
|
||||||
if flen == 4:
|
|
||||||
struct_format += 'L'
|
|
||||||
elif flen == 2:
|
|
||||||
struct_format += 'H'
|
|
||||||
elif flen == 1:
|
|
||||||
struct_format += 'B'
|
|
||||||
else:
|
|
||||||
struct_format += '%ds' % flen
|
|
||||||
struct_len += flen
|
|
||||||
|
|
||||||
while offset <= (self.length - padding_size):
|
|
||||||
# Here we actually unpack the values, the struct format string is used in every data record
|
|
||||||
# iteration, until the final offset reaches the end of the whole data stream
|
|
||||||
unpacked_values = struct.unpack(struct_format, data[offset:offset + struct_len])
|
|
||||||
|
|
||||||
new_record = V9DataRecord()
|
|
||||||
for field, value in zip(template.fields, unpacked_values):
|
|
||||||
flen = field.field_length
|
|
||||||
fkey = V9_FIELD_TYPES[field.field_type]
|
|
||||||
|
|
||||||
# Special handling of IP addresses to convert integers to strings to not lose precision in dump
|
|
||||||
# TODO: might only be needed for IPv6
|
|
||||||
if field.field_type in V9_FIELD_TYPES_CONTAINING_IP:
|
|
||||||
try:
|
|
||||||
ip = ipaddress.ip_address(value)
|
|
||||||
except ValueError:
|
|
||||||
print("IP address could not be parsed: {}".format(repr(value)))
|
|
||||||
continue
|
|
||||||
new_record.data[fkey] = ip.compressed
|
|
||||||
elif flen in (1, 2, 4):
|
|
||||||
# These values are already converted to numbers by struct.unpack:
|
|
||||||
new_record.data[fkey] = value
|
|
||||||
else:
|
|
||||||
# Caveat: this code assumes little-endian system (like x86)
|
|
||||||
if sys.byteorder != "little":
|
|
||||||
print("v9.py uses bit shifting for little endianness. Your processor is not little endian")
|
|
||||||
|
|
||||||
fdata = 0
|
|
||||||
for idx, byte in enumerate(reversed(bytearray(value))):
|
|
||||||
fdata += byte << (idx * 8)
|
|
||||||
new_record.data[fkey] = fdata
|
|
||||||
|
|
||||||
offset += flen
|
|
||||||
|
|
||||||
new_record.__dict__.update(new_record.data)
|
|
||||||
self.flows.append(new_record)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<DataFlowSet with template {} of length {} holding {} flows>" \
|
|
||||||
.format(self.template_id, self.length, len(self.flows))
|
|
||||||
|
|
||||||
|
|
||||||
class V9TemplateField:
|
|
||||||
"""A field with type identifier and length.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, field_type, field_length):
|
|
||||||
self.field_type = field_type # integer
|
|
||||||
self.field_length = field_length # bytes
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<TemplateField type {}:{}, length {}>".format(
|
|
||||||
self.field_type, V9_FIELD_TYPES[self.field_type], self.field_length)
|
|
||||||
|
|
||||||
|
|
||||||
class V9TemplateRecord:
|
|
||||||
"""A template record contained in a TemplateFlowSet.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, template_id, field_count, fields: list):
|
|
||||||
self.template_id = template_id
|
|
||||||
self.field_count = field_count
|
|
||||||
self.fields = fields
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<TemplateRecord {} with {} fields: {}>".format(
|
|
||||||
self.template_id, self.field_count,
|
|
||||||
' '.join([V9_FIELD_TYPES[field.field_type] for field in self.fields]))
|
|
||||||
|
|
||||||
|
|
||||||
class V9OptionsDataRecord:
|
|
||||||
def __init__(self):
|
|
||||||
self.scopes = {}
|
|
||||||
self.data = {}
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<V9OptionsDataRecord with scopes {} and data {}>".format(self.scopes.keys(), self.data.keys())
|
|
||||||
|
|
||||||
|
|
||||||
class V9OptionsTemplateRecord:
|
|
||||||
"""An options template record contained in an options template flowset.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, template_id, scope_fields: dict, option_fields: dict):
|
|
||||||
self.template_id = template_id
|
|
||||||
self.scope_fields = scope_fields
|
|
||||||
self.option_fields = option_fields
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<V9OptionsTemplateRecord with scope fields {} and option fields {}>".format(
|
|
||||||
self.scope_fields.keys(), self.option_fields.keys())
|
|
||||||
|
|
||||||
|
|
||||||
class V9OptionsTemplateFlowSet:
|
|
||||||
"""An options template flowset.
|
|
||||||
|
|
||||||
> Each Options Template FlowSet MAY contain multiple Options Template Records.
|
|
||||||
|
|
||||||
Scope field types range from 1 to 5:
|
|
||||||
1 System
|
|
||||||
2 Interface
|
|
||||||
3 Line Card
|
|
||||||
4 Cache
|
|
||||||
5 Template
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, data: bytes):
|
|
||||||
pack = struct.unpack('!HH', data[:4])
|
|
||||||
self.flowset_id = pack[0] # always 1
|
|
||||||
self.flowset_length = pack[1] # length of this flowset
|
|
||||||
self.templates = {}
|
|
||||||
|
|
||||||
offset = 4
|
|
||||||
|
|
||||||
while offset < self.flowset_length:
|
|
||||||
pack = struct.unpack("!HHH", data[offset:offset + 6]) # options template header
|
|
||||||
template_id = pack[0] # value above 255
|
|
||||||
option_scope_length = pack[1]
|
|
||||||
options_length = pack[2]
|
|
||||||
|
|
||||||
offset += 6
|
|
||||||
|
|
||||||
# Fetch all scope fields (most probably only one field)
|
|
||||||
scopes = {} # Holds "type: length" key-value pairs
|
|
||||||
|
|
||||||
if option_scope_length % 4 != 0 or options_length % 4 != 0:
|
|
||||||
raise ValueError(option_scope_length, options_length)
|
|
||||||
|
|
||||||
for scope_counter in range(option_scope_length // 4): # example: option_scope_length = 4 means one scope
|
|
||||||
pack = struct.unpack("!HH", data[offset:offset + 4])
|
|
||||||
scope_field_type = pack[0] # values range from 1 to 5
|
|
||||||
scope_field_length = pack[1]
|
|
||||||
scopes[scope_field_type] = scope_field_length
|
|
||||||
offset += 4
|
|
||||||
|
|
||||||
# Fetch all option fields
|
|
||||||
options = {} # same
|
|
||||||
for option_counter in range(options_length // 4): # now counting the options
|
|
||||||
pack = struct.unpack("!HH", data[offset:offset + 4])
|
|
||||||
option_field_type = pack[0]
|
|
||||||
option_field_length = pack[1]
|
|
||||||
options[option_field_type] = option_field_length
|
|
||||||
offset += 4
|
|
||||||
|
|
||||||
optionstemplate = V9OptionsTemplateRecord(template_id, scopes, options)
|
|
||||||
|
|
||||||
self.templates[template_id] = optionstemplate
|
|
||||||
|
|
||||||
# handle padding and add offset if needed
|
|
||||||
if offset % 4 == 2:
|
|
||||||
offset += 2
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<V9OptionsTemplateFlowSet with {} templates: {}>".format(len(self.templates), self.templates.keys())
|
|
||||||
|
|
||||||
|
|
||||||
class V9OptionsDataFlowset:
|
|
||||||
"""An options data flowset with option data records
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, data: bytes, template: V9OptionsTemplateRecord):
|
|
||||||
pack = struct.unpack('!HH', data[:4])
|
|
||||||
|
|
||||||
self.template_id = pack[0]
|
|
||||||
self.length = pack[1]
|
|
||||||
self.option_data_records = []
|
|
||||||
|
|
||||||
offset = 4
|
|
||||||
|
|
||||||
while offset < self.length:
|
|
||||||
new_options_record = V9OptionsDataRecord()
|
|
||||||
|
|
||||||
for scope_type, length in template.scope_fields.items():
|
|
||||||
type_name = V9_SCOPE_TYPES.get(scope_type, scope_type) # Either name, or unknown int
|
|
||||||
value = int.from_bytes(data[offset:offset + length], 'big') # TODO: is this always integer?
|
|
||||||
new_options_record.scopes[type_name] = value
|
|
||||||
offset += length
|
|
||||||
|
|
||||||
for field_type, length in template.option_fields.items():
|
|
||||||
type_name = V9_FIELD_TYPES.get(field_type, None)
|
|
||||||
is_bytes = False
|
|
||||||
|
|
||||||
if not type_name: # Cisco refers to the IANA IPFIX table for types >256...
|
|
||||||
iana_type = IPFIXFieldTypes.by_id(field_type) # try to get from IPFIX types
|
|
||||||
if iana_type:
|
|
||||||
type_name = iana_type.name
|
|
||||||
is_bytes = IPFIXDataTypes.is_bytes(iana_type)
|
|
||||||
|
|
||||||
if not type_name:
|
|
||||||
raise ValueError
|
|
||||||
|
|
||||||
value = None
|
|
||||||
if is_bytes:
|
|
||||||
value = data[offset:offset + length]
|
|
||||||
else:
|
|
||||||
value = int.from_bytes(data[offset:offset + length], 'big')
|
|
||||||
|
|
||||||
new_options_record.data[type_name] = value
|
|
||||||
|
|
||||||
offset += length
|
|
||||||
|
|
||||||
self.option_data_records.append(new_options_record)
|
|
||||||
|
|
||||||
if offset % 4 == 2:
|
|
||||||
offset += 2
|
|
||||||
|
|
||||||
|
|
||||||
class V9TemplateFlowSet:
|
|
||||||
"""A template flowset, which holds an id that is used by data flowsets to
|
|
||||||
reference back to the template. The template then has fields which hold
|
|
||||||
identifiers of data types (eg "IP_SRC_ADDR", "PKTS"..). This way the flow
|
|
||||||
sender can dynamically put together data flowsets.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, data):
|
|
||||||
pack = struct.unpack('!HH', data[:4])
|
|
||||||
self.flowset_id = pack[0] # always 0
|
|
||||||
self.length = pack[1] # total length including this header in bytes
|
|
||||||
self.templates = {}
|
|
||||||
|
|
||||||
offset = 4 # Skip header
|
|
||||||
|
|
||||||
# Iterate through all template records in this template flowset
|
|
||||||
while offset < self.length:
|
|
||||||
pack = struct.unpack('!HH', data[offset:offset + 4])
|
|
||||||
template_id = pack[0]
|
|
||||||
field_count = pack[1]
|
|
||||||
|
|
||||||
fields = []
|
|
||||||
for field in range(field_count):
|
|
||||||
# Get all fields of this template
|
|
||||||
offset += 4
|
|
||||||
field_type, field_length = struct.unpack('!HH', data[offset:offset + 4])
|
|
||||||
if field_type not in V9_FIELD_TYPES:
|
|
||||||
field_type = 0 # Set field_type to UNKNOWN_FIELD_TYPE as fallback
|
|
||||||
field = V9TemplateField(field_type, field_length)
|
|
||||||
fields.append(field)
|
|
||||||
|
|
||||||
# Create a template object with all collected data
|
|
||||||
template = V9TemplateRecord(template_id, field_count, fields)
|
|
||||||
|
|
||||||
# Append the new template to the global templates list
|
|
||||||
self.templates[template.template_id] = template
|
|
||||||
|
|
||||||
# Set offset to next template_id field
|
|
||||||
offset += 4
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<TemplateFlowSet with id {} of length {} containing templates: {}>" \
|
|
||||||
.format(self.flowset_id, self.length, self.templates.keys())
|
|
||||||
|
|
||||||
|
|
||||||
class V9Header:
|
|
||||||
"""The header of the V9ExportPacket
|
|
||||||
"""
|
|
||||||
length = 20
|
|
||||||
|
|
||||||
def __init__(self, data):
|
|
||||||
pack = struct.unpack('!HHIIII', data[:self.length])
|
|
||||||
self.version = pack[0]
|
|
||||||
self.count = pack[1] # not sure if correct. softflowd: no of flows
|
|
||||||
self.uptime = pack[2]
|
|
||||||
self.timestamp = pack[3]
|
|
||||||
self.sequence = pack[4]
|
|
||||||
self.source_id = pack[5]
|
|
||||||
|
|
||||||
def to_dict(self):
|
|
||||||
return self.__dict__
|
|
||||||
|
|
||||||
|
|
||||||
class V9ExportPacket:
|
|
||||||
"""The flow record holds the header and all template and data flowsets.
|
|
||||||
|
|
||||||
TODO: refactor into two loops: first get all contained flowsets and examine template
|
|
||||||
flowsets first. Then data flowsets.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, data: bytes, templates: dict):
|
|
||||||
self.header = V9Header(data)
|
|
||||||
self._templates = templates
|
|
||||||
self._new_templates = False
|
|
||||||
self._flows = []
|
|
||||||
self._options = []
|
|
||||||
|
|
||||||
offset = self.header.length
|
|
||||||
skipped_flowsets_offsets = []
|
|
||||||
|
|
||||||
while offset != len(data):
|
|
||||||
pack = struct.unpack('!HH', data[offset:offset + 4])
|
|
||||||
flowset_id = pack[0] # = template id
|
|
||||||
flowset_length = pack[1]
|
|
||||||
|
|
||||||
# Data template flowsets
|
|
||||||
if flowset_id == 0: # TemplateFlowSet always have id 0
|
|
||||||
tfs = V9TemplateFlowSet(data[offset:])
|
|
||||||
# Update the templates with the provided templates, even if they are the same
|
|
||||||
for id_, template in tfs.templates.items():
|
|
||||||
if id_ not in self._templates:
|
|
||||||
self._new_templates = True
|
|
||||||
self._templates[id_] = template
|
|
||||||
if tfs.length == 0:
|
|
||||||
break
|
|
||||||
offset += tfs.length
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Option template flowsets
|
|
||||||
elif flowset_id == 1: # Option templates always use ID 1
|
|
||||||
otfs = V9OptionsTemplateFlowSet(data[offset:])
|
|
||||||
for id_, template in otfs.templates.items():
|
|
||||||
if id_ not in self._templates:
|
|
||||||
self._new_templates = True
|
|
||||||
self._templates[id_] = template
|
|
||||||
offset += otfs.flowset_length
|
|
||||||
if otfs.flowset_length == 0:
|
|
||||||
break
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Data / option flowsets
|
|
||||||
# First, check if template is known
|
|
||||||
if flowset_id not in self._templates:
|
|
||||||
# Could not be parsed, continue to check for templates
|
|
||||||
skipped_flowsets_offsets.append(offset)
|
|
||||||
offset += flowset_length
|
|
||||||
if flowset_length == 0:
|
|
||||||
break
|
|
||||||
continue
|
|
||||||
|
|
||||||
matched_template = self._templates[flowset_id]
|
|
||||||
|
|
||||||
if isinstance(matched_template, V9TemplateRecord):
|
|
||||||
dfs = V9DataFlowSet(data[offset:], matched_template)
|
|
||||||
self._flows += dfs.flows
|
|
||||||
if dfs.length == 0:
|
|
||||||
break
|
|
||||||
offset += dfs.length
|
|
||||||
|
|
||||||
elif isinstance(matched_template, V9OptionsTemplateRecord):
|
|
||||||
odfs = V9OptionsDataFlowset(data[offset:], matched_template)
|
|
||||||
self._options += odfs.option_data_records
|
|
||||||
if odfs.length == 0:
|
|
||||||
break
|
|
||||||
offset += odfs.length
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
# In the same export packet, re-try flowsets with previously unknown templates.
|
|
||||||
# Might happen, if an export packet first contains data flowsets, and template flowsets after
|
|
||||||
if skipped_flowsets_offsets and self._new_templates:
|
|
||||||
# Process flowsets in the data slice which occured before the template sets
|
|
||||||
# Handling of offset increases is not needed here
|
|
||||||
for offset in skipped_flowsets_offsets:
|
|
||||||
pack = struct.unpack('!H', data[offset:offset + 2])
|
|
||||||
flowset_id = pack[0]
|
|
||||||
|
|
||||||
if flowset_id not in self._templates:
|
|
||||||
raise V9TemplateNotRecognized
|
|
||||||
|
|
||||||
matched_template = self._templates[flowset_id]
|
|
||||||
if isinstance(matched_template, V9TemplateRecord):
|
|
||||||
dfs = V9DataFlowSet(data[offset:], matched_template)
|
|
||||||
self._flows += dfs.flows
|
|
||||||
elif isinstance(matched_template, V9OptionsTemplateRecord):
|
|
||||||
odfs = V9OptionsDataFlowset(data[offset:], matched_template)
|
|
||||||
self._options += odfs.option_data_records
|
|
||||||
|
|
||||||
elif skipped_flowsets_offsets:
|
|
||||||
raise V9TemplateNotRecognized
|
|
||||||
|
|
||||||
@property
|
|
||||||
def contains_new_templates(self):
|
|
||||||
return self._new_templates
|
|
||||||
|
|
||||||
@property
|
|
||||||
def flows(self):
|
|
||||||
return self._flows
|
|
||||||
|
|
||||||
@property
|
|
||||||
def templates(self):
|
|
||||||
return self._templates
|
|
||||||
|
|
||||||
@property
|
|
||||||
def options(self):
|
|
||||||
return self._options
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
s = " and new template(s)" if self.contains_new_templates else ""
|
|
||||||
return "<V9ExportPacket with {} records{}>".format(self.header.count, s)
|
|
BIN
netflow_0.12.2.orig.tar.gz.delta
Normal file
BIN
netflow_0.12.2.orig.tar.gz.delta
Normal file
Binary file not shown.
1
netflow_0.12.2.orig.tar.gz.id
Normal file
1
netflow_0.12.2.orig.tar.gz.id
Normal file
|
@ -0,0 +1 @@
|
||||||
|
647710280e4ea6e71e5fe4e639f91d73170e6557
|
BIN
nf-workflow.png
BIN
nf-workflow.png
Binary file not shown.
Before Width: | Height: | Size: 19 KiB |
27
setup.py
27
setup.py
|
@ -1,27 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
from setuptools import setup
|
|
||||||
|
|
||||||
with open("README.md", "r") as fh:
|
|
||||||
long_description = fh.read()
|
|
||||||
|
|
||||||
setup(
|
|
||||||
name='netflow',
|
|
||||||
version='0.12.2',
|
|
||||||
description='NetFlow v1, v5, v9 and IPFIX tool suite implemented in Python 3',
|
|
||||||
long_description=long_description,
|
|
||||||
long_description_content_type='text/markdown',
|
|
||||||
author='Dominik Pataky',
|
|
||||||
author_email='software+pynetflow@dpataky.eu',
|
|
||||||
url='https://github.com/bitkeks/python-netflow-v9-softflowd',
|
|
||||||
packages=["netflow"],
|
|
||||||
license='MIT',
|
|
||||||
python_requires='>=3.5.3',
|
|
||||||
keywords='netflow ipfix collector parser',
|
|
||||||
classifiers=[
|
|
||||||
"Programming Language :: Python :: 3",
|
|
||||||
"License :: OSI Approved :: MIT License",
|
|
||||||
"Intended Audience :: Developers",
|
|
||||||
"Intended Audience :: System Administrators"
|
|
||||||
],
|
|
||||||
)
|
|
325
tests/lib.py
325
tests/lib.py
|
@ -1,325 +0,0 @@
|
||||||
"""
|
|
||||||
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
||||||
|
|
||||||
The test packets (defined below as hex streams) were extracted from "real"
|
|
||||||
softflowd exports based on a sample PCAP capture file.
|
|
||||||
|
|
||||||
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
|
|
||||||
Licensed under MIT License. See LICENSE.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# The flowset with 2 templates (IPv4 and IPv6) and 8 flows with data
|
|
||||||
import queue
|
|
||||||
import random
|
|
||||||
import socket
|
|
||||||
import time
|
|
||||||
|
|
||||||
from netflow.collector import ThreadedNetFlowListener
|
|
||||||
|
|
||||||
# Invalid export hex stream
|
|
||||||
PACKET_INVALID = "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
|
|
||||||
|
|
||||||
CONNECTION = ('127.0.0.1', 1337)
|
|
||||||
NUM_PACKETS = 1000
|
|
||||||
|
|
||||||
|
|
||||||
def emit_packets(packets, delay=0.0001):
|
|
||||||
"""Send the provided packets to the listener"""
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
for p in packets:
|
|
||||||
sock.sendto(bytes.fromhex(p), CONNECTION)
|
|
||||||
time.sleep(delay)
|
|
||||||
sock.close()
|
|
||||||
|
|
||||||
|
|
||||||
def send_recv_packets(packets, delay=0.0001, store_packets=-1) -> (list, float, float):
|
|
||||||
"""Starts a listener, send packets, receives packets
|
|
||||||
|
|
||||||
returns a tuple: ([(ts, export), ...], time_started_sending, time_stopped_sending)
|
|
||||||
"""
|
|
||||||
listener = ThreadedNetFlowListener(*CONNECTION)
|
|
||||||
tstart = time.time()
|
|
||||||
emit_packets(packets, delay=delay)
|
|
||||||
time.sleep(0.5) # Allow packets to be sent and recieved
|
|
||||||
tend = time.time()
|
|
||||||
listener.start()
|
|
||||||
|
|
||||||
pkts = []
|
|
||||||
to_pad = 0
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
packet = listener.get(timeout=0.5)
|
|
||||||
if -1 == store_packets or store_packets > 0:
|
|
||||||
# Case where a programm yields from the queue and stores all packets.
|
|
||||||
pkts.append(packet)
|
|
||||||
if store_packets != -1 and len(pkts) > store_packets:
|
|
||||||
to_pad += len(pkts) # Hack for testing
|
|
||||||
pkts.clear()
|
|
||||||
else:
|
|
||||||
# Performance measurements for cases where yielded objects are processed
|
|
||||||
# immediatelly instead of stored. Add empty tuple to retain counting possibility.
|
|
||||||
pkts.append(())
|
|
||||||
except queue.Empty:
|
|
||||||
break
|
|
||||||
listener.stop()
|
|
||||||
listener.join()
|
|
||||||
if to_pad > 0:
|
|
||||||
pkts = [()] * to_pad + pkts
|
|
||||||
return pkts, tstart, tend
|
|
||||||
|
|
||||||
|
|
||||||
def generate_packets(amount, version, template_every_x=100):
|
|
||||||
packets = [PACKET_IPFIX]
|
|
||||||
template = PACKET_IPFIX_TEMPLATE
|
|
||||||
|
|
||||||
if version == 1:
|
|
||||||
packets = [PACKET_V1]
|
|
||||||
elif version == 5:
|
|
||||||
packets = [PACKET_V5]
|
|
||||||
elif version == 9:
|
|
||||||
packets = [*PACKETS_V9]
|
|
||||||
template = PACKET_V9_TEMPLATE
|
|
||||||
|
|
||||||
if amount < template_every_x:
|
|
||||||
template_every_x = 10
|
|
||||||
|
|
||||||
# If the list of test packets is only one item big (the same packet is used over and over),
|
|
||||||
# do not use random.choice - it costs performance and results in the same packet every time.
|
|
||||||
def single_packet(pkts):
|
|
||||||
return pkts[0]
|
|
||||||
|
|
||||||
packet_func = single_packet
|
|
||||||
if len(packets) > 1:
|
|
||||||
packet_func = random.choice
|
|
||||||
|
|
||||||
for x in range(amount):
|
|
||||||
if x % template_every_x == 0 and version in [9, 10]:
|
|
||||||
# First packet always a template, then periodically
|
|
||||||
# Note: this was once based on random.random, but it costs performance
|
|
||||||
yield template
|
|
||||||
else:
|
|
||||||
yield packet_func(packets)
|
|
||||||
|
|
||||||
|
|
||||||
# Example export for v1 which contains two flows from one ICMP ping request/reply session
|
|
||||||
PACKET_V1 = "000100020001189b5e80c32c2fd41848ac110002ac11000100000000000000000000000a00000348" \
|
|
||||||
"000027c700004af100000800000001000000000000000000ac110001ac1100020000000000000000" \
|
|
||||||
"0000000a00000348000027c700004af100000000000001000000000000000000"
|
|
||||||
|
|
||||||
# Example export for v5 which contains three flows, two for ICMP ping and one multicast on interface (224.0.0.251)
|
|
||||||
PACKET_V5 = "00050003000379a35e80c58622a55ab00000000000000000ac110002ac1100010000000000000000" \
|
|
||||||
"0000000a0000034800002f4c0000527600000800000001000000000000000000ac110001ac110002" \
|
|
||||||
"00000000000000000000000a0000034800002f4c0000527600000000000001000000000000000000" \
|
|
||||||
"ac110001e00000fb000000000000000000000001000000a90000e01c0000e01c14e914e900001100" \
|
|
||||||
"0000000000000000"
|
|
||||||
|
|
||||||
PACKET_V9_TEMPLATE = "0009000a000000035c9f55980000000100000000000000400400000e00080004000c000400150004" \
|
|
||||||
"001600040001000400020004000a0004000e000400070002000b00020004000100060001003c0001" \
|
|
||||||
"00050001000000400800000e001b0010001c001000150004001600040001000400020004000a0004" \
|
|
||||||
"000e000400070002000b00020004000100060001003c000100050001040001447f0000017f000001" \
|
|
||||||
"fb3c1aaafb3c18fd000190100000004b00000000000000000050942c061b04007f0000017f000001" \
|
|
||||||
"fb3c1aaafb3c18fd00000f94000000360000000000000000942c0050061f04007f0000017f000001" \
|
|
||||||
"fb3c1cfcfb3c1a9b0000d3fc0000002a000000000000000000509434061b04007f0000017f000001" \
|
|
||||||
"fb3c1cfcfb3c1a9b00000a490000001e000000000000000094340050061f04007f0000017f000001" \
|
|
||||||
"fb3bb82cfb3ba48b000002960000000300000000000000000050942a061904007f0000017f000001" \
|
|
||||||
"fb3bb82cfb3ba48b00000068000000020000000000000000942a0050061104007f0000017f000001" \
|
|
||||||
"fb3c1900fb3c18fe0000004c0000000100000000000000000035b3c9110004007f0000017f000001" \
|
|
||||||
"fb3c1900fb3c18fe0000003c000000010000000000000000b3c9003511000400"
|
|
||||||
|
|
||||||
# This packet is special. We take PACKET_V9_TEMPLATE and re-order the templates and flows.
|
|
||||||
# The first line is the header, the smaller lines the templates and the long lines the flows (limited to 80 chars)
|
|
||||||
PACKET_V9_TEMPLATE_MIXED = ("0009000a000000035c9f55980000000100000000" # header
|
|
||||||
"040001447f0000017f000001fb3c1aaafb3c18fd000190100000004b00000000000000000050942c"
|
|
||||||
"061b04007f0000017f000001fb3c1aaafb3c18fd00000f94000000360000000000000000942c0050"
|
|
||||||
"061f04007f0000017f000001fb3c1cfcfb3c1a9b0000d3fc0000002a000000000000000000509434"
|
|
||||||
"061b04007f0000017f000001fb3c1cfcfb3c1a9b00000a490000001e000000000000000094340050"
|
|
||||||
"061f04007f0000017f000001fb3bb82cfb3ba48b000002960000000300000000000000000050942a"
|
|
||||||
"061904007f0000017f000001fb3bb82cfb3ba48b00000068000000020000000000000000942a0050"
|
|
||||||
"061104007f0000017f000001fb3c1900fb3c18fe0000004c0000000100000000000000000035b3c9"
|
|
||||||
"110004007f0000017f000001fb3c1900fb3c18fe0000003c000000010000000000000000b3c90035"
|
|
||||||
"11000400" # end of flow segments
|
|
||||||
"000000400400000e00080004000c000400150004001600040001000400020004" # template 1024
|
|
||||||
"000a0004000e000400070002000b00020004000100060001003c000100050001"
|
|
||||||
"000000400800000e001b0010001c001000150004001600040001000400020004" # template 2048
|
|
||||||
"000a0004000e000400070002000b00020004000100060001003c000100050001")
|
|
||||||
|
|
||||||
# Three packets without templates, each with 12 flows
|
|
||||||
PACKETS_V9 = [
|
|
||||||
"0009000c000000035c9f55980000000200000000040001e47f0000017f000001fb3c1a17fb3c19fd"
|
|
||||||
"000001480000000200000000000000000035ea82110004007f0000017f000001fb3c1a17fb3c19fd"
|
|
||||||
"0000007a000000020000000000000000ea820035110004007f0000017f000001fb3c1a17fb3c19fd"
|
|
||||||
"000000f80000000200000000000000000035c6e2110004007f0000017f000001fb3c1a17fb3c19fd"
|
|
||||||
"0000007a000000020000000000000000c6e20035110004007f0000017f000001fb3c1a9efb3c1a9c"
|
|
||||||
"0000004c0000000100000000000000000035adc1110004007f0000017f000001fb3c1a9efb3c1a9c"
|
|
||||||
"0000003c000000010000000000000000adc10035110004007f0000017f000001fb3c1b74fb3c1b72"
|
|
||||||
"0000004c0000000100000000000000000035d0b3110004007f0000017f000001fb3c1b74fb3c1b72"
|
|
||||||
"0000003c000000010000000000000000d0b30035110004007f0000017f000001fb3c2f59fb3c1b71"
|
|
||||||
"00001a350000000a000000000000000000509436061b04007f0000017f000001fb3c2f59fb3c1b71"
|
|
||||||
"0000038a0000000a000000000000000094360050061b04007f0000017f000001fb3c913bfb3c9138"
|
|
||||||
"0000004c0000000100000000000000000035e262110004007f0000017f000001fb3c913bfb3c9138"
|
|
||||||
"0000003c000000010000000000000000e262003511000400",
|
|
||||||
|
|
||||||
"0009000c000000035c9f55980000000300000000040001e47f0000017f000001fb3ca523fb3c913b"
|
|
||||||
"0000030700000005000000000000000000509438061b04007f0000017f000001fb3ca523fb3c913b"
|
|
||||||
"000002a200000005000000000000000094380050061b04007f0000017f000001fb3f7fe1fb3dbc97"
|
|
||||||
"0002d52800000097000000000000000001bb8730061b04007f0000017f000001fb3f7fe1fb3dbc97"
|
|
||||||
"0000146c000000520000000000000000873001bb061f04007f0000017f000001fb3d066ffb3d066c"
|
|
||||||
"0000004c0000000100000000000000000035e5bd110004007f0000017f000001fb3d066ffb3d066c"
|
|
||||||
"0000003c000000010000000000000000e5bd0035110004007f0000017f000001fb3d1a61fb3d066b"
|
|
||||||
"000003060000000500000000000000000050943a061b04007f0000017f000001fb3d1a61fb3d066b"
|
|
||||||
"000002a2000000050000000000000000943a0050061b04007f0000017f000001fb3fed00fb3f002c"
|
|
||||||
"0000344000000016000000000000000001bbae50061f04007f0000017f000001fb3fed00fb3f002c"
|
|
||||||
"00000a47000000120000000000000000ae5001bb061b04007f0000017f000001fb402f17fb402a75"
|
|
||||||
"0003524c000000a5000000000000000001bbc48c061b04007f0000017f000001fb402f17fb402a75"
|
|
||||||
"000020a60000007e0000000000000000c48c01bb061f0400",
|
|
||||||
|
|
||||||
"0009000c000000035c9f55980000000400000000040001e47f0000017f000001fb3d7ba2fb3d7ba0"
|
|
||||||
"0000004c0000000100000000000000000035a399110004007f0000017f000001fb3d7ba2fb3d7ba0"
|
|
||||||
"0000003c000000010000000000000000a3990035110004007f0000017f000001fb3d8f85fb3d7b9f"
|
|
||||||
"000003070000000500000000000000000050943c061b04007f0000017f000001fb3d8f85fb3d7b9f"
|
|
||||||
"000002a2000000050000000000000000943c0050061b04007f0000017f000001fb3d9165fb3d7f6d"
|
|
||||||
"0000c97b0000002a000000000000000001bbae48061b04007f0000017f000001fb3d9165fb3d7f6d"
|
|
||||||
"000007f40000001a0000000000000000ae4801bb061b04007f0000017f000001fb3dbc96fb3dbc7e"
|
|
||||||
"0000011e0000000200000000000000000035bd4f110004007f0000017f000001fb3dbc96fb3dbc7e"
|
|
||||||
"0000008e000000020000000000000000bd4f0035110004007f0000017f000001fb3ddbb3fb3c1a18"
|
|
||||||
"0000bfee0000002f00000000000000000050ae56061b04007f0000017f000001fb3ddbb3fb3c1a18"
|
|
||||||
"00000982000000270000000000000000ae560050061b04007f0000017f000001fb3ddbb3fb3c1a18"
|
|
||||||
"0000130e0000001200000000000000000050e820061b04007f0000017f000001fb3ddbb3fb3c1a18"
|
|
||||||
"0000059c000000140000000000000000e8200050061b0400",
|
|
||||||
]
|
|
||||||
|
|
||||||
PACKET_V9_WITH_ZEROS = (
|
|
||||||
"000900057b72e830620b717d78cf34e30102000001040048000000000000006e0000000101000000"
|
|
||||||
"000a20076a06065c0800000d6b15c80000000b7b72e4487b72e448080000000000000438bf6401c7"
|
|
||||||
"65ad1e0d6b15c8000000000001040048000000000000006700000001110000c951ac180b0306065c"
|
|
||||||
"080035010000010000000b7b72e4487b72e448000000000000000443177b01c765ada501000001c3"
|
|
||||||
"9c00350001040048000000000000004a000000010100000000ac19bc3206065c080000287048cd00"
|
|
||||||
"00000b7b72e8307b72e83008000000000000048f071a01c765ae42287048cd000000000001040048"
|
|
||||||
"000000000000004600000001060002cbef0a30681f06065c0801bb142a49180000000b7b72e8307b"
|
|
||||||
"72e8300000000000000004801c7801c765ae1d142a49185a2b01bb00010400480000000000000046"
|
|
||||||
"00000001060002fe800a2f601206065c0801bb142a49180000000b7b72e8307b72e8300000000000"
|
|
||||||
"0000040806b001c765ae28142a4918d4b501bb000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Example export for IPFIX (v10) with 4 templates, 1 option template and 8 data flow sets
|
|
||||||
PACKET_IPFIX_TEMPLATE = "000a05202d45a4700000001300000000000200400400000e00080004000c00040016000400150004" \
|
|
||||||
"0001000400020004000a0004000e000400070002000b00020004000100060001003c000100050001" \
|
|
||||||
"000200340401000b00080004000c000400160004001500040001000400020004000a0004000e0004" \
|
|
||||||
"00200002003c000100050001000200400800000e001b0010001c0010001600040015000400010004" \
|
|
||||||
"00020004000a0004000e000400070002000b00020004000100060001003c00010005000100020034" \
|
|
||||||
"0801000b001b0010001c001000160004001500040001000400020004000a0004000e0004008b0002" \
|
|
||||||
"003c0001000500010003001e010000050001008f000400a000080131000401320004013000020100" \
|
|
||||||
"001a00000a5900000171352e672100000001000000000001040000547f000001ac110002ff7ed688" \
|
|
||||||
"ff7ed73a000015c70000000d000000000000000001bbe1a6061b0400ac1100027f000001ff7ed688" \
|
|
||||||
"ff7ed73a0000074f000000130000000000000000e1a601bb061f04000401004cac110002ac110001" \
|
|
||||||
"ff7db9e0ff7dc1d0000000fc00000003000000000000000008000400ac110001ac110002ff7db9e0" \
|
|
||||||
"ff7dc1d0000000fc0000000300000000000000000000040008010220fde66f14e0f1960900000242" \
|
|
||||||
"ac110002ff0200000000000000000001ff110001ff7dfad6ff7e0e95000001b00000000600000000" \
|
|
||||||
"0000000087000600fde66f14e0f196090000affeaffeaffefdabcdef123456789000000000000001" \
|
|
||||||
"ff7e567fff7e664a0000020800000005000000000000000080000600fde66f14e0f1960900000000" \
|
|
||||||
"00000001fde66f14e0f196090000affeaffeaffeff7e567fff7e664a000002080000000500000000" \
|
|
||||||
"0000000081000600fe800000000000000042aafffe73bbfafde66f14e0f196090000affeaffeaffe" \
|
|
||||||
"ff7e6aaaff7e6aaa0000004800000001000000000000000087000600fde66f14e0f1960900000242" \
|
|
||||||
"ac110002fe800000000000000042aafffe73bbfaff7e6aaaff7e6aaa000000400000000100000000" \
|
|
||||||
"0000000088000600fe800000000000000042acfffe110002fe800000000000000042aafffe73bbfa" \
|
|
||||||
"ff7e7eaaff7e7eaa0000004800000001000000000000000087000600fe800000000000000042aaff" \
|
|
||||||
"fe73bbfafe800000000000000042acfffe110002ff7e7eaaff7e7eaa000000400000000100000000" \
|
|
||||||
"0000000088000600fe800000000000000042aafffe73bbfafe800000000000000042acfffe110002" \
|
|
||||||
"ff7e92aaff7e92aa0000004800000001000000000000000087000600fe800000000000000042acff" \
|
|
||||||
"fe110002fe800000000000000042aafffe73bbfaff7e92aaff7e92aa000000400000000100000000" \
|
|
||||||
"000000008800060008000044fde66f14e0f196090000affeaffeaffefd41b7143f86000000000000" \
|
|
||||||
"00000001ff7ec2a0ff7ec2a00000004a000000010000000000000000d20100351100060004000054" \
|
|
||||||
"ac1100027f000001ff7ed62eff7ed68700000036000000010000000000000000c496003511000400" \
|
|
||||||
"7f000001ac110002ff7ed62eff7ed687000000760000000100000000000000000035c49611000400" \
|
|
||||||
"08000044fde66f14e0f196090000affeaffeaffefd41b7143f8600000000000000000001ff7ef359" \
|
|
||||||
"ff7ef3590000004a000000010000000000000000b1e700351100060004000054ac1100027f000001" \
|
|
||||||
"ff7f06e4ff7f06e800000036000000010000000000000000a8f90035110004007f000001ac110002" \
|
|
||||||
"ff7f06e4ff7f06e8000000a60000000100000000000000000035a8f911000400"
|
|
||||||
|
|
||||||
# Example export for IPFIX with two data sets
|
|
||||||
PACKET_IPFIX = "000a00d02d45a47000000016000000000801007cfe800000000000000042acfffe110002fde66f14" \
|
|
||||||
"e0f196090000000000000001ff7f0755ff7f07550000004800000001000000000000000087000600" \
|
|
||||||
"fdabcdef123456789000000000000001fe800000000000000042acfffe110002ff7f0755ff7f0755" \
|
|
||||||
"000000400000000100000000000000008800060008000044fde66f14e0f196090000affeaffeaffe" \
|
|
||||||
"2a044e42020000000000000000000223ff7f06e9ff7f22d500000140000000040000000000000000" \
|
|
||||||
"e54c01bb06020600"
|
|
||||||
|
|
||||||
PACKET_IPFIX_TEMPLATE_ETHER = "000a05002d45a4700000000d00000000" \
|
|
||||||
"000200500400001200080004000c000400160004001500040001000400020004000a0004000e0004" \
|
|
||||||
"00070002000b00020004000100060001003c000100050001003a0002003b00020038000600390006" \
|
|
||||||
"000200440401000f00080004000c000400160004001500040001000400020004000a0004000e0004" \
|
|
||||||
"00200002003c000100050001003a0002003b000200380006003900060002005008000012001b0010" \
|
|
||||||
"001c001000160004001500040001000400020004000a0004000e000400070002000b000200040001" \
|
|
||||||
"00060001003c000100050001003a0002003b00020038000600390006000200440801000f001b0010" \
|
|
||||||
"001c001000160004001500040001000400020004000a0004000e0004008b0002003c000100050001" \
|
|
||||||
"003a0002003b000200380006003900060003001e010000050001008f000400a00008013100040132" \
|
|
||||||
"0004013000020100001a00000009000000b0d80a558000000001000000000001040000747f000001" \
|
|
||||||
"ac110002e58b988be58b993e000015c70000000d000000000000000001bbe1a6061b040000000000" \
|
|
||||||
"123456affefeaffeaffeaffeac1100027f000001e58b988be58b993e0000074f0000001300000000" \
|
|
||||||
"00000000e1a601bb061f040000000000affeaffeaffe123456affefe0401006cac110002ac110001" \
|
|
||||||
"e58a7be3e58a83d3000000fc0000000300000000000000000800040000000000affeaffeaffe0242" \
|
|
||||||
"aa73bbfaac110001ac110002e58a7be3e58a83d3000000fc00000003000000000000000000000400" \
|
|
||||||
"00000000123456affefeaffeaffeaffe080102b0fde66f14e0f196090000affeaffeaffeff020000" \
|
|
||||||
"0000000000000001ff110001e58abcd9e58ad098000001b000000006000000000000000087000600" \
|
|
||||||
"00000000affeaffeaffe3333ff110001fde66f14e0f196090000affeaffeaffefde66f14e0f19609" \
|
|
||||||
"0000000000000001e58b1883e58b284e000002080000000500000000000000008000060000000000" \
|
|
||||||
"affeaffeaffe123456affefefdabcdef123456789000000000000001fde66f14e0f1960900000242" \
|
|
||||||
"ac110002e58b1883e58b284e0000020800000005000000000000000081000600000000000242aa73" \
|
|
||||||
"bbfaaffeaffeaffefe800000000000000042aafffe73bbfafde66f14e0f196090000affeaffeaffe" \
|
|
||||||
"e58b2caee58b2cae000000480000000100000000000000008700060000000000123456affefe0242" \
|
|
||||||
"ac110002fde66f14e0f196090000affeaffeaffefe800000000000000042aafffe73bbfae58b2cae" \
|
|
||||||
"e58b2cae000000400000000100000000000000008800060000000000affeaffeaffe123456affefe" \
|
|
||||||
"fe800000000000000042acfffe110002fe800000000000000042aafffe73bbfae58b40aee58b40ae" \
|
|
||||||
"000000480000000100000000000000008700060000000000affeaffeaffe123456affefefe800000" \
|
|
||||||
"000000000042aafffe73bbfafe800000000000000042acfffe110002e58b40aee58b40ae00000040" \
|
|
||||||
"0000000100000000000000008800060000000000123456affefeaffeaffeaffefe80000000000000" \
|
|
||||||
"0042aafffe73bbfafe800000000000000042acfffe110002e58b54aee58b54ae0000004800000001" \
|
|
||||||
"00000000000000008700060000000000123456affefeaffeaffeaffefe800000000000000042acff" \
|
|
||||||
"fe110002fe800000000000000042aafffe73bbfae58b54aee58b54ae000000400000000100000000" \
|
|
||||||
"000000008800060000000000affeaffeaffe123456affefe"
|
|
||||||
|
|
||||||
PACKET_IPFIX_ETHER = "000a02905e8b0aa90000001600000000" \
|
|
||||||
"08000054fde66f14e0f196090000affeaffeaffefd40abcdabcd00000000000000011111e58b84a4" \
|
|
||||||
"e58b84a40000004a000000010000000000000000d20100351100060000000000affeaffeaffe0242" \
|
|
||||||
"aa73bbfa04000074ac1100027f000001e58b9831e58b988a00000036000000010000000000000000" \
|
|
||||||
"c49600351100040000000000affeaffeaffe123456affefe7f000001ac110002e58b9831e58b988a" \
|
|
||||||
"000000760000000100000000000000000035c4961100040000000000123456affefeaffeaffeaffe" \
|
|
||||||
"08000054fde66f14e0f196090000affeaffeaffefd40abcdabcd00000000000000011111e58bb55c" \
|
|
||||||
"e58bb55c0000004a000000010000000000000000b1e700351100060000000000affeaffeaffe0242" \
|
|
||||||
"aa73bbfa04000074ac1100027f000001e58bc8e8e58bc8ec00000036000000010000000000000000" \
|
|
||||||
"a8f900351100040000000000affeaffeaffe123456affefe7f000001ac110002e58bc8e8e58bc8ec" \
|
|
||||||
"000000a60000000100000000000000000035a8f91100040000000000123456affefeaffeaffeaffe" \
|
|
||||||
"0801009cfe800000000000000042acfffe110002fdabcdef123456789000000000000001e58bc958" \
|
|
||||||
"e58bc958000000480000000100000000000000008700060000000000affeaffeaffe123456affefe" \
|
|
||||||
"fdabcdef123456789000000000000001fe800000000000000042acfffe110002e58bc958e58bc958" \
|
|
||||||
"000000400000000100000000000000008800060000000000123456affefeaffeaffeaffe08000054" \
|
|
||||||
"fde66f14e0f196090000affeaffeaffe2a044e42020000000000000000000223e58bc8ede58be4d8" \
|
|
||||||
"00000140000000040000000000000000e54c01bb0602060000000000affeaffeaffe123456affefe"
|
|
||||||
|
|
||||||
PACKET_IPFIX_PADDING = "000a01c064e0b1900000000200000000000200480400001000080004000c00040016000400150004" \
|
|
||||||
"0001000400020004000a0004000e0004003d00010088000100070002000b00020004000100060001" \
|
|
||||||
"003c000100050001000200400401000e00080004000c000400160004001500040001000400020004" \
|
|
||||||
"000a0004000e0004003d0001008800010020000200040001003c0001000500010002004808000010" \
|
|
||||||
"001b0010001c001000160004001500040001000400020004000a0004000e0004003d000100880001" \
|
|
||||||
"00070002000b00020004000100060001003c000100050001000200400801000e001b0010001c0010" \
|
|
||||||
"00160004001500040001000400020004000a0004000e0004003d000100880001008b000200040001" \
|
|
||||||
"003c00010005000100030022010000060001008f000400a000080131000401320004013000020052" \
|
|
||||||
"0010040100547f0000017f000001ffff07d0ffff0ff7000000fc0000000300000000000000000001" \
|
|
||||||
"08000104007f0000017f000001ffff07d0ffff0ff7000000fc000000030000000000000000000100" \
|
|
||||||
"0001040000000100002a0000b2da0000018a0db59d2e000000010000000000017465737463617074" \
|
|
||||||
"7572655f73696e67"
|
|
|
@ -1,66 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
"""
|
|
||||||
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
||||||
|
|
||||||
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
|
|
||||||
Licensed under MIT License. See LICENSE.
|
|
||||||
"""
|
|
||||||
import gzip
|
|
||||||
import json
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
from tests.lib import *
|
|
||||||
from tests.lib import PACKET_V9_TEMPLATE, PACKETS_V9
|
|
||||||
|
|
||||||
|
|
||||||
class TestFlowExportAnalyzer(unittest.TestCase):
|
|
||||||
def test_analyzer(self):
|
|
||||||
"""Test the analyzer by producing some packets, parsing them and then calling the analyzer
|
|
||||||
in a subprocess, piping in a created gzip JSON collection (as if it is coming from a file).
|
|
||||||
"""
|
|
||||||
# First create and parse some packets, which should get exported
|
|
||||||
pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE, *PACKETS_V9])
|
|
||||||
|
|
||||||
# Now the pkts must be transformed from their data structure to the "gzipped JSON representation",
|
|
||||||
# which the collector uses for persistant storage.
|
|
||||||
data_dicts = [] # list holding all entries
|
|
||||||
for p in pkts: # each pkt has its own entry with timestamp as key
|
|
||||||
data_dicts.append({p.ts: {
|
|
||||||
"client": p.client,
|
|
||||||
"header": p.export.header.to_dict(),
|
|
||||||
"flows": [f.data for f in p.export.flows]
|
|
||||||
}})
|
|
||||||
data = "\n".join([json.dumps(dd) for dd in data_dicts]) # join all entries together by newlines
|
|
||||||
|
|
||||||
# Different stdout/stderr arguments for backwards compatibility
|
|
||||||
pipe_output_param = {"capture_output": True}
|
|
||||||
if sys.version_info < (3, 7): # capture_output was added in Python 3.7
|
|
||||||
pipe_output_param = {
|
|
||||||
"stdout": subprocess.PIPE,
|
|
||||||
"stderr": subprocess.PIPE
|
|
||||||
}
|
|
||||||
|
|
||||||
# Analyzer takes gzipped input either via stdin or from a file (here: stdin)
|
|
||||||
gzipped_input = gzip.compress(data.encode()) # encode to unicode
|
|
||||||
|
|
||||||
# Run analyzer as CLI script with no packets ignored (parameter)
|
|
||||||
analyzer = subprocess.run(
|
|
||||||
[sys.executable, '-m', 'netflow.analyzer', '-p', '0'],
|
|
||||||
input=gzipped_input,
|
|
||||||
**pipe_output_param
|
|
||||||
)
|
|
||||||
|
|
||||||
# If stderr has content, print it
|
|
||||||
# make sure there are no errors
|
|
||||||
self.assertEqual(analyzer.stderr, b"", analyzer.stderr.decode())
|
|
||||||
|
|
||||||
# Every 2 flows are written as a single line (any extras are dropped)
|
|
||||||
num_flows = sum(len(list(item.values())[0]["flows"]) for item in data_dicts)
|
|
||||||
self.assertEqual(len(analyzer.stdout.splitlines()) - 2, num_flows // 2) # ignore two header lines
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main()
|
|
|
@ -1,116 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
"""
|
|
||||||
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
||||||
|
|
||||||
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
|
|
||||||
Licensed under MIT License. See LICENSE.
|
|
||||||
"""
|
|
||||||
# TODO: tests with 500 packets fail with delay=0. Probably a problem with UDP sockets buffer
|
|
||||||
# TODO: add test for template withdrawal
|
|
||||||
|
|
||||||
import ipaddress
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
from tests.lib import send_recv_packets, PACKET_IPFIX_TEMPLATE, PACKET_IPFIX, PACKET_IPFIX_ETHER, \
|
|
||||||
PACKET_IPFIX_TEMPLATE_ETHER, PACKET_IPFIX_PADDING
|
|
||||||
|
|
||||||
|
|
||||||
class TestFlowExportIPFIX(unittest.TestCase):
|
|
||||||
def test_recv_ipfix_packet(self):
|
|
||||||
"""
|
|
||||||
Test general sending of raw and receiving and parsing of these packets.
|
|
||||||
If this test runs successfully, the sender thread has sent a raw bytes packet towards a locally
|
|
||||||
listening collector thread, and the collector has successfully received and parsed the packets.
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
# send packet without any template, must fail to parse (packets are queued)
|
|
||||||
pkts, _, _ = send_recv_packets([PACKET_IPFIX])
|
|
||||||
self.assertEqual(len(pkts), 0) # no export is parsed due to missing template
|
|
||||||
|
|
||||||
# send packet with 5 templates and 20 flows, should parse correctly since the templates are known
|
|
||||||
pkts, _, _ = send_recv_packets([PACKET_IPFIX_TEMPLATE])
|
|
||||||
self.assertEqual(len(pkts), 1)
|
|
||||||
|
|
||||||
p = pkts[0]
|
|
||||||
self.assertEqual(p.client[0], "127.0.0.1")
|
|
||||||
self.assertEqual(len(p.export.flows), 1 + 2 + 2 + 9 + 1 + 2 + 1 + 2) # count flows
|
|
||||||
self.assertEqual(len(p.export.templates), 4 + 1) # count new templates
|
|
||||||
|
|
||||||
# send template and multiple export packets
|
|
||||||
pkts, _, _ = send_recv_packets([PACKET_IPFIX, PACKET_IPFIX_TEMPLATE, PACKET_IPFIX])
|
|
||||||
self.assertEqual(len(pkts), 3)
|
|
||||||
self.assertEqual(pkts[0].export.header.version, 10)
|
|
||||||
|
|
||||||
# check amount of flows across all packets
|
|
||||||
total_flows = 0
|
|
||||||
for packet in pkts:
|
|
||||||
total_flows += len(packet.export.flows)
|
|
||||||
self.assertEqual(total_flows, 2 + 1 + (1 + 2 + 2 + 9 + 1 + 2 + 1 + 2) + 2 + 1)
|
|
||||||
|
|
||||||
def test_ipfix_contents(self):
|
|
||||||
"""
|
|
||||||
Inspect content of exported flows, eg. test the value of an option flow and the correct
|
|
||||||
parsing of IPv4 and IPv6 addresses.
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
p = send_recv_packets([PACKET_IPFIX_TEMPLATE])[0][0]
|
|
||||||
|
|
||||||
flow = p.export.flows[0]
|
|
||||||
self.assertEqual(flow.meteringProcessId, 2649)
|
|
||||||
self.assertEqual(flow.selectorAlgorithm, 1)
|
|
||||||
self.assertEqual(flow.systemInitTimeMilliseconds, 1585735165729)
|
|
||||||
|
|
||||||
flow = p.export.flows[1] # HTTPS flow from web server to client
|
|
||||||
self.assertEqual(flow.destinationIPv4Address, 2886795266)
|
|
||||||
self.assertEqual(ipaddress.ip_address(flow.destinationIPv4Address),
|
|
||||||
ipaddress.ip_address("172.17.0.2"))
|
|
||||||
self.assertEqual(flow.protocolIdentifier, 6) # TCP
|
|
||||||
self.assertEqual(flow.sourceTransportPort, 443)
|
|
||||||
self.assertEqual(flow.destinationTransportPort, 57766)
|
|
||||||
self.assertEqual(flow.tcpControlBits, 0x1b)
|
|
||||||
|
|
||||||
flow = p.export.flows[17] # IPv6 flow
|
|
||||||
self.assertEqual(flow.protocolIdentifier, 17) # UDP
|
|
||||||
self.assertEqual(flow.sourceIPv6Address, 0xfde66f14e0f196090000affeaffeaffe)
|
|
||||||
self.assertEqual(ipaddress.ip_address(flow.sourceIPv6Address), # Docker ULA
|
|
||||||
ipaddress.ip_address("fde6:6f14:e0f1:9609:0:affe:affe:affe"))
|
|
||||||
|
|
||||||
def test_ipfix_contents_ether(self):
|
|
||||||
"""
|
|
||||||
IPFIX content tests based on exports with the softflowd "-T ether" flag, meaning that layer 2
|
|
||||||
is included in the export, like MAC addresses.
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
pkts, _, _ = send_recv_packets([PACKET_IPFIX_TEMPLATE_ETHER, PACKET_IPFIX_ETHER])
|
|
||||||
self.assertEqual(len(pkts), 2)
|
|
||||||
p = pkts[0]
|
|
||||||
|
|
||||||
# Inspect contents of specific flows
|
|
||||||
flow = p.export.flows[0]
|
|
||||||
self.assertEqual(flow.meteringProcessId, 9)
|
|
||||||
self.assertEqual(flow.selectorAlgorithm, 1)
|
|
||||||
self.assertEqual(flow.systemInitTimeMilliseconds, 759538800000)
|
|
||||||
|
|
||||||
flow = p.export.flows[1]
|
|
||||||
self.assertEqual(flow.destinationIPv4Address, 2886795266)
|
|
||||||
self.assertTrue(hasattr(flow, "sourceMacAddress"))
|
|
||||||
self.assertTrue(hasattr(flow, "postDestinationMacAddress"))
|
|
||||||
self.assertEqual(flow.sourceMacAddress, 0x123456affefe)
|
|
||||||
self.assertEqual(flow.postDestinationMacAddress, 0xaffeaffeaffe)
|
|
||||||
|
|
||||||
def test_ipfix_padding(self):
|
|
||||||
"""
|
|
||||||
Checks successful parsing of export packets that contain padding zeroes in an IPFIX set.
|
|
||||||
The padding in the example data is in between the last two data sets, so the successful parsing of the last
|
|
||||||
data set indicates correct handling of padding zero bytes.
|
|
||||||
"""
|
|
||||||
pkts, _, _ = send_recv_packets([PACKET_IPFIX_PADDING])
|
|
||||||
self.assertEqual(len(pkts), 1)
|
|
||||||
p = pkts[0]
|
|
||||||
|
|
||||||
# Check for length of whole export
|
|
||||||
self.assertEqual(p.export.header.length, 448)
|
|
||||||
|
|
||||||
# Check a specific value of the last flow in the export. Success means correct handling of padding in the set
|
|
||||||
self.assertEqual(p.export.flows[-1].meteringProcessId, 45786)
|
|
|
@ -1,157 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
"""
|
|
||||||
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
||||||
|
|
||||||
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
|
|
||||||
Licensed under MIT License. See LICENSE.
|
|
||||||
"""
|
|
||||||
# TODO: tests with 500 packets fail with delay=0. Probably a problem with UDP sockets buffer
|
|
||||||
|
|
||||||
import ipaddress
|
|
||||||
import random
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
from tests.lib import send_recv_packets, NUM_PACKETS, \
|
|
||||||
PACKET_INVALID, PACKET_V1, PACKET_V5, PACKET_V9_WITH_ZEROS, \
|
|
||||||
PACKET_V9_TEMPLATE, PACKET_V9_TEMPLATE_MIXED, PACKETS_V9
|
|
||||||
|
|
||||||
|
|
||||||
class TestFlowExportNetflow(unittest.TestCase):
|
|
||||||
def _test_recv_all_packets(self, num, template_idx, delay=0.0001):
|
|
||||||
"""Fling packets at the server and test that it receives them all"""
|
|
||||||
|
|
||||||
def gen_pkts(n, idx):
|
|
||||||
for x in range(n):
|
|
||||||
if x == idx:
|
|
||||||
yield PACKET_V9_TEMPLATE
|
|
||||||
else:
|
|
||||||
yield random.choice(PACKETS_V9)
|
|
||||||
|
|
||||||
pkts, tstart, tend = send_recv_packets(gen_pkts(num, template_idx), delay=delay)
|
|
||||||
|
|
||||||
# check number of packets
|
|
||||||
self.assertEqual(len(pkts), num)
|
|
||||||
|
|
||||||
# check timestamps are when packets were sent, not processed
|
|
||||||
self.assertTrue(all(tstart < p.ts < tend for p in pkts))
|
|
||||||
|
|
||||||
# check number of "things" in the packets (flows + templates)
|
|
||||||
# template packet = 10 things
|
|
||||||
# other packets = 12 things
|
|
||||||
self.assertEqual(sum(p.export.header.count for p in pkts), (num - 1) * 12 + 10)
|
|
||||||
|
|
||||||
# check number of flows in the packets
|
|
||||||
# template packet = 8 flows (2 templates)
|
|
||||||
# other packets = 12 flows
|
|
||||||
self.assertEqual(sum(len(p.export.flows) for p in pkts), (num - 1) * 12 + 8)
|
|
||||||
|
|
||||||
def test_recv_all_packets_template_first(self):
|
|
||||||
"""Test all packets are received when the template is sent first"""
|
|
||||||
self._test_recv_all_packets(NUM_PACKETS, 0)
|
|
||||||
|
|
||||||
def test_recv_all_packets_template_middle(self):
|
|
||||||
"""Test all packets are received when the template is sent in the middle"""
|
|
||||||
self._test_recv_all_packets(NUM_PACKETS, NUM_PACKETS // 2)
|
|
||||||
|
|
||||||
def test_recv_all_packets_template_last(self):
|
|
||||||
"""Test all packets are received when the template is sent last"""
|
|
||||||
self._test_recv_all_packets(NUM_PACKETS, NUM_PACKETS - 1)
|
|
||||||
|
|
||||||
def test_recv_all_packets_slowly(self):
|
|
||||||
"""Test all packets are received when things are sent slooooowwwwwwwwlllllllyyyyyy"""
|
|
||||||
self._test_recv_all_packets(3, 0, delay=1)
|
|
||||||
|
|
||||||
def test_ignore_invalid_packets(self):
|
|
||||||
"""Test that invalid packets log a warning but are otherwise ignored"""
|
|
||||||
with self.assertLogs(level='WARNING'):
|
|
||||||
pkts, _, _ = send_recv_packets([
|
|
||||||
PACKET_INVALID, PACKET_V9_TEMPLATE, random.choice(PACKETS_V9), PACKET_INVALID,
|
|
||||||
random.choice(PACKETS_V9), PACKET_INVALID
|
|
||||||
])
|
|
||||||
self.assertEqual(len(pkts), 3)
|
|
||||||
|
|
||||||
def test_recv_v1_packet(self):
|
|
||||||
"""Test NetFlow v1 packet parsing"""
|
|
||||||
pkts, _, _ = send_recv_packets([PACKET_V1])
|
|
||||||
self.assertEqual(len(pkts), 1)
|
|
||||||
|
|
||||||
# Take the parsed packet and check meta data
|
|
||||||
p = pkts[0]
|
|
||||||
self.assertEqual(p.client[0], "127.0.0.1") # collector listens locally
|
|
||||||
self.assertEqual(len(p.export.flows), 2) # ping request and reply
|
|
||||||
self.assertEqual(p.export.header.count, 2) # same value, in header
|
|
||||||
self.assertEqual(p.export.header.version, 1)
|
|
||||||
|
|
||||||
# Check specific IP address contained in a flow.
|
|
||||||
# Since it might vary which flow of the pair is epxorted first, check both
|
|
||||||
flow = p.export.flows[0]
|
|
||||||
self.assertIn(
|
|
||||||
ipaddress.ip_address(flow.IPV4_SRC_ADDR), # convert to ipaddress obj because value is int
|
|
||||||
[ipaddress.ip_address("172.17.0.1"), ipaddress.ip_address("172.17.0.2")]
|
|
||||||
)
|
|
||||||
self.assertEqual(flow.PROTO, 1) # ICMP
|
|
||||||
|
|
||||||
def test_recv_v5_packet(self):
|
|
||||||
"""Test NetFlow v5 packet parsing"""
|
|
||||||
pkts, _, _ = send_recv_packets([PACKET_V5])
|
|
||||||
self.assertEqual(len(pkts), 1)
|
|
||||||
|
|
||||||
p = pkts[0]
|
|
||||||
self.assertEqual(p.client[0], "127.0.0.1")
|
|
||||||
self.assertEqual(len(p.export.flows), 3) # ping request and reply, one multicast
|
|
||||||
self.assertEqual(p.export.header.count, 3)
|
|
||||||
self.assertEqual(p.export.header.version, 5)
|
|
||||||
|
|
||||||
# Check specific IP address contained in a flow.
|
|
||||||
# Since it might vary which flow of the pair is epxorted first, check both
|
|
||||||
flow = p.export.flows[0]
|
|
||||||
self.assertIn(
|
|
||||||
ipaddress.ip_address(flow.IPV4_SRC_ADDR), # convert to ipaddress obj because value is int
|
|
||||||
[ipaddress.ip_address("172.17.0.1"), ipaddress.ip_address("172.17.0.2")] # matches multicast packet too
|
|
||||||
)
|
|
||||||
self.assertEqual(flow.PROTO, 1) # ICMP
|
|
||||||
|
|
||||||
def test_recv_v9_packet(self):
|
|
||||||
"""Test NetFlow v9 packet parsing"""
|
|
||||||
|
|
||||||
# send packet without any template, must fail to parse (packets are queued)
|
|
||||||
pkts, _, _ = send_recv_packets([PACKETS_V9[0]])
|
|
||||||
self.assertEqual(len(pkts), 0) # no export is parsed due to missing template
|
|
||||||
|
|
||||||
# send an invalid packet with zero bytes, must fail to parse
|
|
||||||
pkts, _, _ = send_recv_packets([PACKET_V9_WITH_ZEROS])
|
|
||||||
self.assertEqual(len(pkts), 0) # no export is parsed due to missing template
|
|
||||||
|
|
||||||
# send packet with two templates and eight flows, should parse correctly since the templates are known
|
|
||||||
pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE])
|
|
||||||
self.assertEqual(len(pkts), 1)
|
|
||||||
|
|
||||||
# and again, but with the templates at the end in the packet
|
|
||||||
pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE_MIXED])
|
|
||||||
self.assertEqual(len(pkts), 1)
|
|
||||||
p = pkts[0]
|
|
||||||
self.assertEqual(p.client[0], "127.0.0.1")
|
|
||||||
self.assertEqual(len(p.export.flows), 8) # count flows
|
|
||||||
self.assertEqual(len(p.export.templates), 2) # count new templates
|
|
||||||
|
|
||||||
# Inspect contents of specific flows
|
|
||||||
flow = p.export.flows[0]
|
|
||||||
self.assertEqual(flow.PROTOCOL, 6) # TCP
|
|
||||||
self.assertEqual(flow.L4_SRC_PORT, 80)
|
|
||||||
self.assertEqual(flow.IPV4_SRC_ADDR, "127.0.0.1")
|
|
||||||
|
|
||||||
flow = p.export.flows[-1] # last flow
|
|
||||||
self.assertEqual(flow.PROTOCOL, 17) # UDP
|
|
||||||
self.assertEqual(flow.L4_DST_PORT, 53)
|
|
||||||
|
|
||||||
# send template and multiple export packets
|
|
||||||
pkts, _, _ = send_recv_packets([PACKET_V9_TEMPLATE, *PACKETS_V9])
|
|
||||||
self.assertEqual(len(pkts), 4)
|
|
||||||
self.assertEqual(pkts[0].export.header.version, 9)
|
|
||||||
|
|
||||||
# check amount of flows across all packets
|
|
||||||
total_flows = 0
|
|
||||||
for packet in pkts:
|
|
||||||
total_flows += len(packet.export.flows)
|
|
||||||
self.assertEqual(total_flows, 8 + 12 + 12 + 12)
|
|
|
@ -1,188 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
"""
|
|
||||||
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
|
|
||||||
|
|
||||||
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
|
|
||||||
Licensed under MIT License. See LICENSE.
|
|
||||||
"""
|
|
||||||
import cProfile
|
|
||||||
import io
|
|
||||||
import linecache
|
|
||||||
import pstats
|
|
||||||
import tracemalloc
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
from tests.lib import send_recv_packets, generate_packets
|
|
||||||
|
|
||||||
NUM_PACKETS_PERFORMANCE = 2000
|
|
||||||
|
|
||||||
|
|
||||||
@unittest.skip("Not necessary in functional tests, used as analysis tool")
|
|
||||||
class TestNetflowIPFIXPerformance(unittest.TestCase):
|
|
||||||
def setUp(self) -> None:
|
|
||||||
"""
|
|
||||||
Before each test run, start tracemalloc profiling.
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
tracemalloc.start()
|
|
||||||
print("\n\n")
|
|
||||||
|
|
||||||
def tearDown(self) -> None:
|
|
||||||
"""
|
|
||||||
After each test run, stop tracemalloc.
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
tracemalloc.stop()
|
|
||||||
|
|
||||||
def _memory_of_version(self, version, store_packets=500) -> tracemalloc.Snapshot:
|
|
||||||
"""
|
|
||||||
Create memory snapshot of collector run with packets of version :version:
|
|
||||||
:param version:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
if not tracemalloc.is_tracing():
|
|
||||||
raise RuntimeError
|
|
||||||
pkts, t1, t2 = send_recv_packets(generate_packets(NUM_PACKETS_PERFORMANCE, version),
|
|
||||||
store_packets=store_packets)
|
|
||||||
self.assertEqual(len(pkts), NUM_PACKETS_PERFORMANCE)
|
|
||||||
snapshot = tracemalloc.take_snapshot()
|
|
||||||
del pkts
|
|
||||||
return snapshot
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _print_memory_statistics(snapshot: tracemalloc.Snapshot, key: str, topx: int = 10):
|
|
||||||
"""
|
|
||||||
Print memory statistics from a tracemalloc.Snapshot in certain formats.
|
|
||||||
:param snapshot:
|
|
||||||
:param key:
|
|
||||||
:param topx:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
if key not in ["filename", "lineno", "traceback"]:
|
|
||||||
raise KeyError
|
|
||||||
|
|
||||||
stats = snapshot.statistics(key)
|
|
||||||
if key == "lineno":
|
|
||||||
for idx, stat in enumerate(stats[:topx]):
|
|
||||||
frame = stat.traceback[0]
|
|
||||||
print("\n{idx:02d}: {filename}:{lineno} {size:.1f} KiB, count {count}".format(
|
|
||||||
idx=idx + 1, filename=frame.filename, lineno=frame.lineno, size=stat.size / 1024, count=stat.count
|
|
||||||
))
|
|
||||||
|
|
||||||
lines = []
|
|
||||||
lines_whitespaces = []
|
|
||||||
for lineshift in range(-3, 2):
|
|
||||||
stat = linecache.getline(frame.filename, frame.lineno + lineshift)
|
|
||||||
lines_whitespaces.append(len(stat) - len(stat.lstrip(" "))) # count
|
|
||||||
lines.append(stat.strip())
|
|
||||||
lines_whitespaces = [x - min([y for y in lines_whitespaces if y > 0]) for x in lines_whitespaces]
|
|
||||||
for lidx, stat in enumerate(lines):
|
|
||||||
print(" {}{}".format("> " if lidx == 3 else "| ", " " * lines_whitespaces.pop(0) + stat))
|
|
||||||
elif key == "filename":
|
|
||||||
for idx, stat in enumerate(stats[:topx]):
|
|
||||||
frame = stat.traceback[0]
|
|
||||||
print("{idx:02d}: {filename:80s} {size:6.1f} KiB, count {count:5<d}".format(
|
|
||||||
idx=idx + 1, filename=frame.filename, size=stat.size / 1024, count=stat.count
|
|
||||||
))
|
|
||||||
|
|
||||||
def test_compare_memory(self):
|
|
||||||
"""
|
|
||||||
Test memory usage of two collector runs with IPFIX and NetFlow v9 packets respectively.
|
|
||||||
Then compare the two memory snapshots to make sure the libraries do not cross each other.
|
|
||||||
TODO: more features could be tested, e.g. too big of a difference if one version is optimized better
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
pkts, t1, t2 = send_recv_packets(generate_packets(NUM_PACKETS_PERFORMANCE, 10))
|
|
||||||
self.assertEqual(len(pkts), NUM_PACKETS_PERFORMANCE)
|
|
||||||
snapshot_ipfix = tracemalloc.take_snapshot()
|
|
||||||
del pkts
|
|
||||||
tracemalloc.clear_traces()
|
|
||||||
|
|
||||||
pkts, t1, t2 = send_recv_packets(generate_packets(NUM_PACKETS_PERFORMANCE, 9))
|
|
||||||
self.assertEqual(len(pkts), NUM_PACKETS_PERFORMANCE)
|
|
||||||
snapshot_v9 = tracemalloc.take_snapshot()
|
|
||||||
del pkts
|
|
||||||
|
|
||||||
stats = snapshot_v9.compare_to(snapshot_ipfix, "lineno")
|
|
||||||
for stat in stats:
|
|
||||||
if stat.traceback[0].filename.endswith("netflow/ipfix.py"):
|
|
||||||
self.assertEqual(stat.count, 0)
|
|
||||||
self.assertEqual(stat.size, 0)
|
|
||||||
|
|
||||||
stats = snapshot_ipfix.compare_to(snapshot_v9, "lineno")
|
|
||||||
for stat in stats:
|
|
||||||
if stat.traceback[0].filename.endswith("netflow/v9.py"):
|
|
||||||
self.assertEqual(stat.count, 0)
|
|
||||||
self.assertEqual(stat.size, 0)
|
|
||||||
|
|
||||||
def test_memory_ipfix(self):
|
|
||||||
"""
|
|
||||||
Test memory usage of the collector with IPFIX packets.
|
|
||||||
Three iterations are done with different amounts of packets to be stored.
|
|
||||||
With this approach, increased usage of memory can be captured when the ExportPacket objects are not deleted.
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
for store_pkts in [0, 500, -1]: # -1 is compatibility value for "store all"
|
|
||||||
snapshot_ipfix = self._memory_of_version(10, store_packets=store_pkts)
|
|
||||||
|
|
||||||
# TODO: this seems misleading, maybe reads the memory of the whole testing process?
|
|
||||||
# system_memory = pathlib.Path("/proc/self/statm").read_text()
|
|
||||||
# pagesize = resource.getpagesize()
|
|
||||||
# print("Total RSS memory used: {:.1f} KiB".format(int(system_memory.split()[1]) * pagesize // 1024.))
|
|
||||||
|
|
||||||
print("\nIPFIX memory usage with {} packets being stored".format(store_pkts))
|
|
||||||
self._print_memory_statistics(snapshot_ipfix, "filename")
|
|
||||||
if store_pkts == -1:
|
|
||||||
# very verbose and most interesting in iteration with all ExportPackets being stored in memory
|
|
||||||
print("\nTraceback for run with all packets being stored in memory")
|
|
||||||
self._print_memory_statistics(snapshot_ipfix, "lineno")
|
|
||||||
|
|
||||||
def test_memory_v1(self):
|
|
||||||
"""
|
|
||||||
Test memory with NetFlow v1
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
snapshot_v1 = self._memory_of_version(1)
|
|
||||||
print("\nNetFlow v1 memory usage by file")
|
|
||||||
self._print_memory_statistics(snapshot_v1, "filename")
|
|
||||||
|
|
||||||
def test_memory_v5(self):
|
|
||||||
"""
|
|
||||||
Test memory with NetFlow v5
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
snapshot_v5 = self._memory_of_version(5)
|
|
||||||
print("\nNetFlow v5 memory usage by file")
|
|
||||||
self._print_memory_statistics(snapshot_v5, "filename")
|
|
||||||
|
|
||||||
def test_memory_v9(self):
|
|
||||||
"""
|
|
||||||
Test memory usage of the collector with NetFlow v9 packets.
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
snapshot_v9 = self._memory_of_version(9)
|
|
||||||
print("\nNetFlow v9 memory usage by file")
|
|
||||||
self._print_memory_statistics(snapshot_v9, "filename")
|
|
||||||
print("\nNetFlow v9 memory usage by line")
|
|
||||||
self._print_memory_statistics(snapshot_v9, "lineno")
|
|
||||||
|
|
||||||
@unittest.skip("Does not work as expected due to threading")
|
|
||||||
def test_time_ipfix(self):
|
|
||||||
"""
|
|
||||||
Profile function calls and CPU time.
|
|
||||||
TODO: this does not work with threading in the collector, yet
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
profile = cProfile.Profile()
|
|
||||||
profile.enable(subcalls=True, builtins=True)
|
|
||||||
pkts, t1, t2 = send_recv_packets(generate_packets(NUM_PACKETS_PERFORMANCE, 10), delay=0, store_packets=500)
|
|
||||||
self.assertEqual(len(pkts), NUM_PACKETS_PERFORMANCE)
|
|
||||||
profile.disable()
|
|
||||||
|
|
||||||
for sort_by in ['cumulative', 'calls']:
|
|
||||||
s = io.StringIO()
|
|
||||||
ps = pstats.Stats(profile, stream=s)
|
|
||||||
ps.sort_stats(sort_by).print_stats("netflow")
|
|
||||||
ps.sort_stats(sort_by).print_callees(.5)
|
|
||||||
print(s.getvalue())
|
|
Loading…
Reference in a new issue