Improve collector script and restructure code

- Moved the netflow library out of the src directory
- The UDP listener was restructured so that multiple threads can receive
  packets and push them into a queue. The main thread then pulls the
  packets off the queue one at a time and processes them. This means
  that the collector will never drop a packet because it was blocked on
  processing the previous one.
- Adds a property to the ExportPacket class to expose if any new
  templates are contained in it.
- The collector will now only retry parsing past packets when a new
  template is found. Also refactored the retry logic a bit to remove
  duplicate code (retrying just pushes the packets back into the main
  queue to be processed again like all the other packets).
- The collector no longer continually reads and writes to/from the disk.
  It just caches the data in memory until it exits instead.
This commit is contained in:
Carey Metcalfe 2019-10-05 03:07:02 -04:00
parent ce2be709d6
commit ef151f8d28
3 changed files with 113 additions and 118 deletions

186
main.py Normal file → Executable file
View file

@ -8,133 +8,123 @@ Copyright 2017-2019 Dominik Pataky <dev@bitkeks.eu>
Licensed under MIT License. See LICENSE.
"""
import logging
import argparse
from collections import namedtuple
from queue import Queue
import json
import logging
import sys
import socketserver
import threading
import time
import json
import os.path
from netflow.v9 import ExportPacket, TemplateNotRecognized
logging.getLogger().setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(message)s')
ch.setFormatter(formatter)
logging.getLogger().addHandler(ch)
__log__ = logging.getLogger(__name__)
try:
from netflow.collector_v9 import ExportPacket, TemplateNotRecognized
except ImportError:
logging.warning("Netflow v9 not installed as package! Running from directory.")
from src.netflow.collector_v9 import ExportPacket, TemplateNotRecognized
# Amount of time to wait before dropping an undecodable ExportPacket
PACKET_TIMEOUT = 60 * 60
parser = argparse.ArgumentParser(description="A sample netflow collector.")
parser.add_argument("--host", type=str, default="",
help="collector listening address")
parser.add_argument("--port", "-p", type=int, default=2055,
help="collector listener port")
parser.add_argument("--file", "-o", type=str, dest="output_file",
default="{}.json".format(int(time.time())),
help="collector export JSON file")
parser.add_argument("--debug", "-D", action="store_true",
help="Enable debug output")
class SoftflowUDPHandler(socketserver.BaseRequestHandler):
# We need to save the templates our NetFlow device
# send over time. Templates are not resended every
# time a flow is sent to the collector.
templates = {}
buffered = {}
@classmethod
def set_output_file(cls, path):
cls.output_file = path
RawPacket = namedtuple('RawPacket', ['ts', 'data'])
class QueuingRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
if not os.path.exists(self.output_file):
with open(self.output_file, 'w') as fh:
json.dump({}, fh)
with open(self.output_file, 'r') as fh:
try:
existing_data = json.load(fh)
except json.decoder.JSONDecodeError as ex:
logging.error("Malformed JSON output file. Cannot read existing data, aborting.")
return
data = self.request[0]
host = self.client_address[0]
logging.debug("Received data from {}, length {}".format(host, len(data)))
self.server.queue.put(RawPacket(time.time(), data))
__log__.debug(
"Recieved %d bytes of data from %s", len(data), self.client_address[0]
)
export = None
try:
export = ExportPacket(data, self.templates)
except TemplateNotRecognized:
self.buffered[time.time()] = data
logging.warning("Received data with unknown template, data stored in buffer!")
return
if not export:
logging.error("Error with exception handling while disecting export, export is None")
return
class QueuingUDPListener(socketserver.ThreadingUDPServer):
"""A threaded UDP server that adds a (time, data) tuple to a queue for
every request it sees
"""
def __init__(self, interface, queue):
self.queue = queue
super().__init__(interface, QueuingRequestHandler)
logging.debug("Processed ExportPacket with {} flows.".format(export.header.count))
logging.debug("Size of buffer: {}".format(len(self.buffered)))
# In case the export held some new templates
self.templates.update(export.templates)
def get_export_packets(host, port):
"""A generator that will yield ExportPacket objects until it is killed
or has a truthy value sent to it"""
remain_buffered = {}
processed = []
for timestamp, data in self.buffered.items():
__log__.info("Starting the NetFlow listener on {}:{}".format(host, port))
queue = Queue()
server = QueuingUDPListener((host, port), queue)
thread = threading.Thread(target=server.serve_forever)
thread.start()
# Process packets from the queue
templates = {}
to_retry = []
try:
while True:
pkt = queue.get()
try:
buffered_export = ExportPacket(data, self.templates)
processed.append(timestamp)
export = ExportPacket(pkt.data, templates)
except TemplateNotRecognized:
remain_buffered[timestamp] = data
logging.debug("Template of buffered ExportPacket still not recognized")
if time.time() - pkt.ts > PACKET_TIMEOUT:
__log__.warning("Dropping an old and undecodable ExportPacket")
else:
to_retry.append(pkt)
__log__.debug("Failed to decode an ExportPacket - will "
"re-attempt when a new template is dicovered")
continue
logging.debug("Processed buffered ExportPacket with {} flows.".format(buffered_export.header.count))
existing_data[timestamp] = [flow.data for flow in buffered_export.flows]
# Delete processed items from the buffer
for pro in processed:
del self.buffered[pro]
__log__.debug("Processed an ExportPacket with %d flows.",
export.header.count)
# Update the buffer
self.buffered.update(remain_buffered)
# Append new flows
existing_data[time.time()] = [flow.data for flow in export.flows]
with open(self.output_file, 'w') as fh:
json.dump(existing_data, fh)
# If any new templates were discovered, dump the unprocessable
# data back into the queue and try to decode them again
if export.contains_new_templates and to_retry:
__log__.debug("Recieved new template(s)")
__log__.debug("Will re-attempt to decode %d old ExportPackets",
len(to_retry))
for p in to_retry:
queue.put(p)
to_retry.clear()
stop = yield pkt.ts, export
if stop:
break
finally:
__log__.info("Shutting down the NetFlow listener")
server.shutdown()
server.server_close()
thread.join()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A sample netflow collector.")
parser.add_argument("--host", type=str, default="0.0.0.0",
help="collector listening address")
parser.add_argument("--port", "-p", type=int, default=2055,
help="collector listener port")
parser.add_argument("--file", "-o", type=str, dest="output_file",
default="{}.json".format(int(time.time())),
help="collector export JSON file")
parser.add_argument("--debug", "-D", action="store_true",
help="Enable debug output")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s")
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
output_file = args.output_file
SoftflowUDPHandler.set_output_file(output_file)
host = args.host
port = args.port
logging.info("Listening on interface {}:{}".format(host, port))
server = socketserver.UDPServer((host, port), SoftflowUDPHandler)
__log__.setLevel(logging.DEBUG)
data = {}
try:
logging.debug("Starting the NetFlow listener")
server.serve_forever(poll_interval=0.5)
except (IOError, SystemExit):
raise
# TODO: For a long-running processes, this will consume loads of memory
for ts, export in get_export_packets(args.host, args.port):
data[ts] = [flow.data for flow in export.flows]
except KeyboardInterrupt:
raise
pass
server.server_close()
if data:
__log__.info("Outputting collected data to '%s'", args.output_file)
with open(args.output_file, 'w') as f:
json.dump(data, f)
else:
__log__.info("No data collected")

View file

@ -11,12 +11,10 @@ Copyright 2017, 2018 Dominik Pataky <dev@bitkeks.eu>
Licensed under MIT License. See LICENSE.
"""
import socket
import struct
import sys
field_types = {
FIELD_TYPES = {
0: 'UNKNOWN_FIELD_TYPE', # fallback for unknown field types
# Cisco specs for NetFlow v9
@ -153,10 +151,14 @@ field_types = {
}
class TemplateNotRecognized(KeyError):
pass
class DataRecord:
"""This is a 'flow' as we want it from our source. What it contains is
variable in NetFlow V9, so to work with the data you have to analyze the
data dict keys (which are integers and can be mapped with the field_types
data dict keys (which are integers and can be mapped with the FIELD_TYPES
dict).
Should hold a 'data' dict with keys=field_type (integer) and value (in bytes).
@ -195,7 +197,7 @@ class DataFlowSet:
for field in template.fields:
flen = field.field_length
fkey = field_types[field.field_type]
fkey = FIELD_TYPES[field.field_type]
fdata = None
# The length of the value byte slice is defined in the template
@ -218,20 +220,18 @@ class DataFlowSet:
class TemplateField:
"""A field with type identifier and length.
"""
"""A field with type identifier and length."""
def __init__(self, field_type, field_length):
self.field_type = field_type # integer
self.field_length = field_length # bytes
def __repr__(self):
return "<TemplateField type {}:{}, length {}>".format(
self.field_type, field_types[self.field_type], self.field_length)
self.field_type, FIELD_TYPES[self.field_type], self.field_length)
class TemplateRecord:
"""A template record contained in a TemplateFlowSet.
"""
"""A template record contained in a TemplateFlowSet."""
def __init__(self, template_id, field_count, fields):
self.template_id = template_id
self.field_count = field_count
@ -240,7 +240,7 @@ class TemplateRecord:
def __repr__(self):
return "<TemplateRecord {} with {} fields: {}>".format(
self.template_id, self.field_count,
' '.join([field_types[field.field_type] for field in self.fields]))
' '.join([FIELD_TYPES[field.field_type] for field in self.fields]))
class TemplateFlowSet:
@ -268,7 +268,7 @@ class TemplateFlowSet:
# Get all fields of this template
offset += 4
field_type, field_length = struct.unpack('!HH', data[offset:offset+4])
if field_type not in field_types:
if field_type not in FIELD_TYPES:
field_type = 0 # Set field_type to UNKNOWN_FIELD_TYPE as fallback
field = TemplateField(field_type, field_length)
fields.append(field)
@ -288,8 +288,7 @@ class TemplateFlowSet:
class Header:
"""The header of the ExportPacket.
"""
"""The header of the ExportPacket."""
def __init__(self, data):
pack = struct.unpack('!HHIIII', data[:20])
@ -302,11 +301,11 @@ class Header:
class ExportPacket:
"""The flow record holds the header and all template and data flowsets.
"""
"""The flow record holds the header and all template and data flowsets."""
def __init__(self, data, templates):
self.header = Header(data)
self.templates = templates
self._new_templates = False
self.flows = []
offset = 20
@ -314,6 +313,12 @@ class ExportPacket:
flowset_id = struct.unpack('!H', data[offset:offset+2])[0]
if flowset_id == 0: # TemplateFlowSet always have id 0
tfs = TemplateFlowSet(data[offset:])
# Check for any new/changed templates
if not self._new_templates:
for id_, template in tfs.templates.items():
if id_ not in self.templates or self.templates[id_] != template:
self._new_templates = True
break
self.templates.update(tfs.templates)
offset += tfs.length
else:
@ -321,10 +326,10 @@ class ExportPacket:
self.flows += dfs.flows
offset += dfs.length
@property
def contains_new_templates(self):
return self._new_templates
def __repr__(self):
return "<ExportPacket version {} counting {} records>".format(
self.header.version, self.header.count)
class TemplateNotRecognized(KeyError):
pass