diff --git a/analyze_json.py b/analyze_json.py index 5e532e2..ee979b9 100755 --- a/analyze_json.py +++ b/analyze_json.py @@ -22,7 +22,7 @@ import sys Pair = namedtuple('Pair', ['src', 'dest']) -@functools.lru_cache(maxsize=0) +@functools.lru_cache(maxsize=128) def resolve_hostname(ip): return socket.getfqdn(ip) @@ -48,6 +48,7 @@ class Connection: raise Exception("A connection requires two flows") # Assume the size that sent the most data is the source + # TODO: this might not always be right, maybe use earlier timestamp? size1 = fallback(flow1, ['IN_BYTES', 'IN_OCTETS']) size2 = fallback(flow2, ['IN_BYTES', 'IN_OCTETS']) if size1 >= size2: @@ -61,7 +62,7 @@ class Connection: self.src = ips.src self.dest = ips.dest self.src_port = fallback(src, ['L4_SRC_PORT', 'SRC_PORT']) - self.dest_port = fallback(src, ['L4_DST_PORT', 'DST_PORT']) + self.dest_port = fallback(dest, ['L4_DST_PORT', 'DST_PORT']) self.size = fallback(src, ['IN_BYTES', 'IN_OCTETS']) # Duration is given in milliseconds @@ -82,8 +83,9 @@ class Connection: # could lose precision. # IPv4 - if (flow.get('IP_PROTOCOL_VERSION') == 4 or 'IPV4_SRC_ADDR' in flow or - 'IPV4_DST_ADDR' in flow): + if flow.get('IP_PROTOCOL_VERSION') == 4 \ + or 'IPV4_SRC_ADDR' in flow \ + or 'IPV4_DST_ADDR' in flow: return Pair( ipaddress.ip_address(flow['IPV4_SRC_ADDR']), ipaddress.ip_address(flow['IPV4_DST_ADDR']) @@ -157,10 +159,7 @@ if __name__ == "__main__": pending = flow continue con = Connection(pending, flow) - print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to"\ - " {dest_host} ({dest})".format( - timestamp=timestamp, service=con.service.upper(), - src_host=con.hostnames.src, src=con.src, - dest_host=con.hostnames.dest, dest=con.dest, - size=con.human_size, duration=con.human_duration)) + print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to {dest_host} ({dest})" \ + .format(timestamp=timestamp, service=con.service.upper(), src_host=con.hostnames.src, src=con.src, + dest_host=con.hostnames.dest, dest=con.dest, size=con.human_size, duration=con.human_duration)) pending = None diff --git a/main.py b/main.py index 5b7c15c..9680abd 100755 --- a/main.py +++ b/main.py @@ -34,7 +34,7 @@ class QueuingRequestHandler(socketserver.BaseRequestHandler): data = self.request[0] self.server.queue.put(RawPacket(time.time(), data)) __log__.debug( - "Recieved %d bytes of data from %s", len(data), self.client_address[0] + "Received %d bytes of data from %s", len(data), self.client_address[0] ) @@ -191,6 +191,7 @@ if __name__ == "__main__": pass if data: + # TODO: this should be done periodically to not lose any data (only saved in memory) __log__.info("Outputting collected data to '%s'", args.output_file) with open(args.output_file, 'w') as f: json.dump(data, f) diff --git a/netflow/__init__.py b/netflow/__init__.py index e6bf6ed..9078d70 100644 --- a/netflow/__init__.py +++ b/netflow/__init__.py @@ -8,6 +8,7 @@ from netflow.v9 import V9ExportPacket, TemplateNotRecognized __all__ = ["TemplateNotRecognized", "UnknownNetFlowVersion", "parse_packet"] + class UnknownNetFlowVersion(Exception): def __init__(self, data, version): self.data = data diff --git a/setup.py b/setup.py index 2f77a8c..a57aab7 100644 --- a/setup.py +++ b/setup.py @@ -1,11 +1,10 @@ #!/usr/bin/env python3 from setuptools import setup -import os setup(name='netflow', - version='0.7.0', - description='NetFlow v1, v5, and v9 parser and collector implemented in Python 3. Developed to be used with softflowd v0.9.9', + version='0.8.0', + description='NetFlow v1, v5, and v9 collector, parser and analyzer implemented in Python 3.', author='Dominik Pataky', author_email='dev@bitkeks.eu', packages=["netflow"], diff --git a/tests.py b/tests.py index 0dcc9a6..23372a4 100755 --- a/tests.py +++ b/tests.py @@ -22,6 +22,8 @@ import unittest from main import NetFlowListener +# TODO: add tests for v1 and v5 +# TODO: tests with 500 packets fail? # The flowset with 2 templates and 8 flows TEMPLATE_PACKET = '0009000a000000035c9f55980000000100000000000000400400000e00080004000c000400150004001600040001000400020004000a0004000e000400070002000b00020004000100060001003c000100050001000000400800000e001b0010001c001000150004001600040001000400020004000a0004000e000400070002000b00020004000100060001003c000100050001040001447f0000017f000001fb3c1aaafb3c18fd000190100000004b00000000000000000050942c061b04007f0000017f000001fb3c1aaafb3c18fd00000f94000000360000000000000000942c0050061f04007f0000017f000001fb3c1cfcfb3c1a9b0000d3fc0000002a000000000000000000509434061b04007f0000017f000001fb3c1cfcfb3c1a9b00000a490000001e000000000000000094340050061f04007f0000017f000001fb3bb82cfb3ba48b000002960000000300000000000000000050942a061904007f0000017f000001fb3bb82cfb3ba48b00000068000000020000000000000000942a0050061104007f0000017f000001fb3c1900fb3c18fe0000004c0000000100000000000000000035b3c9110004007f0000017f000001fb3c1900fb3c18fe0000003c000000010000000000000000b3c9003511000400' @@ -36,7 +38,7 @@ PACKETS = [ INVALID_PACKET = "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF" CONNECTION = ('127.0.0.1', 1337) -NUM_PACKETS = 500 +NUM_PACKETS = 50 def emit_packets(packets, delay=0):