Allow analyze_json.py to accept input via stdin

This will make testing much cleaner in the future (no temp files needed)

Also increase performance by memoizing the hostname lookup
This commit is contained in:
Carey Metcalfe 2019-10-05 03:06:06 -04:00
parent 11dc92269c
commit 8e6d0c54e8
2 changed files with 60 additions and 54 deletions

112
analyze_json.py Normal file → Executable file
View file

@ -8,43 +8,35 @@ Copyright 2017-2019 Dominik Pataky <dev@bitkeks.eu>
Licensed under MIT License. See LICENSE. Licensed under MIT License. See LICENSE.
""" """
import argparse
from collections import namedtuple
import contextlib
from datetime import datetime from datetime import datetime
import functools
import ipaddress import ipaddress
import json import json
import os.path
import sys
import socket import socket
from collections import namedtuple import sys
Pair = namedtuple('Pair', 'src dest')
def getIPs(flow): Pair = namedtuple('Pair', ['src', 'dest'])
use_ipv4 = False # optimistic default case of IPv6
if 'IP_PROTOCOL_VERSION' in flow and flow['IP_PROTOCOL_VERSION'] == 4:
use_ipv4 = True
elif 'IPV4_SRC_ADDR' in flow or 'IPV4_DST_ADDR' in flow:
use_ipv4 = True
if use_ipv4:
return Pair(
ipaddress.ip_address(flow['IPV4_SRC_ADDR']),
ipaddress.ip_address(flow['IPV4_DST_ADDR']))
# else: return IPv6 pair
return Pair(
ipaddress.ip_address(flow['IPV6_SRC_ADDR']),
ipaddress.ip_address(flow['IPV6_DST_ADDR']))
@functools.lru_cache(maxsize=0)
def resolve_hostname(ip):
return socket.getfqdn(ip)
class Connection: class Connection:
"""Connection model for two flows. """Connection model for two flows.
The direction of the data flow can be seen by looking at the size. The direction of the data flow can be seen by looking at the size.
'src' describes the peer which sends more data towards the other. This 'src' describes the peer which sends more data towards the other. This
does NOT have to mean, that 'src' was the initiator of the connection. does NOT have to mean that 'src' was the initiator of the connection.
""" """
def __init__(self, flow1, flow2): def __init__(self, flow1, flow2):
if not flow1 or not flow2:
raise Exception("A connection requires two flows")
if flow1['IN_BYTES'] >= flow2['IN_BYTES']: if flow1['IN_BYTES'] >= flow2['IN_BYTES']:
src = flow1 src = flow1
dest = flow2 dest = flow2
@ -52,7 +44,7 @@ class Connection:
src = flow2 src = flow2
dest = flow1 dest = flow1
ips = getIPs(src) ips = self.get_ips(src)
self.src = ips.src self.src = ips.src
self.dest = ips.dest self.dest = ips.dest
self.src_port = src['L4_SRC_PORT'] self.src_port = src['L4_SRC_PORT']
@ -63,12 +55,33 @@ class Connection:
self.duration = src['LAST_SWITCHED'] - src['FIRST_SWITCHED'] self.duration = src['LAST_SWITCHED'] - src['FIRST_SWITCHED']
if self.duration < 0: if self.duration < 0:
# 32 bit int has its limits. Handling overflow here # 32 bit int has its limits. Handling overflow here
# TODO: Should be handled in the collection phase
self.duration = (2**32 - src['FIRST_SWITCHED']) + src['LAST_SWITCHED'] self.duration = (2**32 - src['FIRST_SWITCHED']) + src['LAST_SWITCHED']
def __repr__(self): def __repr__(self):
return "<Connection from {} to {}, size {}>".format( return "<Connection from {} to {}, size {}>".format(
self.src, self.dest, self.human_size) self.src, self.dest, self.human_size)
@staticmethod
def get_ips(flow):
# TODO: These values should be parsed into strings in the collection phase.
# The floating point representation of an IPv6 address in JSON
# could lose precision.
# IPv4
if (flow.get('IP_PROTOCOL_VERSION') == 4 or 'IPV4_SRC_ADDR' in flow or
'IPV4_DST_ADDR' in flow):
return Pair(
ipaddress.ip_address(flow['IPV4_SRC_ADDR']),
ipaddress.ip_address(flow['IPV4_DST_ADDR'])
)
# IPv6
return Pair(
ipaddress.ip_address(flow['IPV6_SRC_ADDR']),
ipaddress.ip_address(flow['IPV6_DST_ADDR'])
)
@property @property
def human_size(self): def human_size(self):
# Calculate a human readable size of the traffic # Calculate a human readable size of the traffic
@ -96,47 +109,40 @@ class Connection:
@property @property
def hostnames(self): def hostnames(self):
# Resolve the IPs of this flows to their hostname # Resolve the IPs of this flows to their hostname
src_hostname = socket.getfqdn(self.src.compressed) src_hostname = resolve_hostname(self.src.compressed)
dest_hostname = socket.getfqdn(self.dest.compressed) dest_hostname = resolve_hostname(self.dest.compressed)
return Pair(src_hostname, dest_hostname) return Pair(src_hostname, dest_hostname)
@property @property
def service(self): def service(self):
# Resolve ports to their services, if known # Resolve ports to their services, if known
service = "unknown" # Try source port, fallback to dest port, otherwise "unknown"
try: with contextlib.suppress(OSError):
# Try service of sending peer first return socket.getservbyport(self.src_port)
service = socket.getservbyport(self.src_port) with contextlib.suppress(OSError):
except OSError: return socket.getservbyport(self.dest_port)
# Resolving the sport did not work, trying dport return "unknown"
try:
service = socket.getservbyport(self.dest_port)
except OSError:
pass
return service
# Handle CLI args and load the data dump if __name__ == "__main__":
if len(sys.argv) < 2: parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data")
exit("Use {} <filename>.json".format(sys.argv[0])) parser.add_argument('filename', nargs='?', type=argparse.FileType('r'),
filename = sys.argv[1] default=sys.stdin,
if not os.path.exists(filename): help="The file to analyze (defaults to stdin if not provided)")
exit("File {} does not exist!".format(filename)) args = parser.parse_args()
with open(filename, 'r') as fh:
data = json.loads(fh.read())
data = json.load(args.filename)
# Go through data and disect every flow saved inside the dump # Go through data and disect every flow saved inside the dump
for export in sorted(data): for key in sorted(data):
timestamp = datetime.fromtimestamp(float(export)).strftime("%Y-%m-%d %H:%M.%S") timestamp = datetime.fromtimestamp(float(key)).strftime("%Y-%m-%d %H:%M.%S")
flows = data[export] flows = data[key]
pending = None # Two flows normally appear together for duplex connection pending = None # Two flows normally appear together for duplex connection
for flow in flows: for flow in flows:
if not pending: if not pending:
pending = flow pending = flow
else: continue
con = Connection(pending, flow) con = Connection(pending, flow)
print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to"\ print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to"\
" {dest_host} ({dest})".format( " {dest_host} ({dest})".format(

View file

@ -127,7 +127,7 @@ class NetFlowListener(threading.Thread):
# data back into the queue and try to decode them again # data back into the queue and try to decode them again
if export.contains_new_templates and to_retry: if export.contains_new_templates and to_retry:
__log__.debug("Received new template(s)") __log__.debug("Received new template(s)")
__log__.debug("Will re-attempt to decode %d old v9 ExportPackets", __log__.debug("Will re-attempt to decode %d old ExportPackets",
len(to_retry)) len(to_retry))
for p in to_retry: for p in to_retry:
self.input.put(p) self.input.put(p)