diff --git a/analyzer.py b/analyzer.py index bde7bc4..77d789c 100755 --- a/analyzer.py +++ b/analyzer.py @@ -22,17 +22,28 @@ import socket import sys +IP_PROTOCOLS = { + 1: "ICMP", + 6: "TCP", + 17: "UDP", + 58: "ICMPv6" +} + Pair = namedtuple('Pair', ['src', 'dest']) logger = logging.getLogger(__name__) -def printv(message, *args, **kwargs): - if args.verbose == True: - print(message.format(*args, **kwargs)) +def printv(message, *args_, **kwargs): + if args.verbose: + print(message.format(*args_, **kwargs)) @functools.lru_cache(maxsize=None) -def resolve_hostname(ip): +def resolve_hostname(ip: str) -> str: + if args.no_dns: + # If no DNS resolution is requested, simply return the IP string + return ip + # else resolve the IP address to a hostname and return the hostname return socket.getfqdn(ip) @@ -43,6 +54,30 @@ def fallback(d, keys): raise KeyError(", ".join(keys)) +def human_size(size_bytes): + # Calculate a human readable size of the flow + if size_bytes < 1024: + return "%dB" % size_bytes + elif size_bytes / 1024. < 1024: + return "%.2fK" % (size_bytes / 1024.) + elif size_bytes / 1024.**2 < 1024: + return "%.2fM" % (size_bytes / 1024.**2) + else: + return "%.2fG" % (size_bytes / 1024.**3) + + +def human_duration(seconds): + # Calculate human readable duration times + if seconds < 60: + # seconds + return "%d sec" % seconds + if seconds / 60 > 60: + # hours + return "%d:%02d.%02d hours" % (seconds / 60**2, seconds % 60**2 / 60, seconds % 60) + # minutes + return "%02d:%02d min" % (seconds / 60, seconds % 60) + + class Connection: """Connection model for two flows. The direction of the data flow can be seen by looking at the size. @@ -113,27 +148,12 @@ class Connection: @property def human_size(self): - # Calculate a human readable size of the traffic - if self.size < 1024: - return "%dB" % self.size - elif self.size / 1024. < 1024: - return "%.2fK" % (self.size / 1024.) - elif self.size / 1024.**2 < 1024: - return "%.2fM" % (self.size / 1024.**2) - else: - return "%.2fG" % (self.size / 1024.**3) + return human_size(self.size) @property def human_duration(self): duration = self.duration // 1000 # uptime in milliseconds, floor it - if duration < 60: - # seconds - return "%d sec" % duration - if duration / 60 > 60: - # hours - return "%d:%02d.%02d hours" % (duration / 60**2, duration % 60**2 / 60, duration % 60) - # minutes - return "%02d:%02d min" % (duration / 60, duration % 60) + return human_duration(duration) @property def hostnames(self): @@ -146,8 +166,6 @@ class Connection: def service(self): # Resolve ports to their services, if known default = "({} {})".format(self.src_port, self.dest_port) - if self.src_port > 10000: - return default with contextlib.suppress(OSError): return socket.getservbyport(self.src_port) with contextlib.suppress(OSError): @@ -161,14 +179,24 @@ class Connection: if __name__ == "__main__": parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data") - parser.add_argument('-f', '--file', dest='file', type=str, default=sys.stdin, + parser.add_argument("-f", "--file", dest="file", type=str, default=sys.stdin, help="The file to analyze (defaults to stdin if not provided)") - parser.add_argument('-p', '--packets', dest='packets_threshold', type=int, default=10, + parser.add_argument("-p", "--packets", dest="packets_threshold", type=int, default=10, help="Number of packets representing the lower bound in connections to be processed") - parser.add_argument('-v', '--verbose', dest="verbose", action="store_true", + parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", help="Enable verbose output.") + parser.add_argument("--match-host", dest="match_host", type=str, default=None, + help="Filter output by matching on the given host (matches source or destination)") + parser.add_argument("-n", "--no-dns", dest="no_dns", action="store_true", help="Disable DNS resolving of IP addresses") args = parser.parse_args() + # Sanity check for IP address + if args.match_host: + try: + match_host = ipaddress.ip_address(args.match_host) + except ValueError as ex: + exit("IP address '{}' is neither IPv4 nor IPv6".format(args.match_host)) + # Using a file and using stdin differ in their further usage for gzip.open file = args.file mode = "rb" # reading files @@ -219,17 +247,23 @@ if __name__ == "__main__": # TODO: handle fitting, yet mismatching (here: 1 second) pairs pass - if first_switched not in pending: - pending[first_switched] = {} - # Find the peer for this connection - if flow["IP_PROTOCOL_VERSION"] == 4: + if "IPV4_SRC_ADDR" in flow or flow.get("IP_PROTOCOL_VERSION") == 4: local_peer = flow["IPV4_SRC_ADDR"] remote_peer = flow["IPV4_DST_ADDR"] else: local_peer = flow["IPV6_SRC_ADDR"] remote_peer = flow["IPV6_DST_ADDR"] + # Match on host filter passed in as argument + if args.match_host and not any([local_peer == args.match_host, remote_peer == args.match_host]): + # If a match_host is given but neither local_peer nor remote_peer match + continue + + if first_switched not in pending: + pending[first_switched] = {} + + # Match peers if remote_peer in pending[first_switched]: # The destination peer put itself into the pending dict, getting and removing entry peer_flow = pending[first_switched].pop(remote_peer) @@ -250,7 +284,8 @@ if __name__ == "__main__": print("-" * 100) first_line = False - print("{timestamp} | {service:<14} | {size:8} | {duration:9} | {packets:7} | Between {src_host} ({src}) and {dest_host} ({dest})" \ + print("{timestamp} | {service:<14} | {size:8} | {duration:9} | {packets:7} | " + "Between {src_host} ({src}) and {dest_host} ({dest})" \ .format(timestamp=timestamp, service=con.service.upper(), src_host=con.hostnames.src, src=con.src, dest_host=con.hostnames.dest, dest=con.dest, size=con.human_size, duration=con.human_duration, packets=con.total_packets)) @@ -263,7 +298,7 @@ if __name__ == "__main__": exit(0) if len(pending) > 0: - print(f"There are {len(pending)} first_switched entries left in the pending dict!") + print(f"\nThere are {len(pending)} first_switched entries left in the pending dict!") all_noise = True for first_switched, flows in sorted(pending.items(), key=lambda x: x[0]): for peer, flow in flows.items(): @@ -272,10 +307,21 @@ if __name__ == "__main__": continue all_noise = False - if flow["IP_PROTOCOL_VERSION"] == 4: - print(first_switched, peer, flow["IPV4_DST_ADDR"], flow["IN_PKTS"]) - else: - print(first_switched, peer, flow["IPV6_DST_ADDR"], flow["IN_PKTS"]) + src = flow.get("IPV4_SRC_ADDR") or flow.get("IPV6_SRC_ADDR") + src_host = resolve_hostname(src) + src_text = f"{src}" if src == src_host else f"{src_host} ({src})" + dst = flow.get("IPV4_DST_ADDR") or flow.get("IPV6_DST_ADDR") + dst_host = resolve_hostname(dst) + dst_text = f"{dst}" if dst == dst_host else f"{dst_host} ({dst})" + proto = flow["PROTOCOL"] + size = flow["IN_BYTES"] + packets = flow["IN_PKTS"] + src_port = flow.get("L4_SRC_PORT", 0) + dst_port = flow.get("L4_DST_PORT", 0) + + print(f"From {src_text}:{src_port} to {dst_text}:{dst_port} with " + f"proto {IP_PROTOCOLS.get(proto, 'UNKNOWN')} and size {human_size(size)}" + f" ({packets} packets)") if all_noise: print("They were all noise!")