From 898d220a910ff5d4e8020af78815c482fb077692 Mon Sep 17 00:00:00 2001 From: Dominik Pataky Date: Sat, 28 Oct 2017 19:00:18 +0200 Subject: [PATCH] Add JSON export and analyzing example script --- .gitignore | 2 ++ analyze_json.py | 36 ++++++++++++++++++++++++ main.py | 55 ++++++++++++++++++++++++++++--------- src/netflow/collector_v9.py | 21 -------------- 4 files changed, 80 insertions(+), 34 deletions(-) create mode 100644 analyze_json.py diff --git a/.gitignore b/.gitignore index e22d2d9..e3a9264 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ dist* .*python_netflow_v9_softflowd.egg-info/ *.swp *.swo +__pycache__ +*.json diff --git a/analyze_json.py b/analyze_json.py new file mode 100644 index 0000000..a88a5e7 --- /dev/null +++ b/analyze_json.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +# Example analyzing script for saved exports + +from datetime import datetime +import ipaddress +import json +import sys + +if len(sys.argv) < 2: + exit("Use {} .json".format(sys.argv[0])) + +filename = sys.argv[1] + +with open(filename, 'r') as fh: + data = json.loads(fh.read()) + +for export in sorted(data): + timestamp = datetime.fromtimestamp(float(export)) + print("\n{}".format(timestamp)) + + flows = data[export] + for flow in flows: + count_bytes = flow['IN_BYTES'] + count_packets = flow['IN_PKTS'] + + if flow['IP_PROTOCOL_VERSION'] == 4: + src = ipaddress.ip_address(flow['IPV4_SRC_ADDR']) + dest = ipaddress.ip_address(flow['IPV4_DST_ADDR']) + + elif flow['IP_PROTOCOL_VERSION'] == 6: + src = ipaddress.ip_address(flow['IPV6_SRC_ADDR']) + dest = ipaddress.ip_address(flow['IPV6_DST_ADDR']) + + print("Flow from {src} to {dest} with {packets} packets, size {size}". + format(src=src, dest=dest, packets=count_packets, size=count_bytes)) diff --git a/main.py b/main.py index 4dba32a..e1d22ad 100644 --- a/main.py +++ b/main.py @@ -3,12 +3,10 @@ import logging import argparse import sys import socketserver +import time +import json +import os.path -try: - from netflow.collector_v9 import ExportPacket -except ImportError: - print("Netflow v9 not installed as package! Running from directory.") - from src.netflow.collector_v9 import ExportPacket logging.getLogger().setLevel(logging.INFO) ch = logging.StreamHandler(sys.stdout) @@ -17,13 +15,22 @@ formatter = logging.Formatter('%(message)s') ch.setFormatter(formatter) logging.getLogger().addHandler(ch) +try: + from netflow.collector_v9 import ExportPacket +except ImportError: + logging.warn("Netflow v9 not installed as package! Running from directory.") + from src.netflow.collector_v9 import ExportPacket + parser = argparse.ArgumentParser(description='A sample netflow collector.') - -parser.add_argument('-chost', type=str, default='', +parser.add_argument('--host', type=str, default='', help='collector listening address') -parser.add_argument('-cport', type=int, default=2055, +parser.add_argument('--port', '-p', type=int, default=2055, help='collector listener port') - +parser.add_argument('--file', '-o', type=str, dest='output_file', + default="{}.json".format(int(time.time())), + help='collector export JSON file') +parser.add_argument('--debug', '-D', action='store_true', + help='Enable debug output') class SoftflowUDPHandler(socketserver.BaseRequestHandler): # We need to save the templates our NetFlow device @@ -37,20 +44,42 @@ class SoftflowUDPHandler(socketserver.BaseRequestHandler): server = socketserver.UDPServer((host, port), cls) return server + @classmethod + def set_output_file(cls, path): + cls.output_file = path + def handle(self): + if not os.path.exists(self.output_file): + with open(self.output_file, 'w') as fh: + fh.write(json.dumps({})) + + with open(self.output_file, 'r') as fh: + existing_data = json.loads(fh.read()) + data = self.request[0] host = self.client_address[0] s = "Received data from {}, length {}".format(host, len(data)) - logging.info(s) + logging.debug(s) export = ExportPacket(data, self.TEMPLATES) self.TEMPLATES.update(export.templates) s = "Processed ExportPacket with {} flows.".format(export.header.count) - logging.info(s) - return export + logging.debug(s) + + # Append new flows + existing_data[time.time()] = [flow.data for flow in export.flows] + + with open(self.output_file, 'w') as fh: + fh.write(json.dumps(existing_data)) + + if __name__ == "__main__": args = parser.parse_args() - server = SoftflowUDPHandler.get_server(args.chost, args.cport) + SoftflowUDPHandler.set_output_file(args.output_file) + server = SoftflowUDPHandler.get_server(args.host, args.port) + + if args.debug: + logging.getLogger().setLevel(logging.DEBUG) try: logging.debug("Starting the NetFlow listener") diff --git a/src/netflow/collector_v9.py b/src/netflow/collector_v9.py index 728a68e..1ac75e3 100644 --- a/src/netflow/collector_v9.py +++ b/src/netflow/collector_v9.py @@ -270,24 +270,3 @@ class ExportPacket: def __repr__(self): return "".format( self.header.version, self.header.count) - - -if __name__ == "__main__": - # We need to save the templates our NetFlow device send over time. Templates - # are not resended every time a flow is sent to the collector. - _templates = {} - HOST = sys.argv[1] - PORT = int(sys.argv[2]) - - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.bind((HOST, PORT)) - print("Listening on interface {}:{}".format(HOST, PORT)) - - while 1: - (data, sender) = sock.recvfrom(8192) - print("Received data from {}, length {}".format(sender, len(data))) - - export = ExportPacket(data, _templates) - _templates.update(export.templates) - - print("Processed ExportPacket with {} flows.".format(export.header.count))