diff --git a/analyze_json.py b/analyze_json.py index ee93d07..a81a8c2 100755 --- a/analyze_json.py +++ b/analyze_json.py @@ -172,8 +172,8 @@ if __name__ == "__main__": # Go through data and dissect every flow saved inside the dump for key in sorted(data): timestamp = datetime.fromtimestamp(float(key)).strftime("%Y-%m-%d %H:%M.%S") - - flows = data[key] + client = data[key]["client"] + flows = data[key]["flows"] pending = None # Two flows normally appear together for duplex connection for flow in flows: if not pending: diff --git a/main.py b/main.py index ffb9bd8..0169c17 100755 --- a/main.py +++ b/main.py @@ -26,16 +26,15 @@ logger = logging.getLogger(__name__) # Amount of time to wait before dropping an undecodable ExportPacket PACKET_TIMEOUT = 60 * 60 -# TODO: Add source IP -RawPacket = namedtuple('RawPacket', ['ts', 'data']) +RawPacket = namedtuple('RawPacket', ['ts', 'client', 'data']) class QueuingRequestHandler(socketserver.BaseRequestHandler): def handle(self): - data = self.request[0] - self.server.queue.put(RawPacket(time.time(), data)) + data = self.request[0] # get content, [1] would be the socket + self.server.queue.put(RawPacket(time.time(), self.client_address, data)) logger.debug( - "Received %d bytes of data from %s", len(data), self.client_address[0] + "Received %d bytes of data from %s", len(data), self.client_address ) @@ -107,7 +106,7 @@ class NetFlowListener(threading.Thread): while not self._shutdown.is_set(): try: # 0.5s delay to limit CPU usage while waiting for new packets - pkt = self.input.get(block=True, timeout=0.5) + pkt: RawPacket = self.input.get(block=True, timeout=0.5) except queue.Empty: continue @@ -130,7 +129,7 @@ class NetFlowListener(threading.Thread): # If any new templates were discovered, dump the unprocessable # data back into the queue and try to decode them again - if (export.header.version == 9 and export.contains_new_templates and to_retry): + if export.header.version == 9 and export.contains_new_templates and to_retry: logger.debug("Received new template(s)") logger.debug("Will re-attempt to decode %d old v9 ExportPackets", len(to_retry)) @@ -138,7 +137,7 @@ class NetFlowListener(threading.Thread): self.input.put(p) to_retry.clear() - self.output.put((pkt.ts, export)) + self.output.put((pkt.ts, pkt.client, export)) finally: self.server.shutdown() self.server.server_close() @@ -198,8 +197,11 @@ if __name__ == "__main__": # 3. the disk usage of files with JSON and its full strings as keys is reduced by using gzipped files # This also means that the files have to be handled differently, because they are gzipped and not formatted as # one single big JSON dump, but rather many little JSON dumps, separated by line breaks. - for ts, export in get_export_packets(args.host, args.port): - entry = {ts: [flow.data for flow in export.flows]} + for ts, client, export in get_export_packets(args.host, args.port): + entry = {ts: { + "client": client, + "flows": [flow.data for flow in export.flows]} + } line = json.dumps(entry).encode() + b"\n" # byte encoded line with gzip.open(args.output_file, "ab") as fh: # open as append, not reading the whole file fh.write(line)