From 11dc92269c2a73f32aaba38a624c34b718d1842c Mon Sep 17 00:00:00 2001 From: Carey Metcalfe Date: Wed, 16 Oct 2019 22:55:17 -0400 Subject: [PATCH] Refactor code to make programatic access to flows easier This commit splits the packet collecting and processing out into a thread that provides a queue-like `get(block=True, timeout=None)` function for getting processed `ExportPackets`. This makes it much easier to use rather than starting a generator and sending a value to it when you want to stop. The `get_export_packets` generator is an example of using it - it just starts the thread and yields values from it. --- main.py | 152 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 108 insertions(+), 44 deletions(-) diff --git a/main.py b/main.py index 9e5f08f..11570a6 100755 --- a/main.py +++ b/main.py @@ -10,7 +10,7 @@ Licensed under MIT License. See LICENSE. import argparse from collections import namedtuple -from queue import Queue +import queue import json import logging import sys @@ -46,54 +46,118 @@ class QueuingUDPListener(socketserver.ThreadingUDPServer): super().__init__(interface, QueuingRequestHandler) +class NetFlowListener(threading.Thread): + """A thread that listens for incoming NetFlow packets, processes them, and + makes them available to consumers. + + - When initialized, will start listening for NetFlow packets on the provided + host and port and queuing them for processing. + - When started, will start processing and parsing queued packets. + - When stopped, will shut down the listener and stop processing. + - When joined, will wait for the listener to exit + + For example, a simple script that outputs data until killed with CTRL+C: + >>> listener = NetFlowListener('0.0.0.0', 2055) + >>> print("Listening for NetFlow packets") + >>> listener.start() # start processing packets + >>> try: + ... while True: + ... ts, export = listener.get() + ... print("Time: {}".format(ts)) + ... for f in export.flows: + ... print(" - {IPV4_SRC_ADDR} sent data to {IPV4_DST_ADDR}" + ... "".format(**f)) + ... finally: + ... print("Stopping...") + ... listener.stop() + ... listener.join() + ... print("Stopped!") + """ + + def __init__(self, host, port): + __log__.info("Starting the NetFlow listener on {}:{}".format(host, port)) + self.output = queue.Queue() + self.input = queue.Queue() + self.server = QueuingUDPListener((host, port), self.input) + self.thread = threading.Thread(target=self.server.serve_forever) + self.thread.start() + self._shutdown = threading.Event() + super().__init__() + + def get(self, block=True, timeout=None): + """Get a processed flow. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until a flow is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the queue.Empty exception if no flow was available within that time. + Otherwise ('block' is false), return a flow if one is immediately + available, else raise the queue.Empty exception ('timeout' is ignored + in that case). + """ + return self.output.get(block, timeout) + + def run(self): + # Process packets from the queue + try: + templates = {} + to_retry = [] + while not self._shutdown.is_set(): + try: + # 0.5s delay to limit CPU usage while waiting for new packets + pkt = self.input.get(block=True, timeout=0.5) + except queue.Empty: + continue + + try: + export = ExportPacket(pkt.data, templates) + except TemplateNotRecognized: + if time.time() - pkt.ts > PACKET_TIMEOUT: + __log__.warning("Dropping an old and undecodable ExportPacket") + else: + to_retry.append(pkt) + __log__.debug("Failed to decode a ExportPacket - will " + "re-attempt when a new template is discovered") + continue + + __log__.debug("Processed an ExportPacket with %d flows.", + export.header.version, export.header.count) + + # If any new templates were discovered, dump the unprocessable + # data back into the queue and try to decode them again + if export.contains_new_templates and to_retry: + __log__.debug("Received new template(s)") + __log__.debug("Will re-attempt to decode %d old v9 ExportPackets", + len(to_retry)) + for p in to_retry: + self.input.put(p) + to_retry.clear() + + self.output.put((pkt.ts, export)) + finally: + self.server.shutdown() + self.server.server_close() + + def stop(self): + __log__.info("Shutting down the NetFlow listener") + self._shutdown.set() + + def join(self): + self.thread.join() + super().join() + + def get_export_packets(host, port): - """A generator that will yield ExportPacket objects until it is killed - or has a truthy value sent to it""" + """A generator that will yield ExportPacket objects until it is killed""" - __log__.info("Starting the NetFlow listener on {}:{}".format(host, port)) - queue = Queue() - server = QueuingUDPListener((host, port), queue) - thread = threading.Thread(target=server.serve_forever) - thread.start() - - # Process packets from the queue - templates = {} - to_retry = [] + listener = NetFlowListener(host, port) + listener.start() try: while True: - pkt = queue.get() - try: - export = ExportPacket(pkt.data, templates) - except TemplateNotRecognized: - if time.time() - pkt.ts > PACKET_TIMEOUT: - __log__.warning("Dropping an old and undecodable ExportPacket") - else: - to_retry.append(pkt) - __log__.debug("Failed to decode an ExportPacket - will " - "re-attempt when a new template is dicovered") - continue - - __log__.debug("Processed an ExportPacket with %d flows.", - export.header.count) - - # If any new templates were discovered, dump the unprocessable - # data back into the queue and try to decode them again - if export.contains_new_templates and to_retry: - __log__.debug("Recieved new template(s)") - __log__.debug("Will re-attempt to decode %d old ExportPackets", - len(to_retry)) - for p in to_retry: - queue.put(p) - to_retry.clear() - - stop = yield pkt.ts, export - if stop: - break + yield listener.get() finally: - __log__.info("Shutting down the NetFlow listener") - server.shutdown() - server.server_close() - thread.join() + listener.stop() + listener.join() if __name__ == "__main__":