#!/usr/bin/env python3 """ Netflow V9 collector and parser implementation in Python 3. This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. Created for learning purposes and unsatisfying alternatives. Reference: https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html This script is specifically implemented in combination with softflowd. See https://github.com/djmdjm/softflowd Copyright 2016-2020 Dominik Pataky Licensed under MIT License. See LICENSE. """ import ipaddress import struct import sys from .ipfix import IPFIXFieldTypes, IPFIXDataTypes __all__ = ["V9DataFlowSet", "V9DataRecord", "V9ExportPacket", "V9Header", "V9TemplateField", "V9TemplateFlowSet", "V9TemplateNotRecognized", "V9TemplateRecord", "V9OptionsTemplateFlowSet", "V9OptionsTemplateRecord", "V9OptionsDataRecord"] V9_FIELD_TYPES_CONTAINING_IP = [8, 12, 15, 18, 27, 28, 62, 63] V9_FIELD_TYPES = { 0: 'UNKNOWN_FIELD_TYPE', # fallback for unknown field types # Cisco specs for NetFlow v9 # https://tools.ietf.org/html/rfc3954 # https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html 1: 'IN_BYTES', 2: 'IN_PKTS', 3: 'FLOWS', 4: 'PROTOCOL', 5: 'SRC_TOS', 6: 'TCP_FLAGS', 7: 'L4_SRC_PORT', 8: 'IPV4_SRC_ADDR', 9: 'SRC_MASK', 10: 'INPUT_SNMP', 11: 'L4_DST_PORT', 12: 'IPV4_DST_ADDR', 13: 'DST_MASK', 14: 'OUTPUT_SNMP', 15: 'IPV4_NEXT_HOP', 16: 'SRC_AS', 17: 'DST_AS', 18: 'BGP_IPV4_NEXT_HOP', 19: 'MUL_DST_PKTS', 20: 'MUL_DST_BYTES', 21: 'LAST_SWITCHED', 22: 'FIRST_SWITCHED', 23: 'OUT_BYTES', 24: 'OUT_PKTS', 25: 'MIN_PKT_LNGTH', 26: 'MAX_PKT_LNGTH', 27: 'IPV6_SRC_ADDR', 28: 'IPV6_DST_ADDR', 29: 'IPV6_SRC_MASK', 30: 'IPV6_DST_MASK', 31: 'IPV6_FLOW_LABEL', 32: 'ICMP_TYPE', 33: 'MUL_IGMP_TYPE', 34: 'SAMPLING_INTERVAL', 35: 'SAMPLING_ALGORITHM', 36: 'FLOW_ACTIVE_TIMEOUT', 37: 'FLOW_INACTIVE_TIMEOUT', 38: 'ENGINE_TYPE', 39: 'ENGINE_ID', 40: 'TOTAL_BYTES_EXP', 41: 'TOTAL_PKTS_EXP', 42: 'TOTAL_FLOWS_EXP', # 43 vendor proprietary 44: 'IPV4_SRC_PREFIX', 45: 'IPV4_DST_PREFIX', 46: 'MPLS_TOP_LABEL_TYPE', 47: 'MPLS_TOP_LABEL_IP_ADDR', 48: 'FLOW_SAMPLER_ID', 49: 'FLOW_SAMPLER_MODE', 50: 'NTERVAL', # 51 vendor proprietary 52: 'MIN_TTL', 53: 'MAX_TTL', 54: 'IPV4_IDENT', 55: 'DST_TOS', 56: 'IN_SRC_MAC', 57: 'OUT_DST_MAC', 58: 'SRC_VLAN', 59: 'DST_VLAN', 60: 'IP_PROTOCOL_VERSION', 61: 'DIRECTION', 62: 'IPV6_NEXT_HOP', 63: 'BPG_IPV6_NEXT_HOP', 64: 'IPV6_OPTION_HEADERS', # 65-69 vendor proprietary 70: 'MPLS_LABEL_1', 71: 'MPLS_LABEL_2', 72: 'MPLS_LABEL_3', 73: 'MPLS_LABEL_4', 74: 'MPLS_LABEL_5', 75: 'MPLS_LABEL_6', 76: 'MPLS_LABEL_7', 77: 'MPLS_LABEL_8', 78: 'MPLS_LABEL_9', 79: 'MPLS_LABEL_10', 80: 'IN_DST_MAC', 81: 'OUT_SRC_MAC', 82: 'IF_NAME', 83: 'IF_DESC', 84: 'SAMPLER_NAME', 85: 'IN_PERMANENT_BYTES', 86: 'IN_PERMANENT_PKTS', # 87 vendor property 88: 'FRAGMENT_OFFSET', 89: 'FORWARDING_STATUS', 90: 'MPLS_PAL_RD', 91: 'MPLS_PREFIX_LEN', # Number of consecutive bits in the MPLS prefix length. 92: 'SRC_TRAFFIC_INDEX', # BGP Policy Accounting Source Traffic Index 93: 'DST_TRAFFIC_INDEX', # BGP Policy Accounting Destination Traffic Index 94: 'APPLICATION_DESCRIPTION', # Application description 95: 'APPLICATION_TAG', # 8 bits of engine ID, followed by n bits of classification 96: 'APPLICATION_NAME', # Name associated with a classification 98: 'postipDiffServCodePoint', # The value of a Differentiated Services Code Point (DSCP) # encoded in the Differentiated Services Field, after modification 99: 'replication_factor', # Multicast replication factor 100: 'DEPRECATED', # DEPRECATED 102: 'layer2packetSectionOffset', # Layer 2 packet section offset. Potentially a generic offset 103: 'layer2packetSectionSize', # Layer 2 packet section size. Potentially a generic size 104: 'layer2packetSectionData', # Layer 2 packet section data # 105-127 reserved for future use by Cisco # ASA extensions # https://www.cisco.com/c/en/us/td/docs/security/asa/special/netflow/guide/asa_netflow.html 148: 'NF_F_CONN_ID', # An identifier of a unique flow for the device 176: 'NF_F_ICMP_TYPE', # ICMP type value 177: 'NF_F_ICMP_CODE', # ICMP code value 178: 'NF_F_ICMP_TYPE_IPV6', # ICMP IPv6 type value 179: 'NF_F_ICMP_CODE_IPV6', # ICMP IPv6 code value 225: 'NF_F_XLATE_SRC_ADDR_IPV4', # Post NAT Source IPv4 Address 226: 'NF_F_XLATE_DST_ADDR_IPV4', # Post NAT Destination IPv4 Address 227: 'NF_F_XLATE_SRC_PORT', # Post NATT Source Transport Port 228: 'NF_F_XLATE_DST_PORT', # Post NATT Destination Transport Port 281: 'NF_F_XLATE_SRC_ADDR_IPV6', # Post NAT Source IPv6 Address 282: 'NF_F_XLATE_DST_ADDR_IPV6', # Post NAT Destination IPv6 Address 233: 'NF_F_FW_EVENT', # High-level event code 33002: 'NF_F_FW_EXT_EVENT', # Extended event code 323: 'NF_F_EVENT_TIME_MSEC', # The time that the event occurred, which comes from IPFIX 152: 'NF_F_FLOW_CREATE_TIME_MSEC', 231: 'NF_F_FWD_FLOW_DELTA_BYTES', # The delta number of bytes from source to destination 232: 'NF_F_REV_FLOW_DELTA_BYTES', # The delta number of bytes from destination to source 33000: 'NF_F_INGRESS_ACL_ID', # The input ACL that permitted or denied the flow 33001: 'NF_F_EGRESS_ACL_ID', # The output ACL that permitted or denied a flow 40000: 'NF_F_USERNAME', # AAA username # PaloAlto PAN-OS 8.0 # https://www.paloaltonetworks.com/documentation/80/pan-os/pan-os/monitoring/netflow-monitoring/netflow-templates 346: 'PANOS_privateEnterpriseNumber', 56701: 'PANOS_APPID', 56702: 'PANOS_USERID' } V9_SCOPE_TYPES = { 1: "System", 2: "Interface", 3: "Line Card", 4: "Cache", 5: "Template" } class V9TemplateNotRecognized(KeyError): pass class V9DataRecord: """This is a 'flow' as we want it from our source. What it contains is variable in NetFlow V9, so to work with the data you have to analyze the data dict keys (which are integers and can be mapped with the FIELD_TYPES dict). Should hold a 'data' dict with keys=field_type (integer) and value (in bytes). """ def __init__(self): self.data = {} def __repr__(self): return "".format(self.data) class V9DataFlowSet: """Holds one or multiple DataRecord which are all defined after the same template. This template is referenced in the field 'flowset_id' of this DataFlowSet and must not be zero. """ def __init__(self, data, template): pack = struct.unpack('!HH', data[:4]) self.template_id = pack[0] # flowset_id is reference to a template_id self.length = pack[1] self.flows = [] offset = 4 # As the field lengths are variable V9 has padding to next 32 Bit padding_size = 4 - (self.length % 4) # 4 Byte # For performance reasons, we use struct.unpack to get individual values. Here # we prepare the format string for parsing it. The format string is based on the template fields and their # lengths. The string can then be re-used for every data record in the data stream struct_format = '!' struct_len = 0 for field in template.fields: # The length of the value byte slice is defined in the template flen = field.field_length if flen == 4: struct_format += 'L' elif flen == 2: struct_format += 'H' elif flen == 1: struct_format += 'B' else: struct_format += '%ds' % flen struct_len += flen while offset <= (self.length - padding_size): # Here we actually unpack the values, the struct format string is used in every data record # iteration, until the final offset reaches the end of the whole data stream unpacked_values = struct.unpack(struct_format, data[offset:offset + struct_len]) new_record = V9DataRecord() for field, value in zip(template.fields, unpacked_values): flen = field.field_length fkey = V9_FIELD_TYPES[field.field_type] # Special handling of IP addresses to convert integers to strings to not lose precision in dump # TODO: might only be needed for IPv6 if field.field_type in V9_FIELD_TYPES_CONTAINING_IP: try: ip = ipaddress.ip_address(value) except ValueError: print("IP address could not be parsed: {}".format(repr(value))) continue new_record.data[fkey] = ip.compressed elif flen in (1, 2, 4): # These values are already converted to numbers by struct.unpack: new_record.data[fkey] = value else: # Caveat: this code assumes little-endian system (like x86) if sys.byteorder != "little": print("v9.py uses bit shifting for little endianness. Your processor is not little endian") fdata = 0 for idx, byte in enumerate(reversed(bytearray(value))): fdata += byte << (idx * 8) new_record.data[fkey] = fdata offset += flen new_record.__dict__.update(new_record.data) self.flows.append(new_record) def __repr__(self): return "" \ .format(self.template_id, self.length, len(self.flows)) class V9TemplateField: """A field with type identifier and length. """ def __init__(self, field_type, field_length): self.field_type = field_type # integer self.field_length = field_length # bytes def __repr__(self): return "".format( self.field_type, V9_FIELD_TYPES[self.field_type], self.field_length) class V9TemplateRecord: """A template record contained in a TemplateFlowSet. """ def __init__(self, template_id, field_count, fields: list): self.template_id = template_id self.field_count = field_count self.fields = fields def __repr__(self): return "".format( self.template_id, self.field_count, ' '.join([V9_FIELD_TYPES[field.field_type] for field in self.fields])) class V9OptionsDataRecord: def __init__(self): self.scopes = {} self.data = {} def __repr__(self): return "".format(self.scopes.keys(), self.data.keys()) class V9OptionsTemplateRecord: """An options template record contained in an options template flowset. """ def __init__(self, template_id, scope_fields: dict, option_fields: dict): self.template_id = template_id self.scope_fields = scope_fields self.option_fields = option_fields def __repr__(self): return "".format( self.scope_fields.keys(), self.option_fields.keys()) class V9OptionsTemplateFlowSet: """An options template flowset. > Each Options Template FlowSet MAY contain multiple Options Template Records. Scope field types range from 1 to 5: 1 System 2 Interface 3 Line Card 4 Cache 5 Template """ def __init__(self, data: bytes): pack = struct.unpack('!HH', data[:4]) self.flowset_id = pack[0] # always 1 self.flowset_length = pack[1] # length of this flowset self.templates = {} offset = 4 while offset < self.flowset_length: pack = struct.unpack("!HHH", data[offset:offset + 6]) # options template header template_id = pack[0] # value above 255 option_scope_length = pack[1] options_length = pack[2] offset += 6 # Fetch all scope fields (most probably only one field) scopes = {} # Holds "type: length" key-value pairs if option_scope_length % 4 != 0 or options_length % 4 != 0: raise ValueError(option_scope_length, options_length) for scope_counter in range(option_scope_length // 4): # example: option_scope_length = 4 means one scope pack = struct.unpack("!HH", data[offset:offset + 4]) scope_field_type = pack[0] # values range from 1 to 5 scope_field_length = pack[1] scopes[scope_field_type] = scope_field_length offset += 4 # Fetch all option fields options = {} # same for option_counter in range(options_length // 4): # now counting the options pack = struct.unpack("!HH", data[offset:offset + 4]) option_field_type = pack[0] option_field_length = pack[1] options[option_field_type] = option_field_length offset += 4 optionstemplate = V9OptionsTemplateRecord(template_id, scopes, options) self.templates[template_id] = optionstemplate # handle padding and add offset if needed if offset % 4 == 2: offset += 2 def __repr__(self): return "".format(len(self.templates), self.templates.keys()) class V9OptionsDataFlowset: """An options data flowset with option data records """ def __init__(self, data: bytes, template: V9OptionsTemplateRecord): pack = struct.unpack('!HH', data[:4]) self.template_id = pack[0] self.length = pack[1] self.option_data_records = [] offset = 4 while offset < self.length: new_options_record = V9OptionsDataRecord() for scope_type, length in template.scope_fields.items(): type_name = V9_SCOPE_TYPES.get(scope_type, scope_type) # Either name, or unknown int value = int.from_bytes(data[offset:offset + length], 'big') # TODO: is this always integer? new_options_record.scopes[type_name] = value offset += length for field_type, length in template.option_fields.items(): type_name = V9_FIELD_TYPES.get(field_type, None) is_bytes = False if not type_name: # Cisco refers to the IANA IPFIX table for types >256... iana_type = IPFIXFieldTypes.by_id(field_type) # try to get from IPFIX types if iana_type: type_name = iana_type.name is_bytes = IPFIXDataTypes.is_bytes(iana_type) if not type_name: raise ValueError value = None if is_bytes: value = data[offset:offset + length] else: value = int.from_bytes(data[offset:offset + length], 'big') new_options_record.data[type_name] = value offset += length self.option_data_records.append(new_options_record) if offset % 4 == 2: offset += 2 class V9TemplateFlowSet: """A template flowset, which holds an id that is used by data flowsets to reference back to the template. The template then has fields which hold identifiers of data types (eg "IP_SRC_ADDR", "PKTS"..). This way the flow sender can dynamically put together data flowsets. """ def __init__(self, data): pack = struct.unpack('!HH', data[:4]) self.flowset_id = pack[0] # always 0 self.length = pack[1] # total length including this header in bytes self.templates = {} offset = 4 # Skip header # Iterate through all template records in this template flowset while offset < self.length: pack = struct.unpack('!HH', data[offset:offset + 4]) template_id = pack[0] field_count = pack[1] fields = [] for field in range(field_count): # Get all fields of this template offset += 4 field_type, field_length = struct.unpack('!HH', data[offset:offset + 4]) if field_type not in V9_FIELD_TYPES: field_type = 0 # Set field_type to UNKNOWN_FIELD_TYPE as fallback field = V9TemplateField(field_type, field_length) fields.append(field) # Create a template object with all collected data template = V9TemplateRecord(template_id, field_count, fields) # Append the new template to the global templates list self.templates[template.template_id] = template # Set offset to next template_id field offset += 4 def __repr__(self): return "" \ .format(self.flowset_id, self.length, self.templates.keys()) class V9Header: """The header of the V9ExportPacket """ length = 20 def __init__(self, data): pack = struct.unpack('!HHIIII', data[:self.length]) self.version = pack[0] self.count = pack[1] # not sure if correct. softflowd: no of flows self.uptime = pack[2] self.timestamp = pack[3] self.sequence = pack[4] self.source_id = pack[5] def to_dict(self): return self.__dict__ class V9ExportPacket: """The flow record holds the header and all template and data flowsets. TODO: refactor into two loops: first get all contained flowsets and examine template flowsets first. Then data flowsets. """ def __init__(self, data: bytes, templates: dict): self.header = V9Header(data) self._templates = templates self._new_templates = False self._flows = [] self._options = [] offset = self.header.length skipped_flowsets_offsets = [] while offset != len(data): pack = struct.unpack('!HH', data[offset:offset + 4]) flowset_id = pack[0] # = template id flowset_length = pack[1] # Data template flowsets if flowset_id == 0: # TemplateFlowSet always have id 0 tfs = V9TemplateFlowSet(data[offset:]) # Update the templates with the provided templates, even if they are the same for id_, template in tfs.templates.items(): if id_ not in self._templates: self._new_templates = True self._templates[id_] = template if tfs.length == 0: break offset += tfs.length continue # Option template flowsets elif flowset_id == 1: # Option templates always use ID 1 otfs = V9OptionsTemplateFlowSet(data[offset:]) for id_, template in otfs.templates.items(): if id_ not in self._templates: self._new_templates = True self._templates[id_] = template offset += otfs.flowset_length if otfs.flowset_length == 0: break continue # Data / option flowsets # First, check if template is known if flowset_id not in self._templates: # Could not be parsed, continue to check for templates skipped_flowsets_offsets.append(offset) offset += flowset_length if flowset_length == 0: break continue matched_template = self._templates[flowset_id] if isinstance(matched_template, V9TemplateRecord): dfs = V9DataFlowSet(data[offset:], matched_template) self._flows += dfs.flows if dfs.length == 0: break offset += dfs.length elif isinstance(matched_template, V9OptionsTemplateRecord): odfs = V9OptionsDataFlowset(data[offset:], matched_template) self._options += odfs.option_data_records if odfs.length == 0: break offset += odfs.length else: raise NotImplementedError # In the same export packet, re-try flowsets with previously unknown templates. # Might happen, if an export packet first contains data flowsets, and template flowsets after if skipped_flowsets_offsets and self._new_templates: # Process flowsets in the data slice which occured before the template sets # Handling of offset increases is not needed here for offset in skipped_flowsets_offsets: pack = struct.unpack('!H', data[offset:offset + 2]) flowset_id = pack[0] if flowset_id not in self._templates: raise V9TemplateNotRecognized matched_template = self._templates[flowset_id] if isinstance(matched_template, V9TemplateRecord): dfs = V9DataFlowSet(data[offset:], matched_template) self._flows += dfs.flows elif isinstance(matched_template, V9OptionsTemplateRecord): odfs = V9OptionsDataFlowset(data[offset:], matched_template) self._options += odfs.option_data_records elif skipped_flowsets_offsets: raise V9TemplateNotRecognized @property def contains_new_templates(self): return self._new_templates @property def flows(self): return self._flows @property def templates(self): return self._templates @property def options(self): return self._options def __repr__(self): s = " and new template(s)" if self.contains_new_templates else "" return "".format(self.header.count, s)