Refactor storing data and writing to disk - using gzip and lines

In previous versions, collected flows (parsed data) were stored in
memory by the collector. In regular intervals, or at shutdown, this one
single dict was dumped as JSON onto disk.

With this commit, the behaviour is changed to line-based JSON dumps for
each flow, gzipped onto disk for storage efficiency. The analyze_json is
updated as well to handle the new gzipped files in the new format.

See the comments in main.py for more details.

Fixes #10
This commit is contained in:
Dominik Pataky 2019-11-03 11:59:28 +01:00
parent 3dee135a22
commit 6b9d20c8a6
2 changed files with 56 additions and 21 deletions

View file

@ -13,16 +13,20 @@ from collections import namedtuple
import contextlib
from datetime import datetime
import functools
import gzip
import ipaddress
import json
import logging
import os.path
import socket
import sys
Pair = namedtuple('Pair', ['src', 'dest'])
logger = logging.getLogger(__name__)
@functools.lru_cache(maxsize=128)
@functools.lru_cache(maxsize=None)
def resolve_hostname(ip):
return socket.getfqdn(ip)
@ -139,14 +143,38 @@ class Connection:
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data")
parser.add_argument('filename', nargs='?', type=argparse.FileType('r'),
default=sys.stdin,
parser.add_argument('-f', '--file', dest='file', type=str, default=sys.stdin,
help="The file to analyze (defaults to stdin if not provided)")
args = parser.parse_args()
data = json.load(args.filename)
# Using a file and using stdin differ in their further usage for gzip.open
file = args.file
mode = "rb" # reading files
if file != sys.stdin and not os.path.exists(file):
exit("File {} does not exist!".format(file))
# Go through data and disect every flow saved inside the dump
if file == sys.stdin:
file = sys.stdin.buffer
mode = "rt" # reading from stdin
data = {}
with gzip.open(file, mode) as gzipped:
# "for line in" lazy-loads all lines in the file
for line in gzipped:
entry = json.loads(line)
if len(entry.keys()) != 1:
logger.warning("Line \"{}\" does not have exactly one timestamp key.")
try:
ts = list(entry)[0] # timestamp from key
except KeyError:
logger.error("Saved line \"{}\" has no timestamp key!".format(line))
continue
data[ts] = entry[ts]
# Go through data and dissect every flow saved inside the dump
for key in sorted(data):
timestamp = datetime.fromtimestamp(float(key)).strftime("%Y-%m-%d %H:%M.%S")
@ -157,7 +185,7 @@ if __name__ == "__main__":
pending = flow
continue
con = Connection(pending, flow)
print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to {dest_host} ({dest})" \
print("{timestamp}: {service:<10} | {size:8} | {duration:9} | {src_host} ({src}) to {dest_host} ({dest})" \
.format(timestamp=timestamp, service=con.service.upper(), src_host=con.hostnames.src, src=con.src,
dest_host=con.hostnames.dest, dest=con.dest, size=con.human_size, duration=con.human_duration))
pending = None

37
main.py
View file

@ -11,6 +11,7 @@ Licensed under MIT License. See LICENSE.
import argparse
from collections import namedtuple
import queue
import gzip
import json
import logging
import sys
@ -20,7 +21,6 @@ import time
from netflow import parse_packet, TemplateNotRecognized, UnknownNetFlowVersion
logger = logging.getLogger(__name__)
# Amount of time to wait before dropping an undecodable ExportPacket
@ -122,7 +122,7 @@ class NetFlowListener(threading.Thread):
else:
to_retry.append(pkt)
logger.debug("Failed to decode a v9 ExportPacket - will "
"re-attempt when a new template is discovered")
"re-attempt when a new template is discovered")
continue
logger.debug("Processed a v%d ExportPacket with %d flows.",
@ -172,8 +172,8 @@ if __name__ == "__main__":
parser.add_argument("--port", "-p", type=int, default=2055,
help="collector listener port")
parser.add_argument("--file", "-o", type=str, dest="output_file",
default="{}.json".format(int(time.time())),
help="collector export JSON file")
default="{}.gz".format(int(time.time())),
help="collector export multiline JSON file")
parser.add_argument("--debug", "-D", action="store_true",
help="Enable debug output")
args = parser.parse_args()
@ -183,19 +183,26 @@ if __name__ == "__main__":
if args.debug:
logger.setLevel(logging.DEBUG)
data = {}
try:
# TODO: For a long-running processes, this will consume loads of memory
# With every parsed flow a new line is appended to the output file. In previous versions, this was implemented
# by storing the whole data dict in memory and dumping it regularly onto disk. This was extremely fragile, as
# it a) consumed a lot of memory and CPU (dropping packets since storing one flow took longer than the arrival
# of the next flow) and b) broke the exported JSON file, if the collector crashed during the write process,
# rendering all collected flows during the runtime of the collector useless (the file contained one large JSON
# dict which represented the 'data' dict).
# In this new approach, each received flow is parsed as usual, but it gets appended to a gzipped file each time.
# All in all, this improves in three aspects:
# 1. collected flow data is not stored in memory any more
# 2. received and parsed flows are persisted reliably
# 3. the disk usage of files with JSON and its full strings as keys is reduced by using gzipped files
# This also means that the files have to be handled differently, because they are gzipped and not formatted as
# one single big JSON dump, but rather many little JSON dumps, separated by line breaks.
for ts, export in get_export_packets(args.host, args.port):
data[ts] = [flow.data for flow in export.flows]
entry = {ts: [flow.data for flow in export.flows]}
line = json.dumps(entry).encode() + b"\n" # byte encoded line
with gzip.open(args.output_file, "ab") as fh: # open as append, not reading the whole file
fh.write(line)
except KeyboardInterrupt:
logger.info("Received KeyboardInterrupt, passing through")
pass
if data:
# TODO: this should be done periodically to not lose any data (only saved in memory)
logger.info("Outputting collected data to '%s'", args.output_file)
with open(args.output_file, 'w') as f:
json.dump(data, f)
else:
logger.info("No data collected")