Add buffering of exports with unknown template

Until now, exports which were received, but their template was not known,
resulted in KeyError exceptions due to a missing key in the template dict.
With this release, these exports are buffered until a template export
updates this dict, and all buffered exports are again examined.

Release v0.7.0

Fixes #4
Fixes #5
This commit is contained in:
Dominik Pataky 2019-03-31 20:51:34 +02:00
parent 5c7ec0aef8
commit 85e6af4bd2
4 changed files with 92 additions and 44 deletions

View file

@ -1,10 +1,10 @@
#!/usr/bin/env python3
"""
Example analyzing script for saved exports (as JSON).
This file belongs to https://github.com/cooox/python-netflow-v9-softflowd.
Example analyzing script for saved exports (by main.py, as JSON).
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2017, 2018 Dominik Pataky <dev@bitkeks.eu>
Copyright 2017-2019 Dominik Pataky <dev@bitkeks.eu>
Licensed under MIT License. See LICENSE.
"""
@ -138,12 +138,10 @@ for export in sorted(data):
pending = flow
else:
con = Connection(pending, flow)
if con.size > 1024**2:
#~ if con.service == "http":
print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to"\
" {dest_host} ({dest})".format(
timestamp=timestamp, service=con.service.upper(),
src_host=con.hostnames.src, src=con.src,
dest_host=con.hostnames.dest, dest=con.dest,
size=con.human_size, duration=con.human_duration))
print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to"\
" {dest_host} ({dest})".format(
timestamp=timestamp, service=con.service.upper(),
src_host=con.hostnames.src, src=con.src,
dest_host=con.hostnames.dest, dest=con.dest,
size=con.human_size, duration=con.human_duration))
pending = None

105
main.py
View file

