From dc38875202efd181c6b898996cb8a5ea5e6b49a2 Mon Sep 17 00:00:00 2001 From: Ad Schellevis Date: Sat, 24 Nov 2018 15:39:24 +0100 Subject: [PATCH] Network insight, cleanups and restructures for https://github.com/opnsense/core/issues/2964 Adds configuration yaml support, using a simple config yaml containing the previously hardcoded values: ``` library_path: /path/to/core/src/opnsense/site-python flowd_source: /path/to/flowd.log database_dir: /path/to/netflow pid_filename: /path/to/netflow.pid ``` Startup using local config, using: ``` flowd_aggregate.py -c my_config.yaml --console ``` Further then path relocations, this commit should not contain any new features. The other scripts, not related to the daemon have not been altered (yet) --- .../scripts/netflow/flowd_aggregate.py | 128 ++++--- src/opnsense/scripts/netflow/lib/__init__.py | 58 +++ src/opnsense/scripts/netflow/lib/aggregate.py | 347 +---------------- .../netflow/lib/aggregates/__init__.py | 348 +++++++++++++++++- .../netflow/lib/aggregates/interface.py | 24 +- .../scripts/netflow/lib/aggregates/ports.py | 22 +- .../scripts/netflow/lib/aggregates/source.py | 37 +- src/opnsense/scripts/netflow/lib/parse.py | 23 +- .../conf/actions.d/actions_netflow.conf | 2 +- 9 files changed, 543 insertions(+), 446 deletions(-) diff --git a/src/opnsense/scripts/netflow/flowd_aggregate.py b/src/opnsense/scripts/netflow/flowd_aggregate.py index 9e0c31f11..3f5e27532 100755 --- a/src/opnsense/scripts/netflow/flowd_aggregate.py +++ b/src/opnsense/scripts/netflow/flowd_aggregate.py @@ -1,6 +1,6 @@ #!/usr/local/bin/python2.7 """ - Copyright (c) 2016 Ad Schellevis + Copyright (c) 2016-2018 Ad Schellevis All rights reserved. Redistribution and use in source and binary forms, with or without @@ -36,36 +36,36 @@ import glob import copy import syslog import traceback -sys.path.insert(0, "/usr/local/opnsense/site-python") -from sqlite3_helper import check_and_repair +import argparse +from lib import load_config from lib.parse import parse_flow from lib.aggregate import AggMetadata import lib.aggregates -from daemonize import Daemonize -MAX_FILE_SIZE_MB=10 -MAX_LOGS=10 +MAX_FILE_SIZE_MB = 10 +MAX_LOGS = 10 -def aggregate_flowd(do_vacuum=False): +def aggregate_flowd(config, do_vacuum=False): """ aggregate collected flowd data + :param config: script configuration :param do_vacuum: vacuum database after cleanup :return: None """ # init metadata (progress maintenance) - metadata = AggMetadata() + metadata = AggMetadata(config.database_dir) # register aggregate classes to stream data to stream_agg_objects = list() for agg_class in lib.aggregates.get_aggregators(): for resolution in agg_class.resolutions(): - stream_agg_objects.append(agg_class(resolution)) + stream_agg_objects.append(agg_class(resolution, config.database_dir)) # parse flow data and stream to registered consumers prev_recv = metadata.last_sync() commit_record_count = 0 - for flow_record in parse_flow(prev_recv): + for flow_record in parse_flow(prev_recv, config.flowd_source): if flow_record is None or (prev_recv != flow_record['recv'] and commit_record_count > 100000): # commit data on receive timestamp change or last record for stream_agg_object in stream_agg_objects: @@ -88,15 +88,17 @@ def aggregate_flowd(do_vacuum=False): del metadata -def check_rotate(): +def check_rotate(filename, pid_filename): """ Checks if flowd log needs to be rotated, if so perform rotate. We keep [MAX_LOGS] number of logs containing approx. [MAX_FILE_SIZE_MB] data, the flowd data probably contains more detailed data then the stored aggregates. + :param filename: flowd logfile + :param pid_filename: filename of pid :return: None """ - if os.path.getsize("/var/log/flowd.log")/1024/1024 > MAX_FILE_SIZE_MB: + if os.path.getsize(filename)/1024/1024 > MAX_FILE_SIZE_MB: # max filesize reached rotate - filenames = sorted(glob.glob('/var/log/flowd.log.*'), reverse=True) + filenames = sorted(glob.glob('%s.*' % filename), reverse=True) file_sequence = len(filenames) for filename in filenames: sequence = filename.split('.')[-1] @@ -106,11 +108,11 @@ def check_rotate(): elif int(sequence) != 0: os.rename(filename, filename.replace('.%s' % sequence, '.%06d' % (int(sequence)+1))) file_sequence -= 1 - # rename /var/log/flowd.log - os.rename('/var/log/flowd.log', '/var/log/flowd.log.000001') + # rename flowd.log + os.rename(filename, '%s.000001' % filename) # signal flowd for new log file - if os.path.isfile('/var/run/flowd.pid'): - pid = open('/var/run/flowd.pid').read().strip() + if os.path.isfile(pid_filename): + pid = open(pid_filename).read().strip() if pid.isdigit(): try: os.kill(int(pid), signal.SIGUSR1) @@ -119,6 +121,12 @@ def check_rotate(): class Main(object): + config = None + + @classmethod + def set_config(cls, config): + cls.config = config + def __init__(self): """ construct, hook signal handler and run aggregators :return: None @@ -133,7 +141,7 @@ class Main(object): """ # check database consistency / repair syslog.syslog(syslog.LOG_NOTICE, 'startup, check database.') - check_and_repair('/var/netflow/*.sqlite') + check_and_repair('%s/*.sqlite' % self.config.database_dir) vacuum_interval = (60*60*8) # 8 hour vacuum cycle vacuum_countdown = None @@ -148,14 +156,15 @@ class Main(object): # run aggregate try: - aggregate_flowd(do_vacuum) + aggregate_flowd(self.config, do_vacuum) if do_vacuum: syslog.syslog(syslog.LOG_NOTICE, 'vacuum done') except: syslog.syslog(syslog.LOG_ERR, 'flowd aggregate died with message %s' % (traceback.format_exc())) return # rotate if needed - check_rotate() + check_rotate(self.config.flowd_source, self.config.pid_filename) + # wait for next pass, exit on sigterm for i in range(30): if self.running: @@ -172,37 +181,52 @@ class Main(object): self.running = False -if len(sys.argv) > 1 and 'console' in sys.argv[1:]: - # command line start - if 'profile' in sys.argv[1:]: - # start with profiling - import cProfile - import StringIO - import pstats +if __name__ == '__main__': + # parse arguments and load config + parser = argparse.ArgumentParser() + parser.add_argument('-c', '--config', help='configuration yaml', default=None) + parser.add_argument('--console', dest='console', help='run in console', action='store_true') + parser.add_argument('--profile', dest='profile', help='enable profiler', action='store_true') + parser.add_argument('--repair', dest='repair', help='init repair', action='store_true') + cmd_args = parser.parse_args() - pr = cProfile.Profile(builtins=False) - pr.enable() - Main() - pr.disable() - s = StringIO.StringIO() - sortby = 'cumulative' - ps = pstats.Stats(pr, stream=s).sort_stats(sortby) - ps.print_stats() - print s.getvalue() + Main.set_config( + load_config(cmd_args.config) + ) + + if cmd_args.console: + # command line start + if cmd_args.profile: + # start with profiling + import cProfile + import StringIO + import pstats + + pr = cProfile.Profile(builtins=False) + pr.enable() + Main() + pr.disable() + s = StringIO.StringIO() + sortby = 'cumulative' + ps = pstats.Stats(pr, stream=s).sort_stats(sortby) + ps.print_stats() + print s.getvalue() + else: + Main() + elif cmd_args.repair: + # force a database repair, when + try: + from sqlite3_helper import check_and_repair + lck = open(Main.config.pid_filename, 'a+') + fcntl.flock(lck, fcntl.LOCK_EX | fcntl.LOCK_NB) + check_and_repair(filename_mask='%s/*.sqlite' % Main.config.database_dir, force_repair=True) + lck.close() + os.remove(Main.config.pid_filename) + except IOError: + # already running, exit status 99 + sys.exit(99) else: - Main() -elif len(sys.argv) > 1 and 'repair' in sys.argv[1:]: - # force a database repair, when - try: - lck = open('/var/run/flowd_aggregate.pid', 'a+') - fcntl.flock(lck, fcntl.LOCK_EX | fcntl.LOCK_NB) - check_and_repair(filename_mask='/var/netflow/*.sqlite', force_repair=True) - lck.close() - os.remove('/var/run/flowd_aggregate.pid') - except IOError: - # already running, exit status 99 - sys.exit(99) -else: - # Daemonize flowd aggregator - daemon = Daemonize(app="flowd_aggregate", pid='/var/run/flowd_aggregate.pid', action=Main) - daemon.start() + # Daemonize flowd aggregator + from daemonize import Daemonize + daemon = Daemonize(app="flowd_aggregate", pid=Main.config.pid_filename, action=Main) + daemon.start() diff --git a/src/opnsense/scripts/netflow/lib/__init__.py b/src/opnsense/scripts/netflow/lib/__init__.py index e69de29bb..e594a3ec4 100644 --- a/src/opnsense/scripts/netflow/lib/__init__.py +++ b/src/opnsense/scripts/netflow/lib/__init__.py @@ -0,0 +1,58 @@ +""" + Copyright (c) 2018 Ad Schellevis + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, + INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY + AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, + OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + POSSIBILITY OF SUCH DAMAGE. +""" +import sys + + +def load_config(config_yaml=None): + """ setup configuration object + :param config_yaml: + :return: + """ + if config_yaml: + import yaml + cnf_input = yaml.load(open(config_yaml, 'r')) + else: + cnf_input = dict() + + result = Config(**cnf_input) + sys.path.insert(0, result.library_path) + + return result + + +class Config(object): + """ Simple configuration wrapper for our netflow scripts, containing our defaults + """ + library_path = '/usr/local/opnsense/site-python' + pid_filename = '/var/run/flowd_aggregate.pid' + flowd_source = '/var/log/flowd.log' + database_dir = '/var/netflow' + + def __init__(self, **kwargs): + for key in kwargs: + if hasattr(self, key): + setattr(self, key, kwargs[key]) + diff --git a/src/opnsense/scripts/netflow/lib/aggregate.py b/src/opnsense/scripts/netflow/lib/aggregate.py index 57bd0fc7e..cbc360f3e 100644 --- a/src/opnsense/scripts/netflow/lib/aggregate.py +++ b/src/opnsense/scripts/netflow/lib/aggregate.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2016 Ad Schellevis + Copyright (c) 2016-2018 Ad Schellevis All rights reserved. Redistribution and use in source and binary forms, with or without @@ -27,11 +27,11 @@ aggregate flow data (format in parse.py) into sqlite structured container per type/resolution. Implementations are collected in lib\aggregates\ """ -import syslog import os import datetime import sqlite3 + def convert_timestamp(val): """ convert timestamps from string (internal sqlite type) or seconds since epoch """ @@ -57,20 +57,22 @@ def convert_timestamp(val): return val + sqlite3.register_converter('timestamp', convert_timestamp) + class AggMetadata(object): """ store some metadata needed to keep track of parse progress """ - def __init__(self): - self._filename = '/var/netflow/metadata.sqlite' + def __init__(self, database_dir='/var/netflow'): + self._filename = '%s/metadata.sqlite' % database_dir # make sure the target directory exists target_path = os.path.dirname(self._filename) if not os.path.isdir(target_path): os.makedirs(target_path) # open sqlite database and cursor self._db_connection = sqlite3.connect(self._filename, timeout=60, - detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) + detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) self._db_cursor = self._db_connection.cursor() # known tables self._tables = list() @@ -87,7 +89,6 @@ class AggMetadata(object): def _update_known_tables(self): """ request known tables """ - result = list() self._db_cursor.execute('SELECT name FROM sqlite_master') for record in self._db_cursor.fetchall(): self._tables.append(record[0]) @@ -110,337 +111,3 @@ class AggMetadata(object): else: self._db_cursor.execute('select max(mtime) from sync_timestamp') return self._db_cursor.fetchall()[0][0] - -class BaseFlowAggregator(object): - # target location ('/var/netflow/.sqlite') - target_filename = None - # list of fields to use in this aggregate - agg_fields = None - - @classmethod - def resolutions(cls): - """ sample resolutions for this aggregation - :return: list of sample resolutions - """ - return list() - - @classmethod - def history_per_resolution(cls): - """ history to keep in seconds per sample resolution - :return: dict sample resolution / expire time (seconds) - """ - return dict() - - @classmethod - def seconds_per_day(cls, days): - """ - :param days: number of days - :return: number of seconds - """ - return 60*60*24*days - - def __init__(self, resolution): - """ construct new flow sample class - :return: None - """ - self.resolution = resolution - # target table name, data_ - self._db_connection = None - self._update_cur = None - self._known_targets = list() - # construct update and insert sql statements - tmp = 'update timeserie set last_seen = :flow_end, ' - tmp += 'octets = octets + :octets_consumed, packets = packets + :packets_consumed ' - tmp += 'where mtime = :mtime and %s ' - self._update_stmt = tmp % (' and '.join(map(lambda x: '%s = :%s' % (x, x), self.agg_fields))) - tmp = 'insert into timeserie (mtime, last_seen, octets, packets, %s) ' - tmp += 'values (:mtime, :flow_end, :octets_consumed, :packets_consumed, %s)' - self._insert_stmt = tmp % (','.join(self.agg_fields), ','.join(map(lambda x: ':%s' % x, self.agg_fields))) - # open database - self._open_db() - self._fetch_known_targets() - - def __del__(self): - """ close database on destruct - :return: None - """ - if self._db_connection is not None: - self._db_connection.close() - - def _fetch_known_targets(self): - """ read known target table names from the sqlite db - :return: None - """ - if self._db_connection is not None: - self._known_targets = list() - cur = self._db_connection.cursor() - cur.execute('SELECT name FROM sqlite_master') - for record in cur.fetchall(): - self._known_targets.append(record[0]) - cur.close() - - def _create_target_table(self): - """ construct target aggregate table, using resulution and list of agg_fields - :return: None - """ - if self._db_connection is not None: - # construct new aggregate table - sql_text = list() - sql_text.append('create table timeserie ( ') - sql_text.append(' mtime timestamp') - sql_text.append(', last_seen timestamp') - for agg_field in self.agg_fields: - sql_text.append(', %s varchar(255)' % agg_field) - sql_text.append(', octets numeric') - sql_text.append(', packets numeric') - sql_text.append(', primary key(mtime, %s)' % ','.join(self.agg_fields)) - sql_text.append(')') - cur = self._db_connection.cursor() - cur.executescript('\n'.join(sql_text)) - cur.close() - # read table names - self._fetch_known_targets() - - def is_db_open(self): - """ check if target database is open - :return: database connected (True/False) - """ - if self._db_connection is not None: - return True - else: - return False - - def _open_db(self): - """ open / create database - :return: None - """ - if self.target_filename is not None: - # make sure the target directory exists - target_path = os.path.dirname(self.target_filename % self.resolution) - if not os.path.isdir(target_path): - os.makedirs(target_path) - # open sqlite database - self._db_connection = sqlite3.connect(self.target_filename % self.resolution, timeout=60, - detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) - # open update/insert cursor - self._update_cur = self._db_connection.cursor() - - def commit(self): - """ commit data - :return: None - """ - if self._db_connection is not None: - self._db_connection.commit() - - - def add(self, flow): - """ calculate timeslices per flow depending on sample resolution - :param flow: flow data (from parse.py) - :return: None - """ - # make sure target exists - if 'timeserie' not in self._known_targets: - self._create_target_table() - - # push record(s) depending on resolution - start_time = int(flow['flow_start'] / self.resolution) * self.resolution - while start_time <= flow['flow_end']: - consume_start_time = max(flow['flow_start'], start_time) - consume_end_time = min(start_time + self.resolution, flow['flow_end']) - if flow['duration_ms'] != 0: - consume_perc = (consume_end_time - consume_start_time) / float(flow['duration_ms'] / 1000.0) - else: - consume_perc = 1 - if self.is_db_open(): - # upsert data - flow['octets_consumed'] = consume_perc * flow['octets'] - flow['packets_consumed'] = consume_perc * flow['packets'] - flow['mtime'] = datetime.datetime.utcfromtimestamp(start_time) - self._update_cur.execute(self._update_stmt, flow) - if self._update_cur.rowcount == 0: - self._update_cur.execute(self._insert_stmt, flow) - # next start time - start_time += self.resolution - - def cleanup(self, do_vacuum = False): - """ cleanup timeserie table - :param expire: cleanup table, remove data older then [expire] seconds - :param do_vacuum: vacuum database - :return: None - """ - if self.is_db_open() and 'timeserie' in self._known_targets \ - and self.resolution in self.history_per_resolution(): - self._update_cur.execute('select max(mtime) as "[timestamp]" from timeserie') - last_timestamp = self._update_cur.fetchall()[0][0] - if type(last_timestamp) == datetime.datetime: - expire = self.history_per_resolution()[self.resolution] - expire_timestamp = last_timestamp - datetime.timedelta(seconds=expire) - self._update_cur.execute('delete from timeserie where mtime < :expire', {'expire': expire_timestamp}) - self.commit() - if do_vacuum: - # vacuum database if requested - syslog.syslog(syslog.LOG_NOTICE, 'vacuum %s' % (self.target_filename % self.resolution)) - self._update_cur.execute('vacuum') - - def _parse_timestamp(self, timestamp): - """ convert input to datetime.datetime or return if it already was of that type - :param timestamp: timestamp to convert - :return: datetime.datetime object - """ - if type(timestamp) in (int, float): - return datetime.datetime.utcfromtimestamp(timestamp) - elif type(timestamp) != datetime.datetime: - return datetime.datetime.utcfromtimestamp(0) - else: - return timestamp - - def _valid_fields(self, fields): - """ cleanse fields (return only valid ones) - :param fields: field list - :return: list - """ - # validate field list (can only select fields in self.agg_fields) - select_fields = list() - for field in fields: - if field.strip() in self.agg_fields: - select_fields.append(field.strip()) - - return select_fields - - def get_timeserie_data(self, start_time, end_time, fields): - """ fetch data from aggregation source, groups by mtime and selected fields - :param start_time: start timestamp - :param end_time: end timestamp - :param fields: fields to retrieve - :return: iterator returning dict records (start_time, end_time, [fields], octets, packets) - """ - if self.is_db_open() and 'timeserie' in self._known_targets: - # validate field list (can only select fields in self.agg_fields) - select_fields = self._valid_fields(fields) - if len(select_fields) == 0: - # select "none", add static null as field - select_fields.append('null') - sql_select = 'select mtime as "start_time [timestamp]", %s' % ','.join(select_fields) - sql_select += ', sum(octets) as octets, sum(packets) as packets\n' - sql_select += 'from timeserie \n' - sql_select += 'where mtime >= :start_time and mtime < :end_time\n' - sql_select += 'group by mtime, %s\n'% ','.join(select_fields) - - # execute select query - cur = self._db_connection.cursor() - cur.execute(sql_select, {'start_time': self._parse_timestamp(start_time), - 'end_time': self._parse_timestamp(end_time)}) - # - field_names = (map(lambda x:x[0], cur.description)) - for record in cur.fetchall(): - result_record = dict() - for field_indx in range(len(field_names)): - if len(record) > field_indx: - result_record[field_names[field_indx]] = record[field_indx] - if 'start_time' in result_record: - result_record['end_time'] = result_record['start_time'] + datetime.timedelta(seconds=self.resolution) - # send data - yield result_record - # close cursor - cur.close() - - def get_top_data(self, start_time, end_time, fields, value_field, data_filters=None, max_hits=100): - """ Retrieve top (usage) from this aggregation. - Fetch data from aggregation source, groups by selected fields, sorts by value_field descending - use data_filter to filter before grouping. - :param start_time: start timestamp - :param end_time: end timestamp - :param fields: fields to retrieve - :param value_field: field to sum - :param data_filter: filter data, use as field=value - :param max_hits: maximum number of results, rest is summed into (other) - :return: iterator returning dict records (start_time, end_time, [fields], octets, packets) - """ - result = list() - if self.is_db_open() and 'timeserie' in self._known_targets: - select_fields = self._valid_fields(fields) - filter_fields = [] - query_params = {} - if value_field == 'octets': - value_sql = 'sum(octets)' - elif value_field == 'packets': - value_sql = 'sum(packets)' - else: - value_sql = '0' - - # query filters, correct start_time for resolution - query_params['start_time'] = self._parse_timestamp((int(start_time/self.resolution))*self.resolution) - query_params['end_time'] = self._parse_timestamp(end_time) - if data_filters: - for data_filter in data_filters.split(','): - tmp = data_filter.split('=')[0].strip() - if tmp in self.agg_fields and data_filter.find('=') > -1: - filter_fields.append(tmp) - query_params[tmp] = '='.join(data_filter.split('=')[1:]) - - if len(select_fields) > 0: - # construct sql query to filter and select data - sql_select = 'select %s' % ','.join(select_fields) - sql_select += ', %s as total, max(last_seen) last_seen \n' % value_sql - sql_select += 'from timeserie \n' - sql_select += 'where mtime >= :start_time and mtime < :end_time\n' - for filter_field in filter_fields: - sql_select += ' and %s = :%s \n' % (filter_field, filter_field) - sql_select += 'group by %s\n'% ','.join(select_fields) - sql_select += 'order by %s desc ' % value_sql - - # execute select query - cur = self._db_connection.cursor() - cur.execute(sql_select, query_params) - - # fetch all data, to a max of [max_hits] rows. - field_names = (map(lambda x:x[0], cur.description)) - for record in cur.fetchall(): - result_record = dict() - for field_indx in range(len(field_names)): - if len(record) > field_indx: - result_record[field_names[field_indx]] = record[field_indx] - if len(result) < max_hits: - result.append(result_record) - else: - if len(result) == max_hits: - # generate row for "rest of data" - result.append({'total': 0}) - for key in result_record: - if key not in result[-1]: - result[-1][key] = "" - result[-1]['total'] += result_record['total'] - # close cursor - cur.close() - - return result - - def get_data(self, start_time, end_time): - """ get detail data - :param start_time: start timestamp - :param end_time: end timestamp - :return: iterator - """ - if self.is_db_open() and 'timeserie' in self._known_targets: - query_params = {} - query_params['start_time'] = self._parse_timestamp((int(start_time/self.resolution))*self.resolution) - query_params['end_time'] = self._parse_timestamp(end_time) - sql_select = 'select mtime start_time, ' - sql_select += '%s, octets, packets, last_seen as "last_seen [timestamp]" \n' % ','.join(self.agg_fields) - sql_select += 'from timeserie \n' - sql_select += 'where mtime >= :start_time and mtime < :end_time\n' - cur = self._db_connection.cursor() - cur.execute(sql_select, query_params) - - # fetch all data, to a max of [max_hits] rows. - field_names = (map(lambda x:x[0], cur.description)) - while True: - record = cur.fetchone() - if record is None: - break - else: - result_record=dict() - for field_indx in range(len(field_names)): - if len(record) > field_indx: - result_record[field_names[field_indx]] = record[field_indx] - yield result_record diff --git a/src/opnsense/scripts/netflow/lib/aggregates/__init__.py b/src/opnsense/scripts/netflow/lib/aggregates/__init__.py index 40f6c8f4c..84b76cb3f 100644 --- a/src/opnsense/scripts/netflow/lib/aggregates/__init__.py +++ b/src/opnsense/scripts/netflow/lib/aggregates/__init__.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2016 Ad Schellevis + Copyright (c) 2016-2018 Ad Schellevis All rights reserved. Redistribution and use in source and binary forms, with or without @@ -26,17 +26,357 @@ -------------------------------------------------------------------------------------- data aggregator loader """ -import sys import os +import sys import glob -from lib.aggregate import BaseFlowAggregator +import syslog +import datetime +import sqlite3 + + +class BaseFlowAggregator(object): + # target location ('.sqlite') + target_filename = None + # list of fields to use in this aggregate + agg_fields = None + + @classmethod + def resolutions(cls): + """ sample resolutions for this aggregation + :return: list of sample resolutions + """ + return list() + + @classmethod + def history_per_resolution(cls): + """ history to keep in seconds per sample resolution + :return: dict sample resolution / expire time (seconds) + """ + return dict() + + @classmethod + def seconds_per_day(cls, days): + """ + :param days: number of days + :return: number of seconds + """ + return 60*60*24*days + + def __init__(self, resolution, database_dir='/var/netflow'): + """ construct new flow sample class + :return: None + """ + self.database_dir = database_dir + self.resolution = resolution + # target table name, data_ + self._db_connection = None + self._update_cur = None + self._known_targets = list() + # construct update and insert sql statements + tmp = 'update timeserie set last_seen = :flow_end, ' + tmp += 'octets = octets + :octets_consumed, packets = packets + :packets_consumed ' + tmp += 'where mtime = :mtime and %s ' + self._update_stmt = tmp % (' and '.join(map(lambda x: '%s = :%s' % (x, x), self.agg_fields))) + tmp = 'insert into timeserie (mtime, last_seen, octets, packets, %s) ' + tmp += 'values (:mtime, :flow_end, :octets_consumed, :packets_consumed, %s)' + self._insert_stmt = tmp % (','.join(self.agg_fields), ','.join(map(lambda x: ':%s' % x, self.agg_fields))) + # open database + self._open_db() + self._fetch_known_targets() + + def __del__(self): + """ close database on destruct + :return: None + """ + if self._db_connection is not None: + self._db_connection.close() + + def _fetch_known_targets(self): + """ read known target table names from the sqlite db + :return: None + """ + if self._db_connection is not None: + self._known_targets = list() + cur = self._db_connection.cursor() + cur.execute('SELECT name FROM sqlite_master') + for record in cur.fetchall(): + self._known_targets.append(record[0]) + cur.close() + + def _create_target_table(self): + """ construct target aggregate table, using resulution and list of agg_fields + :return: None + """ + if self._db_connection is not None: + # construct new aggregate table + sql_text = list() + sql_text.append('create table timeserie ( ') + sql_text.append(' mtime timestamp') + sql_text.append(', last_seen timestamp') + for agg_field in self.agg_fields: + sql_text.append(', %s varchar(255)' % agg_field) + sql_text.append(', octets numeric') + sql_text.append(', packets numeric') + sql_text.append(', primary key(mtime, %s)' % ','.join(self.agg_fields)) + sql_text.append(')') + cur = self._db_connection.cursor() + cur.executescript('\n'.join(sql_text)) + cur.close() + # read table names + self._fetch_known_targets() + + def is_db_open(self): + """ check if target database is open + :return: database connected (True/False) + """ + if self._db_connection is not None: + return True + else: + return False + + def _open_db(self): + """ open / create database + :return: None + """ + if self.target_filename is not None: + # make sure the target directory exists + if not os.path.isdir(self.database_dir): + os.makedirs(self.database_dir) + # open sqlite database + self._db_connection = sqlite3.connect( + ("%s/%s" % (self.database_dir, self.target_filename)) % self.resolution, timeout=60, + detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES + ) + # open update/insert cursor + self._update_cur = self._db_connection.cursor() + + def commit(self): + """ commit data + :return: None + """ + if self._db_connection is not None: + self._db_connection.commit() + + def add(self, flow): + """ calculate timeslices per flow depending on sample resolution + :param flow: flow data (from parse.py) + :return: None + """ + # make sure target exists + if 'timeserie' not in self._known_targets: + self._create_target_table() + + # push record(s) depending on resolution + start_time = int(flow['flow_start'] / self.resolution) * self.resolution + while start_time <= flow['flow_end']: + consume_start_time = max(flow['flow_start'], start_time) + consume_end_time = min(start_time + self.resolution, flow['flow_end']) + if flow['duration_ms'] != 0: + consume_perc = (consume_end_time - consume_start_time) / float(flow['duration_ms'] / 1000.0) + else: + consume_perc = 1 + if self.is_db_open(): + # upsert data + flow['octets_consumed'] = consume_perc * flow['octets'] + flow['packets_consumed'] = consume_perc * flow['packets'] + flow['mtime'] = datetime.datetime.utcfromtimestamp(start_time) + self._update_cur.execute(self._update_stmt, flow) + if self._update_cur.rowcount == 0: + self._update_cur.execute(self._insert_stmt, flow) + # next start time + start_time += self.resolution + + def cleanup(self, do_vacuum=False): + """ cleanup timeserie table + :param do_vacuum: vacuum database + :return: None + """ + if self.is_db_open() and 'timeserie' in self._known_targets \ + and self.resolution in self.history_per_resolution(): + self._update_cur.execute('select max(mtime) as "[timestamp]" from timeserie') + last_timestamp = self._update_cur.fetchall()[0][0] + if type(last_timestamp) == datetime.datetime: + expire = self.history_per_resolution()[self.resolution] + expire_timestamp = last_timestamp - datetime.timedelta(seconds=expire) + self._update_cur.execute('delete from timeserie where mtime < :expire', {'expire': expire_timestamp}) + self.commit() + if do_vacuum: + # vacuum database if requested + syslog.syslog(syslog.LOG_NOTICE, 'vacuum %s' % (self.target_filename % self.resolution)) + self._update_cur.execute('vacuum') + + @staticmethod + def _parse_timestamp(timestamp): + """ convert input to datetime.datetime or return if it already was of that type + :param timestamp: timestamp to convert + :return: datetime.datetime object + """ + if type(timestamp) in (int, float): + return datetime.datetime.utcfromtimestamp(timestamp) + elif type(timestamp) != datetime.datetime: + return datetime.datetime.utcfromtimestamp(0) + else: + return timestamp + + def _valid_fields(self, fields): + """ cleanse fields (return only valid ones) + :param fields: field list + :return: list + """ + # validate field list (can only select fields in self.agg_fields) + select_fields = list() + for field in fields: + if field.strip() in self.agg_fields: + select_fields.append(field.strip()) + + return select_fields + + def get_timeserie_data(self, start_time, end_time, fields): + """ fetch data from aggregation source, groups by mtime and selected fields + :param start_time: start timestamp + :param end_time: end timestamp + :param fields: fields to retrieve + :return: iterator returning dict records (start_time, end_time, [fields], octets, packets) + """ + if self.is_db_open() and 'timeserie' in self._known_targets: + # validate field list (can only select fields in self.agg_fields) + select_fields = self._valid_fields(fields) + if len(select_fields) == 0: + # select "none", add static null as field + select_fields.append('null') + sql_select = 'select mtime as "start_time [timestamp]", %s' % ','.join(select_fields) + sql_select += ', sum(octets) as octets, sum(packets) as packets\n' + sql_select += 'from timeserie \n' + sql_select += 'where mtime >= :start_time and mtime < :end_time\n' + sql_select += 'group by mtime, %s\n' % ','.join(select_fields) + + # execute select query + cur = self._db_connection.cursor() + cur.execute(sql_select, {'start_time': self._parse_timestamp(start_time), + 'end_time': self._parse_timestamp(end_time)}) + # + field_names = (map(lambda x: x[0], cur.description)) + for record in cur.fetchall(): + result_record = dict() + for field_indx in range(len(field_names)): + if len(record) > field_indx: + result_record[field_names[field_indx]] = record[field_indx] + if 'start_time' in result_record: + result_record['end_time'] = result_record['start_time'] \ + + datetime.timedelta(seconds=self.resolution) + # send data + yield result_record + # close cursor + cur.close() + + def get_top_data(self, start_time, end_time, fields, value_field, data_filters=None, max_hits=100): + """ Retrieve top (usage) from this aggregation. + Fetch data from aggregation source, groups by selected fields, sorts by value_field descending + use data_filter to filter before grouping. + :param start_time: start timestamp + :param end_time: end timestamp + :param fields: fields to retrieve + :param value_field: field to sum + :param data_filters: filter data, use as field=value + :param max_hits: maximum number of results, rest is summed into (other) + :return: iterator returning dict records (start_time, end_time, [fields], octets, packets) + """ + result = list() + if self.is_db_open() and 'timeserie' in self._known_targets: + select_fields = self._valid_fields(fields) + filter_fields = [] + query_params = {} + if value_field == 'octets': + value_sql = 'sum(octets)' + elif value_field == 'packets': + value_sql = 'sum(packets)' + else: + value_sql = '0' + + # query filters, correct start_time for resolution + query_params['start_time'] = self._parse_timestamp((int(start_time/self.resolution))*self.resolution) + query_params['end_time'] = self._parse_timestamp(end_time) + if data_filters: + for data_filter in data_filters.split(','): + tmp = data_filter.split('=')[0].strip() + if tmp in self.agg_fields and data_filter.find('=') > -1: + filter_fields.append(tmp) + query_params[tmp] = '='.join(data_filter.split('=')[1:]) + + if len(select_fields) > 0: + # construct sql query to filter and select data + sql_select = 'select %s' % ','.join(select_fields) + sql_select += ', %s as total, max(last_seen) last_seen \n' % value_sql + sql_select += 'from timeserie \n' + sql_select += 'where mtime >= :start_time and mtime < :end_time\n' + for filter_field in filter_fields: + sql_select += ' and %s = :%s \n' % (filter_field, filter_field) + sql_select += 'group by %s\n' % ','.join(select_fields) + sql_select += 'order by %s desc ' % value_sql + + # execute select query + cur = self._db_connection.cursor() + cur.execute(sql_select, query_params) + + # fetch all data, to a max of [max_hits] rows. + field_names = (map(lambda x: x[0], cur.description)) + for record in cur.fetchall(): + result_record = dict() + for field_indx in range(len(field_names)): + if len(record) > field_indx: + result_record[field_names[field_indx]] = record[field_indx] + if len(result) < max_hits: + result.append(result_record) + else: + if len(result) == max_hits: + # generate row for "rest of data" + result.append({'total': 0}) + for key in result_record: + if key not in result[-1]: + result[-1][key] = "" + result[-1]['total'] += result_record['total'] + # close cursor + cur.close() + + return result + + def get_data(self, start_time, end_time): + """ get detail data + :param start_time: start timestamp + :param end_time: end timestamp + :return: iterator + """ + if self.is_db_open() and 'timeserie' in self._known_targets: + query_params = dict() + query_params['start_time'] = self._parse_timestamp((int(start_time/self.resolution))*self.resolution) + query_params['end_time'] = self._parse_timestamp(end_time) + sql_select = 'select mtime start_time, ' + sql_select += '%s, octets, packets, last_seen as "last_seen [timestamp]" \n' % ','.join(self.agg_fields) + sql_select += 'from timeserie \n' + sql_select += 'where mtime >= :start_time and mtime < :end_time\n' + cur = self._db_connection.cursor() + cur.execute(sql_select, query_params) + + # fetch all data, to a max of [max_hits] rows. + field_names = (map(lambda x: x[0], cur.description)) + while True: + record = cur.fetchone() + if record is None: + break + else: + result_record = dict() + for field_indx in range(len(field_names)): + if len(record) > field_indx: + result_record[field_names[field_indx]] = record[field_indx] + yield result_record + def get_aggregators(): """ collect and return available aggregators :return: list of class references """ result = list() - for filename in glob.glob('%s/*.py'%os.path.dirname(__file__)): + for filename in glob.glob('%s/*.py' % os.path.dirname(__file__)): filename_base = os.path.basename(filename) if filename_base[0:2] != '__': module_name = 'lib.aggregates.%s' % '.'.join(filename_base.split('.')[:-1]) diff --git a/src/opnsense/scripts/netflow/lib/aggregates/interface.py b/src/opnsense/scripts/netflow/lib/aggregates/interface.py index 805d1c1a6..77213fe9a 100644 --- a/src/opnsense/scripts/netflow/lib/aggregates/interface.py +++ b/src/opnsense/scripts/netflow/lib/aggregates/interface.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2016 Ad Schellevis + Copyright (c) 2016-2018 Ad Schellevis All rights reserved. Redistribution and use in source and binary forms, with or without @@ -26,12 +26,13 @@ -------------------------------------------------------------------------------------- data aggregator type """ -from lib.aggregate import BaseFlowAggregator +from . import BaseFlowAggregator + class FlowInterfaceTotals(BaseFlowAggregator): """ collect interface totals """ - target_filename = '/var/netflow/interface_%06d.sqlite' + target_filename = 'interface_%06d.sqlite' agg_fields = ['if', 'direction'] @classmethod @@ -40,25 +41,26 @@ class FlowInterfaceTotals(BaseFlowAggregator): :return: list of sample resolutions """ # sample in 30 seconds, 5 minutes, 1 hour and 1 day - return [30, 300, 3600, 86400] + return [30, 300, 3600, 86400] @classmethod def history_per_resolution(cls): """ :return: dict sample resolution / expire time (seconds) """ - return {30: cls.seconds_per_day(1), - 300: cls.seconds_per_day(7), - 3600: cls.seconds_per_day(31), - 86400: cls.seconds_per_day(365) - } + return { + 30: cls.seconds_per_day(1), + 300: cls.seconds_per_day(7), + 3600: cls.seconds_per_day(31), + 86400: cls.seconds_per_day(365) + } - def __init__(self, resolution): + def __init__(self, resolution, database_dir='/var/netflow'): """ :param resolution: sample resultion (seconds) :return: None """ - super(FlowInterfaceTotals, self).__init__(resolution) + super(FlowInterfaceTotals, self).__init__(resolution, database_dir) def add(self, flow): """ combine up/down flow into interface and direction diff --git a/src/opnsense/scripts/netflow/lib/aggregates/ports.py b/src/opnsense/scripts/netflow/lib/aggregates/ports.py index 732c0770b..4e1e7fa94 100644 --- a/src/opnsense/scripts/netflow/lib/aggregates/ports.py +++ b/src/opnsense/scripts/netflow/lib/aggregates/ports.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2016 Ad Schellevis + Copyright (c) 2016-2018 Ad Schellevis All rights reserved. Redistribution and use in source and binary forms, with or without @@ -26,12 +26,13 @@ -------------------------------------------------------------------------------------- data aggregator type """ -from lib.aggregate import BaseFlowAggregator +from . import BaseFlowAggregator + class FlowDstPortTotals(BaseFlowAggregator): """ collect interface totals """ - target_filename = '/var/netflow/dst_port_%06d.sqlite' + target_filename = 'dst_port_%06d.sqlite' agg_fields = ['if', 'protocol', 'dst_port'] @classmethod @@ -39,7 +40,7 @@ class FlowDstPortTotals(BaseFlowAggregator): """ :return: list of sample resolutions """ - return [300, 3600, 86400] + return [300, 3600, 86400] @classmethod def history_per_resolution(cls): @@ -48,17 +49,18 @@ class FlowDstPortTotals(BaseFlowAggregator): """ # only save daily totals for a longer period of time, we probably only want to answer questions like # "top usage over the last 30 seconds, 5 minutes, etc.." - return {300: 3600, - 3600: 86400, - 86400: cls.seconds_per_day(365) - } + return { + 300: 3600, + 3600: 86400, + 86400: cls.seconds_per_day(365) + } - def __init__(self, resolution): + def __init__(self, resolution, database_dir='/var/netflow'): """ :param resolution: sample resultion (seconds) :return: None """ - super(FlowDstPortTotals, self).__init__(resolution) + super(FlowDstPortTotals, self).__init__(resolution, database_dir) def add(self, flow): # most likely service (destination) port diff --git a/src/opnsense/scripts/netflow/lib/aggregates/source.py b/src/opnsense/scripts/netflow/lib/aggregates/source.py index b4d75f57b..b39752870 100644 --- a/src/opnsense/scripts/netflow/lib/aggregates/source.py +++ b/src/opnsense/scripts/netflow/lib/aggregates/source.py @@ -1,5 +1,5 @@ """ - Copyright (c) 2016 Ad Schellevis + Copyright (c) 2016-2018 Ad Schellevis All rights reserved. Redistribution and use in source and binary forms, with or without @@ -26,12 +26,13 @@ -------------------------------------------------------------------------------------- data aggregator type """ -from lib.aggregate import BaseFlowAggregator +from . import BaseFlowAggregator + class FlowSourceAddrTotals(BaseFlowAggregator): """ collect source totals """ - target_filename = '/var/netflow/src_addr_%06d.sqlite' + target_filename = 'src_addr_%06d.sqlite' agg_fields = ['if', 'src_addr', 'direction'] @classmethod @@ -39,7 +40,7 @@ class FlowSourceAddrTotals(BaseFlowAggregator): """ :return: list of sample resolutions """ - return [300, 3600, 86400] + return [300, 3600, 86400] @classmethod def history_per_resolution(cls): @@ -48,17 +49,18 @@ class FlowSourceAddrTotals(BaseFlowAggregator): """ # only save daily totals for a longer period of time, we probably only want to answer questions like # "top usage over the last 30 seconds, 5 minutes, etc.." - return {300: 3600, - 3600: 86400, - 86400: cls.seconds_per_day(365) - } + return { + 300: 3600, + 3600: 86400, + 86400: cls.seconds_per_day(365) + } - def __init__(self, resolution): + def __init__(self, resolution, database_dir='/var/netflow'): """ :param resolution: sample resultion (seconds) :return: None """ - super(FlowSourceAddrTotals, self).__init__(resolution) + super(FlowSourceAddrTotals, self).__init__(resolution, database_dir) def add(self, flow): # most likely service (destination) port @@ -70,10 +72,11 @@ class FlowSourceAddrTotals(BaseFlowAggregator): flow['direction'] = 'out' super(FlowSourceAddrTotals, self).add(flow) + class FlowSourceAddrDetails(BaseFlowAggregator): """ collect source details on a daily resolution """ - target_filename = '/var/netflow/src_addr_details_%06d.sqlite' + target_filename = 'src_addr_details_%06d.sqlite' agg_fields = ['if', 'direction', 'src_addr', 'dst_addr', 'service_port', 'protocol'] @classmethod @@ -81,21 +84,23 @@ class FlowSourceAddrDetails(BaseFlowAggregator): """ :return: list of sample resolutions """ - return [86400] + return [86400] @classmethod def history_per_resolution(cls): """ :return: dict sample resolution / expire time (seconds) """ - return {86400: cls.seconds_per_day(62)} + return { + 86400: cls.seconds_per_day(62) + } - def __init__(self, resolution): + def __init__(self, resolution, database_dir='/var/netflow'): """ - :param resolution: sample resultion (seconds) + :param resolution: sample resolution (seconds) :return: None """ - super(FlowSourceAddrDetails, self).__init__(resolution) + super(FlowSourceAddrDetails, self).__init__(resolution, database_dir) def add(self, flow): # most likely service (destination) port diff --git a/src/opnsense/scripts/netflow/lib/parse.py b/src/opnsense/scripts/netflow/lib/parse.py index 930b34515..af6cca6d8 100644 --- a/src/opnsense/scripts/netflow/lib/parse.py +++ b/src/opnsense/scripts/netflow/lib/parse.py @@ -48,8 +48,6 @@ PARSE_FLOW_FIELDS = [ {'check': flowd.FIELD_GATEWAY_ADDR, 'target': 'gateway_addr'}, {'check': flowd.FIELD_FLOW_TIMES, 'target': 'netflow_ver'}] -# location of flowd logfiles to use -FLOWD_LOG_FILES = '/var/log/flowd.log*' class Interfaces(object): """ mapper for local interface index to interface name (1 -> em0 for example) @@ -57,34 +55,35 @@ class Interfaces(object): def __init__(self): """ construct local interface mapping """ - self._ifIndex = dict() + self._if_index = dict() with tempfile.NamedTemporaryFile() as output_stream: subprocess.call(['/sbin/ifconfig', '-l'], stdout=output_stream, stderr=open(os.devnull, 'wb')) output_stream.seek(0) - ifIndex=1 + if_index = 1 for line in output_stream.read().split('\n')[0].split(): - self._ifIndex[str(ifIndex)] = line - ifIndex += 1 + self._if_index["%s" % if_index] = line + if_index += 1 - def if_device(self, ifIndex): + def if_device(self, if_index): """ convert index to device (if found) """ - if str(ifIndex) in self._ifIndex: + if "%s" % if_index in self._if_index: # found, return interface name - return self._ifIndex[str(ifIndex)] + return self._if_index["%s" % if_index] else: # not found, return index - return str(ifIndex) + return "%s" % if_index -def parse_flow(recv_stamp): +def parse_flow(recv_stamp, flowd_source='/var/log/flowd.log'): """ parse flowd logs and yield records (dict type) :param recv_stamp: last receive timestamp (recv) + :param flowd_source: flowd logfile :return: iterator flow details """ interfaces = Interfaces() parse_done = False - for filename in sorted(glob.glob(FLOWD_LOG_FILES)): + for filename in sorted(glob.glob('%s*' % flowd_source)): if parse_done: # log file contains older data (recv_stamp), break break diff --git a/src/opnsense/service/conf/actions.d/actions_netflow.conf b/src/opnsense/service/conf/actions.d/actions_netflow.conf index 23de1f5ff..fa0f4aa34 100644 --- a/src/opnsense/service/conf/actions.d/actions_netflow.conf +++ b/src/opnsense/service/conf/actions.d/actions_netflow.conf @@ -59,7 +59,7 @@ type:script message:start netflow data aggregator [aggregate.repair] -command:/usr/local/opnsense/scripts/netflow/flowd_aggregate.py repair && /usr/local/etc/rc.d/flowd_aggregate start +command:/usr/local/opnsense/scripts/netflow/flowd_aggregate.py --repair && /usr/local/etc/rc.d/flowd_aggregate start parameters: type:script message:force database repair