From 85e6af4bd2a9e28bd382ed117c01c360b283417d Mon Sep 17 00:00:00 2001 From: Dominik Pataky Date: Sun, 31 Mar 2019 20:51:34 +0200 Subject: [PATCH] Add buffering of exports with unknown template Until now, exports which were received, but their template was not known, resulted in KeyError exceptions due to a missing key in the template dict. With this release, these exports are buffered until a template export updates this dict, and all buffered exports are again examined. Release v0.7.0 Fixes #4 Fixes #5 --- analyze_json.py | 20 ++++--- main.py | 105 +++++++++++++++++++++++++----------- setup.py | 3 +- src/netflow/collector_v9.py | 8 +++ 4 files changed, 92 insertions(+), 44 deletions(-) diff --git a/analyze_json.py b/analyze_json.py index 4013482..0f20bea 100644 --- a/analyze_json.py +++ b/analyze_json.py @@ -1,10 +1,10 @@ #!/usr/bin/env python3 """ -Example analyzing script for saved exports (as JSON). -This file belongs to https://github.com/cooox/python-netflow-v9-softflowd. +Example analyzing script for saved exports (by main.py, as JSON). +This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. -Copyright 2017, 2018 Dominik Pataky +Copyright 2017-2019 Dominik Pataky Licensed under MIT License. See LICENSE. """ @@ -138,12 +138,10 @@ for export in sorted(data): pending = flow else: con = Connection(pending, flow) - if con.size > 1024**2: - #~ if con.service == "http": - print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to"\ - " {dest_host} ({dest})".format( - timestamp=timestamp, service=con.service.upper(), - src_host=con.hostnames.src, src=con.src, - dest_host=con.hostnames.dest, dest=con.dest, - size=con.human_size, duration=con.human_duration)) + print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to"\ + " {dest_host} ({dest})".format( + timestamp=timestamp, service=con.service.upper(), + src_host=con.hostnames.src, src=con.src, + dest_host=con.hostnames.dest, dest=con.dest, + size=con.human_size, duration=con.human_duration)) pending = None diff --git a/main.py b/main.py index a07ef7d..575ea67 100644 --- a/main.py +++ b/main.py @@ -2,9 +2,9 @@ """ Example collector script for NetFlow v9. -This file belongs to https://github.com/cooox/python-netflow-v9-softflowd. +This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. -Copyright 2017, 2018 Dominik Pataky +Copyright 2017-2019 Dominik Pataky Licensed under MIT License. See LICENSE. """ @@ -25,33 +25,29 @@ ch.setFormatter(formatter) logging.getLogger().addHandler(ch) try: - from netflow.collector_v9 import ExportPacket + from netflow.collector_v9 import ExportPacket, TemplateNotRecognized except ImportError: - logging.warn("Netflow v9 not installed as package! Running from directory.") - from src.netflow.collector_v9 import ExportPacket + logging.warning("Netflow v9 not installed as package! Running from directory.") + from src.netflow.collector_v9 import ExportPacket, TemplateNotRecognized -parser = argparse.ArgumentParser(description='A sample netflow collector.') -parser.add_argument('--host', type=str, default='', - help='collector listening address') -parser.add_argument('--port', '-p', type=int, default=2055, - help='collector listener port') -parser.add_argument('--file', '-o', type=str, dest='output_file', +parser = argparse.ArgumentParser(description="A sample netflow collector.") +parser.add_argument("--host", type=str, default="", + help="collector listening address") +parser.add_argument("--port", "-p", type=int, default=2055, + help="collector listener port") +parser.add_argument("--file", "-o", type=str, dest="output_file", default="{}.json".format(int(time.time())), - help='collector export JSON file') -parser.add_argument('--debug', '-D', action='store_true', - help='Enable debug output') + help="collector export JSON file") +parser.add_argument("--debug", "-D", action="store_true", + help="Enable debug output") + class SoftflowUDPHandler(socketserver.BaseRequestHandler): # We need to save the templates our NetFlow device # send over time. Templates are not resended every # time a flow is sent to the collector. - TEMPLATES = {} - - @classmethod - def get_server(cls, host, port): - logging.info("Listening on interface {}:{}".format(host, port)) - server = socketserver.UDPServer((host, port), cls) - return server + templates = {} + buffered = {} @classmethod def set_output_file(cls, path): @@ -60,36 +56,79 @@ class SoftflowUDPHandler(socketserver.BaseRequestHandler): def handle(self): if not os.path.exists(self.output_file): with open(self.output_file, 'w') as fh: - fh.write(json.dumps({})) + json.dump({}, fh) with open(self.output_file, 'r') as fh: - existing_data = json.loads(fh.read()) + try: + existing_data = json.load(fh) + except json.decoder.JSONDecodeError as ex: + logging.error("Malformed JSON output file. Cannot read existing data, aborting.") + return data = self.request[0] host = self.client_address[0] - s = "Received data from {}, length {}".format(host, len(data)) - logging.debug(s) - export = ExportPacket(data, self.TEMPLATES) - self.TEMPLATES.update(export.templates) - s = "Processed ExportPacket with {} flows.".format(export.header.count) - logging.debug(s) + logging.debug("Received data from {}, length {}".format(host, len(data))) + + export = None + try: + export = ExportPacket(data, self.templates) + except TemplateNotRecognized: + self.buffered[time.time()] = data + logging.warning("Received data with unknown template, data stored in buffer!") + return + + if not export: + logging.error("Error with exception handling while disecting export, export is None") + return + + logging.debug("Processed ExportPacket with {} flows.".format(export.header.count)) + logging.debug("Size of buffer: {}".format(len(self.buffered))) + + # In case the export held some new templates + self.templates.update(export.templates) + + remain_buffered = {} + processed = [] + for timestamp, data in self.buffered.items(): + try: + buffered_export = ExportPacket(data, self.templates) + processed.append(timestamp) + except TemplateNotRecognized: + remain_buffered[timestamp] = data + logging.debug("Template of buffered ExportPacket still not recognized") + continue + logging.debug("Processed buffered ExportPacket with {} flows.".format(buffered_export.header.count)) + existing_data[timestamp] = [flow.data for flow in buffered_export.flows] + + # Delete processed items from the buffer + for pro in processed: + del self.buffered[pro] + + # Update the buffer + self.buffered.update(remain_buffered) # Append new flows existing_data[time.time()] = [flow.data for flow in export.flows] with open(self.output_file, 'w') as fh: - fh.write(json.dumps(existing_data)) + json.dump(existing_data, fh) if __name__ == "__main__": args = parser.parse_args() - SoftflowUDPHandler.set_output_file(args.output_file) - server = SoftflowUDPHandler.get_server(args.host, args.port) if args.debug: logging.getLogger().setLevel(logging.DEBUG) + output_file = args.output_file + SoftflowUDPHandler.set_output_file(output_file) + + host = args.host + port = args.port + logging.info("Listening on interface {}:{}".format(host, port)) + server = socketserver.UDPServer((host, port), SoftflowUDPHandler) + try: logging.debug("Starting the NetFlow listener") server.serve_forever(poll_interval=0.5) @@ -97,3 +136,5 @@ if __name__ == "__main__": raise except KeyboardInterrupt: raise + + server.server_close() diff --git a/setup.py b/setup.py index 44746e0..fac2a9c 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 + from setuptools import setup, find_packages import os @@ -6,7 +7,7 @@ data_files = [(d, [os.path.join(d, f) for f in files]) for d, folders, files in os.walk(os.path.join('src', 'config'))] setup(name='netflow-v9', - version='0.6.1', + version='0.7.0', description='NetFlow v9 parser and collector implemented in Python 3. Developed to be used with softflowd v0.9.9', author='Dominik Pataky', author_email='dev@bitkeks.eu', diff --git a/src/netflow/collector_v9.py b/src/netflow/collector_v9.py index 230d2aa..4d64ee1 100644 --- a/src/netflow/collector_v9.py +++ b/src/netflow/collector_v9.py @@ -181,6 +181,10 @@ class DataFlowSet: self.flows = [] offset = 4 + + if self.template_id not in templates: + raise TemplateNotRecognized + template = templates[self.template_id] # As the field lengths are variable V9 has padding to next 32 Bit @@ -320,3 +324,7 @@ class ExportPacket: def __repr__(self): return "".format( self.header.version, self.header.count) + + +class TemplateNotRecognized(KeyError): + pass