diff --git a/analyze_json.py b/analyze_json.py index 5c653dd..992506e 100755 --- a/analyze_json.py +++ b/analyze_json.py @@ -13,16 +13,20 @@ from collections import namedtuple import contextlib from datetime import datetime import functools +import gzip import ipaddress import json +import logging +import os.path import socket import sys Pair = namedtuple('Pair', ['src', 'dest']) +logger = logging.getLogger(__name__) -@functools.lru_cache(maxsize=128) +@functools.lru_cache(maxsize=None) def resolve_hostname(ip): return socket.getfqdn(ip) @@ -139,14 +143,38 @@ class Connection: if __name__ == "__main__": parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data") - parser.add_argument('filename', nargs='?', type=argparse.FileType('r'), - default=sys.stdin, + parser.add_argument('-f', '--file', dest='file', type=str, default=sys.stdin, help="The file to analyze (defaults to stdin if not provided)") args = parser.parse_args() - data = json.load(args.filename) + # Using a file and using stdin differ in their further usage for gzip.open + file = args.file + mode = "rb" # reading files + if file != sys.stdin and not os.path.exists(file): + exit("File {} does not exist!".format(file)) - # Go through data and disect every flow saved inside the dump + if file == sys.stdin: + file = sys.stdin.buffer + mode = "rt" # reading from stdin + + data = {} + + with gzip.open(file, mode) as gzipped: + # "for line in" lazy-loads all lines in the file + for line in gzipped: + entry = json.loads(line) + if len(entry.keys()) != 1: + logger.warning("Line \"{}\" does not have exactly one timestamp key.") + + try: + ts = list(entry)[0] # timestamp from key + except KeyError: + logger.error("Saved line \"{}\" has no timestamp key!".format(line)) + continue + + data[ts] = entry[ts] + + # Go through data and dissect every flow saved inside the dump for key in sorted(data): timestamp = datetime.fromtimestamp(float(key)).strftime("%Y-%m-%d %H:%M.%S") @@ -157,7 +185,7 @@ if __name__ == "__main__": pending = flow continue con = Connection(pending, flow) - print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to {dest_host} ({dest})" \ + print("{timestamp}: {service:<10} | {size:8} | {duration:9} | {src_host} ({src}) to {dest_host} ({dest})" \ .format(timestamp=timestamp, service=con.service.upper(), src_host=con.hostnames.src, src=con.src, dest_host=con.hostnames.dest, dest=con.dest, size=con.human_size, duration=con.human_duration)) pending = None diff --git a/main.py b/main.py index 5e4931b..ffb9bd8 100755 --- a/main.py +++ b/main.py @@ -11,6 +11,7 @@ Licensed under MIT License. See LICENSE. import argparse from collections import namedtuple import queue +import gzip import json import logging import sys @@ -20,7 +21,6 @@ import time from netflow import parse_packet, TemplateNotRecognized, UnknownNetFlowVersion - logger = logging.getLogger(__name__) # Amount of time to wait before dropping an undecodable ExportPacket @@ -122,7 +122,7 @@ class NetFlowListener(threading.Thread): else: to_retry.append(pkt) logger.debug("Failed to decode a v9 ExportPacket - will " - "re-attempt when a new template is discovered") + "re-attempt when a new template is discovered") continue logger.debug("Processed a v%d ExportPacket with %d flows.", @@ -172,8 +172,8 @@ if __name__ == "__main__": parser.add_argument("--port", "-p", type=int, default=2055, help="collector listener port") parser.add_argument("--file", "-o", type=str, dest="output_file", - default="{}.json".format(int(time.time())), - help="collector export JSON file") + default="{}.gz".format(int(time.time())), + help="collector export multiline JSON file") parser.add_argument("--debug", "-D", action="store_true", help="Enable debug output") args = parser.parse_args() @@ -183,19 +183,26 @@ if __name__ == "__main__": if args.debug: logger.setLevel(logging.DEBUG) - data = {} try: - # TODO: For a long-running processes, this will consume loads of memory + # With every parsed flow a new line is appended to the output file. In previous versions, this was implemented + # by storing the whole data dict in memory and dumping it regularly onto disk. This was extremely fragile, as + # it a) consumed a lot of memory and CPU (dropping packets since storing one flow took longer than the arrival + # of the next flow) and b) broke the exported JSON file, if the collector crashed during the write process, + # rendering all collected flows during the runtime of the collector useless (the file contained one large JSON + # dict which represented the 'data' dict). + + # In this new approach, each received flow is parsed as usual, but it gets appended to a gzipped file each time. + # All in all, this improves in three aspects: + # 1. collected flow data is not stored in memory any more + # 2. received and parsed flows are persisted reliably + # 3. the disk usage of files with JSON and its full strings as keys is reduced by using gzipped files + # This also means that the files have to be handled differently, because they are gzipped and not formatted as + # one single big JSON dump, but rather many little JSON dumps, separated by line breaks. for ts, export in get_export_packets(args.host, args.port): - data[ts] = [flow.data for flow in export.flows] + entry = {ts: [flow.data for flow in export.flows]} + line = json.dumps(entry).encode() + b"\n" # byte encoded line + with gzip.open(args.output_file, "ab") as fh: # open as append, not reading the whole file + fh.write(line) except KeyboardInterrupt: logger.info("Received KeyboardInterrupt, passing through") pass - - if data: - # TODO: this should be done periodically to not lose any data (only saved in memory) - logger.info("Outputting collected data to '%s'", args.output_file) - with open(args.output_file, 'w') as f: - json.dump(data, f) - else: - logger.info("No data collected") \ No newline at end of file