Move collector and analyzer into the package, callable via CLI

Beginning with this commit, the reference implementations of the
collector and analyzer are now included in the package. They are
callable by running `python3 -m netflow.collector` or `.analyzer`, with
the same flags as before. Use `-h` to list them.

Additional fixes are contained in this commit as well, e.g. adding more
version prefixes and moving parts of code from __init__ to utils, to fix
circular imports.
This commit is contained in:
Dominik Pataky 2020-03-29 22:14:45 +02:00
parent 9d2bc21ae2
commit abce1f57dd
7 changed files with 93 additions and 65 deletions

View file

@ -1,35 +1,8 @@
#!/usr/bin/env python #!/usr/bin/env python3
import struct """
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
from netflow.v1 import V1ExportPacket Copyright 2017-2020 Dominik Pataky <dev@bitkeks.eu>
from netflow.v5 import V5ExportPacket Licensed under MIT License. See LICENSE.
from netflow.v9 import V9ExportPacket, TemplateNotRecognized """
__all__ = ["TemplateNotRecognized", "UnknownNetFlowVersion", "parse_packet"]
class UnknownNetFlowVersion(Exception):
def __init__(self, data, version):
self.data = data
self.version = version
r = repr(data)
data_str = ("{:.25}..." if len(r) >= 28 else "{}").format(r)
super().__init__(
"Unknown NetFlow version {} for data {}".format(version, data_str)
)
def get_netflow_version(data):
return struct.unpack('!H', data[:2])[0]
def parse_packet(data, templates):
version = get_netflow_version(data)
if version == 1:
return V1ExportPacket(data)
elif version == 5:
return V5ExportPacket(data)
elif version == 9:
return V9ExportPacket(data, templates)
raise UnknownNetFlowVersion(data, version)

15
analyzer.py → netflow/analyzer.py Executable file → Normal file
View file

@ -1,7 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Example analyzing script for saved exports (by main.py, as JSON).
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2017-2020 Dominik Pataky <dev@bitkeks.eu> Copyright 2017-2020 Dominik Pataky <dev@bitkeks.eu>
@ -30,7 +29,12 @@ IP_PROTOCOLS = {
} }
Pair = namedtuple('Pair', ['src', 'dest']) Pair = namedtuple('Pair', ['src', 'dest'])
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
def printv(message, *args_, **kwargs): def printv(message, *args_, **kwargs):
@ -177,6 +181,10 @@ class Connection:
return self.src_flow["IN_PKTS"] + self.dest_flow["IN_PKTS"] return self.src_flow["IN_PKTS"] + self.dest_flow["IN_PKTS"]
if __name__ == "netflow.analyzer":
logger.error("The analyzer is currently meant to be used as a CLI tool only.")
logger.error("Use 'python3 -m netflow.analyzer -h' in your console for additional help.")
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data") parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data")
parser.add_argument("-f", "--file", dest="file", type=str, default=sys.stdin, parser.add_argument("-f", "--file", dest="file", type=str, default=sys.stdin,
@ -187,7 +195,8 @@ if __name__ == "__main__":
help="Enable verbose output.") help="Enable verbose output.")
parser.add_argument("--match-host", dest="match_host", type=str, default=None, parser.add_argument("--match-host", dest="match_host", type=str, default=None,
help="Filter output by matching on the given host (matches source or destination)") help="Filter output by matching on the given host (matches source or destination)")
parser.add_argument("-n", "--no-dns", dest="no_dns", action="store_true", help="Disable DNS resolving of IP addresses") parser.add_argument("-n", "--no-dns", dest="no_dns", action="store_true",
help="Disable DNS resolving of IP addresses")
args = parser.parse_args() args = parser.parse_args()
# Sanity check for IP address # Sanity check for IP address
@ -214,7 +223,7 @@ if __name__ == "__main__":
for line in gzipped: for line in gzipped:
entry = json.loads(line) entry = json.loads(line)
if len(entry.keys()) != 1: if len(entry.keys()) != 1:
logger.warning("Line \"{}\" does not have exactly one timestamp key.") logger.warning("The line does not have exactly one timestamp key: \"{}\"".format(line.keys()))
try: try:
ts = list(entry)[0] # timestamp from key ts = list(entry)[0] # timestamp from key

44
main.py → netflow/collector.py Executable file → Normal file
View file

