Add client info to stored data

Until now, packets arriving at the collector's interface were stored by
timestamp, with the exported flows in the payload. This format is now
extended to also store the client's IP address and port, allowing
multiple clients to export flows to the same collector instance.
This commit is contained in:
Dominik Pataky 2019-11-03 13:57:06 +01:00
parent 1646a52f17
commit eff99fc6e3
2 changed files with 14 additions and 12 deletions

View file

@ -172,8 +172,8 @@ if __name__ == "__main__":
# Go through data and dissect every flow saved inside the dump
for key in sorted(data):
timestamp = datetime.fromtimestamp(float(key)).strftime("%Y-%m-%d %H:%M.%S")
flows = data[key]
client = data[key]["client"]
flows = data[key]["flows"]
pending = None # Two flows normally appear together for duplex connection
for flow in flows:
if not pending:

22
main.py
View file

@ -26,16 +26,15 @@ logger = logging.getLogger(__name__)
# Amount of time to wait before dropping an undecodable ExportPacket
PACKET_TIMEOUT = 60 * 60
# TODO: Add source IP
RawPacket = namedtuple('RawPacket', ['ts', 'data'])
RawPacket = namedtuple('RawPacket', ['ts', 'client', 'data'])
class QueuingRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0]
self.server.queue.put(RawPacket(time.time(), data))
data = self.request[0] # get content, [1] would be the socket
self.server.queue.put(RawPacket(time.time(), self.client_address, data))
logger.debug(
"Received %d bytes of data from %s", len(data), self.client_address[0]
"Received %d bytes of data from %s", len(data), self.client_address
)
@ -107,7 +106,7 @@ class NetFlowListener(threading.Thread):
while not self._shutdown.is_set():
try:
# 0.5s delay to limit CPU usage while waiting for new packets
pkt = self.input.get(block=True, timeout=0.5)
pkt: RawPacket = self.input.get(block=True, timeout=0.5)
except queue.Empty:
continue
@ -130,7 +129,7 @@ class NetFlowListener(threading.Thread):
# If any new templates were discovered, dump the unprocessable
# data back into the queue and try to decode them again
if (export.header.version == 9 and export.contains_new_templates and to_retry):
if export.header.version == 9 and export.contains_new_templates and to_retry:
logger.debug("Received new template(s)")
logger.debug("Will re-attempt to decode %d old v9 ExportPackets",
len(to_retry))
@ -138,7 +137,7 @@ class NetFlowListener(threading.Thread):
self.input.put(p)
to_retry.clear()
self.output.put((pkt.ts, export))
self.output.put((pkt.ts, pkt.client, export))
finally:
self.server.shutdown()
self.server.server_close()
@ -198,8 +197,11 @@ if __name__ == "__main__":
# 3. the disk usage of files with JSON and its full strings as keys is reduced by using gzipped files
# This also means that the files have to be handled differently, because they are gzipped and not formatted as
# one single big JSON dump, but rather many little JSON dumps, separated by line breaks.
for ts, export in get_export_packets(args.host, args.port):
entry = {ts: [flow.data for flow in export.flows]}
for ts, client, export in get_export_packets(args.host, args.port):
entry = {ts: {
"client": client,
"flows": [flow.data for flow in export.flows]}
}
line = json.dumps(entry).encode() + b"\n" # byte encoded line
with gzip.open(args.output_file, "ab") as fh: # open as append, not reading the whole file
fh.write(line)