From 937e6401988532a35a70a639091e8cce8195e85b Mon Sep 17 00:00:00 2001 From: Dominik Pataky Date: Tue, 31 Mar 2020 22:36:00 +0200 Subject: [PATCH] IPFIX: implement data records and template handling; add IANA types Second half of the IPFIX implementation now adds the support for data records. The templates are also extracted, allowing the collector to use them across exports. The field types were extracted from the IANA assignment list at https://www.iana.org/assignments/ipfix/ipfix-information-elements.csv Please note that the IPFIX implementation was made from scratch and differs from the NetFlow v9 implementation, as there was little copy/paste. --- netflow/ipfix.py | 253 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 234 insertions(+), 19 deletions(-) diff --git a/netflow/ipfix.py b/netflow/ipfix.py index a66f729..5773da1 100644 --- a/netflow/ipfix.py +++ b/netflow/ipfix.py @@ -10,6 +10,144 @@ Licensed under MIT License. See LICENSE. import struct +# Source: https://www.iana.org/assignments/ipfix/ipfix-information-elements.csv +IPFIX_FIELD_TYPES = { + 0: "Reserved", 1: 'octetDeltaCount', 2: "packetDeltaCount", 3: "deltaFlowCount", 4: "protocolIdentifier", + 5: "ipClassOfService", 6: "tcpControlBits", 7: "sourceTransportPort", 8: "sourceIPv4Address", + 9: "sourceIPv4PrefixLength", 10: "ingressInterface", 11: "destinationTransportPort", + 12: "destinationIPv4Address", 13: "destinationIPv4PrefixLength", 14: "egressInterface", + 15: "ipNextHopIPv4Address", 16: "bgpSourceAsNumber", 17: "bgpDestinationAsNumber", 18: "bgpNextHopIPv4Address", + 19: "postMCastPacketDeltaCount", 20: "postMCastOctetDeltaCount", 21: "flowEndSysUpTime", + 22: "flowStartSysUpTime", 23: "postOctetDeltaCount", 24: "postPacketDeltaCount", 25: "minimumIpTotalLength", + 26: "maximumIpTotalLength", 27: "sourceIPv6Address", 28: "destinationIPv6Address", + 29: "sourceIPv6PrefixLength", 30: "destinationIPv6PrefixLength", 31: "flowLabelIPv6", 32: "icmpTypeCodeIPv4", + 33: "igmpType", 34: "samplingInterval", 35: "samplingAlgorithm", 36: "flowActiveTimeout", + 37: "flowIdleTimeout", 38: "engineType", 39: "engineId", 40: "exportedOctetTotalCount", + 41: "exportedMessageTotalCount", 42: "exportedFlowRecordTotalCount", 43: "ipv4RouterSc", + 44: "sourceIPv4Prefix", 45: "destinationIPv4Prefix", 46: "mplsTopLabelType", 47: "mplsTopLabelIPv4Address", + 48: "samplerId", 49: "samplerMode", 50: "samplerRandomInterval", 51: "classId", 52: "minimumTTL", + 53: "maximumTTL", 54: "fragmentIdentification", 55: "postIpClassOfService", 56: "sourceMacAddress", + 57: "postDestinationMacAddress", 58: "vlanId", 59: "postVlanId", 60: "ipVersion", 61: "flowDirection", + 62: "ipNextHopIPv6Address", 63: "bgpNextHopIPv6Address", 64: "ipv6ExtensionHeaders", + 70: "mplsTopLabelStackSection", 71: "mplsLabelStackSection2", 72: "mplsLabelStackSection3", + 73: "mplsLabelStackSection4", 74: "mplsLabelStackSection5", 75: "mplsLabelStackSection6", + 76: "mplsLabelStackSection7", 77: "mplsLabelStackSection8", 78: "mplsLabelStackSection9", + 79: "mplsLabelStackSection10", 80: "destinationMacAddress", 81: "postSourceMacAddress", 82: "interfaceName", + 83: "interfaceDescription", 84: "samplerName", 85: "octetTotalCount", 86: "packetTotalCount", + 87: "flagsAndSamplerId", 88: "fragmentOffset", 89: "forwardingStatus", 90: "mplsVpnRouteDistinguisher", + 91: "mplsTopLabelPrefixLength", 92: "srcTrafficIndex", 93: "dstTrafficIndex", 94: "applicationDescription", + 95: "applicationId", 96: "applicationName", 97: "Assigned for NetFlow v9 compatibility", + 98: "postIpDiffServCodePoint", 99: "multicastReplicationFactor", 100: "className", + 101: "classificationEngineId", 102: "layer2packetSectionOffset", 103: "layer2packetSectionSize", + 104: "layer2packetSectionData", 128: "bgpNextAdjacentAsNumber", 129: "bgpPrevAdjacentAsNumber", + 130: "exporterIPv4Address", 131: "exporterIPv6Address", 132: "droppedOctetDeltaCount", + 133: "droppedPacketDeltaCount", 134: "droppedOctetTotalCount", 135: "droppedPacketTotalCount", + 136: "flowEndReason", 137: "commonPropertiesId", 138: "observationPointId", 139: "icmpTypeCodeIPv6", + 140: "mplsTopLabelIPv6Address", 141: "lineCardId", 142: "portId", 143: "meteringProcessId", + 144: "exportingProcessId", 145: "templateId", 146: "wlanChannelId", 147: "wlanSSID", 148: "flowId", + 149: "observationDomainId", 150: "flowStartSeconds", 151: "flowEndSeconds", 152: "flowStartMilliseconds", + 153: "flowEndMilliseconds", 154: "flowStartMicroseconds", 155: "flowEndMicroseconds", + 156: "flowStartNanoseconds", 157: "flowEndNanoseconds", 158: "flowStartDeltaMicroseconds", + 159: "flowEndDeltaMicroseconds", 160: "systemInitTimeMilliseconds", 161: "flowDurationMilliseconds", + 162: "flowDurationMicroseconds", 163: "observedFlowTotalCount", 164: "ignoredPacketTotalCount", + 165: "ignoredOctetTotalCount", 166: "notSentFlowTotalCount", 167: "notSentPacketTotalCount", + 168: "notSentOctetTotalCount", 169: "destinationIPv6Prefix", 170: "sourceIPv6Prefix", + 171: "postOctetTotalCount", 172: "postPacketTotalCount", 173: "flowKeyIndicator", + 174: "postMCastPacketTotalCount", 175: "postMCastOctetTotalCount", 176: "icmpTypeIPv4", 177: "icmpCodeIPv4", + 178: "icmpTypeIPv6", 179: "icmpCodeIPv6", 180: "udpSourcePort", 181: "udpDestinationPort", + 182: "tcpSourcePort", 183: "tcpDestinationPort", 184: "tcpSequenceNumber", 185: "tcpAcknowledgementNumber", + 186: "tcpWindowSize", 187: "tcpUrgentPointer", 188: "tcpHeaderLength", 189: "ipHeaderLength", + 190: "totalLengthIPv4", 191: "payloadLengthIPv6", 192: "ipTTL", 193: "nextHeaderIPv6", + 194: "mplsPayloadLength", 195: "ipDiffServCodePoint", 196: "ipPrecedence", 197: "fragmentFlags", + 198: "octetDeltaSumOfSquares", 199: "octetTotalSumOfSquares", 200: "mplsTopLabelTTL", + 201: "mplsLabelStackLength", 202: "mplsLabelStackDepth", 203: "mplsTopLabelExp", 204: "ipPayloadLength", + 205: "udpMessageLength", 206: "isMulticast", 207: "ipv4IHL", 208: "ipv4Options", 209: "tcpOptions", + 210: "paddingOctets", 211: "collectorIPv4Address", 212: "collectorIPv6Address", 213: "exportInterface", + 214: "exportProtocolVersion", 215: "exportTransportProtocol", 216: "collectorTransportPort", + 217: "exporterTransportPort", 218: "tcpSynTotalCount", 219: "tcpFinTotalCount", 220: "tcpRstTotalCount", + 221: "tcpPshTotalCount", 222: "tcpAckTotalCount", 223: "tcpUrgTotalCount", 224: "ipTotalLength", + 225: "postNATSourceIPv4Address", 226: "postNATDestinationIPv4Address", 227: "postNAPTSourceTransportPort", + 228: "postNAPTDestinationTransportPort", 229: "natOriginatingAddressRealm", 230: "natEvent", + 231: "initiatorOctets", 232: "responderOctets", 233: "firewallEvent", 234: "ingressVRFID", 235: "egressVRFID", + 236: "VRFname", 237: "postMplsTopLabelExp", 238: "tcpWindowScale", 239: "biflowDirection", + 240: "ethernetHeaderLength", 241: "ethernetPayloadLength", 242: "ethernetTotalLength", 243: "dot1qVlanId", + 244: "dot1qPriority", 245: "dot1qCustomerVlanId", 246: "dot1qCustomerPriority", 247: "metroEvcId", + 248: "metroEvcType", 249: "pseudoWireId", 250: "pseudoWireType", 251: "pseudoWireControlWord", + 252: "ingressPhysicalInterface", 253: "egressPhysicalInterface", 254: "postDot1qVlanId", + 255: "postDot1qCustomerVlanId", 256: "ethernetType", 257: "postIpPrecedence", + 258: "collectionTimeMilliseconds", 259: "exportSctpStreamId", 260: "maxExportSeconds", + 261: "maxFlowEndSeconds", 262: "messageMD5Checksum", 263: "messageScope", 264: "minExportSeconds", + 265: "minFlowStartSeconds", 266: "opaqueOctets", 267: "sessionScope", 268: "maxFlowEndMicroseconds", + 269: "maxFlowEndMilliseconds", 270: "maxFlowEndNanoseconds", 271: "minFlowStartMicroseconds", + 272: "minFlowStartMilliseconds", 273: "minFlowStartNanoseconds", 274: "collectorCertificate", + 275: "exporterCertificate", 276: "dataRecordsReliability", 277: "observationPointType", + 278: "newConnectionDeltaCount", 279: "connectionSumDurationSeconds", 280: "connectionTransactionId", + 281: "postNATSourceIPv6Address", 282: "postNATDestinationIPv6Address", 283: "natPoolId", 284: "natPoolName", + 285: "anonymizationFlags", 286: "anonymizationTechnique", 287: "informationElementIndex", 288: "p2pTechnology", + 289: "tunnelTechnology", 290: "encryptedTechnology", 291: "basicList", 292: "subTemplateList", + 293: "subTemplateMultiList", 294: "bgpValidityState", 295: "IPSecSPI", 296: "greKey", 297: "natType", + 298: "initiatorPackets", 299: "responderPackets", 300: "observationDomainName", 301: "selectionSequenceId", + 302: "selectorId", 303: "informationElementId", 304: "selectorAlgorithm", 305: "samplingPacketInterval", + 306: "samplingPacketSpace", 307: "samplingTimeInterval", 308: "samplingTimeSpace", 309: "samplingSize", + 310: "samplingPopulation", 311: "samplingProbability", 312: "dataLinkFrameSize", 313: "ipHeaderPacketSection", + 314: "ipPayloadPacketSection", 315: "dataLinkFrameSection", 316: "mplsLabelStackSection", + 317: "mplsPayloadPacketSection", 318: "selectorIdTotalPktsObserved", 319: "selectorIdTotalPktsSelected", + 320: "absoluteError", 321: "relativeError", 322: "observationTimeSeconds", 323: "observationTimeMilliseconds", + 324: "observationTimeMicroseconds", 325: "observationTimeNanoseconds", 326: "digestHashValue", + 327: "hashIPPayloadOffset", 328: "hashIPPayloadSize", 329: "hashOutputRangeMin", 330: "hashOutputRangeMax", + 331: "hashSelectedRangeMin", 332: "hashSelectedRangeMax", 333: "hashDigestOutput", 334: "hashInitialiserValue", + 335: "selectorName", 336: "upperCILimit", 337: "lowerCILimit", 338: "confidenceLevel", + 339: "informationElementDataType", 340: "informationElementDescription", 341: "informationElementName", + 342: "informationElementRangeBegin", 343: "informationElementRangeEnd", 344: "informationElementSemantics", + 345: "informationElementUnits", 346: "privateEnterpriseNumber", 347: "virtualStationInterfaceId", + 348: "virtualStationInterfaceName", 349: "virtualStationUUID", 350: "virtualStationName", + 351: "layer2SegmentId", 352: "layer2OctetDeltaCount", 353: "layer2OctetTotalCount", + 354: "ingressUnicastPacketTotalCount", 355: "ingressMulticastPacketTotalCount", + 356: "ingressBroadcastPacketTotalCount", 357: "egressUnicastPacketTotalCount", + 358: "egressBroadcastPacketTotalCount", 359: "monitoringIntervalStartMilliSeconds", + 360: "monitoringIntervalEndMilliSeconds", 361: "portRangeStart", 362: "portRangeEnd", 363: "portRangeStepSize", + 364: "portRangeNumPorts", 365: "staMacAddress", 366: "staIPv4Address", 367: "wtpMacAddress", + 368: "ingressInterfaceType", 369: "egressInterfaceType", 370: "rtpSequenceNumber", 371: "userName", + 372: "applicationCategoryName", 373: "applicationSubCategoryName", 374: "applicationGroupName", + 375: "originalFlowsPresent", 376: "originalFlowsInitiated", 377: "originalFlowsCompleted", + 378: "distinctCountOfSourceIPAddress", 379: "distinctCountOfDestinationIPAddress", + 380: "distinctCountOfSourceIPv4Address", 381: "distinctCountOfDestinationIPv4Address", + 382: "distinctCountOfSourceIPv6Address", 383: "distinctCountOfDestinationIPv6Address", + 384: "valueDistributionMethod", 385: "rfc3550JitterMilliseconds", 386: "rfc3550JitterMicroseconds", + 387: "rfc3550JitterNanoseconds", 388: "dot1qDEI", 389: "dot1qCustomerDEI", 390: "flowSelectorAlgorithm", + 391: "flowSelectedOctetDeltaCount", 392: "flowSelectedPacketDeltaCount", 393: "flowSelectedFlowDeltaCount", + 394: "selectorIDTotalFlowsObserved", 395: "selectorIDTotalFlowsSelected", 396: "samplingFlowInterval", + 397: "samplingFlowSpacing", 398: "flowSamplingTimeInterval", 399: "flowSamplingTimeSpacing", + 400: "hashFlowDomain", 401: "transportOctetDeltaCount", 402: "transportPacketDeltaCount", + 403: "originalExporterIPv4Address", 404: "originalExporterIPv6Address", 405: "originalObservationDomainId", + 406: "intermediateProcessId", 407: "ignoredDataRecordTotalCount", 408: "dataLinkFrameType", + 409: "sectionOffset", 410: "sectionExportedOctets", 411: "dot1qServiceInstanceTag", + 412: "dot1qServiceInstanceId", 413: "dot1qServiceInstancePriority", 414: "dot1qCustomerSourceMacAddress", + 415: "dot1qCustomerDestinationMacAddress", 416: "", 417: "postLayer2OctetDeltaCount", + 418: "postMCastLayer2OctetDeltaCount", 419: "", 420: "postLayer2OctetTotalCount", + 421: "postMCastLayer2OctetTotalCount", 422: "minimumLayer2TotalLength", 423: "maximumLayer2TotalLength", + 424: "droppedLayer2OctetDeltaCount", 425: "droppedLayer2OctetTotalCount", 426: "ignoredLayer2OctetTotalCount", + 427: "notSentLayer2OctetTotalCount", 428: "layer2OctetDeltaSumOfSquares", 429: "layer2OctetTotalSumOfSquares", + 430: "layer2FrameDeltaCount", 431: "layer2FrameTotalCount", 432: "pseudoWireDestinationIPv4Address", + 433: "ignoredLayer2FrameTotalCount", 434: "mibObjectValueInteger", 435: "mibObjectValueOctetString", + 436: "mibObjectValueOID", 437: "mibObjectValueBits", 438: "mibObjectValueIPAddress", + 439: "mibObjectValueCounter", 440: "mibObjectValueGauge", 441: "mibObjectValueTimeTicks", + 442: "mibObjectValueUnsigned", 443: "mibObjectValueTable", 444: "mibObjectValueRow", + 445: "mibObjectIdentifier", 446: "mibSubIdentifier", 447: "mibIndexIndicator", 448: "mibCaptureTimeSemantics", + 449: "mibContextEngineID", 450: "mibContextName", 451: "mibObjectName", 452: "mibObjectDescription", + 453: "mibObjectSyntax", 454: "mibModuleName", 455: "mobileIMSI", 456: "mobileMSISDN", 457: "httpStatusCode", + 458: "sourceTransportPortsLimit", 459: "httpRequestMethod", 460: "httpRequestHost", 461: "httpRequestTarget", + 462: "httpMessageVersion", 463: "natInstanceID", 464: "internalAddressRealm", 465: "externalAddressRealm", + 466: "natQuotaExceededEvent", 467: "natThresholdEvent", 468: "httpUserAgent", 469: "httpContentType", + 470: "httpReasonPhrase", 471: "maxSessionEntries", 472: "maxBIBEntries", 473: "maxEntriesPerUser", + 474: "maxSubscribers", 475: "maxFragmentsPendingReassembly", 476: "addressPoolHighThreshold", + 477: "addressPoolLowThreshold", 478: "addressPortMappingHighThreshold", 479: "addressPortMappingLowThreshold", + 480: "addressPortMappingPerUserHighThreshold", 481: "globalAddressMappingHighThreshold", 482: "vpnIdentifier", + 483: "bgpCommunity", 484: "bgpSourceCommunityList", 485: "bgpDestinationCommunityList", + 486: "bgpExtendedCommunity", 487: "bgpSourceExtendedCommunityList", 488: "bgpDestinationExtendedCommunityList", + 489: "bgpLargeCommunity", 490: "bgpSourceLargeCommunityList", 491: "bgpDestinationLargeCommunityList" +} + class IPFIXMalformedRecord(Exception): pass @@ -23,6 +161,14 @@ class IPFIXMalformedPacket(Exception): pass +class IPFIXTemplateError(Exception): + pass + + +class IPFIXTemplateNotRecognized(KeyError): + pass + + class IPFIXHeader: """The header of the IPFIX export packet """ @@ -52,11 +198,8 @@ class IPFIXTemplateRecord: if len(self.fields) != self.field_count: raise IPFIXMalformedRecord - # if offset % 4 != 0: # padding included in record, must be zero - # to_fill = (4 - offset % 4) - # if data[offset:offset + to_fill] != 0: - # raise IPFIXMalformedRecord - # offset += to_fill + # TODO: if padding is needed, implement here + self._length = offset def get_length(self): @@ -88,11 +231,8 @@ class IPFIXOptionsTemplateRecord: raise IPFIXMalformedRecord offset += offset_add - # if offset % 4 != 0: # padding included in record, must be zero - # to_fill = (4 - offset % 4) - # if data[offset:offset + to_fill] != 0: - # raise IPFIXMalformedRecord - # offset += to_fill + # TODO: if padding is needed, implement here + self._length = offset def get_length(self): @@ -105,13 +245,58 @@ class IPFIXOptionsTemplateRecord: class IPFIXDataRecord: - def __init__(self, data, template_id): - pass + """The IPFIX data record with fields and their value. + The field types are identified by the corresponding template. + In contrast to the NetFlow v9 implementation, this one does not use an extra class for the fields. + """ + + def __init__(self, data, template): + self.fields = [] + offset = 0 + unpacker = "!" + + # Iterate through all fields of this template and build the unpack format string + # TODO: this does not handle signed/unsigned or other types + # See https://www.iana.org/assignments/ipfix/ipfix.xhtml + for field_type, field_length in template: + if field_length == 1: + unpacker += "B" + elif field_length == 2: + unpacker += "H" + elif field_length == 4: + unpacker += "I" + elif field_length == 8: + unpacker += "Q" + else: + # TODO: IPv6 fields have 16 bytes, but struct does not support 16 bytes + raise IPFIXTemplateError("Template field_length {} not handled in unpacker".format(field_length)) + offset += field_length + + pack = struct.unpack(unpacker, data[0:offset]) + + # Iterate through template again, but taking the unpacked value this time + for (field_type, _), value in zip(template, pack): + self.fields.append((field_type, value)) + + self._length = offset + + def get_length(self): + return self._length + + @property + def data(self): + return { + IPFIX_FIELD_TYPES.get(key, key): value for (key, value) in self.fields + } + + def __repr__(self): + return "".format(len(self.fields)) class IPFIXSet: - """A set containing the set header and a collection of records (templates, options, data) + """A set containing the set header and a collection of records (one of templates, options, data) """ + def __init__(self, data, templates): self.header = IPFIXSetHeader(data[0:IPFIXSetHeader.size]) self.records = [] @@ -121,23 +306,38 @@ class IPFIXSet: while offset < self.header.length: # length of whole set template_record = IPFIXTemplateRecord(data[offset:]) self.records.append(template_record) + templates[template_record.template_id] = template_record.fields offset += template_record.get_length() + elif self.header.set_id == 3: # options template while offset < self.header.length: optionstemplate_record = IPFIXOptionsTemplateRecord(data[offset:]) self.records.append(optionstemplate_record) + templates[optionstemplate_record.template_id] = optionstemplate_record.scope_fields + \ + optionstemplate_record.fields offset += optionstemplate_record.get_length() - elif self.header.set_id >= 256: # data set - pass - # while offset < self.header.length: - # data_record = IPFIXDataRecord(data[offset:], None) - # self.records.append(data_record) - # offset += 0 + + elif self.header.set_id >= 256: # data set, set_id is template id + while offset < self.header.length: + template = templates.get(self.header.set_id) + if not template: + raise IPFIXTemplateNotRecognized + data_record = IPFIXDataRecord(data[offset:], template) + self.records.append(data_record) + offset += data_record.get_length() self._length = offset def get_length(self): return self._length + @property + def is_template(self): + return self.header.set_id in [2, 3] + + @property + def is_data(self): + return self.header.set_id >= 256 + def __repr__(self): return "".format(self.header.set_id, len(self.records)) @@ -175,10 +375,17 @@ class IPFIXExportPacket: def __init__(self, data, templates): self.header = IPFIXHeader(data[:IPFIXHeader.size]) self.sets = [] + self._contains_new_templates = False + self._flows = [] offset = IPFIXHeader.size while offset < self.header.length: new_set = IPFIXSet(data[offset:], templates) + if new_set.is_template: + self._contains_new_templates = True + elif new_set.is_data: + self._flows += new_set.records + self.sets.append(new_set) offset += new_set.get_length() @@ -186,6 +393,14 @@ class IPFIXExportPacket: if offset != self.header.length: raise IPFIXMalformedPacket + @property + def contains_new_templates(self): + return self._contains_new_templates + + @property + def flows(self): + return self._flows + def __repr__(self): return "".format( len(self.sets), self.header.export_uptime