From 4a3a693f46f9939736b0c5fc32c0cf2372863c59 Mon Sep 17 00:00:00 2001 From: Ad Schellevis Date: Fri, 1 Apr 2016 16:37:00 +0200 Subject: [PATCH] (netflow) add rc script flowd_aggregate, finish initial flowd aggregator version --- src/etc/rc.d/flowd_aggregate | 55 +++++++ .../scripts/netflow/flowd_aggregate.py | 136 +++++++++++++++--- src/opnsense/scripts/netflow/lib/aggregate.py | 14 ++ 3 files changed, 183 insertions(+), 22 deletions(-) create mode 100755 src/etc/rc.d/flowd_aggregate diff --git a/src/etc/rc.d/flowd_aggregate b/src/etc/rc.d/flowd_aggregate new file mode 100755 index 000000000..0d7587d52 --- /dev/null +++ b/src/etc/rc.d/flowd_aggregate @@ -0,0 +1,55 @@ +#!/bin/sh +# +# $FreeBSD$ +# +# PROVIDE: flowd_aggregate +# REQUIRE: SERVERS +# KEYWORD: shutdown +# + +. /etc/rc.subr + +name=flowd_aggregate +rcvar=flowd_aggregate_enable +command=/usr/local/opnsense/scripts/netflow/flowd_aggregate.py +command_interpreter=/usr/local/bin/python2.7 +pidfile="/var/run/${name}.pid" +load_rc_config $name + +# Set defaults +: ${flowd_aggregate_enable:=NO} + +stop_cmd=flowd_aggregate_stop + +# kill configd +flowd_aggregate_stop() +{ + if [ -z "$rc_pid" ]; then + [ -n "$rc_fast" ] && return 0 + _run_rc_notrunning + return 1 + fi + + echo -n "Stopping ${name}." + # first ask gently to exit + kill -15 ${rc_pid} + + # wait max 5 seconds for gentle exit + for i in $(seq 1 50); + do + if [ -z "`/bin/ps -ax | /usr/bin/awk '{print $1;}' | /usr/bin/grep "^${rc_pid}"`" ]; then + break + fi + sleep 0.1 + done + + # kill any remaining configd processes (if still running) + for flowd_aggregate_pid in `/bin/ps -ax | grep 'flowd_aggregate.py' | /usr/bin/awk '{print $1;}' ` + do + kill -9 $flowd_aggregate_pid >/dev/null 2>&1 + done + + echo "..done" +} + +run_rc_command $1 diff --git a/src/opnsense/scripts/netflow/flowd_aggregate.py b/src/opnsense/scripts/netflow/flowd_aggregate.py index 3cf13220f..3c010a9f1 100755 --- a/src/opnsense/scripts/netflow/flowd_aggregate.py +++ b/src/opnsense/scripts/netflow/flowd_aggregate.py @@ -31,30 +31,122 @@ import time import datetime import os import sys +import signal +import glob +sys.path.insert(0, "/usr/local/opnsense/site-python") from lib.parse import parse_flow -from lib.aggregate import BaseFlowAggregator, AggMetadata +from lib.aggregate import AggMetadata import lib.aggregates +from daemonize import Daemonize -# init metadata (progress maintenance) -metadata = AggMetadata() -# register aggregate classes to stream data to -stream_agg_objects = list() -resolutions = [60, 60*5] -for agg_class in lib.aggregates.get_aggregators(): - for resolution in resolutions: - stream_agg_objects.append(agg_class(resolution)) +MAX_FILE_SIZE_MB=10 +MAX_LOGS=10 -# parse flow data and stream to registered consumers -prev_recv=metadata.last_sync() -for flow_record in parse_flow(prev_recv): - if flow_record is None or prev_recv != flow_record['recv']: - # commit data on receive timestamp change or last record - for stream_agg_object in stream_agg_objects: - stream_agg_object.commit() - metadata.update_sync_time(prev_recv) - if flow_record is not None: - # send to aggregator - for stream_agg_object in stream_agg_objects: - stream_agg_object.add(flow_record) - prev_recv = flow_record['recv'] + +def aggregate_flowd(): + """ aggregate collected flowd data + :return: None + """ + # init metadata (progress maintenance) + metadata = AggMetadata() + + # register aggregate classes to stream data to + stream_agg_objects = list() + resolutions = [60, 60*5] + for agg_class in lib.aggregates.get_aggregators(): + for resolution in agg_class.resolutions(): + stream_agg_objects.append(agg_class(resolution)) + + # parse flow data and stream to registered consumers + prev_recv=metadata.last_sync() + for flow_record in parse_flow(prev_recv): + if flow_record is None or prev_recv != flow_record['recv']: + # commit data on receive timestamp change or last record + for stream_agg_object in stream_agg_objects: + stream_agg_object.commit() + metadata.update_sync_time(prev_recv) + if flow_record is not None: + # send to aggregator + for stream_agg_object in stream_agg_objects: + stream_agg_object.add(flow_record) + prev_recv = flow_record['recv'] + + # expire old data + for stream_agg_object in stream_agg_objects: + stream_agg_object.cleanup() + del stream_agg_object + del metadata + + +def check_rotate(): + """ Checks if flowd log needs to be rotated, if so perform rotate. + We keep [MAX_LOGS] number of logs containing approx. [MAX_FILE_SIZE_MB] data, the flowd data probably contains + more detailed data then the stored aggregates. + :return: None + """ + if os.path.getsize("/var/log/flowd.log")/1024/1024 > MAX_FILE_SIZE_MB: + # max filesize reached rotate + filenames = sorted(glob.glob('/var/log/flowd.log.*'), reverse=True) + file_sequence = len(filenames) + for filename in filenames: + sequence = filename.split('.')[-1] + if sequence.isdigit(): + if file_sequence >= MAX_LOGS: + os.remove(filename) + elif int(sequence) != 0: + os.rename(filename, filename.replace('.%s' % sequence, '.%06d' % (int(sequence)+1))) + file_sequence -= 1 + # rename /var/log/flowd.log + os.rename('/var/log/flowd.log', '/var/log/flowd.log.000001') + # signal flowd for new log file + if os.path.isfile('/var/run/flowd.pid'): + pid = open('/var/run/flowd.pid').read().strip() + if pid.isdigit(): + try: + os.kill(int(pid), signal.SIGUSR1) + except OSError: + pass + + +class Main(object): + def __init__(self): + """ construct, hook signal handler and run aggregators + :return: None + """ + self.running = True + signal.signal(signal.SIGTERM, self.signal_handler) + self.run() + + def run(self): + """ run, endless loop, until sigterm is received + :return: None + """ + while self.running: + # run aggregate + aggregate_flowd() + # rotate if needed + check_rotate() + # wait for next pass, exit on sigterm + for i in range(120): + if self.running: + time.sleep(0.5) + else: + break + + def signal_handler(self, sig, frame): + """ end (run) loop on signal + :param sig: signal + :pram frame: frame + :return: None + """ + self.running = False + + +if len(sys.argv) > 1 and 'console' in sys.argv[1:]: + # command line start + Main() +else: + # Daemonize flowd aggregator + daemon = Daemonize(app="flowd_aggregate", pid='/var/run/flowd_aggregate.pid', action=Main) + daemon.start() diff --git a/src/opnsense/scripts/netflow/lib/aggregate.py b/src/opnsense/scripts/netflow/lib/aggregate.py index d4545e697..f19a4c01f 100644 --- a/src/opnsense/scripts/netflow/lib/aggregate.py +++ b/src/opnsense/scripts/netflow/lib/aggregate.py @@ -49,6 +49,13 @@ class AggMetadata(object): # cache known tables self._update_known_tables() + def __del__(self): + """ close database on destruct + :return: None + """ + if self._db_connection is not None: + self._db_connection.close() + def _update_known_tables(self): """ request known tables """ @@ -123,6 +130,13 @@ class BaseFlowAggregator(object): self._open_db() self._fetch_known_targets() + def __del__(self): + """ close database on destruct + :return: None + """ + if self._db_connection is not None: + self._db_connection.close() + def _fetch_known_targets(self): """ read known target table names from the sqlite db :return: None