Add support for v1 and v5 NetFlow packets

Thanks to @alieissa for the initial v1 and v5 code
This commit is contained in:
Carey Metcalfe 2019-10-16 23:23:51 -04:00
parent 186b648c4d
commit 96817f1f8d
6 changed files with 233 additions and 19 deletions

View file

@ -26,6 +26,16 @@ Pair = namedtuple('Pair', ['src', 'dest'])
def resolve_hostname(ip): def resolve_hostname(ip):
return socket.getfqdn(ip) return socket.getfqdn(ip)
def fallback(d, keys):
for k in keys:
try:
return d[k]
except KeyError:
pass
raise KeyError(", ".join(keys))
class Connection: class Connection:
"""Connection model for two flows. """Connection model for two flows.
The direction of the data flow can be seen by looking at the size. The direction of the data flow can be seen by looking at the size.
@ -37,7 +47,10 @@ class Connection:
if not flow1 or not flow2: if not flow1 or not flow2:
raise Exception("A connection requires two flows") raise Exception("A connection requires two flows")
if flow1['IN_BYTES'] >= flow2['IN_BYTES']: # Assume the size that sent the most data is the source
size1 = fallback(flow1, ['IN_BYTES', 'IN_OCTETS'])
size2 = fallback(flow2, ['IN_BYTES', 'IN_OCTETS'])
if size1 >= size2:
src = flow1 src = flow1
dest = flow2 dest = flow2
else: else:
@ -47,9 +60,9 @@ class Connection:
ips = self.get_ips(src) ips = self.get_ips(src)
self.src = ips.src self.src = ips.src
self.dest = ips.dest self.dest = ips.dest
self.src_port = src['L4_SRC_PORT'] self.src_port = fallback(src, ['L4_SRC_PORT', 'SRC_PORT'])
self.dest_port = src['L4_DST_PORT'] self.dest_port = fallback(src, ['L4_DST_PORT', 'DST_PORT'])
self.size = src['IN_BYTES'] self.size = fallback(src, ['IN_BYTES', 'IN_OCTETS'])
# Duration is given in milliseconds # Duration is given in milliseconds
self.duration = src['LAST_SWITCHED'] - src['FIRST_SWITCHED'] self.duration = src['LAST_SWITCHED'] - src['FIRST_SWITCHED']

20
main.py
View file

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Example collector script for NetFlow v9. Example collector script for NetFlow v1, v5, and v9.
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2017-2019 Dominik Pataky <dev@bitkeks.eu> Copyright 2017-2019 Dominik Pataky <dev@bitkeks.eu>
@ -18,7 +18,7 @@ import socketserver
import threading import threading
import time import time
from netflow.v9 import ExportPacket, TemplateNotRecognized from netflow import parse_packet, TemplateNotRecognized, UnknownNetFlowVersion
__log__ = logging.getLogger(__name__) __log__ = logging.getLogger(__name__)
@ -26,6 +26,7 @@ __log__ = logging.getLogger(__name__)
# Amount of time to wait before dropping an undecodable ExportPacket # Amount of time to wait before dropping an undecodable ExportPacket
PACKET_TIMEOUT = 60 * 60 PACKET_TIMEOUT = 60 * 60
# TODO: Add source IP
RawPacket = namedtuple('RawPacket', ['ts', 'data']) RawPacket = namedtuple('RawPacket', ['ts', 'data'])
class QueuingRequestHandler(socketserver.BaseRequestHandler): class QueuingRequestHandler(socketserver.BaseRequestHandler):
@ -110,24 +111,27 @@ class NetFlowListener(threading.Thread):
continue continue
try: try:
export = ExportPacket(pkt.data, templates) export = parse_packet(pkt.data, templates)
except UnknownNetFlowVersion as e:
__log__.error("%s, ignoring the packet", e)
continue
except TemplateNotRecognized: except TemplateNotRecognized:
if time.time() - pkt.ts > PACKET_TIMEOUT: if time.time() - pkt.ts > PACKET_TIMEOUT:
__log__.warning("Dropping an old and undecodable ExportPacket") __log__.warning("Dropping an old and undecodable v9 ExportPacket")
else: else:
to_retry.append(pkt) to_retry.append(pkt)
__log__.debug("Failed to decode a ExportPacket - will " __log__.debug("Failed to decode a v9 ExportPacket - will "
"re-attempt when a new template is discovered") "re-attempt when a new template is discovered")
continue continue
__log__.debug("Processed an ExportPacket with %d flows.", __log__.debug("Processed a v%d ExportPacket with %d flows.",
export.header.version, export.header.count) export.header.version, export.header.count)
# If any new templates were discovered, dump the unprocessable # If any new templates were discovered, dump the unprocessable
# data back into the queue and try to decode them again # data back into the queue and try to decode them again
if export.contains_new_templates and to_retry: if (export.header.version == 9 and export.contains_new_templates and to_retry):
__log__.debug("Received new template(s)") __log__.debug("Received new template(s)")
__log__.debug("Will re-attempt to decode %d old ExportPackets", __log__.debug("Will re-attempt to decode %d old v9 ExportPackets",
len(to_retry)) len(to_retry))
for p in to_retry: for p in to_retry:
self.input.put(p) self.input.put(p)

