From abce1f57dd5fd66664d6b98d3b5e614881483d60 Mon Sep 17 00:00:00 2001 From: Dominik Pataky Date: Sun, 29 Mar 2020 22:14:45 +0200 Subject: [PATCH] Move collector and analyzer into the package, callable via CLI Beginning with this commit, the reference implementations of the collector and analyzer are now included in the package. They are callable by running `python3 -m netflow.collector` or `.analyzer`, with the same flags as before. Use `-h` to list them. Additional fixes are contained in this commit as well, e.g. adding more version prefixes and moving parts of code from __init__ to utils, to fix circular imports. --- netflow/__init__.py | 39 ++++---------------------- analyzer.py => netflow/analyzer.py | 15 ++++++++-- main.py => netflow/collector.py | 44 +++++++++++++++--------------- netflow/utils.py | 40 +++++++++++++++++++++++++++ netflow/v1.py | 2 ++ netflow/v5.py | 2 ++ netflow/v9.py | 16 ++++++----- 7 files changed, 93 insertions(+), 65 deletions(-) rename analyzer.py => netflow/analyzer.py (95%) mode change 100755 => 100644 rename main.py => netflow/collector.py (92%) mode change 100755 => 100644 create mode 100644 netflow/utils.py diff --git a/netflow/__init__.py b/netflow/__init__.py index 9078d70..ee9f070 100644 --- a/netflow/__init__.py +++ b/netflow/__init__.py @@ -1,35 +1,8 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 -import struct +""" +This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. -from netflow.v1 import V1ExportPacket -from netflow.v5 import V5ExportPacket -from netflow.v9 import V9ExportPacket, TemplateNotRecognized - -__all__ = ["TemplateNotRecognized", "UnknownNetFlowVersion", "parse_packet"] - - -class UnknownNetFlowVersion(Exception): - def __init__(self, data, version): - self.data = data - self.version = version - r = repr(data) - data_str = ("{:.25}..." if len(r) >= 28 else "{}").format(r) - super().__init__( - "Unknown NetFlow version {} for data {}".format(version, data_str) - ) - - -def get_netflow_version(data): - return struct.unpack('!H', data[:2])[0] - - -def parse_packet(data, templates): - version = get_netflow_version(data) - if version == 1: - return V1ExportPacket(data) - elif version == 5: - return V5ExportPacket(data) - elif version == 9: - return V9ExportPacket(data, templates) - raise UnknownNetFlowVersion(data, version) +Copyright 2017-2020 Dominik Pataky +Licensed under MIT License. See LICENSE. +""" diff --git a/analyzer.py b/netflow/analyzer.py old mode 100755 new mode 100644 similarity index 95% rename from analyzer.py rename to netflow/analyzer.py index 77d789c..3963234 --- a/analyzer.py +++ b/netflow/analyzer.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 """ -Example analyzing script for saved exports (by main.py, as JSON). This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. Copyright 2017-2020 Dominik Pataky @@ -30,7 +29,12 @@ IP_PROTOCOLS = { } Pair = namedtuple('Pair', ['src', 'dest']) + logger = logging.getLogger(__name__) +ch = logging.StreamHandler() +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +ch.setFormatter(formatter) +logger.addHandler(ch) def printv(message, *args_, **kwargs): @@ -177,6 +181,10 @@ class Connection: return self.src_flow["IN_PKTS"] + self.dest_flow["IN_PKTS"] +if __name__ == "netflow.analyzer": + logger.error("The analyzer is currently meant to be used as a CLI tool only.") + logger.error("Use 'python3 -m netflow.analyzer -h' in your console for additional help.") + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data") parser.add_argument("-f", "--file", dest="file", type=str, default=sys.stdin, @@ -187,7 +195,8 @@ if __name__ == "__main__": help="Enable verbose output.") parser.add_argument("--match-host", dest="match_host", type=str, default=None, help="Filter output by matching on the given host (matches source or destination)") - parser.add_argument("-n", "--no-dns", dest="no_dns", action="store_true", help="Disable DNS resolving of IP addresses") + parser.add_argument("-n", "--no-dns", dest="no_dns", action="store_true", + help="Disable DNS resolving of IP addresses") args = parser.parse_args() # Sanity check for IP address @@ -214,7 +223,7 @@ if __name__ == "__main__": for line in gzipped: entry = json.loads(line) if len(entry.keys()) != 1: - logger.warning("Line \"{}\" does not have exactly one timestamp key.") + logger.warning("The line does not have exactly one timestamp key: \"{}\"".format(line.keys())) try: ts = list(entry)[0] # timestamp from key diff --git a/main.py b/netflow/collector.py old mode 100755 new mode 100644 similarity index 92% rename from main.py rename to netflow/collector.py index 8bb5da6..59e56f9 --- a/main.py +++ b/netflow/collector.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - """ Example collector script for NetFlow v1, v5, and v9. This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. @@ -7,40 +5,38 @@ This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. Copyright 2017-2020 Dominik Pataky Licensed under MIT License. See LICENSE. """ - import argparse -from collections import namedtuple -import queue import gzip import json +from collections import namedtuple +import queue import logging -import sys import socket import socketserver import threading import time -from netflow import parse_packet, TemplateNotRecognized, UnknownNetFlowVersion +from .utils import UnknownNetFlowVersion, parse_packet +from .v9 import V9TemplateNotRecognized -logger = logging.getLogger(__name__) +RawPacket = namedtuple('RawPacket', ['ts', 'client', 'data']) +ParsedPacket = namedtuple('ParsedPacket', ['ts', 'client', 'export']) + +# Amount of time to wait before dropping an undecodable ExportPacket +PACKET_TIMEOUT = 60 * 60 + +logger = logging.getLogger("netflow-collector") ch = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) logger.addHandler(ch) -# Amount of time to wait before dropping an undecodable ExportPacket -PACKET_TIMEOUT = 60 * 60 - -RawPacket = namedtuple('RawPacket', ['ts', 'client', 'data']) -ParsedPacket = namedtuple('ParsedPacket', ['ts', 'client', 'export']) - - class QueuingRequestHandler(socketserver.BaseRequestHandler): def handle(self): data = self.request[0] # get content, [1] would be the socket self.server.queue.put(RawPacket(time.time(), self.client_address, data)) - logger.debug( + logging.debug( "Received %d bytes of data from %s", len(data), self.client_address ) @@ -59,7 +55,7 @@ class QueuingUDPListener(socketserver.ThreadingUDPServer): super().__init__(interface, QueuingRequestHandler) -class NetFlowListener(threading.Thread): +class ThreadedNetFlowListener(threading.Thread): """A thread that listens for incoming NetFlow packets, processes them, and makes them available to consumers. @@ -70,7 +66,7 @@ class NetFlowListener(threading.Thread): - When joined, will wait for the listener to exit For example, a simple script that outputs data until killed with CTRL+C: - >>> listener = NetFlowListener('0.0.0.0', 2055) + >>> listener = ThreadedNetFlowListener('0.0.0.0', 2055) >>> print("Listening for NetFlow packets") >>> listener.start() # start processing packets >>> try: @@ -127,7 +123,7 @@ class NetFlowListener(threading.Thread): except UnknownNetFlowVersion as e: logger.error("%s, ignoring the packet", e) continue - except TemplateNotRecognized: + except V9TemplateNotRecognized: if time.time() - pkt.ts > PACKET_TIMEOUT: logger.warning("Dropping an old and undecodable v9 ExportPacket") else: @@ -164,9 +160,9 @@ class NetFlowListener(threading.Thread): def get_export_packets(host, port): - """A generator that will yield ExportPacket objects until it is killed""" - - listener = NetFlowListener(host, port) + """A threaded generator that will yield ExportPacket objects until it is killed + """ + listener = ThreadedNetFlowListener(host, port) listener.start() try: while True: @@ -176,6 +172,10 @@ def get_export_packets(host, port): listener.join() +if __name__ == "netflow.collector": + logger.error("The collector is currently meant to be used as a CLI tool only.") + logger.error("Use 'python3 -m netflow.collector -h' in your console for additional help.") + if __name__ == "__main__": parser = argparse.ArgumentParser(description="A sample netflow collector.") parser.add_argument("--host", type=str, default="0.0.0.0", diff --git a/netflow/utils.py b/netflow/utils.py new file mode 100644 index 0000000..9eb2eee --- /dev/null +++ b/netflow/utils.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. + +Copyright 2017-2020 Dominik Pataky +Licensed under MIT License. See LICENSE. +""" + +import struct + +from .v1 import V1ExportPacket +from .v5 import V5ExportPacket +from .v9 import V9ExportPacket + + +class UnknownNetFlowVersion(Exception): + def __init__(self, data, version): + self.data = data + self.version = version + r = repr(data) + data_str = ("{:.25}..." if len(r) >= 28 else "{}").format(r) + super().__init__( + "Unknown NetFlow version {} for data {}".format(version, data_str) + ) + + +def get_netflow_version(data): + return struct.unpack('!H', data[:2])[0] + + +def parse_packet(data, templates): + version = get_netflow_version(data) + if version == 1: + return V1ExportPacket(data) + elif version == 5: + return V5ExportPacket(data) + elif version == 9: + return V9ExportPacket(data, templates) + raise UnknownNetFlowVersion(data, version) \ No newline at end of file diff --git a/netflow/v1.py b/netflow/v1.py index f68e370..ff52366 100644 --- a/netflow/v1.py +++ b/netflow/v1.py @@ -13,6 +13,8 @@ See https://github.com/djmdjm/softflowd import struct +__all__ = ["V1DataFlow", "V1ExportPacket", "V1Header"] + class V1DataFlow: """Holds one v1 DataRecord diff --git a/netflow/v5.py b/netflow/v5.py index 60143da..f136620 100644 --- a/netflow/v5.py +++ b/netflow/v5.py @@ -10,6 +10,8 @@ This script is specifically implemented in combination with softflowd. See https import struct +__all__ = ["V5DataFlow", "V5ExportPacket", "V5Header"] + class V5DataFlow: """Holds one v5 DataRecord diff --git a/netflow/v9.py b/netflow/v9.py index 8d5080c..f8eee5d 100644 --- a/netflow/v9.py +++ b/netflow/v9.py @@ -16,8 +16,10 @@ Licensed under MIT License. See LICENSE. import ipaddress import struct +__all__ = ["V9DataFlowSet", "V9DataRecord", "V9ExportPacket", "V9Header", "V9TemplateField", + "V9TemplateFlowSet", "V9TemplateNotRecognized", "V9TemplateRecord"] -FIELD_TYPES = { +V9_FIELD_TYPES = { 0: 'UNKNOWN_FIELD_TYPE', # fallback for unknown field types # Cisco specs for NetFlow v9 @@ -154,7 +156,7 @@ FIELD_TYPES = { } -class TemplateNotRecognized(KeyError): +class V9TemplateNotRecognized(KeyError): pass @@ -188,7 +190,7 @@ class V9DataFlowSet: offset = 4 if self.template_id not in templates: - raise TemplateNotRecognized + raise V9TemplateNotRecognized template = templates[self.template_id] @@ -200,7 +202,7 @@ class V9DataFlowSet: for field in template.fields: flen = field.field_length - fkey = FIELD_TYPES[field.field_type] + fkey = V9_FIELD_TYPES[field.field_type] # The length of the value byte slice is defined in the template dataslice = data[offset:offset+flen] @@ -238,7 +240,7 @@ class V9TemplateField: def __repr__(self): return "".format( - self.field_type, FIELD_TYPES[self.field_type], self.field_length) + self.field_type, V9_FIELD_TYPES[self.field_type], self.field_length) class V9TemplateRecord: @@ -251,7 +253,7 @@ class V9TemplateRecord: def __repr__(self): return "".format( self.template_id, self.field_count, - ' '.join([FIELD_TYPES[field.field_type] for field in self.fields])) + ' '.join([V9_FIELD_TYPES[field.field_type] for field in self.fields])) class V9TemplateFlowSet: @@ -279,7 +281,7 @@ class V9TemplateFlowSet: # Get all fields of this template offset += 4 field_type, field_length = struct.unpack('!HH', data[offset:offset+4]) - if field_type not in FIELD_TYPES: + if field_type not in V9_FIELD_TYPES: field_type = 0 # Set field_type to UNKNOWN_FIELD_TYPE as fallback field = V9TemplateField(field_type, field_length) fields.append(field)