From a72fda79e058e91a433d535c8783d1d20dd12d34 Mon Sep 17 00:00:00 2001 From: Ad Schellevis Date: Fri, 1 Apr 2016 09:54:22 +0200 Subject: [PATCH] (netflow/flowd) move resolution to BaseFlowAggregator, add cleanup() to expire old data --- src/opnsense/scripts/netflow/lib/aggregate.py | 42 ++++++++++++++++++- .../netflow/lib/aggregates/interface.py | 18 ++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/src/opnsense/scripts/netflow/lib/aggregate.py b/src/opnsense/scripts/netflow/lib/aggregate.py index 86b4c17d9..843431e41 100644 --- a/src/opnsense/scripts/netflow/lib/aggregate.py +++ b/src/opnsense/scripts/netflow/lib/aggregate.py @@ -41,7 +41,8 @@ class AggMetadata(object): if not os.path.isdir(target_path): os.makedirs(target_path) # open sqlite database and cursor - self._db_connection = sqlite3.connect(self._filename) + self._db_connection = sqlite3.connect(self._filename, + detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) self._db_cursor = self._db_connection.cursor() # known tables self._tables = list() @@ -81,8 +82,23 @@ class BaseFlowAggregator(object): # list of fields to use in this aggregate agg_fields = None + @classmethod + def resolutions(cls): + """ sample resolutions for this aggregation + :return: list of sample resolutions + """ + return list() + + @classmethod + def history_per_resolution(cls): + """ history to keep in seconds per sample resolution + :return: dict sample resolution / expire time (seconds) + """ + return dict() + def __init__(self, resolution): """ construct new flow sample class + :return: None """ self.resolution = resolution # target table name, data_ @@ -101,6 +117,7 @@ class BaseFlowAggregator(object): def _fetch_known_targets(self): """ read known target table names from the sqlite db + :return: None """ if self._db_connection is not None: self._known_targets = list() @@ -112,6 +129,7 @@ class BaseFlowAggregator(object): def _create_target_table(self): """ construct target aggregate table, using resulution and list of agg_fields + :return: None """ if self._db_connection is not None: # construct new aggregate table @@ -132,6 +150,7 @@ class BaseFlowAggregator(object): def is_db_open(self): """ check if target database is open + :return: database connected (True/False) """ if self._db_connection is not None: return True @@ -140,6 +159,7 @@ class BaseFlowAggregator(object): def _open_db(self): """ open / create database + :return: None """ if self.target_filename is not None: # make sure the target directory exists @@ -147,12 +167,14 @@ class BaseFlowAggregator(object): if not os.path.isdir(target_path): os.makedirs(target_path) # open sqlite database - self._db_connection = sqlite3.connect(self.target_filename % self.resolution) + self._db_connection = sqlite3.connect(self.target_filename % self.resolution, + detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) # open update/insert cursor self._update_cur = self._db_connection.cursor() def commit(self): """ commit data + :return: None """ if self._db_connection is not None: self._db_connection.commit() @@ -160,6 +182,7 @@ class BaseFlowAggregator(object): def add(self, flow): """ calculate timeslices per flow depending on sample resolution :param flow: flow data (from parse.py) + :return: None """ # make sure target exists if 'timeserie' not in self._known_targets: @@ -184,3 +207,18 @@ class BaseFlowAggregator(object): self._update_cur.execute(self._insert_stmt, flow) # next start time start_time += self.resolution + + def cleanup(self): + """ cleanup timeserie table + :param expire: cleanup table, remove data older then [expire] seconds + :return: None + """ + if 'timeserie' in self._known_targets and self.resolution in self.history_per_resolution(): + self._update_cur.execute('select max(mtime) as "[timestamp]" from timeserie') + last_timestamp = self._update_cur.fetchall()[0][0] + if type(last_timestamp) == datetime.datetime: + expire = self.history_per_resolution()[self.resolution] + expire_timestamp = last_timestamp - datetime.timedelta(seconds=expire) + self._update_cur.execute('delete from timeserie where mtime < :expire', {'expire': expire_timestamp}) + self.commit() + # todo: might need vacuum at some point. diff --git a/src/opnsense/scripts/netflow/lib/aggregates/interface.py b/src/opnsense/scripts/netflow/lib/aggregates/interface.py index cbf5b3cde..0f1be0267 100644 --- a/src/opnsense/scripts/netflow/lib/aggregates/interface.py +++ b/src/opnsense/scripts/netflow/lib/aggregates/interface.py @@ -34,5 +34,23 @@ class FlowInterfaceTotals(BaseFlowAggregator): target_filename = '/var/netflow/interface_%06d.sqlite' agg_fields = ['if_in', 'if_out'] + @classmethod + def resolutions(cls): + """ + :return: list of sample resolutions + """ + return [60, 60*5] + + @classmethod + def history_per_resolution(cls): + """ + :return: dict sample resolution / expire time (seconds) + """ + return {60: 60*60} + def __init__(self, resolution): + """ + :param resolution: sample resultion (seconds) + :return: None + """ super(FlowInterfaceTotals, self).__init__(resolution)