View file

@ -0,0 +1,34 @@
#!/usr/bin/env python
import struct
from netflow.v1 import V1ExportPacket
from netflow.v5 import V5ExportPacket
from netflow.v9 import V9ExportPacket, TemplateNotRecognized
__all__ = ["TemplateNotRecognized", "UnknownNetFlowVersion", "parse_packet"]
class UnknownNetFlowVersion(Exception):
def __init__(self, data, version):
self.data = data
self.version = version
r = repr(data)
data_str = ("{:.25}..." if len(r) >= 28 else "{}").format(r)
super().__init__(
"Unknown NetFlow version {} for data {}".format(version, data_str)
)
def get_netflow_version(data):
return struct.unpack('!H', data[:2])[0]
def parse_packet(data, templates):
version = get_netflow_version(data)
if version == 1:
return V1ExportPacket(data)
elif version == 5:
return V5ExportPacket(data)
elif version == 9:
return V9ExportPacket(data, templates)
raise UnknownNetFlowVersion(data, version)

74
netflow/v1.py Normal file
View file

@ -0,0 +1,74 @@
#!/usr/bin/env python3
"""
Netflow V1 collector and parser implementation in Python 3.
Created purely for fun. Not battled tested nor will it be.
Reference https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html
This script is specifically implemented in combination with softflowd.
See https://github.com/djmdjm/softflowd
"""
import struct
class DataFlow:
"""Holds one v1 DataRecord"""
length = 48
def __init__(self, data):
self.data = {}
self.data['IPV4_SRC_ADDR'] = struct.unpack('!I', data[:4])[0]
self.data['IPV4_DST_ADDR'] = struct.unpack('!I', data[4:8])[0]
self.data['NEXT_HOP'] = struct.unpack('!I', data[8:12])[0]
self.data['INPUT'] = struct.unpack('!H', data[12:14])[0]
self.data['OUTPUT'] = struct.unpack('!H', data[14:16])[0]
self.data['IN_PACKETS'] = struct.unpack('!I', data[16:20])[0]
self.data['IN_OCTETS'] = struct.unpack('!I', data[20:24])[0]
self.data['FIRST_SWITCHED'] = struct.unpack('!I', data[24:28])[0]
self.data['LAST_SWITCHED'] = struct.unpack('!I', data[28:32])[0]
self.data['SRC_PORT'] = struct.unpack('!H', data[32:34])[0]
self.data['DST_PORT'] = struct.unpack('!H', data[34:36])[0]
# Word at 36 is used for padding
self.data['PROTO'] = struct.unpack('!B', data[38:39])[0]
self.data['TOS'] = struct.unpack('!B', data[39:40])[0]
self.data['TCP_FLAGS'] = struct.unpack('!B', data[40:41])[0]
# Data at 41-48 is padding
def __repr__(self):
return "<DataRecord with data {}>".format(self.data)
class Header:
"""The header of the V1ExportPacket"""
length = 16
def __init__(self, data):
header = struct.unpack('!HHIII', data[:self.length])
self.version = header[0]
self.count = header[1]
self.uptime = header[2]
self.timestamp = header[3]
self.timestamp_nano = header[4]
class V1ExportPacket:
"""The flow record holds the header and data flowsets."""
def __init__(self, data):
self.flows = []
self.header = Header(data)
offset = self.header.length
for flow_count in range(0, self.header.count):
flow = V1DataFlow(data[offset:])
self.flows.append(flow)
offset += flow.length
def __repr__(self):
return "<ExportPacket v{} with {} records>".format(
self.header.version, self.header.count)

82
netflow/v5.py Normal file
View file

