netflow/netflow/ipfix.py
Dominik Pataky 4b8cbf92bc IPFIX: implement field types of 16 bytes in parser
Python struct does not natively support 16 byte fields. But since IPFIX
uses fields of length 16 bytes for at least IPv6 addresses, they must be
processed in the IPFIX parser. This commit adds support for 16 byte
fields by handling them as special struct.unpack cases.
2020-04-01 11:34:34 +02:00

781 lines
32 KiB
Python

#!/usr/bin/env python3
"""
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Reference: https://tools.ietf.org/html/rfc7011
Copyright 2016-2020 Dominik Pataky <dev@bitkeks.eu>
Licensed under MIT License. See LICENSE.
"""
import struct
# Source: https://www.iana.org/assignments/ipfix/ipfix-information-elements.csv
IPFIX_FIELD_TYPES = {
0: "Reserved", #
1: "octetDeltaCount", # unsigned64
2: "packetDeltaCount", # unsigned64
3: "deltaFlowCount", # unsigned64
4: "protocolIdentifier", # unsigned8
5: "ipClassOfService", # unsigned8
6: "tcpControlBits", # unsigned16
7: "sourceTransportPort", # unsigned16
8: "sourceIPv4Address", # ipv4Address
9: "sourceIPv4PrefixLength", # unsigned8
10: "ingressInterface", # unsigned32
11: "destinationTransportPort", # unsigned16
12: "destinationIPv4Address", # ipv4Address
13: "destinationIPv4PrefixLength", # unsigned8
14: "egressInterface", # unsigned32
15: "ipNextHopIPv4Address", # ipv4Address
16: "bgpSourceAsNumber", # unsigned32
17: "bgpDestinationAsNumber", # unsigned32
18: "bgpNextHopIPv4Address", # ipv4Address
19: "postMCastPacketDeltaCount", # unsigned64
20: "postMCastOctetDeltaCount", # unsigned64
21: "flowEndSysUpTime", # unsigned32
22: "flowStartSysUpTime", # unsigned32
23: "postOctetDeltaCount", # unsigned64
24: "postPacketDeltaCount", # unsigned64
25: "minimumIpTotalLength", # unsigned64
26: "maximumIpTotalLength", # unsigned64
27: "sourceIPv6Address", # ipv6Address
28: "destinationIPv6Address", # ipv6Address
29: "sourceIPv6PrefixLength", # unsigned8
30: "destinationIPv6PrefixLength", # unsigned8
31: "flowLabelIPv6", # unsigned32
32: "icmpTypeCodeIPv4", # unsigned16
33: "igmpType", # unsigned8
34: "samplingInterval", # unsigned32
35: "samplingAlgorithm", # unsigned8
36: "flowActiveTimeout", # unsigned16
37: "flowIdleTimeout", # unsigned16
38: "engineType", # unsigned8
39: "engineId", # unsigned8
40: "exportedOctetTotalCount", # unsigned64
41: "exportedMessageTotalCount", # unsigned64
42: "exportedFlowRecordTotalCount", # unsigned64
43: "ipv4RouterSc", # ipv4Address
44: "sourceIPv4Prefix", # ipv4Address
45: "destinationIPv4Prefix", # ipv4Address
46: "mplsTopLabelType", # unsigned8
47: "mplsTopLabelIPv4Address", # ipv4Address
48: "samplerId", # unsigned8
49: "samplerMode", # unsigned8
50: "samplerRandomInterval", # unsigned32
51: "classId", # unsigned8
52: "minimumTTL", # unsigned8
53: "maximumTTL", # unsigned8
54: "fragmentIdentification", # unsigned32
55: "postIpClassOfService", # unsigned8
56: "sourceMacAddress", # macAddress
57: "postDestinationMacAddress", # macAddress
58: "vlanId", # unsigned16
59: "postVlanId", # unsigned16
60: "ipVersion", # unsigned8
61: "flowDirection", # unsigned8
62: "ipNextHopIPv6Address", # ipv6Address
63: "bgpNextHopIPv6Address", # ipv6Address
64: "ipv6ExtensionHeaders", # unsigned32
70: "mplsTopLabelStackSection", # octetArray
71: "mplsLabelStackSection2", # octetArray
72: "mplsLabelStackSection3", # octetArray
73: "mplsLabelStackSection4", # octetArray
74: "mplsLabelStackSection5", # octetArray
75: "mplsLabelStackSection6", # octetArray
76: "mplsLabelStackSection7", # octetArray
77: "mplsLabelStackSection8", # octetArray
78: "mplsLabelStackSection9", # octetArray
79: "mplsLabelStackSection10", # octetArray
80: "destinationMacAddress", # macAddress
81: "postSourceMacAddress", # macAddress
82: "interfaceName", # string
83: "interfaceDescription", # string
84: "samplerName", # string
85: "octetTotalCount", # unsigned64
86: "packetTotalCount", # unsigned64
87: "flagsAndSamplerId", # unsigned32
88: "fragmentOffset", # unsigned16
89: "forwardingStatus", # unsigned8
90: "mplsVpnRouteDistinguisher", # octetArray
91: "mplsTopLabelPrefixLength", # unsigned8
92: "srcTrafficIndex", # unsigned32
93: "dstTrafficIndex", # unsigned32
94: "applicationDescription", # string
95: "applicationId", # octetArray
96: "applicationName", # string
97: "Assigned for NetFlow v9 compatibility", #
98: "postIpDiffServCodePoint", # unsigned8
99: "multicastReplicationFactor", # unsigned32
100: "className", # string
101: "classificationEngineId", # unsigned8
102: "layer2packetSectionOffset", # unsigned16
103: "layer2packetSectionSize", # unsigned16
104: "layer2packetSectionData", # octetArray
128: "bgpNextAdjacentAsNumber", # unsigned32
129: "bgpPrevAdjacentAsNumber", # unsigned32
130: "exporterIPv4Address", # ipv4Address
131: "exporterIPv6Address", # ipv6Address
132: "droppedOctetDeltaCount", # unsigned64
133: "droppedPacketDeltaCount", # unsigned64
134: "droppedOctetTotalCount", # unsigned64
135: "droppedPacketTotalCount", # unsigned64
136: "flowEndReason", # unsigned8
137: "commonPropertiesId", # unsigned64
138: "observationPointId", # unsigned64
139: "icmpTypeCodeIPv6", # unsigned16
140: "mplsTopLabelIPv6Address", # ipv6Address
141: "lineCardId", # unsigned32
142: "portId", # unsigned32
143: "meteringProcessId", # unsigned32
144: "exportingProcessId", # unsigned32
145: "templateId", # unsigned16
146: "wlanChannelId", # unsigned8
147: "wlanSSID", # string
148: "flowId", # unsigned64
149: "observationDomainId", # unsigned32
150: "flowStartSeconds", # dateTimeSeconds
151: "flowEndSeconds", # dateTimeSeconds
152: "flowStartMilliseconds", # dateTimeMilliseconds
153: "flowEndMilliseconds", # dateTimeMilliseconds
154: "flowStartMicroseconds", # dateTimeMicroseconds
155: "flowEndMicroseconds", # dateTimeMicroseconds
156: "flowStartNanoseconds", # dateTimeNanoseconds
157: "flowEndNanoseconds", # dateTimeNanoseconds
158: "flowStartDeltaMicroseconds", # unsigned32
159: "flowEndDeltaMicroseconds", # unsigned32
160: "systemInitTimeMilliseconds", # dateTimeMilliseconds
161: "flowDurationMilliseconds", # unsigned32
162: "flowDurationMicroseconds", # unsigned32
163: "observedFlowTotalCount", # unsigned64
164: "ignoredPacketTotalCount", # unsigned64
165: "ignoredOctetTotalCount", # unsigned64
166: "notSentFlowTotalCount", # unsigned64
167: "notSentPacketTotalCount", # unsigned64
168: "notSentOctetTotalCount", # unsigned64
169: "destinationIPv6Prefix", # ipv6Address
170: "sourceIPv6Prefix", # ipv6Address
171: "postOctetTotalCount", # unsigned64
172: "postPacketTotalCount", # unsigned64
173: "flowKeyIndicator", # unsigned64
174: "postMCastPacketTotalCount", # unsigned64
175: "postMCastOctetTotalCount", # unsigned64
176: "icmpTypeIPv4", # unsigned8
177: "icmpCodeIPv4", # unsigned8
178: "icmpTypeIPv6", # unsigned8
179: "icmpCodeIPv6", # unsigned8
180: "udpSourcePort", # unsigned16
181: "udpDestinationPort", # unsigned16
182: "tcpSourcePort", # unsigned16
183: "tcpDestinationPort", # unsigned16
184: "tcpSequenceNumber", # unsigned32
185: "tcpAcknowledgementNumber", # unsigned32
186: "tcpWindowSize", # unsigned16
187: "tcpUrgentPointer", # unsigned16
188: "tcpHeaderLength", # unsigned8
189: "ipHeaderLength", # unsigned8
190: "totalLengthIPv4", # unsigned16
191: "payloadLengthIPv6", # unsigned16
192: "ipTTL", # unsigned8
193: "nextHeaderIPv6", # unsigned8
194: "mplsPayloadLength", # unsigned32
195: "ipDiffServCodePoint", # unsigned8
196: "ipPrecedence", # unsigned8
197: "fragmentFlags", # unsigned8
198: "octetDeltaSumOfSquares", # unsigned64
199: "octetTotalSumOfSquares", # unsigned64
200: "mplsTopLabelTTL", # unsigned8
201: "mplsLabelStackLength", # unsigned32
202: "mplsLabelStackDepth", # unsigned32
203: "mplsTopLabelExp", # unsigned8
204: "ipPayloadLength", # unsigned32
205: "udpMessageLength", # unsigned16
206: "isMulticast", # unsigned8
207: "ipv4IHL", # unsigned8
208: "ipv4Options", # unsigned32
209: "tcpOptions", # unsigned64
210: "paddingOctets", # octetArray
211: "collectorIPv4Address", # ipv4Address
212: "collectorIPv6Address", # ipv6Address
213: "exportInterface", # unsigned32
214: "exportProtocolVersion", # unsigned8
215: "exportTransportProtocol", # unsigned8
216: "collectorTransportPort", # unsigned16
217: "exporterTransportPort", # unsigned16
218: "tcpSynTotalCount", # unsigned64
219: "tcpFinTotalCount", # unsigned64
220: "tcpRstTotalCount", # unsigned64
221: "tcpPshTotalCount", # unsigned64
222: "tcpAckTotalCount", # unsigned64
223: "tcpUrgTotalCount", # unsigned64
224: "ipTotalLength", # unsigned64
225: "postNATSourceIPv4Address", # ipv4Address
226: "postNATDestinationIPv4Address", # ipv4Address
227: "postNAPTSourceTransportPort", # unsigned16
228: "postNAPTDestinationTransportPort", # unsigned16
229: "natOriginatingAddressRealm", # unsigned8
230: "natEvent", # unsigned8
231: "initiatorOctets", # unsigned64
232: "responderOctets", # unsigned64
233: "firewallEvent", # unsigned8
234: "ingressVRFID", # unsigned32
235: "egressVRFID", # unsigned32
236: "VRFname", # string
237: "postMplsTopLabelExp", # unsigned8
238: "tcpWindowScale", # unsigned16
239: "biflowDirection", # unsigned8
240: "ethernetHeaderLength", # unsigned8
241: "ethernetPayloadLength", # unsigned16
242: "ethernetTotalLength", # unsigned16
243: "dot1qVlanId", # unsigned16
244: "dot1qPriority", # unsigned8
245: "dot1qCustomerVlanId", # unsigned16
246: "dot1qCustomerPriority", # unsigned8
247: "metroEvcId", # string
248: "metroEvcType", # unsigned8
249: "pseudoWireId", # unsigned32
250: "pseudoWireType", # unsigned16
251: "pseudoWireControlWord", # unsigned32
252: "ingressPhysicalInterface", # unsigned32
253: "egressPhysicalInterface", # unsigned32
254: "postDot1qVlanId", # unsigned16
255: "postDot1qCustomerVlanId", # unsigned16
256: "ethernetType", # unsigned16
257: "postIpPrecedence", # unsigned8
258: "collectionTimeMilliseconds", # dateTimeMilliseconds
259: "exportSctpStreamId", # unsigned16
260: "maxExportSeconds", # dateTimeSeconds
261: "maxFlowEndSeconds", # dateTimeSeconds
262: "messageMD5Checksum", # octetArray
263: "messageScope", # unsigned8
264: "minExportSeconds", # dateTimeSeconds
265: "minFlowStartSeconds", # dateTimeSeconds
266: "opaqueOctets", # octetArray
267: "sessionScope", # unsigned8
268: "maxFlowEndMicroseconds", # dateTimeMicroseconds
269: "maxFlowEndMilliseconds", # dateTimeMilliseconds
270: "maxFlowEndNanoseconds", # dateTimeNanoseconds
271: "minFlowStartMicroseconds", # dateTimeMicroseconds
272: "minFlowStartMilliseconds", # dateTimeMilliseconds
273: "minFlowStartNanoseconds", # dateTimeNanoseconds
274: "collectorCertificate", # octetArray
275: "exporterCertificate", # octetArray
276: "dataRecordsReliability", # boolean
277: "observationPointType", # unsigned8
278: "newConnectionDeltaCount", # unsigned32
279: "connectionSumDurationSeconds", # unsigned64
280: "connectionTransactionId", # unsigned64
281: "postNATSourceIPv6Address", # ipv6Address
282: "postNATDestinationIPv6Address", # ipv6Address
283: "natPoolId", # unsigned32
284: "natPoolName", # string
285: "anonymizationFlags", # unsigned16
286: "anonymizationTechnique", # unsigned16
287: "informationElementIndex", # unsigned16
288: "p2pTechnology", # string
289: "tunnelTechnology", # string
290: "encryptedTechnology", # string
291: "basicList", # basicList
292: "subTemplateList", # subTemplateList
293: "subTemplateMultiList", # subTemplateMultiList
294: "bgpValidityState", # unsigned8
295: "IPSecSPI", # unsigned32
296: "greKey", # unsigned32
297: "natType", # unsigned8
298: "initiatorPackets", # unsigned64
299: "responderPackets", # unsigned64
300: "observationDomainName", # string
301: "selectionSequenceId", # unsigned64
302: "selectorId", # unsigned64
303: "informationElementId", # unsigned16
304: "selectorAlgorithm", # unsigned16
305: "samplingPacketInterval", # unsigned32
306: "samplingPacketSpace", # unsigned32
307: "samplingTimeInterval", # unsigned32
308: "samplingTimeSpace", # unsigned32
309: "samplingSize", # unsigned32
310: "samplingPopulation", # unsigned32
311: "samplingProbability", # float64
312: "dataLinkFrameSize", # unsigned16
313: "ipHeaderPacketSection", # octetArray
314: "ipPayloadPacketSection", # octetArray
315: "dataLinkFrameSection", # octetArray
316: "mplsLabelStackSection", # octetArray
317: "mplsPayloadPacketSection", # octetArray
318: "selectorIdTotalPktsObserved", # unsigned64
319: "selectorIdTotalPktsSelected", # unsigned64
320: "absoluteError", # float64
321: "relativeError", # float64
322: "observationTimeSeconds", # dateTimeSeconds
323: "observationTimeMilliseconds", # dateTimeMilliseconds
324: "observationTimeMicroseconds", # dateTimeMicroseconds
325: "observationTimeNanoseconds", # dateTimeNanoseconds
326: "digestHashValue", # unsigned64
327: "hashIPPayloadOffset", # unsigned64
328: "hashIPPayloadSize", # unsigned64
329: "hashOutputRangeMin", # unsigned64
330: "hashOutputRangeMax", # unsigned64
331: "hashSelectedRangeMin", # unsigned64
332: "hashSelectedRangeMax", # unsigned64
333: "hashDigestOutput", # boolean
334: "hashInitialiserValue", # unsigned64
335: "selectorName", # string
336: "upperCILimit", # float64
337: "lowerCILimit", # float64
338: "confidenceLevel", # float64
339: "informationElementDataType", # unsigned8
340: "informationElementDescription", # string
341: "informationElementName", # string
342: "informationElementRangeBegin", # unsigned64
343: "informationElementRangeEnd", # unsigned64
344: "informationElementSemantics", # unsigned8
345: "informationElementUnits", # unsigned16
346: "privateEnterpriseNumber", # unsigned32
347: "virtualStationInterfaceId", # octetArray
348: "virtualStationInterfaceName", # string
349: "virtualStationUUID", # octetArray
350: "virtualStationName", # string
351: "layer2SegmentId", # unsigned64
352: "layer2OctetDeltaCount", # unsigned64
353: "layer2OctetTotalCount", # unsigned64
354: "ingressUnicastPacketTotalCount", # unsigned64
355: "ingressMulticastPacketTotalCount", # unsigned64
356: "ingressBroadcastPacketTotalCount", # unsigned64
357: "egressUnicastPacketTotalCount", # unsigned64
358: "egressBroadcastPacketTotalCount", # unsigned64
359: "monitoringIntervalStartMilliSeconds", # dateTimeMilliseconds
360: "monitoringIntervalEndMilliSeconds", # dateTimeMilliseconds
361: "portRangeStart", # unsigned16
362: "portRangeEnd", # unsigned16
363: "portRangeStepSize", # unsigned16
364: "portRangeNumPorts", # unsigned16
365: "staMacAddress", # macAddress
366: "staIPv4Address", # ipv4Address
367: "wtpMacAddress", # macAddress
368: "ingressInterfaceType", # unsigned32
369: "egressInterfaceType", # unsigned32
370: "rtpSequenceNumber", # unsigned16
371: "userName", # string
372: "applicationCategoryName", # string
373: "applicationSubCategoryName", # string
374: "applicationGroupName", # string
375: "originalFlowsPresent", # unsigned64
376: "originalFlowsInitiated", # unsigned64
377: "originalFlowsCompleted", # unsigned64
378: "distinctCountOfSourceIPAddress", # unsigned64
379: "distinctCountOfDestinationIPAddress", # unsigned64
380: "distinctCountOfSourceIPv4Address", # unsigned32
381: "distinctCountOfDestinationIPv4Address", # unsigned32
382: "distinctCountOfSourceIPv6Address", # unsigned64
383: "distinctCountOfDestinationIPv6Address", # unsigned64
384: "valueDistributionMethod", # unsigned8
385: "rfc3550JitterMilliseconds", # unsigned32
386: "rfc3550JitterMicroseconds", # unsigned32
387: "rfc3550JitterNanoseconds", # unsigned32
388: "dot1qDEI", # boolean
389: "dot1qCustomerDEI", # boolean
390: "flowSelectorAlgorithm", # unsigned16
391: "flowSelectedOctetDeltaCount", # unsigned64
392: "flowSelectedPacketDeltaCount", # unsigned64
393: "flowSelectedFlowDeltaCount", # unsigned64
394: "selectorIDTotalFlowsObserved", # unsigned64
395: "selectorIDTotalFlowsSelected", # unsigned64
396: "samplingFlowInterval", # unsigned64
397: "samplingFlowSpacing", # unsigned64
398: "flowSamplingTimeInterval", # unsigned64
399: "flowSamplingTimeSpacing", # unsigned64
400: "hashFlowDomain", # unsigned16
401: "transportOctetDeltaCount", # unsigned64
402: "transportPacketDeltaCount", # unsigned64
403: "originalExporterIPv4Address", # ipv4Address
404: "originalExporterIPv6Address", # ipv6Address
405: "originalObservationDomainId", # unsigned32
406: "intermediateProcessId", # unsigned32
407: "ignoredDataRecordTotalCount", # unsigned64
408: "dataLinkFrameType", # unsigned16
409: "sectionOffset", # unsigned16
410: "sectionExportedOctets", # unsigned16
411: "dot1qServiceInstanceTag", # octetArray
412: "dot1qServiceInstanceId", # unsigned32
413: "dot1qServiceInstancePriority", # unsigned8
414: "dot1qCustomerSourceMacAddress", # macAddress
415: "dot1qCustomerDestinationMacAddress", # macAddress
416: "", #
417: "postLayer2OctetDeltaCount", # unsigned64
418: "postMCastLayer2OctetDeltaCount", # unsigned64
419: "", #
420: "postLayer2OctetTotalCount", # unsigned64
421: "postMCastLayer2OctetTotalCount", # unsigned64
422: "minimumLayer2TotalLength", # unsigned64
423: "maximumLayer2TotalLength", # unsigned64
424: "droppedLayer2OctetDeltaCount", # unsigned64
425: "droppedLayer2OctetTotalCount", # unsigned64
426: "ignoredLayer2OctetTotalCount", # unsigned64
427: "notSentLayer2OctetTotalCount", # unsigned64
428: "layer2OctetDeltaSumOfSquares", # unsigned64
429: "layer2OctetTotalSumOfSquares", # unsigned64
430: "layer2FrameDeltaCount", # unsigned64
431: "layer2FrameTotalCount", # unsigned64
432: "pseudoWireDestinationIPv4Address", # ipv4Address
433: "ignoredLayer2FrameTotalCount", # unsigned64
434: "mibObjectValueInteger", # signed32
435: "mibObjectValueOctetString", # octetArray
436: "mibObjectValueOID", # octetArray
437: "mibObjectValueBits", # octetArray
438: "mibObjectValueIPAddress", # ipv4Address
439: "mibObjectValueCounter", # unsigned64
440: "mibObjectValueGauge", # unsigned32
441: "mibObjectValueTimeTicks", # unsigned32
442: "mibObjectValueUnsigned", # unsigned32
443: "mibObjectValueTable", # subTemplateList
444: "mibObjectValueRow", # subTemplateList
445: "mibObjectIdentifier", # octetArray
446: "mibSubIdentifier", # unsigned32
447: "mibIndexIndicator", # unsigned64
448: "mibCaptureTimeSemantics", # unsigned8
449: "mibContextEngineID", # octetArray
450: "mibContextName", # string
451: "mibObjectName", # string
452: "mibObjectDescription", # string
453: "mibObjectSyntax", # string
454: "mibModuleName", # string
455: "mobileIMSI", # string
456: "mobileMSISDN", # string
457: "httpStatusCode", # unsigned16
458: "sourceTransportPortsLimit", # unsigned16
459: "httpRequestMethod", # string
460: "httpRequestHost", # string
461: "httpRequestTarget", # string
462: "httpMessageVersion", # string
463: "natInstanceID", # unsigned32
464: "internalAddressRealm", # octetArray
465: "externalAddressRealm", # octetArray
466: "natQuotaExceededEvent", # unsigned32
467: "natThresholdEvent", # unsigned32
468: "httpUserAgent", # string
469: "httpContentType", # string
470: "httpReasonPhrase", # string
471: "maxSessionEntries", # unsigned32
472: "maxBIBEntries", # unsigned32
473: "maxEntriesPerUser", # unsigned32
474: "maxSubscribers", # unsigned32
475: "maxFragmentsPendingReassembly", # unsigned32
476: "addressPoolHighThreshold", # unsigned32
477: "addressPoolLowThreshold", # unsigned32
478: "addressPortMappingHighThreshold", # unsigned32
479: "addressPortMappingLowThreshold", # unsigned32
480: "addressPortMappingPerUserHighThreshold", # unsigned32
481: "globalAddressMappingHighThreshold", # unsigned32
482: "vpnIdentifier", # octetArray
483: "bgpCommunity", # unsigned32
484: "bgpSourceCommunityList", # basicList
485: "bgpDestinationCommunityList", # basicList
486: "bgpExtendedCommunity", # octetArray
487: "bgpSourceExtendedCommunityList", # basicList
488: "bgpDestinationExtendedCommunityList", # basicList
489: "bgpLargeCommunity", # octetArray
490: "bgpSourceLargeCommunityList", # basicList
491: "bgpDestinationLargeCommunityList", # basicList
}
class IPFIXMalformedRecord(Exception):
pass
class IPFIXRFCError(Exception):
pass
class IPFIXMalformedPacket(Exception):
pass
class IPFIXTemplateError(Exception):
pass
class IPFIXTemplateNotRecognized(KeyError):
pass
class IPFIXHeader:
"""The header of the IPFIX export packet
"""
size = 16
def __init__(self, data):
pack = struct.unpack('!HHIII', data)
self.version = pack[0]
self.length = pack[1]
self.export_uptime = pack[2]
self.sequence_number = pack[3]
self.obervation_domain_id = pack[4]
def to_dict(self):
return self.__dict__
class IPFIXTemplateRecord:
def __init__(self, data):
pack = struct.unpack("!HH", data[:4])
self.template_id = pack[0] # range 256 to 65535
self.field_count = pack[1] # Number of fields in this Template Record
offset = 4
self.fields, offset_add = parse_fields(data[offset:], self.field_count)
offset += offset_add
if len(self.fields) != self.field_count:
raise IPFIXMalformedRecord
# TODO: if padding is needed, implement here
self._length = offset
def get_length(self):
return self._length
def __repr__(self):
return "<IPFIXTemplateRecord with {} fields>".format(len(self.fields))
class IPFIXOptionsTemplateRecord:
def __init__(self, data):
pack = struct.unpack("!HHH", data[:6])
self.template_id = pack[0] # range 256 to 65535
self.field_count = pack[1] # includes count of scope fields
# A scope field count of N specifies that the first N Field Specifiers in
# the Template Record are Scope Fields. The Scope Field Count MUST NOT be zero.
self.scope_field_count = pack[2]
offset = 6
self.scope_fields, offset_add = parse_fields(data[offset:], self.scope_field_count)
if len(self.scope_fields) != self.scope_field_count:
raise IPFIXMalformedRecord
offset += offset_add
self.fields, offset_add = parse_fields(data[offset:], self.field_count - self.scope_field_count)
if len(self.fields) + len(self.scope_fields) != self.field_count:
raise IPFIXMalformedRecord
offset += offset_add
# TODO: if padding is needed, implement here
self._length = offset
def get_length(self):
return self._length
def __repr__(self):
return "<IPFIXOptionsTemplateRecord with {} scope fields and {} fields>".format(
len(self.scope_fields), len(self.fields)
)
class IPFIXDataRecord:
"""The IPFIX data record with fields and their value.
The field types are identified by the corresponding template.
In contrast to the NetFlow v9 implementation, this one does not use an extra class for the fields.
"""
def __init__(self, data, template):
self.fields = []
offset = 0
unpacker = "!"
# 16 bytes fields must be handled extra, since Python struct has no support for it
occurences_of_16_bytes = []
# Iterate through all fields of this template and build the unpack format string
# TODO: this does not handle signed/unsigned or other types
# See https://www.iana.org/assignments/ipfix/ipfix.xhtml
for index, (field_type, field_length) in enumerate(template):
if field_length == 1:
unpacker += "B"
elif field_length == 2:
unpacker += "H"
elif field_length == 4:
unpacker += "I"
elif field_length == 8:
unpacker += "Q"
elif field_length == 16:
# Special case with 16 bytes.
# Take first byte of 16 bytes as fake "unpack item" and fill packer with ignored bytes.
# The fake one byte will later be ignored and replaced.
unpacker += "B" + "x" * 15
# add index in unpack list and data byte offset to special list
occurences_of_16_bytes.append((index, offset))
else:
raise IPFIXTemplateError("Template field_length {} not handled in unpacker".format(field_length))
offset += field_length
# Iterate through list of 16 bytes fields and convert their bytes from data to 128 bit integer
extra_16_bytes_list = []
for index, offset_ in occurences_of_16_bytes:
# The usage of int.from_bytes is based on the assumption that 16 bytes fields are only used for IPv6
extra_16_bytes_list.append((index, int.from_bytes(data[offset_:offset_+16], "big"))) # network = big
# Finally, unpack the data byte stream according to format defined in iteration above
pack = struct.unpack(unpacker, data[0:offset]) # holds ALL fields, including fake 16 bytes byte fields
# Iterate through template again, but taking the unpacked values this time
for index, ((field_type, field_length), value) in enumerate(zip(template, pack)):
if extra_16_bytes_list and extra_16_bytes_list[0][0] == index and field_length == 16:
# Overwrite the fake 16 bytes value (created by unpack with 1 byte) with real full integer
self.fields.append((field_type, extra_16_bytes_list.pop(0)[1]))
continue
# If the field_length is not 16, continue normally
self.fields.append((field_type, value))
self._length = offset
def get_length(self):
return self._length
@property
def data(self):
return {
IPFIX_FIELD_TYPES.get(key, key): value for (key, value) in self.fields
}
def __repr__(self):
return "<IPFIXDataRecord with {} entries>".format(len(self.fields))
class IPFIXSet:
"""A set containing the set header and a collection of records (one of templates, options, data)
"""
def __init__(self, data, templates):
self.header = IPFIXSetHeader(data[0:IPFIXSetHeader.size])
self.records = []
offset = IPFIXSetHeader.size
if self.header.set_id == 2: # template set
while offset < self.header.length: # length of whole set
template_record = IPFIXTemplateRecord(data[offset:])
self.records.append(template_record)
templates[template_record.template_id] = template_record.fields
offset += template_record.get_length()
elif self.header.set_id == 3: # options template
while offset < self.header.length:
optionstemplate_record = IPFIXOptionsTemplateRecord(data[offset:])
self.records.append(optionstemplate_record)
templates[optionstemplate_record.template_id] = optionstemplate_record.scope_fields + \
optionstemplate_record.fields
offset += optionstemplate_record.get_length()
elif self.header.set_id >= 256: # data set, set_id is template id
while offset < self.header.length:
template = templates.get(self.header.set_id)
if not template:
raise IPFIXTemplateNotRecognized
data_record = IPFIXDataRecord(data[offset:], template)
self.records.append(data_record)
offset += data_record.get_length()
self._length = offset
def get_length(self):
return self._length
@property
def is_template(self):
return self.header.set_id in [2, 3]
@property
def is_data(self):
return self.header.set_id >= 256
def __repr__(self):
return "<IPFIXSet with set_id {} and {} records>".format(self.header.set_id, len(self.records))
class IPFIXSetHeader:
"""Header of a set (collection of records)
"""
size = 4
def __init__(self, data):
pack = struct.unpack("!HH", data)
# A value of 2 is reserved for Template Sets.
# A value of 3 is reserved for Options Template Sets. Values from 4
# to 255 are reserved for future use. Values 256 and above are used
# for Data Sets. The Set ID values of 0 and 1 are not used, for
# historical reasons [RFC3954].
self.set_id = pack[0]
if self.set_id in [0, 1] + [i for i in range(4, 256)]:
raise IPFIXRFCError("IPFIX set has forbidden ID {}".format(self.set_id))
self.length = pack[1] # Total length of the Set, in octets, including the Set Header
def to_dict(self):
return self.__dict__
def __repr__(self):
return "<IPFIXSetHeader with set_id {} and length {}>".format(self.set_id, self.length)
class IPFIXExportPacket:
"""IPFIX export packet with header, templates, options and data flowsets
"""
def __init__(self, data, templates):
self.header = IPFIXHeader(data[:IPFIXHeader.size])
self.sets = []
self._contains_new_templates = False
self._flows = []
offset = IPFIXHeader.size
while offset < self.header.length:
new_set = IPFIXSet(data[offset:], templates)
if new_set.is_template:
self._contains_new_templates = True
elif new_set.is_data:
self._flows += new_set.records
self.sets.append(new_set)
offset += new_set.get_length()
# Here all data should be processed and offset set to the length
if offset != self.header.length:
raise IPFIXMalformedPacket
@property
def contains_new_templates(self):
return self._contains_new_templates
@property
def flows(self):
return self._flows
def __repr__(self):
return "<IPFIXExportPacket with {} sets, exported at {}>".format(
len(self.sets), self.header.export_uptime
)
def parse_fields(data, count: int) -> (list, int):
offset = 0
fields = []
for ctr in range(count):
if data[offset] & 1 << 7 != 0: # enterprise flag set
pack = struct.unpack("!HHI", data[offset:offset + 8])
fields.append((
pack[0] & ~(1 << 7), # ID, clear enterprise flag bit
pack[1], # field length
pack[2] # enterprise number
))
offset += 8
else:
pack = struct.unpack("!HH", data[offset:offset + 4])
fields.append((
pack[0],
pack[1]
))
offset += 4
return fields, offset