Extend analyzer with --no-dns and --match-host; fixes

This commit extends the analyzer script with two new flags:
* Adding --no-dns disables hostname DNS resolution, improving speed
* Adding --match-host <IP address> filters all flows not matching the IP

Additional small things were changed, the script is still work in
progress. Especially the "pairing" of two flows will be removed in
future versions.
This commit is contained in:
Dominik Pataky 2020-03-19 18:07:32 +01:00
parent 4639601798
commit f8c5717002

View file

@ -22,17 +22,28 @@ import socket
import sys
IP_PROTOCOLS = {
1: "ICMP",
6: "TCP",
17: "UDP",
58: "ICMPv6"
}
Pair = namedtuple('Pair', ['src', 'dest'])
logger = logging.getLogger(__name__)
def printv(message, *args, **kwargs):
if args.verbose == True:
print(message.format(*args, **kwargs))
def printv(message, *args_, **kwargs):
if args.verbose:
print(message.format(*args_, **kwargs))
@functools.lru_cache(maxsize=None)
def resolve_hostname(ip):
def resolve_hostname(ip: str) -> str:
if args.no_dns:
# If no DNS resolution is requested, simply return the IP string
return ip
# else resolve the IP address to a hostname and return the hostname
return socket.getfqdn(ip)
@ -43,6 +54,30 @@ def fallback(d, keys):
raise KeyError(", ".join(keys))
def human_size(size_bytes):
# Calculate a human readable size of the flow
if size_bytes < 1024:
return "%dB" % size_bytes
elif size_bytes / 1024. < 1024:
return "%.2fK" % (size_bytes / 1024.)
elif size_bytes / 1024.**2 < 1024:
return "%.2fM" % (size_bytes / 1024.**2)
else:
return "%.2fG" % (size_bytes / 1024.**3)
def human_duration(seconds):
# Calculate human readable duration times
if seconds < 60:
# seconds
return "%d sec" % seconds
if seconds / 60 > 60:
# hours
return "%d:%02d.%02d hours" % (seconds / 60**2, seconds % 60**2 / 60, seconds % 60)
# minutes
return "%02d:%02d min" % (seconds / 60, seconds % 60)
class Connection:
"""Connection model for two flows.
The direction of the data flow can be seen by looking at the size.
@ -113,27 +148,12 @@ class Connection:
@property
def human_size(self):
# Calculate a human readable size of the traffic
if self.size < 1024:
return "%dB" % self.size
elif self.size / 1024. < 1024:
return "%.2fK" % (self.size / 1024.)
elif self.size / 1024.**2 < 1024:
return "%.2fM" % (self.size / 1024.**2)
else:
return "%.2fG" % (self.size / 1024.**3)
return human_size(self.size)
@property
def human_duration(self):
duration = self.duration // 1000 # uptime in milliseconds, floor it
if duration < 60:
# seconds
return "%d sec" % duration
if duration / 60 > 60:
# hours
return "%d:%02d.%02d hours" % (duration / 60**2, duration % 60**2 / 60, duration % 60)
# minutes
return "%02d:%02d min" % (duration / 60, duration % 60)
return human_duration(duration)
@property
def hostnames(self):
@ -146,8 +166,6 @@ class Connection:
def service(self):
# Resolve ports to their services, if known
default = "({} {})".format(self.src_port, self.dest_port)
if self.src_port > 10000:
return default
with contextlib.suppress(OSError):
return socket.getservbyport(self.src_port)
with contextlib.suppress(OSError):
@ -161,14 +179,24 @@ class Connection:
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data")
parser.add_argument('-f', '--file', dest='file', type=str, default=sys.stdin,
parser.add_argument("-f", "--file", dest="file", type=str, default=sys.stdin,
help="The file to analyze (defaults to stdin if not provided)")
parser.add_argument('-p', '--packets', dest='packets_threshold', type=int, default=10,
parser.add_argument("-p", "--packets", dest="packets_threshold", type=int, default=10,
help="Number of packets representing the lower bound in connections to be processed")
parser.add_argument('-v', '--verbose', dest="verbose", action="store_true",
parser.add_argument("-v", "--verbose", dest="verbose", action="store_true",
help="Enable verbose output.")
parser.add_argument("--match-host", dest="match_host", type=str, default=None,
help="Filter output by matching on the given host (matches source or destination)")
parser.add_argument("-n", "--no-dns", dest="no_dns", action="store_true", help="Disable DNS resolving of IP addresses")
args = parser.parse_args()
# Sanity check for IP address
if args.match_host:
try:
match_host = ipaddress.ip_address(args.match_host)
except ValueError as ex:
exit("IP address '{}' is neither IPv4 nor IPv6".format(args.match_host))
# Using a file and using stdin differ in their further usage for gzip.open
file = args.file
mode = "rb" # reading files
@ -219,17 +247,23 @@ if __name__ == "__main__":
# TODO: handle fitting, yet mismatching (here: 1 second) pairs
pass
if first_switched not in pending:
pending[first_switched] = {}
# Find the peer for this connection
if flow["IP_PROTOCOL_VERSION"] == 4:
if "IPV4_SRC_ADDR" in flow or flow.get("IP_PROTOCOL_VERSION") == 4:
local_peer = flow["IPV4_SRC_ADDR"]
remote_peer = flow["IPV4_DST_ADDR"]
else:
local_peer = flow["IPV6_SRC_ADDR"]
remote_peer = flow["IPV6_DST_ADDR"]
# Match on host filter passed in as argument
if args.match_host and not any([local_peer == args.match_host, remote_peer == args.match_host]):
# If a match_host is given but neither local_peer nor remote_peer match
continue
if first_switched not in pending:
pending[first_switched] = {}
# Match peers
if remote_peer in pending[first_switched]:
# The destination peer put itself into the pending dict, getting and removing entry
peer_flow = pending[first_switched].pop(remote_peer)
@ -250,7 +284,8 @@ if __name__ == "__main__":
print("-" * 100)
first_line = False
print("{timestamp} | {service:<14} | {size:8} | {duration:9} | {packets:7} | Between {src_host} ({src}) and {dest_host} ({dest})" \
print("{timestamp} | {service:<14} | {size:8} | {duration:9} | {packets:7} | "
"Between {src_host} ({src}) and {dest_host} ({dest})" \
.format(timestamp=timestamp, service=con.service.upper(), src_host=con.hostnames.src, src=con.src,
dest_host=con.hostnames.dest, dest=con.dest, size=con.human_size, duration=con.human_duration,
packets=con.total_packets))
@ -263,7 +298,7 @@ if __name__ == "__main__":
exit(0)
if len(pending) > 0:
print(f"There are {len(pending)} first_switched entries left in the pending dict!")
print(f"\nThere are {len(pending)} first_switched entries left in the pending dict!")
all_noise = True
for first_switched, flows in sorted(pending.items(), key=lambda x: x[0]):
for peer, flow in flows.items():
@ -272,10 +307,21 @@ if __name__ == "__main__":
continue
all_noise = False
if flow["IP_PROTOCOL_VERSION"] == 4:
print(first_switched, peer, flow["IPV4_DST_ADDR"], flow["IN_PKTS"])
else:
print(first_switched, peer, flow["IPV6_DST_ADDR"], flow["IN_PKTS"])
src = flow.get("IPV4_SRC_ADDR") or flow.get("IPV6_SRC_ADDR")
src_host = resolve_hostname(src)
src_text = f"{src}" if src == src_host else f"{src_host} ({src})"
dst = flow.get("IPV4_DST_ADDR") or flow.get("IPV6_DST_ADDR")
dst_host = resolve_hostname(dst)
dst_text = f"{dst}" if dst == dst_host else f"{dst_host} ({dst})"
proto = flow["PROTOCOL"]
size = flow["IN_BYTES"]
packets = flow["IN_PKTS"]
src_port = flow.get("L4_SRC_PORT", 0)
dst_port = flow.get("L4_DST_PORT", 0)
print(f"From {src_text}:{src_port} to {dst_text}:{dst_port} with "
f"proto {IP_PROTOCOLS.get(proto, 'UNKNOWN')} and size {human_size(size)}"
f" ({packets} packets)")
if all_noise:
print("They were all noise!")