diff --git a/analyze_json.py b/analyze_json.py old mode 100644 new mode 100755 index 0f20bea..55bfbb0 --- a/analyze_json.py +++ b/analyze_json.py @@ -8,43 +8,35 @@ Copyright 2017-2019 Dominik Pataky Licensed under MIT License. See LICENSE. """ +import argparse +from collections import namedtuple +import contextlib from datetime import datetime +import functools import ipaddress import json -import os.path -import sys import socket -from collections import namedtuple +import sys -Pair = namedtuple('Pair', 'src dest') -def getIPs(flow): - use_ipv4 = False # optimistic default case of IPv6 +Pair = namedtuple('Pair', ['src', 'dest']) - if 'IP_PROTOCOL_VERSION' in flow and flow['IP_PROTOCOL_VERSION'] == 4: - use_ipv4 = True - elif 'IPV4_SRC_ADDR' in flow or 'IPV4_DST_ADDR' in flow: - use_ipv4 = True - - if use_ipv4: - return Pair( - ipaddress.ip_address(flow['IPV4_SRC_ADDR']), - ipaddress.ip_address(flow['IPV4_DST_ADDR'])) - - # else: return IPv6 pair - return Pair( - ipaddress.ip_address(flow['IPV6_SRC_ADDR']), - ipaddress.ip_address(flow['IPV6_DST_ADDR'])) +@functools.lru_cache(maxsize=0) +def resolve_hostname(ip): + return socket.getfqdn(ip) class Connection: """Connection model for two flows. The direction of the data flow can be seen by looking at the size. 'src' describes the peer which sends more data towards the other. This - does NOT have to mean, that 'src' was the initiator of the connection. + does NOT have to mean that 'src' was the initiator of the connection. """ def __init__(self, flow1, flow2): + if not flow1 or not flow2: + raise Exception("A connection requires two flows") + if flow1['IN_BYTES'] >= flow2['IN_BYTES']: src = flow1 dest = flow2 @@ -52,7 +44,7 @@ class Connection: src = flow2 dest = flow1 - ips = getIPs(src) + ips = self.get_ips(src) self.src = ips.src self.dest = ips.dest self.src_port = src['L4_SRC_PORT'] @@ -63,12 +55,33 @@ class Connection: self.duration = src['LAST_SWITCHED'] - src['FIRST_SWITCHED'] if self.duration < 0: # 32 bit int has its limits. Handling overflow here + # TODO: Should be handled in the collection phase self.duration = (2**32 - src['FIRST_SWITCHED']) + src['LAST_SWITCHED'] def __repr__(self): return "".format( self.src, self.dest, self.human_size) + @staticmethod + def get_ips(flow): + # TODO: These values should be parsed into strings in the collection phase. + # The floating point representation of an IPv6 address in JSON + # could lose precision. + + # IPv4 + if (flow.get('IP_PROTOCOL_VERSION') == 4 or 'IPV4_SRC_ADDR' in flow or + 'IPV4_DST_ADDR' in flow): + return Pair( + ipaddress.ip_address(flow['IPV4_SRC_ADDR']), + ipaddress.ip_address(flow['IPV4_DST_ADDR']) + ) + + # IPv6 + return Pair( + ipaddress.ip_address(flow['IPV6_SRC_ADDR']), + ipaddress.ip_address(flow['IPV6_DST_ADDR']) + ) + @property def human_size(self): # Calculate a human readable size of the traffic @@ -96,47 +109,40 @@ class Connection: @property def hostnames(self): # Resolve the IPs of this flows to their hostname - src_hostname = socket.getfqdn(self.src.compressed) - dest_hostname = socket.getfqdn(self.dest.compressed) - + src_hostname = resolve_hostname(self.src.compressed) + dest_hostname = resolve_hostname(self.dest.compressed) return Pair(src_hostname, dest_hostname) @property def service(self): # Resolve ports to their services, if known - service = "unknown" - try: - # Try service of sending peer first - service = socket.getservbyport(self.src_port) - except OSError: - # Resolving the sport did not work, trying dport - try: - service = socket.getservbyport(self.dest_port) - except OSError: - pass - return service + # Try source port, fallback to dest port, otherwise "unknown" + with contextlib.suppress(OSError): + return socket.getservbyport(self.src_port) + with contextlib.suppress(OSError): + return socket.getservbyport(self.dest_port) + return "unknown" -# Handle CLI args and load the data dump -if len(sys.argv) < 2: - exit("Use {} .json".format(sys.argv[0])) -filename = sys.argv[1] -if not os.path.exists(filename): - exit("File {} does not exist!".format(filename)) -with open(filename, 'r') as fh: - data = json.loads(fh.read()) +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data") + parser.add_argument('filename', nargs='?', type=argparse.FileType('r'), + default=sys.stdin, + help="The file to analyze (defaults to stdin if not provided)") + args = parser.parse_args() + data = json.load(args.filename) -# Go through data and disect every flow saved inside the dump -for export in sorted(data): - timestamp = datetime.fromtimestamp(float(export)).strftime("%Y-%m-%d %H:%M.%S") + # Go through data and disect every flow saved inside the dump + for key in sorted(data): + timestamp = datetime.fromtimestamp(float(key)).strftime("%Y-%m-%d %H:%M.%S") - flows = data[export] - pending = None # Two flows normally appear together for duplex connection - for flow in flows: - if not pending: - pending = flow - else: + flows = data[key] + pending = None # Two flows normally appear together for duplex connection + for flow in flows: + if not pending: + pending = flow + continue con = Connection(pending, flow) print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to"\ " {dest_host} ({dest})".format( diff --git a/main.py b/main.py index 11570a6..f76d915 100755 --- a/main.py +++ b/main.py @@ -127,7 +127,7 @@ class NetFlowListener(threading.Thread): # data back into the queue and try to decode them again if export.contains_new_templates and to_retry: __log__.debug("Received new template(s)") - __log__.debug("Will re-attempt to decode %d old v9 ExportPackets", + __log__.debug("Will re-attempt to decode %d old ExportPackets", len(to_retry)) for p in to_retry: self.input.put(p)