@ -0,0 +1,82 @@
#!/usr/bin/env python3
"""
Netflow V5 collector and parser implementation in Python 3.
Created purely for fun. Not battled tested nor will it be.
Reference: https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html
This script is specifically implemented in combination with softflowd.
See https://github.com/djmdjm/softflowd
"""
import struct
class DataFlow:
"""Holds one v5 DataRecord"""
length = 48
def __init__(self, data):
self.data = {}
self.data['IPV4_SRC_ADDR'] = struct.unpack('!I', data[:4])[0]
self.data['IPV4_DST_ADDR'] = struct.unpack('!I', data[4:8])[0]
self.data['NEXT_HOP'] = struct.unpack('!I', data[8:12])[0]
self.data['INPUT'] = struct.unpack('!H', data[12:14])[0]
self.data['OUTPUT'] = struct.unpack('!H', data[14:16])[0]
self.data['IN_PACKETS'] = struct.unpack('!I', data[16:20])[0]
self.data['IN_OCTETS'] = struct.unpack('!I', data[20:24])[0]
self.data['FIRST_SWITCHED'] = struct.unpack('!I', data[24:28])[0]
self.data['LAST_SWITCHED'] = struct.unpack('!I', data[28:32])[0]
self.data['SRC_PORT'] = struct.unpack('!H', data[32:34])[0]
self.data['DST_PORT'] = struct.unpack('!H', data[34:36])[0]
# Byte 36 is used for padding
self.data['TCP_FLAGS'] = struct.unpack('!B', data[37:38])[0]
self.data['PROTO'] = struct.unpack('!B', data[38:39])[0]
self.data['TOS'] = struct.unpack('!B', data[39:40])[0]
self.data['SRC_AS'] = struct.unpack('!H', data[40:42])[0]
self.data['DST_AS'] = struct.unpack('!H', data[42:44])[0]
self.data['SRC_MASK'] = struct.unpack('!B', data[44:45])[0]
self.data['DST_MASK'] = struct.unpack('!B', data[45:46])[0]
# Word 46 is used for padding
def __repr__(self):
return "<DataRecord with data {}>".format(self.data)
class Header:
"""The header of the V5ExportPacket"""
length = 24
def __init__(self, data):
header = struct.unpack('!HHIIIIBBH', data[:self.length])
self.version = header[0]
self.count = header[1]
self.uptime = header[2]
self.timestamp = header[3]
self.timestamp_nano = header[4]
self.sequence = header[5]
self.engine_type = header[6]
self.engine_id = header[7]
self.sampling_interval = header[8]
class V5ExportPacket:
"""The flow record holds the header and data flowsets."""
def __init__(self, data):
self.flows = []
self.header = Header(data)
offset = self.header.length
for flow_count in range(0, self.header.count):
flow = DataFlow(data[offset:])
self.flows.append(flow)
offset += flow.length
def __repr__(self):
return "<ExportPacket v{} with {} records>".format(
self.header.version, self.header.count)

View file

@ -4,6 +4,8 @@
Netflow V9 collector and parser implementation in Python 3. Netflow V9 collector and parser implementation in Python 3.
Created for learning purposes and unsatisfying alternatives. Created for learning purposes and unsatisfying alternatives.
Reference: https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html
This script is specifically implemented in combination with softflowd. This script is specifically implemented in combination with softflowd.
See https://github.com/djmdjm/softflowd See https://github.com/djmdjm/softflowd
@ -258,7 +260,7 @@ class TemplateFlowSet:
offset = 4 # Skip header offset = 4 # Skip header
# Iterate through all template records in this template flowset # Iterate through all template records in this template flowset
while offset != self.length: while offset < self.length:
pack = struct.unpack('!HH', data[offset:offset+4]) pack = struct.unpack('!HH', data[offset:offset+4])
template_id = pack[0] template_id = pack[0]
field_count = pack[1] field_count = pack[1]
@ -288,9 +290,12 @@ class TemplateFlowSet:
class Header: class Header:
"""The header of the ExportPacket.""" """The header of the V9ExportPacket"""
length = 20
def __init__(self, data): def __init__(self, data):
pack = struct.unpack('!HHIIII', data[:20]) pack = struct.unpack('!HHIIII', data[:self.length])
self.version = pack[0] self.version = pack[0]
self.count = pack[1] # not sure if correct. softflowd: no of flows self.count = pack[1] # not sure if correct. softflowd: no of flows
@ -300,15 +305,16 @@ class Header:
self.source_id = pack[5] self.source_id = pack[5]
class ExportPacket: class V9ExportPacket:
"""The flow record holds the header and all template and data flowsets.""" """The flow record holds the header and all template and data flowsets."""
def __init__(self, data, templates): def __init__(self, data, templates):
self.header = Header(data) self.header = Header(data)
self.templates = templates self.templates = templates
self._new_templates = False self._new_templates = False
self.flows = [] self.flows = []
offset = 20 offset = self.header.length
while offset != len(data): while offset != len(data):
flowset_id = struct.unpack('!H', data[offset:offset+2])[0] flowset_id = struct.unpack('!H', data[offset:offset+2])[0]
if flowset_id == 0: # TemplateFlowSet always have id 0 if flowset_id == 0: # TemplateFlowSet always have id 0
@ -331,5 +337,6 @@ class ExportPacket:
return self._new_templates return self._new_templates
def __repr__(self): def __repr__(self):
return "<ExportPacket version {} counting {} records>".format( s = " and new template(s)" if self.contains_new_templates else ""
self.header.version, self.header.count) return "<ExportPacket v{} with {} records{}>".format(
self.header.version, self.header.count, s)