diff --git a/netflow/__init__.py b/netflow/__init__.py index 9078d70..ee9f070 100644 --- a/netflow/__init__.py +++ b/netflow/__init__.py @@ -1,35 +1,8 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 -import struct +""" +This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. -from netflow.v1 import V1ExportPacket -from netflow.v5 import V5ExportPacket -from netflow.v9 import V9ExportPacket, TemplateNotRecognized - -__all__ = ["TemplateNotRecognized", "UnknownNetFlowVersion", "parse_packet"] - - -class UnknownNetFlowVersion(Exception): - def __init__(self, data, version): - self.data = data - self.version = version - r = repr(data) - data_str = ("{:.25}..." if len(r) >= 28 else "{}").format(r) - super().__init__( - "Unknown NetFlow version {} for data {}".format(version, data_str) - ) - - -def get_netflow_version(data): - return struct.unpack('!H', data[:2])[0] - - -def parse_packet(data, templates): - version = get_netflow_version(data) - if version == 1: - return V1ExportPacket(data) - elif version == 5: - return V5ExportPacket(data) - elif version == 9: - return V9ExportPacket(data, templates) - raise UnknownNetFlowVersion(data, version) +Copyright 2017-2020 Dominik Pataky +Licensed under MIT License. See LICENSE. +""" diff --git a/analyzer.py b/netflow/analyzer.py old mode 100755 new mode 100644 similarity index 95% rename from analyzer.py rename to netflow/analyzer.py index 77d789c..3963234 --- a/analyzer.py +++ b/netflow/analyzer.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 """ -Example analyzing script for saved exports (by main.py, as JSON). This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. Copyright 2017-2020 Dominik Pataky @@ -30,7 +29,12 @@ IP_PROTOCOLS = { } Pair = namedtuple('Pair', ['src', 'dest']) + logger = logging.getLogger(__name__) +ch = logging.StreamHandler() +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +ch.setFormatter(formatter) +logger.addHandler(ch) def printv(message, *args_, **kwargs): @@ -177,6 +181,10 @@ class Connection: return self.src_flow["IN_PKTS"] + self.dest_flow["IN_PKTS"] +if __name__ == "netflow.analyzer": + logger.error("The analyzer is currently meant to be used as a CLI tool only.") + logger.error("Use 'python3 -m netflow.analyzer -h' in your console for additional help.") + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data") parser.add_argument("-f", "--file", dest="file", type=str, default=sys.stdin, @@ -187,7 +195,8 @@ if __name__ == "__main__": help="Enable verbose output.") parser.add_argument("--match-host", dest="match_host", type=str, default=None, help="Filter output by matching on the given host (matches source or destination)") - parser.add_argument("-n", "--no-dns", dest="no_dns", action="store_true", help="Disable DNS resolving of IP addresses") + parser.add_argument("-n", "--no-dns", dest="no_dns", action="store_true", + help="Disable DNS resolving of IP addresses") args = parser.parse_args() # Sanity check for IP address @@ -214,7 +223,7 @@ if __name__ == "__main__": for line in gzipped: entry = json.loads(line) if len(entry.keys()) != 1: - logger.warning("Line \"{}\" does not have exactly one timestamp key.") + logger.warning("The line does not have exactly one timestamp key: \"{}\"".format(line.keys())) try: ts = list(entry)[0] # timestamp from key diff --git a/main.py b/netflow/collector.py old mode 100755 new mode 100644 similarity index 92% rename from main.py rename to netflow/collector.py index 8bb5da6..59e56f9 --- a/main.py +++ b/netflow/collector.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - """ Example collector script for NetFlow v1, v5, and v9. This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. @@ -7,40 +5,38 @@ This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. Copyright 2017-2020 Dominik Pataky Licensed under MIT License. See LICENSE. """ - import argparse -from collections import namedtuple -import queue import gzip import json +from collections import namedtuple +import queue import logging -import sys import socket import socketserver import threading import time -from netflow import parse_packet, TemplateNotRecognized, UnknownNetFlowVersion +from .utils import UnknownNetFlowVersion, parse_packet +from .v9 import V9TemplateNotRecognized -logger = logging.getLogger(__name__) +RawPacket = namedtuple('RawPacket', ['ts', 'client', 'data']) +ParsedPacket = namedtuple('ParsedPacket', ['ts', 'client', 'export']) + +# Amount of time to wait before dropping an undecodable ExportPacket +PACKET_TIMEOUT = 60 * 60 + +logger = logging.getLogger("netflow-collector") ch = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) logger.addHandler(ch) -# Amount of time to wait before dropping an undecodable ExportPacket -PACKET_TIMEOUT = 60 * 60 - -RawPacket = namedtuple('RawPacket', ['ts', 'client', 'data']) -ParsedPacket = namedtuple('ParsedPacket', ['ts', 'client', 'export']) - - class QueuingRequestHandler(socketserver.BaseRequestHandler): def handle(self): data = self.request[0] # get content, [1] would be the socket self.server.queue.put(RawPacket(time.time(), self.client_address, data)) - logger.debug( + logging.debug( "Received %d bytes of data from %s", len(data), self.client_address ) @@ -59,7 +55,7 @@ class QueuingUDPListener(socketserver.ThreadingUDPServer): super().__init__(interface, QueuingRequestHandler) -class NetFlowListener(threading.Thread): +class ThreadedNetFlowListener(threading.Thread): """A thread that listens for incoming NetFlow packets, processes them, and makes them available to consumers. @@ -70,7 +66,7 @@ class NetFlowListener(threading.Thread): - When joined, will wait for the listener to exit For example, a simple script that outputs data until killed with CTRL+C: - >>> listener = NetFlowListener('0.0.0.0', 2055) + >>> listener = ThreadedNetFlowListener('0.0.0.0', 2055) >>> print("Listening for NetFlow packets") >>> listener.start() # start processing packets >>> try: @@ -127,7 +123,7 @@ class NetFlowListener(threading.Thread): except UnknownNetFlowVersion as e: logger.error("%s, ignoring the packet", e) continue - except TemplateNotRecognized: + except V9TemplateNotRecognized: if time.time() - pkt.ts > PACKET_TIMEOUT: logger.warning("Dropping an old and undecodable v9 ExportPacket") else: @@ -164,9 +160,9 @@ class NetFlowListener(threading.Thread): def get_export_packets(host, port): - """A generator that will yield ExportPacket objects until it is killed""" - - listener = NetFlowListener(host, port) + """A threaded generator that will yield ExportPacket objects until it is killed + """ + listener = ThreadedNetFlowListener(host, port) listener.start() try: while True: @@ -176,6 +172,10 @@ def get_export_packets(host, port): listener.join() +if __name__ == "netflow.collector": + logger.error("The collector is currently meant to be used as a CLI tool only.") + logger.error("Use 'python3 -m netflow.collector -h' in your console for additional help.") + if __name__ == "__main__": parser = argparse.ArgumentParser(description="A sample netflow collector.") parser.add_argument("--host", type=str, default="0.0.0.0", diff --git a/netflow/utils.py b/netflow/utils.py new file mode 100644 index 0000000..9eb2eee --- /dev/null +++ b/netflow/utils.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. + +Copyright 2017-2020 Dominik Pataky +Licensed under MIT License. See LICENSE. +""" + +import struct + +from .v1 import V1ExportPacket +from .v5 import V5ExportPacket +from .v9 import V9ExportPacket + + +class UnknownNetFlowVersion(Exception): + def __init__(self, data, version): + self.data = data + self.version = version + r = repr(data) + data_str = ("{:.25}..." if len(r) >= 28 else "{}").format(r) + super().__init__( + "Unknown NetFlow version {} for data {}".format(version, data_str) + ) + + +def get_netflow_version(data): + return struct.unpack('!H', data[:2])[0] + + +def parse_packet(data, templates): + version = get_netflow_version(data) + if version == 1: + return V1ExportPacket(data) + elif version == 5: + return V5ExportPacket(data) + elif version == 9: + return V9ExportPacket(data, templates) + raise UnknownNetFlowVersion(data, version) \ No newline at end of file diff --git a/netflow/v1.py b/netflow/v1.py index f68e370..ff52366 100644 --- a/netflow/v1.py +++ b/netflow/v1.py @@ -13,6 +13,8 @@ See https://github.com/djmdjm/softflowd import struct +__all__ = ["V1DataFlow", "V1ExportPacket", "V1Header"] + class V1DataFlow: """Holds one v1 DataRecord diff --git a/netflow/v5.py b/netflow/v5.py index 60143da..f136620 100644 --- a/netflow/v5.py +++ b/netflow/v5.py @@ -10,6 +10,8 @@ This script is specifically implemented in combination with softflowd. See https import struct +__all__ = ["V5DataFlow", "V5ExportPacket", "V5Header"] + class V5DataFlow: """Holds one v5 DataRecord diff --git a/netflow/v9.py b/netflow/v9.py index 8d5080c..f8eee5d 100644 --- a/netflow/v9.py +++ b/netflow/v9.py @@ -16,8 +16,10 @@ Licensed under MIT License. See LICENSE. import ipaddress import struct +__all__ = ["V9DataFlowSet", "V9DataRecord", "V9ExportPacket", "V9Header", "V9TemplateField", + "V9TemplateFlowSet", "V9TemplateNotRecognized", "V9TemplateRecord"] -FIELD_TYPES = { +V9_FIELD_TYPES = { 0: 'UNKNOWN_FIELD_TYPE', # fallback for unknown field types # Cisco specs for NetFlow v9 @@ -154,7 +156,7 @@ FIELD_TYPES = { } -class TemplateNotRecognized(KeyError): +class V9TemplateNotRecognized(KeyError): pass @@ -188,7 +190,7 @@ class V9DataFlowSet: offset = 4 if self.template_id not in templates: - raise TemplateNotRecognized + raise V9TemplateNotRecognized template = templates[self.template_id] @@ -200,7 +202,7 @@ class V9DataFlowSet: for field in template.fields: flen = field.field_length - fkey = FIELD_TYPES[field.field_type] + fkey = V9_FIELD_TYPES[field.field_type] # The length of the value byte slice is defined in the template dataslice = data[offset:offset+flen] @@ -238,7 +240,7 @@ class V9TemplateField: def __repr__(self): return "".format( - self.field_type, FIELD_TYPES[self.field_type], self.field_length) + self.field_type, V9_FIELD_TYPES[self.field_type], self.field_length) class V9TemplateRecord: @@ -251,7 +253,7 @@ class V9TemplateRecord: def __repr__(self): return "".format( self.template_id, self.field_count, - ' '.join([FIELD_TYPES[field.field_type] for field in self.fields])) + ' '.join([V9_FIELD_TYPES[field.field_type] for field in self.fields])) class V9TemplateFlowSet: @@ -279,7 +281,7 @@ class V9TemplateFlowSet: # Get all fields of this template offset += 4 field_type, field_length = struct.unpack('!HH', data[offset:offset+4]) - if field_type not in FIELD_TYPES: + if field_type not in V9_FIELD_TYPES: field_type = 0 # Set field_type to UNKNOWN_FIELD_TYPE as fallback field = V9TemplateField(field_type, field_length) fields.append(field)