@ -2,9 +2,9 @@
"""
Example collector script for NetFlow v9.
This file belongs to https://github.com/cooox/python-netflow-v9-softflowd.
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2017, 2018 Dominik Pataky <dev@bitkeks.eu>
Copyright 2017-2019 Dominik Pataky <dev@bitkeks.eu>
Licensed under MIT License. See LICENSE.
"""
@ -25,33 +25,29 @@ ch.setFormatter(formatter)
logging.getLogger().addHandler(ch)
try:
from netflow.collector_v9 import ExportPacket
from netflow.collector_v9 import ExportPacket, TemplateNotRecognized
except ImportError:
logging.warn("Netflow v9 not installed as package! Running from directory.")
from src.netflow.collector_v9 import ExportPacket
logging.warning("Netflow v9 not installed as package! Running from directory.")
from src.netflow.collector_v9 import ExportPacket, TemplateNotRecognized
parser = argparse.ArgumentParser(description='A sample netflow collector.')
parser.add_argument('--host', type=str, default='',
help='collector listening address')
parser.add_argument('--port', '-p', type=int, default=2055,
help='collector listener port')
parser.add_argument('--file', '-o', type=str, dest='output_file',
parser = argparse.ArgumentParser(description="A sample netflow collector.")
parser.add_argument("--host", type=str, default="",
help="collector listening address")
parser.add_argument("--port", "-p", type=int, default=2055,
help="collector listener port")
parser.add_argument("--file", "-o", type=str, dest="output_file",
default="{}.json".format(int(time.time())),
help='collector export JSON file')
parser.add_argument('--debug', '-D', action='store_true',
help='Enable debug output')
help="collector export JSON file")
parser.add_argument("--debug", "-D", action="store_true",
help="Enable debug output")
class SoftflowUDPHandler(socketserver.BaseRequestHandler):
# We need to save the templates our NetFlow device
# send over time. Templates are not resended every
# time a flow is sent to the collector.
TEMPLATES = {}
@classmethod
def get_server(cls, host, port):
logging.info("Listening on interface {}:{}".format(host, port))
server = socketserver.UDPServer((host, port), cls)
return server
templates = {}
buffered = {}
@classmethod
def set_output_file(cls, path):
@ -60,36 +56,79 @@ class SoftflowUDPHandler(socketserver.BaseRequestHandler):
def handle(self):
if not os.path.exists(self.output_file):
with open(self.output_file, 'w') as fh:
fh.write(json.dumps({}))
json.dump({}, fh)
with open(self.output_file, 'r') as fh:
existing_data = json.loads(fh.read())
try:
existing_data = json.load(fh)
except json.decoder.JSONDecodeError as ex:
logging.error("Malformed JSON output file. Cannot read existing data, aborting.")
return
data = self.request[0]
host = self.client_address[0]
s = "Received data from {}, length {}".format(host, len(data))
logging.debug(s)
export = ExportPacket(data, self.TEMPLATES)
self.TEMPLATES.update(export.templates)
s = "Processed ExportPacket with {} flows.".format(export.header.count)
logging.debug(s)
logging.debug("Received data from {}, length {}".format(host, len(data)))
export = None
try:
export = ExportPacket(data, self.templates)
except TemplateNotRecognized:
self.buffered[time.time()] = data
logging.warning("Received data with unknown template, data stored in buffer!")
return
if not export:
logging.error("Error with exception handling while disecting export, export is None")
return
logging.debug("Processed ExportPacket with {} flows.".format(export.header.count))
logging.debug("Size of buffer: {}".format(len(self.buffered)))
# In case the export held some new templates
self.templates.update(export.templates)
remain_buffered = {}
processed = []
for timestamp, data in self.buffered.items():
try:
buffered_export = ExportPacket(data, self.templates)
processed.append(timestamp)
except TemplateNotRecognized:
remain_buffered[timestamp] = data
logging.debug("Template of buffered ExportPacket still not recognized")
continue
logging.debug("Processed buffered ExportPacket with {} flows.".format(buffered_export.header.count))
existing_data[timestamp] = [flow.data for flow in buffered_export.flows]
# Delete processed items from the buffer
for pro in processed:
del self.buffered[pro]
# Update the buffer
self.buffered.update(remain_buffered)
# Append new flows
existing_data[time.time()] = [flow.data for flow in export.flows]
with open(self.output_file, 'w') as fh:
fh.write(json.dumps(existing_data))
json.dump(existing_data, fh)
if __name__ == "__main__":
args = parser.parse_args()
SoftflowUDPHandler.set_output_file(args.output_file)
server = SoftflowUDPHandler.get_server(args.host, args.port)
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
output_file = args.output_file
SoftflowUDPHandler.set_output_file(output_file)
host = args.host
port = args.port
logging.info("Listening on interface {}:{}".format(host, port))
server = socketserver.UDPServer((host, port), SoftflowUDPHandler)
try:
logging.debug("Starting the NetFlow listener")
server.serve_forever(poll_interval=0.5)
@ -97,3 +136,5 @@ if __name__ == "__main__":
raise
except KeyboardInterrupt:
raise
server.server_close()

View file

@ -1,4 +1,5 @@
#!/usr/bin/env python3
from setuptools import setup, find_packages
import os
@ -6,7 +7,7 @@ data_files = [(d, [os.path.join(d, f) for f in files])
for d, folders, files in os.walk(os.path.join('src', 'config'))]
setup(name='netflow-v9',
version='0.6.1',
version='0.7.0',
description='NetFlow v9 parser and collector implemented in Python 3. Developed to be used with softflowd v0.9.9',
author='Dominik Pataky',
author_email='dev@bitkeks.eu',

View file

@ -181,6 +181,10 @@ class DataFlowSet:
self.flows = []
offset = 4
if self.template_id not in templates:
raise TemplateNotRecognized
template = templates[self.template_id]
# As the field lengths are variable V9 has padding to next 32 Bit
@ -320,3 +324,7 @@ class ExportPacket:
def __repr__(self):
return "<ExportPacket version {} counting {} records>".format(
self.header.version, self.header.count)
class TemplateNotRecognized(KeyError):
pass