@ -1,5 +1,3 @@
#!/usr/bin/env python3
""" """
Example collector script for NetFlow v1, v5, and v9. Example collector script for NetFlow v1, v5, and v9.
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
@ -7,40 +5,38 @@ This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2017-2020 Dominik Pataky <dev@bitkeks.eu> Copyright 2017-2020 Dominik Pataky <dev@bitkeks.eu>
Licensed under MIT License. See LICENSE. Licensed under MIT License. See LICENSE.
""" """
import argparse import argparse
from collections import namedtuple
import queue
import gzip import gzip
import json import json
from collections import namedtuple
import queue
import logging import logging
import sys
import socket import socket
import socketserver import socketserver
import threading import threading
import time import time
from netflow import parse_packet, TemplateNotRecognized, UnknownNetFlowVersion from .utils import UnknownNetFlowVersion, parse_packet
from .v9 import V9TemplateNotRecognized
logger = logging.getLogger(__name__) RawPacket = namedtuple('RawPacket', ['ts', 'client', 'data'])
ParsedPacket = namedtuple('ParsedPacket', ['ts', 'client', 'export'])
# Amount of time to wait before dropping an undecodable ExportPacket
PACKET_TIMEOUT = 60 * 60
logger = logging.getLogger("netflow-collector")
ch = logging.StreamHandler() ch = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter) ch.setFormatter(formatter)
logger.addHandler(ch) logger.addHandler(ch)
# Amount of time to wait before dropping an undecodable ExportPacket
PACKET_TIMEOUT = 60 * 60
RawPacket = namedtuple('RawPacket', ['ts', 'client', 'data'])
ParsedPacket = namedtuple('ParsedPacket', ['ts', 'client', 'export'])
class QueuingRequestHandler(socketserver.BaseRequestHandler): class QueuingRequestHandler(socketserver.BaseRequestHandler):
def handle(self): def handle(self):
data = self.request[0] # get content, [1] would be the socket data = self.request[0] # get content, [1] would be the socket
self.server.queue.put(RawPacket(time.time(), self.client_address, data)) self.server.queue.put(RawPacket(time.time(), self.client_address, data))
logger.debug( logging.debug(
"Received %d bytes of data from %s", len(data), self.client_address "Received %d bytes of data from %s", len(data), self.client_address
) )
@ -59,7 +55,7 @@ class QueuingUDPListener(socketserver.ThreadingUDPServer):
super().__init__(interface, QueuingRequestHandler) super().__init__(interface, QueuingRequestHandler)
class NetFlowListener(threading.Thread): class ThreadedNetFlowListener(threading.Thread):
"""A thread that listens for incoming NetFlow packets, processes them, and """A thread that listens for incoming NetFlow packets, processes them, and
makes them available to consumers. makes them available to consumers.
@ -70,7 +66,7 @@ class NetFlowListener(threading.Thread):
- When joined, will wait for the listener to exit - When joined, will wait for the listener to exit
For example, a simple script that outputs data until killed with CTRL+C: For example, a simple script that outputs data until killed with CTRL+C:
>>> listener = NetFlowListener('0.0.0.0', 2055) >>> listener = ThreadedNetFlowListener('0.0.0.0', 2055)
>>> print("Listening for NetFlow packets") >>> print("Listening for NetFlow packets")
>>> listener.start() # start processing packets >>> listener.start() # start processing packets
>>> try: >>> try:
@ -127,7 +123,7 @@ class NetFlowListener(threading.Thread):
except UnknownNetFlowVersion as e: except UnknownNetFlowVersion as e:
logger.error("%s, ignoring the packet", e) logger.error("%s, ignoring the packet", e)
continue continue
except TemplateNotRecognized: except V9TemplateNotRecognized:
if time.time() - pkt.ts > PACKET_TIMEOUT: if time.time() - pkt.ts > PACKET_TIMEOUT:
logger.warning("Dropping an old and undecodable v9 ExportPacket") logger.warning("Dropping an old and undecodable v9 ExportPacket")
else: else:
@ -164,9 +160,9 @@ class NetFlowListener(threading.Thread):
def get_export_packets(host, port): def get_export_packets(host, port):
"""A generator that will yield ExportPacket objects until it is killed""" """A threaded generator that will yield ExportPacket objects until it is killed
"""
listener = NetFlowListener(host, port) listener = ThreadedNetFlowListener(host, port)
listener.start() listener.start()
try: try:
while True: while True:
@ -176,6 +172,10 @@ def get_export_packets(host, port):
listener.join() listener.join()
if __name__ == "netflow.collector":
logger.error("The collector is currently meant to be used as a CLI tool only.")
logger.error("Use 'python3 -m netflow.collector -h' in your console for additional help.")
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A sample netflow collector.") parser = argparse.ArgumentParser(description="A sample netflow collector.")
parser.add_argument("--host", type=str, default="0.0.0.0", parser.add_argument("--host", type=str, default="0.0.0.0",

40
netflow/utils.py Normal file
View file

@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2017-2020 Dominik Pataky <dev@bitkeks.eu>
Licensed under MIT License. See LICENSE.
"""
import struct
from .v1 import V1ExportPacket
from .v5 import V5ExportPacket
from .v9 import V9ExportPacket
class UnknownNetFlowVersion(Exception):
def __init__(self, data, version):
self.data = data
self.version = version
r = repr(data)
data_str = ("{:.25}..." if len(r) >= 28 else "{}").format(r)
super().__init__(
"Unknown NetFlow version {} for data {}".format(version, data_str)
)
def get_netflow_version(data):
return struct.unpack('!H', data[:2])[0]
def parse_packet(data, templates):
version = get_netflow_version(data)
if version == 1:
return V1ExportPacket(data)
elif version == 5:
return V5ExportPacket(data)
elif version == 9:
return V9ExportPacket(data, templates)
raise UnknownNetFlowVersion(data, version)

View file

@ -13,6 +13,8 @@ See https://github.com/djmdjm/softflowd
import struct import struct
__all__ = ["V1DataFlow", "V1ExportPacket", "V1Header"]
class V1DataFlow: class V1DataFlow:
"""Holds one v1 DataRecord """Holds one v1 DataRecord

View file

@ -10,6 +10,8 @@ This script is specifically implemented in combination with softflowd. See https
import struct import struct
__all__ = ["V5DataFlow", "V5ExportPacket", "V5Header"]
class V5DataFlow: class V5DataFlow:
"""Holds one v5 DataRecord """Holds one v5 DataRecord

View file

@ -16,8 +16,10 @@ Licensed under MIT License. See LICENSE.
import ipaddress import ipaddress
import struct import struct
__all__ = ["V9DataFlowSet", "V9DataRecord", "V9ExportPacket", "V9Header", "V9TemplateField",
"V9TemplateFlowSet", "V9TemplateNotRecognized", "V9TemplateRecord"]
FIELD_TYPES = { V9_FIELD_TYPES = {
0: 'UNKNOWN_FIELD_TYPE', # fallback for unknown field types 0: 'UNKNOWN_FIELD_TYPE', # fallback for unknown field types
# Cisco specs for NetFlow v9 # Cisco specs for NetFlow v9
@ -154,7 +156,7 @@ FIELD_TYPES = {
} }
class TemplateNotRecognized(KeyError): class V9TemplateNotRecognized(KeyError):
pass pass
@ -188,7 +190,7 @@ class V9DataFlowSet:
offset = 4 offset = 4
if self.template_id not in templates: if self.template_id not in templates:
raise TemplateNotRecognized raise V9TemplateNotRecognized
template = templates[self.template_id] template = templates[self.template_id]
@ -200,7 +202,7 @@ class V9DataFlowSet:
for field in template.fields: for field in template.fields:
flen = field.field_length flen = field.field_length
fkey = FIELD_TYPES[field.field_type] fkey = V9_FIELD_TYPES[field.field_type]
# The length of the value byte slice is defined in the template # The length of the value byte slice is defined in the template
dataslice = data[offset:offset+flen] dataslice = data[offset:offset+flen]
@ -238,7 +240,7 @@ class V9TemplateField:
def __repr__(self): def __repr__(self):
return "<TemplateField type {}:{}, length {}>".format( return "<TemplateField type {}:{}, length {}>".format(
self.field_type, FIELD_TYPES[self.field_type], self.field_length) self.field_type, V9_FIELD_TYPES[self.field_type], self.field_length)
class V9TemplateRecord: class V9TemplateRecord:
@ -251,7 +253,7 @@ class V9TemplateRecord:
def __repr__(self): def __repr__(self):
return "<TemplateRecord {} with {} fields: {}>".format( return "<TemplateRecord {} with {} fields: {}>".format(
self.template_id, self.field_count, self.template_id, self.field_count,
' '.join([FIELD_TYPES[field.field_type] for field in self.fields])) ' '.join([V9_FIELD_TYPES[field.field_type] for field in self.fields]))
class V9TemplateFlowSet: class V9TemplateFlowSet:
@ -279,7 +281,7 @@ class V9TemplateFlowSet:
# Get all fields of this template # Get all fields of this template
offset += 4 offset += 4
field_type, field_length = struct.unpack('!HH', data[offset:offset+4]) field_type, field_length = struct.unpack('!HH', data[offset:offset+4])
if field_type not in FIELD_TYPES: if field_type not in V9_FIELD_TYPES:
field_type = 0 # Set field_type to UNKNOWN_FIELD_TYPE as fallback field_type = 0 # Set field_type to UNKNOWN_FIELD_TYPE as fallback
field = V9TemplateField(field_type, field_length) field = V9TemplateField(field_type, field_length)
fields.append(field) fields.append(field)