Add JSON export and analyzing example script
This commit is contained in:
parent
92d8e724bf
commit
898d220a91
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -4,3 +4,5 @@ dist*
|
||||||
.*python_netflow_v9_softflowd.egg-info/
|
.*python_netflow_v9_softflowd.egg-info/
|
||||||
*.swp
|
*.swp
|
||||||
*.swo
|
*.swo
|
||||||
|
__pycache__
|
||||||
|
*.json
|
||||||
|
|
36
analyze_json.py
Normal file
36
analyze_json.py
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# Example analyzing script for saved exports
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
import ipaddress
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
exit("Use {} <filename>.json".format(sys.argv[0]))
|
||||||
|
|
||||||
|
filename = sys.argv[1]
|
||||||
|
|
||||||
|
with open(filename, 'r') as fh:
|
||||||
|
data = json.loads(fh.read())
|
||||||
|
|
||||||
|
for export in sorted(data):
|
||||||
|
timestamp = datetime.fromtimestamp(float(export))
|
||||||
|
print("\n{}".format(timestamp))
|
||||||
|
|
||||||
|
flows = data[export]
|
||||||
|
for flow in flows:
|
||||||
|
count_bytes = flow['IN_BYTES']
|
||||||
|
count_packets = flow['IN_PKTS']
|
||||||
|
|
||||||
|
if flow['IP_PROTOCOL_VERSION'] == 4:
|
||||||
|
src = ipaddress.ip_address(flow['IPV4_SRC_ADDR'])
|
||||||
|
dest = ipaddress.ip_address(flow['IPV4_DST_ADDR'])
|
||||||
|
|
||||||
|
elif flow['IP_PROTOCOL_VERSION'] == 6:
|
||||||
|
src = ipaddress.ip_address(flow['IPV6_SRC_ADDR'])
|
||||||
|
dest = ipaddress.ip_address(flow['IPV6_DST_ADDR'])
|
||||||
|
|
||||||
|
print("Flow from {src} to {dest} with {packets} packets, size {size}".
|
||||||
|
format(src=src, dest=dest, packets=count_packets, size=count_bytes))
|
55
main.py
55
main.py
|
@ -3,12 +3,10 @@ import logging
|
||||||
import argparse
|
import argparse
|
||||||
import sys
|
import sys
|
||||||
import socketserver
|
import socketserver
|
||||||
|
import time
|
||||||
|
import json
|
||||||
|
import os.path
|
||||||
|
|
||||||
try:
|
|
||||||
from netflow.collector_v9 import ExportPacket
|
|
||||||
except ImportError:
|
|
||||||
print("Netflow v9 not installed as package! Running from directory.")
|
|
||||||
from src.netflow.collector_v9 import ExportPacket
|
|
||||||
|
|
||||||
logging.getLogger().setLevel(logging.INFO)
|
logging.getLogger().setLevel(logging.INFO)
|
||||||
ch = logging.StreamHandler(sys.stdout)
|
ch = logging.StreamHandler(sys.stdout)
|
||||||
|
@ -17,13 +15,22 @@ formatter = logging.Formatter('%(message)s')
|
||||||
ch.setFormatter(formatter)
|
ch.setFormatter(formatter)
|
||||||
logging.getLogger().addHandler(ch)
|
logging.getLogger().addHandler(ch)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from netflow.collector_v9 import ExportPacket
|
||||||
|
except ImportError:
|
||||||
|
logging.warn("Netflow v9 not installed as package! Running from directory.")
|
||||||
|
from src.netflow.collector_v9 import ExportPacket
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description='A sample netflow collector.')
|
parser = argparse.ArgumentParser(description='A sample netflow collector.')
|
||||||
|
parser.add_argument('--host', type=str, default='',
|
||||||
parser.add_argument('-chost', type=str, default='',
|
|
||||||
help='collector listening address')
|
help='collector listening address')
|
||||||
parser.add_argument('-cport', type=int, default=2055,
|
parser.add_argument('--port', '-p', type=int, default=2055,
|
||||||
help='collector listener port')
|
help='collector listener port')
|
||||||
|
parser.add_argument('--file', '-o', type=str, dest='output_file',
|
||||||
|
default="{}.json".format(int(time.time())),
|
||||||
|
help='collector export JSON file')
|
||||||
|
parser.add_argument('--debug', '-D', action='store_true',
|
||||||
|
help='Enable debug output')
|
||||||
|
|
||||||
class SoftflowUDPHandler(socketserver.BaseRequestHandler):
|
class SoftflowUDPHandler(socketserver.BaseRequestHandler):
|
||||||
# We need to save the templates our NetFlow device
|
# We need to save the templates our NetFlow device
|
||||||
|
@ -37,20 +44,42 @@ class SoftflowUDPHandler(socketserver.BaseRequestHandler):
|
||||||
server = socketserver.UDPServer((host, port), cls)
|
server = socketserver.UDPServer((host, port), cls)
|
||||||
return server
|
return server
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def set_output_file(cls, path):
|
||||||
|
cls.output_file = path
|
||||||
|
|
||||||
def handle(self):
|
def handle(self):
|
||||||
|
if not os.path.exists(self.output_file):
|
||||||
|
with open(self.output_file, 'w') as fh:
|
||||||
|
fh.write(json.dumps({}))
|
||||||
|
|
||||||
|
with open(self.output_file, 'r') as fh:
|
||||||
|
existing_data = json.loads(fh.read())
|
||||||
|
|
||||||
data = self.request[0]
|
data = self.request[0]
|
||||||
host = self.client_address[0]
|
host = self.client_address[0]
|
||||||
s = "Received data from {}, length {}".format(host, len(data))
|
s = "Received data from {}, length {}".format(host, len(data))
|
||||||
logging.info(s)
|
logging.debug(s)
|
||||||
export = ExportPacket(data, self.TEMPLATES)
|
export = ExportPacket(data, self.TEMPLATES)
|
||||||
self.TEMPLATES.update(export.templates)
|
self.TEMPLATES.update(export.templates)
|
||||||
s = "Processed ExportPacket with {} flows.".format(export.header.count)
|
s = "Processed ExportPacket with {} flows.".format(export.header.count)
|
||||||
logging.info(s)
|
logging.debug(s)
|
||||||
return export
|
|
||||||
|
# Append new flows
|
||||||
|
existing_data[time.time()] = [flow.data for flow in export.flows]
|
||||||
|
|
||||||
|
with open(self.output_file, 'w') as fh:
|
||||||
|
fh.write(json.dumps(existing_data))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
server = SoftflowUDPHandler.get_server(args.chost, args.cport)
|
SoftflowUDPHandler.set_output_file(args.output_file)
|
||||||
|
server = SoftflowUDPHandler.get_server(args.host, args.port)
|
||||||
|
|
||||||
|
if args.debug:
|
||||||
|
logging.getLogger().setLevel(logging.DEBUG)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
logging.debug("Starting the NetFlow listener")
|
logging.debug("Starting the NetFlow listener")
|
||||||
|
|
|
@ -270,24 +270,3 @@ class ExportPacket:
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<ExportPacket version {} counting {} records>".format(
|
return "<ExportPacket version {} counting {} records>".format(
|
||||||
self.header.version, self.header.count)
|
self.header.version, self.header.count)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
# We need to save the templates our NetFlow device send over time. Templates
|
|
||||||
# are not resended every time a flow is sent to the collector.
|
|
||||||
_templates = {}
|
|
||||||
HOST = sys.argv[1]
|
|
||||||
PORT = int(sys.argv[2])
|
|
||||||
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
sock.bind((HOST, PORT))
|
|
||||||
print("Listening on interface {}:{}".format(HOST, PORT))
|
|
||||||
|
|
||||||
while 1:
|
|
||||||
(data, sender) = sock.recvfrom(8192)
|
|
||||||
print("Received data from {}, length {}".format(sender, len(data)))
|
|
||||||
|
|
||||||
export = ExportPacket(data, _templates)
|
|
||||||
_templates.update(export.templates)
|
|
||||||
|
|
||||||
print("Processed ExportPacket with {} flows.".format(export.header.count))
|
|
||||||
|
|
Loading…
Reference in a new issue