diff --git a/main.py b/main.py old mode 100644 new mode 100755 index 575ea67..9e5f08f --- a/main.py +++ b/main.py @@ -8,133 +8,123 @@ Copyright 2017-2019 Dominik Pataky Licensed under MIT License. See LICENSE. """ -import logging import argparse +from collections import namedtuple +from queue import Queue +import json +import logging import sys import socketserver +import threading import time -import json -import os.path + +from netflow.v9 import ExportPacket, TemplateNotRecognized -logging.getLogger().setLevel(logging.INFO) -ch = logging.StreamHandler(sys.stdout) -ch.setLevel(logging.DEBUG) -formatter = logging.Formatter('%(message)s') -ch.setFormatter(formatter) -logging.getLogger().addHandler(ch) +__log__ = logging.getLogger(__name__) -try: - from netflow.collector_v9 import ExportPacket, TemplateNotRecognized -except ImportError: - logging.warning("Netflow v9 not installed as package! Running from directory.") - from src.netflow.collector_v9 import ExportPacket, TemplateNotRecognized +# Amount of time to wait before dropping an undecodable ExportPacket +PACKET_TIMEOUT = 60 * 60 -parser = argparse.ArgumentParser(description="A sample netflow collector.") -parser.add_argument("--host", type=str, default="", - help="collector listening address") -parser.add_argument("--port", "-p", type=int, default=2055, - help="collector listener port") -parser.add_argument("--file", "-o", type=str, dest="output_file", - default="{}.json".format(int(time.time())), - help="collector export JSON file") -parser.add_argument("--debug", "-D", action="store_true", - help="Enable debug output") - - -class SoftflowUDPHandler(socketserver.BaseRequestHandler): - # We need to save the templates our NetFlow device - # send over time. Templates are not resended every - # time a flow is sent to the collector. - templates = {} - buffered = {} - - @classmethod - def set_output_file(cls, path): - cls.output_file = path +RawPacket = namedtuple('RawPacket', ['ts', 'data']) +class QueuingRequestHandler(socketserver.BaseRequestHandler): def handle(self): - if not os.path.exists(self.output_file): - with open(self.output_file, 'w') as fh: - json.dump({}, fh) - - with open(self.output_file, 'r') as fh: - try: - existing_data = json.load(fh) - except json.decoder.JSONDecodeError as ex: - logging.error("Malformed JSON output file. Cannot read existing data, aborting.") - return - data = self.request[0] - host = self.client_address[0] - logging.debug("Received data from {}, length {}".format(host, len(data))) + self.server.queue.put(RawPacket(time.time(), data)) + __log__.debug( + "Recieved %d bytes of data from %s", len(data), self.client_address[0] + ) - export = None - try: - export = ExportPacket(data, self.templates) - except TemplateNotRecognized: - self.buffered[time.time()] = data - logging.warning("Received data with unknown template, data stored in buffer!") - return - if not export: - logging.error("Error with exception handling while disecting export, export is None") - return +class QueuingUDPListener(socketserver.ThreadingUDPServer): + """A threaded UDP server that adds a (time, data) tuple to a queue for + every request it sees + """ + def __init__(self, interface, queue): + self.queue = queue + super().__init__(interface, QueuingRequestHandler) - logging.debug("Processed ExportPacket with {} flows.".format(export.header.count)) - logging.debug("Size of buffer: {}".format(len(self.buffered))) - # In case the export held some new templates - self.templates.update(export.templates) +def get_export_packets(host, port): + """A generator that will yield ExportPacket objects until it is killed + or has a truthy value sent to it""" - remain_buffered = {} - processed = [] - for timestamp, data in self.buffered.items(): + __log__.info("Starting the NetFlow listener on {}:{}".format(host, port)) + queue = Queue() + server = QueuingUDPListener((host, port), queue) + thread = threading.Thread(target=server.serve_forever) + thread.start() + + # Process packets from the queue + templates = {} + to_retry = [] + try: + while True: + pkt = queue.get() try: - buffered_export = ExportPacket(data, self.templates) - processed.append(timestamp) + export = ExportPacket(pkt.data, templates) except TemplateNotRecognized: - remain_buffered[timestamp] = data - logging.debug("Template of buffered ExportPacket still not recognized") + if time.time() - pkt.ts > PACKET_TIMEOUT: + __log__.warning("Dropping an old and undecodable ExportPacket") + else: + to_retry.append(pkt) + __log__.debug("Failed to decode an ExportPacket - will " + "re-attempt when a new template is dicovered") continue - logging.debug("Processed buffered ExportPacket with {} flows.".format(buffered_export.header.count)) - existing_data[timestamp] = [flow.data for flow in buffered_export.flows] - # Delete processed items from the buffer - for pro in processed: - del self.buffered[pro] + __log__.debug("Processed an ExportPacket with %d flows.", + export.header.count) - # Update the buffer - self.buffered.update(remain_buffered) - - # Append new flows - existing_data[time.time()] = [flow.data for flow in export.flows] - - with open(self.output_file, 'w') as fh: - json.dump(existing_data, fh) + # If any new templates were discovered, dump the unprocessable + # data back into the queue and try to decode them again + if export.contains_new_templates and to_retry: + __log__.debug("Recieved new template(s)") + __log__.debug("Will re-attempt to decode %d old ExportPackets", + len(to_retry)) + for p in to_retry: + queue.put(p) + to_retry.clear() + stop = yield pkt.ts, export + if stop: + break + finally: + __log__.info("Shutting down the NetFlow listener") + server.shutdown() + server.server_close() + thread.join() if __name__ == "__main__": + parser = argparse.ArgumentParser(description="A sample netflow collector.") + parser.add_argument("--host", type=str, default="0.0.0.0", + help="collector listening address") + parser.add_argument("--port", "-p", type=int, default=2055, + help="collector listener port") + parser.add_argument("--file", "-o", type=str, dest="output_file", + default="{}.json".format(int(time.time())), + help="collector export JSON file") + parser.add_argument("--debug", "-D", action="store_true", + help="Enable debug output") args = parser.parse_args() + logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s") + if args.debug: - logging.getLogger().setLevel(logging.DEBUG) - - output_file = args.output_file - SoftflowUDPHandler.set_output_file(output_file) - - host = args.host - port = args.port - logging.info("Listening on interface {}:{}".format(host, port)) - server = socketserver.UDPServer((host, port), SoftflowUDPHandler) + __log__.setLevel(logging.DEBUG) + data = {} try: - logging.debug("Starting the NetFlow listener") - server.serve_forever(poll_interval=0.5) - except (IOError, SystemExit): - raise + # TODO: For a long-running processes, this will consume loads of memory + for ts, export in get_export_packets(args.host, args.port): + data[ts] = [flow.data for flow in export.flows] except KeyboardInterrupt: - raise + pass - server.server_close() + if data: + __log__.info("Outputting collected data to '%s'", args.output_file) + with open(args.output_file, 'w') as f: + json.dump(data, f) + else: + __log__.info("No data collected") diff --git a/src/netflow/__init__.py b/netflow/__init__.py similarity index 100% rename from src/netflow/__init__.py rename to netflow/__init__.py diff --git a/src/netflow/collector_v9.py b/netflow/v9.py similarity index 92% rename from src/netflow/collector_v9.py rename to netflow/v9.py index 4d64ee1..5c01953 100644 --- a/src/netflow/collector_v9.py +++ b/netflow/v9.py @@ -11,12 +11,10 @@ Copyright 2017, 2018 Dominik Pataky Licensed under MIT License. See LICENSE. """ -import socket import struct -import sys -field_types = { +FIELD_TYPES = { 0: 'UNKNOWN_FIELD_TYPE', # fallback for unknown field types # Cisco specs for NetFlow v9 @@ -153,10 +151,14 @@ field_types = { } +class TemplateNotRecognized(KeyError): + pass + + class DataRecord: """This is a 'flow' as we want it from our source. What it contains is variable in NetFlow V9, so to work with the data you have to analyze the - data dict keys (which are integers and can be mapped with the field_types + data dict keys (which are integers and can be mapped with the FIELD_TYPES dict). Should hold a 'data' dict with keys=field_type (integer) and value (in bytes). @@ -195,7 +197,7 @@ class DataFlowSet: for field in template.fields: flen = field.field_length - fkey = field_types[field.field_type] + fkey = FIELD_TYPES[field.field_type] fdata = None # The length of the value byte slice is defined in the template @@ -218,20 +220,18 @@ class DataFlowSet: class TemplateField: - """A field with type identifier and length. - """ + """A field with type identifier and length.""" def __init__(self, field_type, field_length): self.field_type = field_type # integer self.field_length = field_length # bytes def __repr__(self): return "".format( - self.field_type, field_types[self.field_type], self.field_length) + self.field_type, FIELD_TYPES[self.field_type], self.field_length) class TemplateRecord: - """A template record contained in a TemplateFlowSet. - """ + """A template record contained in a TemplateFlowSet.""" def __init__(self, template_id, field_count, fields): self.template_id = template_id self.field_count = field_count @@ -240,7 +240,7 @@ class TemplateRecord: def __repr__(self): return "".format( self.template_id, self.field_count, - ' '.join([field_types[field.field_type] for field in self.fields])) + ' '.join([FIELD_TYPES[field.field_type] for field in self.fields])) class TemplateFlowSet: @@ -268,7 +268,7 @@ class TemplateFlowSet: # Get all fields of this template offset += 4 field_type, field_length = struct.unpack('!HH', data[offset:offset+4]) - if field_type not in field_types: + if field_type not in FIELD_TYPES: field_type = 0 # Set field_type to UNKNOWN_FIELD_TYPE as fallback field = TemplateField(field_type, field_length) fields.append(field) @@ -288,8 +288,7 @@ class TemplateFlowSet: class Header: - """The header of the ExportPacket. - """ + """The header of the ExportPacket.""" def __init__(self, data): pack = struct.unpack('!HHIIII', data[:20]) @@ -302,11 +301,11 @@ class Header: class ExportPacket: - """The flow record holds the header and all template and data flowsets. - """ + """The flow record holds the header and all template and data flowsets.""" def __init__(self, data, templates): self.header = Header(data) self.templates = templates + self._new_templates = False self.flows = [] offset = 20 @@ -314,6 +313,12 @@ class ExportPacket: flowset_id = struct.unpack('!H', data[offset:offset+2])[0] if flowset_id == 0: # TemplateFlowSet always have id 0 tfs = TemplateFlowSet(data[offset:]) + # Check for any new/changed templates + if not self._new_templates: + for id_, template in tfs.templates.items(): + if id_ not in self.templates or self.templates[id_] != template: + self._new_templates = True + break self.templates.update(tfs.templates) offset += tfs.length else: @@ -321,10 +326,10 @@ class ExportPacket: self.flows += dfs.flows offset += dfs.length + @property + def contains_new_templates(self): + return self._new_templates + def __repr__(self): return "".format( self.header.version, self.header.count) - - -class TemplateNotRecognized(KeyError): - pass