diff --git a/netflow/ipfix.py b/netflow/ipfix.py index a66f729..5773da1 100644 --- a/netflow/ipfix.py +++ b/netflow/ipfix.py @@ -10,6 +10,144 @@ Licensed under MIT License. See LICENSE. import struct +# Source: https://www.iana.org/assignments/ipfix/ipfix-information-elements.csv +IPFIX_FIELD_TYPES = { + 0: "Reserved", 1: 'octetDeltaCount', 2: "packetDeltaCount", 3: "deltaFlowCount", 4: "protocolIdentifier", + 5: "ipClassOfService", 6: "tcpControlBits", 7: "sourceTransportPort", 8: "sourceIPv4Address", + 9: "sourceIPv4PrefixLength", 10: "ingressInterface", 11: "destinationTransportPort", + 12: "destinationIPv4Address", 13: "destinationIPv4PrefixLength", 14: "egressInterface", + 15: "ipNextHopIPv4Address", 16: "bgpSourceAsNumber", 17: "bgpDestinationAsNumber", 18: "bgpNextHopIPv4Address", + 19: "postMCastPacketDeltaCount", 20: "postMCastOctetDeltaCount", 21: "flowEndSysUpTime", + 22: "flowStartSysUpTime", 23: "postOctetDeltaCount", 24: "postPacketDeltaCount", 25: "minimumIpTotalLength", + 26: "maximumIpTotalLength", 27: "sourceIPv6Address", 28: "destinationIPv6Address", + 29: "sourceIPv6PrefixLength", 30: "destinationIPv6PrefixLength", 31: "flowLabelIPv6", 32: "icmpTypeCodeIPv4", + 33: "igmpType", 34: "samplingInterval", 35: "samplingAlgorithm", 36: "flowActiveTimeout", + 37: "flowIdleTimeout", 38: "engineType", 39: "engineId", 40: "exportedOctetTotalCount", + 41: "exportedMessageTotalCount", 42: "exportedFlowRecordTotalCount", 43: "ipv4RouterSc", + 44: "sourceIPv4Prefix", 45: "destinationIPv4Prefix", 46: "mplsTopLabelType", 47: "mplsTopLabelIPv4Address", + 48: "samplerId", 49: "samplerMode", 50: "samplerRandomInterval", 51: "classId", 52: "minimumTTL", + 53: "maximumTTL", 54: "fragmentIdentification", 55: "postIpClassOfService", 56: "sourceMacAddress", + 57: "postDestinationMacAddress", 58: "vlanId", 59: "postVlanId", 60: "ipVersion", 61: "flowDirection", + 62: "ipNextHopIPv6Address", 63: "bgpNextHopIPv6Address", 64: "ipv6ExtensionHeaders", + 70: "mplsTopLabelStackSection", 71: "mplsLabelStackSection2", 72: "mplsLabelStackSection3", + 73: "mplsLabelStackSection4", 74: "mplsLabelStackSection5", 75: "mplsLabelStackSection6", + 76: "mplsLabelStackSection7", 77: "mplsLabelStackSection8", 78: "mplsLabelStackSection9", + 79: "mplsLabelStackSection10", 80: "destinationMacAddress", 81: "postSourceMacAddress", 82: "interfaceName", + 83: "interfaceDescription", 84: "samplerName", 85: "octetTotalCount", 86: "packetTotalCount", + 87: "flagsAndSamplerId", 88: "fragmentOffset", 89: "forwardingStatus", 90: "mplsVpnRouteDistinguisher", + 91: "mplsTopLabelPrefixLength", 92: "srcTrafficIndex", 93: "dstTrafficIndex", 94: "applicationDescription", + 95: "applicationId", 96: "applicationName", 97: "Assigned for NetFlow v9 compatibility", + 98: "postIpDiffServCodePoint", 99: "multicastReplicationFactor", 100: "className", + 101: "classificationEngineId", 102: "layer2packetSectionOffset", 103: "layer2packetSectionSize", + 104: "layer2packetSectionData", 128: "bgpNextAdjacentAsNumber", 129: "bgpPrevAdjacentAsNumber", + 130: "exporterIPv4Address", 131: "exporterIPv6Address", 132: "droppedOctetDeltaCount", + 133: "droppedPacketDeltaCount", 134: "droppedOctetTotalCount", 135: "droppedPacketTotalCount", + 136: "flowEndReason", 137: "commonPropertiesId", 138: "observationPointId", 139: "icmpTypeCodeIPv6", + 140: "mplsTopLabelIPv6Address", 141: "lineCardId", 142: "portId", 143: "meteringProcessId", + 144: "exportingProcessId", 145: "templateId", 146: "wlanChannelId", 147: "wlanSSID", 148: "flowId", + 149: "observationDomainId", 150: "flowStartSeconds", 151: "flowEndSeconds", 152: "flowStartMilliseconds", + 153: "flowEndMilliseconds", 154: "flowStartMicroseconds", 155: "flowEndMicroseconds", + 156: "flowStartNanoseconds", 157: "flowEndNanoseconds", 158: "flowStartDeltaMicroseconds", + 159: "flowEndDeltaMicroseconds", 160: "systemInitTimeMilliseconds", 161: "flowDurationMilliseconds", + 162: "flowDurationMicroseconds", 163: "observedFlowTotalCount", 164: "ignoredPacketTotalCount", + 165: "ignoredOctetTotalCount", 166: "notSentFlowTotalCount", 167: "notSentPacketTotalCount", + 168: "notSentOctetTotalCount", 169: "destinationIPv6Prefix", 170: "sourceIPv6Prefix", + 171: "postOctetTotalCount", 172: "postPacketTotalCount", 173: "flowKeyIndicator", + 174: "postMCastPacketTotalCount", 175: "postMCastOctetTotalCount", 176: "icmpTypeIPv4", 177: "icmpCodeIPv4", + 178: "icmpTypeIPv6", 179: "icmpCodeIPv6", 180: "udpSourcePort", 181: "udpDestinationPort", + 182: "tcpSourcePort", 183: "tcpDestinationPort", 184: "tcpSequenceNumber", 185: "tcpAcknowledgementNumber", + 186: "tcpWindowSize", 187: "tcpUrgentPointer", 188: "tcpHeaderLength", 189: "ipHeaderLength", + 190: "totalLengthIPv4", 191: "payloadLengthIPv6", 192: "ipTTL", 193: "nextHeaderIPv6", + 194: "mplsPayloadLength", 195: "ipDiffServCodePoint", 196: "ipPrecedence", 197: "fragmentFlags", + 198: "octetDeltaSumOfSquares", 199: "octetTotalSumOfSquares", 200: "mplsTopLabelTTL", + 201: "mplsLabelStackLength", 202: "mplsLabelStackDepth", 203: "mplsTopLabelExp", 204: "ipPayloadLength", + 205: "udpMessageLength", 206: "isMulticast", 207: "ipv4IHL", 208: "ipv4Options", 209: "tcpOptions", + 210: "paddingOctets", 211: "collectorIPv4Address", 212: "collectorIPv6Address", 213: "exportInterface", + 214: "exportProtocolVersion", 215: "exportTransportProtocol", 216: "collectorTransportPort", + 217: "exporterTransportPort", 218: "tcpSynTotalCount", 219: "tcpFinTotalCount", 220: "tcpRstTotalCount", + 221: "tcpPshTotalCount", 222: "tcpAckTotalCount", 223: "tcpUrgTotalCount", 224: "ipTotalLength", + 225: "postNATSourceIPv4Address", 226: "postNATDestinationIPv4Address", 227: "postNAPTSourceTransportPort", + 228: "postNAPTDestinationTransportPort", 229: "natOriginatingAddressRealm", 230: "natEvent", + 231: "initiatorOctets", 232: "responderOctets", 233: "firewallEvent", 234: "ingressVRFID", 235: "egressVRFID", + 236: "VRFname", 237: "postMplsTopLabelExp", 238: "tcpWindowScale", 239: "biflowDirection", + 240: "ethernetHeaderLength", 241: "ethernetPayloadLength", 242: "ethernetTotalLength", 243: "dot1qVlanId", + 244: "dot1qPriority", 245: "dot1qCustomerVlanId", 246: "dot1qCustomerPriority", 247: "metroEvcId", + 248: "metroEvcType", 249: "pseudoWireId", 250: "pseudoWireType", 251: "pseudoWireControlWord", + 252: "ingressPhysicalInterface", 253: "egressPhysicalInterface", 254: "postDot1qVlanId", + 255: "postDot1qCustomerVlanId", 256: "ethernetType", 257: "postIpPrecedence", + 258: "collectionTimeMilliseconds", 259: "exportSctpStreamId", 260: "maxExportSeconds", + 261: "maxFlowEndSeconds", 262: "messageMD5Checksum", 263: "messageScope", 264: "minExportSeconds", + 265: "minFlowStartSeconds", 266: "opaqueOctets", 267: "sessionScope", 268: "maxFlowEndMicroseconds", + 269: "maxFlowEndMilliseconds", 270: "maxFlowEndNanoseconds", 271: "minFlowStartMicroseconds", + 272: "minFlowStartMilliseconds", 273: "minFlowStartNanoseconds", 274: "collectorCertificate", + 275: "exporterCertificate", 276: "dataRecordsReliability", 277: "observationPointType", + 278: "newConnectionDeltaCount", 279: "connectionSumDurationSeconds", 280: "connectionTransactionId", + 281: "postNATSourceIPv6Address", 282: "postNATDestinationIPv6Address", 283: "natPoolId", 284: "natPoolName", + 285: "anonymizationFlags", 286: "anonymizationTechnique", 287: "informationElementIndex", 288: "p2pTechnology", + 289: "tunnelTechnology", 290: "encryptedTechnology", 291: "basicList", 292: "subTemplateList", + 293: "subTemplateMultiList", 294: "bgpValidityState", 295: "IPSecSPI", 296: "greKey", 297: "natType", + 298: "initiatorPackets", 299: "responderPackets", 300: "observationDomainName", 301: "selectionSequenceId", + 302: "selectorId", 303: "informationElementId", 304: "selectorAlgorithm", 305: "samplingPacketInterval", + 306: "samplingPacketSpace", 307: "samplingTimeInterval", 308: "samplingTimeSpace", 309: "samplingSize", + 310: "samplingPopulation", 311: "samplingProbability", 312: "dataLinkFrameSize", 313: "ipHeaderPacketSection", + 314: "ipPayloadPacketSection", 315: "dataLinkFrameSection", 316: "mplsLabelStackSection", + 317: "mplsPayloadPacketSection", 318: "selectorIdTotalPktsObserved", 319: "selectorIdTotalPktsSelected", + 320: "absoluteError", 321: "relativeError", 322: "observationTimeSeconds", 323: "observationTimeMilliseconds", + 324: "observationTimeMicroseconds", 325: "observationTimeNanoseconds", 326: "digestHashValue", + 327: "hashIPPayloadOffset", 328: "hashIPPayloadSize", 329: "hashOutputRangeMin", 330: "hashOutputRangeMax", + 331: "hashSelectedRangeMin", 332: "hashSelectedRangeMax", 333: "hashDigestOutput", 334: "hashInitialiserValue", + 335: "selectorName", 336: "upperCILimit", 337: "lowerCILimit", 338: "confidenceLevel", + 339: "informationElementDataType", 340: "informationElementDescription", 341: "informationElementName", + 342: "informationElementRangeBegin", 343: "informationElementRangeEnd", 344: "informationElementSemantics", + 345: "informationElementUnits", 346: "privateEnterpriseNumber", 347: "virtualStationInterfaceId", + 348: "virtualStationInterfaceName", 349: "virtualStationUUID", 350: "virtualStationName", + 351: "layer2SegmentId", 352: "layer2OctetDeltaCount", 353: "layer2OctetTotalCount", + 354: "ingressUnicastPacketTotalCount", 355: "ingressMulticastPacketTotalCount", + 356: "ingressBroadcastPacketTotalCount", 357: "egressUnicastPacketTotalCount", + 358: "egressBroadcastPacketTotalCount", 359: "monitoringIntervalStartMilliSeconds", + 360: "monitoringIntervalEndMilliSeconds", 361: "portRangeStart", 362: "portRangeEnd", 363: "portRangeStepSize", + 364: "portRangeNumPorts", 365: "staMacAddress", 366: "staIPv4Address", 367: "wtpMacAddress", + 368: "ingressInterfaceType", 369: "egressInterfaceType", 370: "rtpSequenceNumber", 371: "userName", + 372: "applicationCategoryName", 373: "applicationSubCategoryName", 374: "applicationGroupName", + 375: "originalFlowsPresent", 376: "originalFlowsInitiated", 377: "originalFlowsCompleted", + 378: "distinctCountOfSourceIPAddress", 379: "distinctCountOfDestinationIPAddress", + 380: "distinctCountOfSourceIPv4Address", 381: "distinctCountOfDestinationIPv4Address", + 382: "distinctCountOfSourceIPv6Address", 383: "distinctCountOfDestinationIPv6Address", + 384: "valueDistributionMethod", 385: "rfc3550JitterMilliseconds", 386: "rfc3550JitterMicroseconds", + 387: "rfc3550JitterNanoseconds", 388: "dot1qDEI", 389: "dot1qCustomerDEI", 390: "flowSelectorAlgorithm", + 391: "flowSelectedOctetDeltaCount", 392: "flowSelectedPacketDeltaCount", 393: "flowSelectedFlowDeltaCount", + 394: "selectorIDTotalFlowsObserved", 395: "selectorIDTotalFlowsSelected", 396: "samplingFlowInterval", + 397: "samplingFlowSpacing", 398: "flowSamplingTimeInterval", 399: "flowSamplingTimeSpacing", + 400: "hashFlowDomain", 401: "transportOctetDeltaCount", 402: "transportPacketDeltaCount", + 403: "originalExporterIPv4Address", 404: "originalExporterIPv6Address", 405: "originalObservationDomainId", + 406: "intermediateProcessId", 407: "ignoredDataRecordTotalCount", 408: "dataLinkFrameType", + 409: "sectionOffset", 410: "sectionExportedOctets", 411: "dot1qServiceInstanceTag", + 412: "dot1qServiceInstanceId", 413: "dot1qServiceInstancePriority", 414: "dot1qCustomerSourceMacAddress", + 415: "dot1qCustomerDestinationMacAddress", 416: "", 417: "postLayer2OctetDeltaCount", + 418: "postMCastLayer2OctetDeltaCount", 419: "", 420: "postLayer2OctetTotalCount", + 421: "postMCastLayer2OctetTotalCount", 422: "minimumLayer2TotalLength", 423: "maximumLayer2TotalLength", + 424: "droppedLayer2OctetDeltaCount", 425: "droppedLayer2OctetTotalCount", 426: "ignoredLayer2OctetTotalCount", + 427: "notSentLayer2OctetTotalCount", 428: "layer2OctetDeltaSumOfSquares", 429: "layer2OctetTotalSumOfSquares", + 430: "layer2FrameDeltaCount", 431: "layer2FrameTotalCount", 432: "pseudoWireDestinationIPv4Address", + 433: "ignoredLayer2FrameTotalCount", 434: "mibObjectValueInteger", 435: "mibObjectValueOctetString", + 436: "mibObjectValueOID", 437: "mibObjectValueBits", 438: "mibObjectValueIPAddress", + 439: "mibObjectValueCounter", 440: "mibObjectValueGauge", 441: "mibObjectValueTimeTicks", + 442: "mibObjectValueUnsigned", 443: "mibObjectValueTable", 444: "mibObjectValueRow", + 445: "mibObjectIdentifier", 446: "mibSubIdentifier", 447: "mibIndexIndicator", 448: "mibCaptureTimeSemantics", + 449: "mibContextEngineID", 450: "mibContextName", 451: "mibObjectName", 452: "mibObjectDescription", + 453: "mibObjectSyntax", 454: "mibModuleName", 455: "mobileIMSI", 456: "mobileMSISDN", 457: "httpStatusCode", + 458: "sourceTransportPortsLimit", 459: "httpRequestMethod", 460: "httpRequestHost", 461: "httpRequestTarget", + 462: "httpMessageVersion", 463: "natInstanceID", 464: "internalAddressRealm", 465: "externalAddressRealm", + 466: "natQuotaExceededEvent", 467: "natThresholdEvent", 468: "httpUserAgent", 469: "httpContentType", + 470: "httpReasonPhrase", 471: "maxSessionEntries", 472: "maxBIBEntries", 473: "maxEntriesPerUser", + 474: "maxSubscribers", 475: "maxFragmentsPendingReassembly", 476: "addressPoolHighThreshold", + 477: "addressPoolLowThreshold", 478: "addressPortMappingHighThreshold", 479: "addressPortMappingLowThreshold", + 480: "addressPortMappingPerUserHighThreshold", 481: "globalAddressMappingHighThreshold", 482: "vpnIdentifier", + 483: "bgpCommunity", 484: "bgpSourceCommunityList", 485: "bgpDestinationCommunityList", + 486: "bgpExtendedCommunity", 487: "bgpSourceExtendedCommunityList", 488: "bgpDestinationExtendedCommunityList", + 489: "bgpLargeCommunity", 490: "bgpSourceLargeCommunityList", 491: "bgpDestinationLargeCommunityList" +} + class IPFIXMalformedRecord(Exception): pass @@ -23,6 +161,14 @@ class IPFIXMalformedPacket(Exception): pass +class IPFIXTemplateError(Exception): + pass + + +class IPFIXTemplateNotRecognized(KeyError): + pass + + class IPFIXHeader: """The header of the IPFIX export packet """ @@ -52,11 +198,8 @@ class IPFIXTemplateRecord: if len(self.fields) != self.field_count: raise IPFIXMalformedRecord - # if offset % 4 != 0: # padding included in record, must be zero - # to_fill = (4 - offset % 4) - # if data[offset:offset + to_fill] != 0: - # raise IPFIXMalformedRecord - # offset += to_fill + # TODO: if padding is needed, implement here + self._length = offset def get_length(self): @@ -88,11 +231,8 @@ class IPFIXOptionsTemplateRecord: raise IPFIXMalformedRecord offset += offset_add - # if offset % 4 != 0: # padding included in record, must be zero - # to_fill = (4 - offset % 4) - # if data[offset:offset + to_fill] != 0: - # raise IPFIXMalformedRecord - # offset += to_fill + # TODO: if padding is needed, implement here + self._length = offset def get_length(self): @@ -105,13 +245,58 @@ class IPFIXOptionsTemplateRecord: class IPFIXDataRecord: - def __init__(self, data, template_id): - pass + """The IPFIX data record with fields and their value. + The field types are identified by the corresponding template. + In contrast to the NetFlow v9 implementation, this one does not use an extra class for the fields. + """ + + def __init__(self, data, template): + self.fields = [] + offset = 0 + unpacker = "!" + + # Iterate through all fields of this template and build the unpack format string + # TODO: this does not handle signed/unsigned or other types + # See https://www.iana.org/assignments/ipfix/ipfix.xhtml + for field_type, field_length in template: + if field_length == 1: + unpacker += "B" + elif field_length == 2: + unpacker += "H" + elif field_length == 4: + unpacker += "I" + elif field_length == 8: + unpacker += "Q" + else: + # TODO: IPv6 fields have 16 bytes, but struct does not support 16 bytes + raise IPFIXTemplateError("Template field_length {} not handled in unpacker".format(field_length)) + offset += field_length + + pack = struct.unpack(unpacker, data[0:offset]) + + # Iterate through template again, but taking the unpacked value this time + for (field_type, _), value in zip(template, pack): + self.fields.append((field_type, value)) + + self._length = offset + + def get_length(self): + return self._length + + @property + def data(self): + return { + IPFIX_FIELD_TYPES.get(key, key): value for (key, value) in self.fields + } + + def __repr__(self): + return "".format(len(self.fields)) class IPFIXSet: - """A set containing the set header and a collection of records (templates, options, data) + """A set containing the set header and a collection of records (one of templates, options, data) """ + def __init__(self, data, templates): self.header = IPFIXSetHeader(data[0:IPFIXSetHeader.size]) self.records = [] @@ -121,23 +306,38 @@ class IPFIXSet: while offset < self.header.length: # length of whole set template_record = IPFIXTemplateRecord(data[offset:]) self.records.append(template_record) + templates[template_record.template_id] = template_record.fields offset += template_record.get_length() + elif self.header.set_id == 3: # options template while offset < self.header.length: optionstemplate_record = IPFIXOptionsTemplateRecord(data[offset:]) self.records.append(optionstemplate_record) + templates[optionstemplate_record.template_id] = optionstemplate_record.scope_fields + \ + optionstemplate_record.fields offset += optionstemplate_record.get_length() - elif self.header.set_id >= 256: # data set - pass - # while offset < self.header.length: - # data_record = IPFIXDataRecord(data[offset:], None) - # self.records.append(data_record) - # offset += 0 + + elif self.header.set_id >= 256: # data set, set_id is template id + while offset < self.header.length: + template = templates.get(self.header.set_id) + if not template: + raise IPFIXTemplateNotRecognized + data_record = IPFIXDataRecord(data[offset:], template) + self.records.append(data_record) + offset += data_record.get_length() self._length = offset def get_length(self): return self._length + @property + def is_template(self): + return self.header.set_id in [2, 3] + + @property + def is_data(self): + return self.header.set_id >= 256 + def __repr__(self): return "".format(self.header.set_id, len(self.records)) @@ -175,10 +375,17 @@ class IPFIXExportPacket: def __init__(self, data, templates): self.header = IPFIXHeader(data[:IPFIXHeader.size]) self.sets = [] + self._contains_new_templates = False + self._flows = [] offset = IPFIXHeader.size while offset < self.header.length: new_set = IPFIXSet(data[offset:], templates) + if new_set.is_template: + self._contains_new_templates = True + elif new_set.is_data: + self._flows += new_set.records + self.sets.append(new_set) offset += new_set.get_length() @@ -186,6 +393,14 @@ class IPFIXExportPacket: if offset != self.header.length: raise IPFIXMalformedPacket + @property + def contains_new_templates(self): + return self._contains_new_templates + + @property + def flows(self): + return self._flows + def __repr__(self): return "".format( len(self.sets), self.header.export_uptime