Refactor code to make programatic access to flows easier

This commit splits the packet collecting and processing out into a
thread that provides a queue-like `get(block=True, timeout=None)`
function for getting processed `ExportPackets`.

This makes it much easier to use rather than starting a generator and
sending a value to it when you want to stop. The `get_export_packets`
generator is an example of using it - it just starts the thread and
yields values from it.
This commit is contained in:
Carey Metcalfe 2019-10-16 22:55:17 -04:00
parent ef151f8d28
commit 11dc92269c

152
main.py
View file

@ -10,7 +10,7 @@ Licensed under MIT License. See LICENSE.
import argparse
from collections import namedtuple
from queue import Queue
import queue
import json
import logging
import sys
@ -46,54 +46,118 @@ class QueuingUDPListener(socketserver.ThreadingUDPServer):
super().__init__(interface, QueuingRequestHandler)
class NetFlowListener(threading.Thread):
"""A thread that listens for incoming NetFlow packets, processes them, and
makes them available to consumers.
- When initialized, will start listening for NetFlow packets on the provided
host and port and queuing them for processing.
- When started, will start processing and parsing queued packets.
- When stopped, will shut down the listener and stop processing.
- When joined, will wait for the listener to exit
For example, a simple script that outputs data until killed with CTRL+C:
>>> listener = NetFlowListener('0.0.0.0', 2055)
>>> print("Listening for NetFlow packets")
>>> listener.start() # start processing packets
>>> try:
... while True:
... ts, export = listener.get()
... print("Time: {}".format(ts))
... for f in export.flows:
... print(" - {IPV4_SRC_ADDR} sent data to {IPV4_DST_ADDR}"
... "".format(**f))
... finally:
... print("Stopping...")
... listener.stop()
... listener.join()
... print("Stopped!")
"""
def __init__(self, host, port):
__log__.info("Starting the NetFlow listener on {}:{}".format(host, port))
self.output = queue.Queue()
self.input = queue.Queue()
self.server = QueuingUDPListener((host, port), self.input)
self.thread = threading.Thread(target=self.server.serve_forever)
self.thread.start()
self._shutdown = threading.Event()
super().__init__()
def get(self, block=True, timeout=None):
"""Get a processed flow.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a flow is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the queue.Empty exception if no flow was available within that time.
Otherwise ('block' is false), return a flow if one is immediately
available, else raise the queue.Empty exception ('timeout' is ignored
in that case).
"""
return self.output.get(block, timeout)
def run(self):
# Process packets from the queue
try:
templates = {}
to_retry = []
while not self._shutdown.is_set():
try:
# 0.5s delay to limit CPU usage while waiting for new packets
pkt = self.input.get(block=True, timeout=0.5)
except queue.Empty:
continue
try:
export = ExportPacket(pkt.data, templates)
except TemplateNotRecognized:
if time.time() - pkt.ts > PACKET_TIMEOUT:
__log__.warning("Dropping an old and undecodable ExportPacket")
else:
to_retry.append(pkt)
__log__.debug("Failed to decode a ExportPacket - will "
"re-attempt when a new template is discovered")
continue
__log__.debug("Processed an ExportPacket with %d flows.",
export.header.version, export.header.count)
# If any new templates were discovered, dump the unprocessable
# data back into the queue and try to decode them again
if export.contains_new_templates and to_retry:
__log__.debug("Received new template(s)")
__log__.debug("Will re-attempt to decode %d old v9 ExportPackets",
len(to_retry))
for p in to_retry:
self.input.put(p)
to_retry.clear()
self.output.put((pkt.ts, export))
finally:
self.server.shutdown()
self.server.server_close()
def stop(self):
__log__.info("Shutting down the NetFlow listener")
self._shutdown.set()
def join(self):
self.thread.join()
super().join()
def get_export_packets(host, port):
"""A generator that will yield ExportPacket objects until it is killed
or has a truthy value sent to it"""
"""A generator that will yield ExportPacket objects until it is killed"""
__log__.info("Starting the NetFlow listener on {}:{}".format(host, port))
queue = Queue()
server = QueuingUDPListener((host, port), queue)
thread = threading.Thread(target=server.serve_forever)
thread.start()
# Process packets from the queue
templates = {}
to_retry = []
listener = NetFlowListener(host, port)
listener.start()
try:
while True:
pkt = queue.get()
try:
export = ExportPacket(pkt.data, templates)
except TemplateNotRecognized:
if time.time() - pkt.ts > PACKET_TIMEOUT:
__log__.warning("Dropping an old and undecodable ExportPacket")
else:
to_retry.append(pkt)
__log__.debug("Failed to decode an ExportPacket - will "
"re-attempt when a new template is dicovered")
continue
__log__.debug("Processed an ExportPacket with %d flows.",
export.header.count)
# If any new templates were discovered, dump the unprocessable
# data back into the queue and try to decode them again
if export.contains_new_templates and to_retry:
__log__.debug("Recieved new template(s)")
__log__.debug("Will re-attempt to decode %d old ExportPackets",
len(to_retry))
for p in to_retry:
queue.put(p)
to_retry.clear()
stop = yield pkt.ts, export
if stop:
break
yield listener.get()
finally:
__log__.info("Shutting down the NetFlow listener")
server.shutdown()
server.server_close()
thread.join()
listener.stop()
listener.join()
if __name__ == "__main__":