Fix datarecord saving bug; cleanup; license
This commit is contained in:
parent
2d7c905d41
commit
546f96122f
|
@ -1,13 +1,14 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Netflow V9 collector implementation in Python 3.
|
||||
Netflow V9 collector and parser implementation in Python 3.
|
||||
Created for learning purposes and unsatisfying alternatives.
|
||||
|
||||
This script is specifically implemented in combination with softflowd.
|
||||
See https://github.com/djmdjm/softflowd
|
||||
|
||||
(C) 2016 Dominik Pataky <dom@netdecorator.org>
|
||||
Licensed under MIT License. See LICENSE.
|
||||
"""
|
||||
|
||||
from collections import namedtuple
|
||||
|
@ -105,34 +106,39 @@ field_types = {
|
|||
89: 'FORWARDING STATUS',
|
||||
}
|
||||
|
||||
# We need to save the templates our NetFlow device send over time. Templates
|
||||
# are not resended every time a flow is sent to the collector.
|
||||
_templates = {}
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.bind((HOST, PORT))
|
||||
print("Listening on interface {}:{}".format(HOST, PORT))
|
||||
|
||||
|
||||
class DataRecord:
|
||||
"""Should hold a 'data' dict with keys=field_type and value (in bytes).
|
||||
"""This is a 'flow' as we want it from our source. What it contains is
|
||||
variable in NetFlow V9, so to work with the data you have to analyze the
|
||||
data dict keys (which are integers and can be mapped with the field_types
|
||||
dict).
|
||||
|
||||
Should hold a 'data' dict with keys=field_type (integer) and value (in bytes).
|
||||
"""
|
||||
data = {}
|
||||
def __init__(self):
|
||||
self.data = {}
|
||||
|
||||
def __repr__(self):
|
||||
return "<DataRecord with data: {}>".format(self.data)
|
||||
|
||||
|
||||
class DataFlowSet:
|
||||
"""
|
||||
"""Holds one or multiple DataRecord which are all defined after the same
|
||||
template. This template is referenced in the field 'flowset_id' of this
|
||||
DataFlowSet and must not be zero.
|
||||
"""
|
||||
def __init__(self, data, templates):
|
||||
pack = struct.unpack('!HH', data[:4])
|
||||
|
||||
self.template_id = pack[0] # flowset_id is reference to template_id
|
||||
self.template_id = pack[0] # flowset_id is reference to a template_id
|
||||
self.length = pack[1]
|
||||
self.flows = []
|
||||
|
||||
offset = 4
|
||||
template = templates[self.template_id]
|
||||
padding_size = 4 - (self.length % 4)
|
||||
|
||||
# As the field lengths are variable V9 has padding to next 32 Bit
|
||||
padding_size = 4 - (self.length % 4) # 4 Byte
|
||||
|
||||
while offset <= (self.length - padding_size):
|
||||
new_record = DataRecord()
|
||||
|
@ -142,23 +148,14 @@ class DataFlowSet:
|
|||
fkey = field_types[field.field_type]
|
||||
fdata = None
|
||||
|
||||
# The length of the value byte slice is defined in the template
|
||||
dataslice = data[offset:offset+flen]
|
||||
|
||||
if flen == 1:
|
||||
fdata = struct.unpack('!B', dataslice)
|
||||
elif flen == 2:
|
||||
fdata = struct.unpack('!H', dataslice)
|
||||
elif flen == 4:
|
||||
fdata = struct.unpack('!I', dataslice)
|
||||
elif flen == 8:
|
||||
fdata = struct.unpack('!Q', dataslice)
|
||||
elif flen == 16:
|
||||
# IPv6 address
|
||||
fdata = int.from_bytes(dataslice, byteorder='big')
|
||||
else:
|
||||
raise ValueError("Length of field was not 1/2/4/8/16")
|
||||
# Better solution than struct.unpack with variable field length
|
||||
fdata = int.from_bytes(dataslice, byteorder='big')
|
||||
|
||||
new_record.data[fkey] = fdata
|
||||
|
||||
offset += flen
|
||||
|
||||
self.flows.append(new_record)
|
||||
|
@ -177,7 +174,7 @@ class TemplateField:
|
|||
)
|
||||
|
||||
|
||||
class Template:
|
||||
class TemplateRecord:
|
||||
"""A template record contained in a TemplateFlowSet.
|
||||
"""
|
||||
def __init__(self, template_id, field_count, fields):
|
||||
|
@ -186,7 +183,7 @@ class Template:
|
|||
self.fields = fields
|
||||
|
||||
def __repr__(self):
|
||||
return "<Template {} with {} fields: {}>".format(
|
||||
return "<TemplateRecord {} with {} fields: {}>".format(
|
||||
self.template_id, self.field_count,
|
||||
' '.join([field_types[field.field_type] for field in self.fields])
|
||||
)
|
||||
|
@ -221,7 +218,7 @@ class TemplateFlowSet:
|
|||
fields.append(field)
|
||||
|
||||
# Create a tempalte object with all collected data
|
||||
template = Template(template_id, field_count, fields)
|
||||
template = TemplateRecord(template_id, field_count, fields)
|
||||
|
||||
# Append the new template to the global templates list
|
||||
self.templates[template.template_id] = template
|
||||
|
@ -247,9 +244,9 @@ class Header:
|
|||
class ExportPacket:
|
||||
"""The flow record holds the header and all template and data flowsets.
|
||||
"""
|
||||
def __init__(self, data):
|
||||
def __init__(self, data, templates):
|
||||
self.header = Header(data)
|
||||
self.templates = {}
|
||||
self.templates = templates
|
||||
self.flows = []
|
||||
|
||||
offset = 20
|
||||
|
@ -257,10 +254,10 @@ class ExportPacket:
|
|||
flowset_id = struct.unpack('!H', data[offset:offset+2])[0]
|
||||
if flowset_id == 0: # TemplateFlowSet always have id 0
|
||||
tfs = TemplateFlowSet(data[offset:])
|
||||
_templates.update(tfs.templates)
|
||||
self.templates.update(tfs.templates)
|
||||
offset += tfs.length
|
||||
else:
|
||||
dfs = DataFlowSet(data[offset:], _templates)
|
||||
dfs = DataFlowSet(data[offset:], self.templates)
|
||||
self.flows += dfs.flows
|
||||
offset += dfs.length
|
||||
|
||||
|
@ -268,11 +265,22 @@ class ExportPacket:
|
|||
return "<ExportPacket version {} counting {} records>".format(
|
||||
self.header.version, self.header.count)
|
||||
|
||||
while 1:
|
||||
(data, sender) = sock.recvfrom(8192)
|
||||
print("Received data from {}, length {}".format(sender, len(data)))
|
||||
|
||||
export = ExportPacket(data)
|
||||
print(export)
|
||||
for r in export.flows:
|
||||
print(r.data)
|
||||
if __name__ == "__main__":
|
||||
# We need to save the templates our NetFlow device send over time. Templates
|
||||
# are not resended every time a flow is sent to the collector.
|
||||
_templates = {}
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.bind((HOST, PORT))
|
||||
print("Listening on interface {}:{}".format(HOST, PORT))
|
||||
|
||||
counter = 0
|
||||
while 1:
|
||||
(data, sender) = sock.recvfrom(8192)
|
||||
print("Received data from {}, length {}".format(sender, len(data)))
|
||||
|
||||
export = ExportPacket(data, _templates)
|
||||
_templates.update(export.templates)
|
||||
|
||||
print("Processed ExportPacket with {} flows.".format(export.header.count))
|
||||
|
|
Loading…
Reference in a new issue