#!/usr/bin/env python3 """ This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd. Copyright 2016-2020 Dominik Pataky Licensed under MIT License. See LICENSE. """ import io import linecache import cProfile import pathlib import pstats import resource import tracemalloc import unittest from pstats import SortKey from tests.lib import send_recv_packets, generate_packets NUM_PACKETS_PERFORMANCE = 5000 class TestNetflowIPFIXPerformance(unittest.TestCase): def setUp(self) -> None: """ Before each test run, start tracemalloc profiling. :return: """ tracemalloc.start() print("\n\n") def tearDown(self) -> None: """ After each test run, stop tracemalloc. :return: """ tracemalloc.stop() def _memory_of_version(self, version, store_packets=500) -> tracemalloc.Snapshot: """ Create memory snapshot of collector run with packets of version :version: :param version: :return: """ if not tracemalloc.is_tracing(): raise RuntimeError pkts, t1, t2 = send_recv_packets(generate_packets(NUM_PACKETS_PERFORMANCE, version), store_packets=store_packets) self.assertEqual(len(pkts), NUM_PACKETS_PERFORMANCE) snapshot = tracemalloc.take_snapshot() del pkts return snapshot @staticmethod def _print_memory_statistics(snapshot: tracemalloc.Snapshot, key: str, topx: int = 10): """ Print memory statistics from a tracemalloc.Snapshot in certain formats. :param snapshot: :param key: :param topx: :return: """ if key not in ["filename", "lineno", "traceback"]: raise KeyError stats = snapshot.statistics(key) if key == "lineno": print("\n## Detailed memory of traceback, based on lines ##") for idx, stat in enumerate(stats[:topx]): frame = stat.traceback[0] print("\n{idx:02d}: {filename}:{lineno} {size:.1f} KiB, count {count}".format( idx=idx+1, filename=frame.filename, lineno=frame.lineno, size=stat.size / 1024, count=stat.count )) lines = [] lines_whitespaces = [] for lineshift in range(-3, 2): stat = linecache.getline(frame.filename, frame.lineno + lineshift) lines_whitespaces.append(len(stat) - len(stat.lstrip(" "))) # count lines.append(stat.strip()) lines_whitespaces = [x - min([y for y in lines_whitespaces if y > 0]) for x in lines_whitespaces] for lidx, stat in enumerate(lines): print(" {}{}".format("> " if lidx == 3 else "| ", " " * lines_whitespaces.pop(0) + stat)) elif key == "filename": print("\n## Detailed memory by file ##") for idx, stat in enumerate(stats[:topx]): frame = stat.traceback[0] print("{idx:02d}: {filename:80s} {size:6.1f} KiB, count {count:5