diff --git a/src/opnsense/scripts/netflow/get_timeseries.py b/src/opnsense/scripts/netflow/get_timeseries.py index c6c14e6e9..526999efa 100755 --- a/src/opnsense/scripts/netflow/get_timeseries.py +++ b/src/opnsense/scripts/netflow/get_timeseries.py @@ -70,7 +70,7 @@ if valid_params: for agg_class in lib.aggregates.get_aggregators(): if app_params['provider'] == agg_class.__name__: obj = agg_class(resolution) - for record in obj.get_data(start_time, end_time, key_fields): + for record in obj.get_timeserie_data(start_time, end_time, key_fields): record_key = [] for key_field in key_fields: if key_field in record and record[key_field] != None: diff --git a/src/opnsense/scripts/netflow/get_top_usage.py b/src/opnsense/scripts/netflow/get_top_usage.py new file mode 100755 index 000000000..a213c6a57 --- /dev/null +++ b/src/opnsense/scripts/netflow/get_top_usage.py @@ -0,0 +1,101 @@ +#!/usr/local/bin/python2.7 + +""" + Copyright (c) 2016 Ad Schellevis + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, + INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY + AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, + OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + POSSIBILITY OF SUCH DAMAGE. + + -------------------------------------------------------------------------------------- + fetch top usage from data provider +""" +import time +import datetime +import os +import sys +import ujson +sys.path.insert(0, "/usr/local/opnsense/site-python") +from lib.parse import parse_flow +from lib.aggregate import BaseFlowAggregator +import lib.aggregates +import params + + +app_params = {'start_time': '', + 'end_time': '', + 'key_fields': '', + 'value_field': '', + 'filter': '', + 'max_hits': '', + 'provider': '' + } +params.update_params(app_params) + +# handle input parameters +valid_params = False +if app_params['start_time'].isdigit(): + start_time = int(app_params['start_time']) + if app_params['end_time'].isdigit(): + end_time = int(app_params['end_time']) + if app_params['max_hits'].isdigit(): + max_hits = int(app_params['max_hits']) + if app_params['key_fields']: + key_fields = app_params['key_fields'].split(',') + if app_params['value_field']: + value_field = app_params['value_field'] + valid_params = True +data_filter=app_params['filter'] + +timeseries=dict() +if valid_params: + # collect requested top + result = dict() + for agg_class in lib.aggregates.get_aggregators(): + if app_params['provider'] == agg_class.__name__: + # provider may specify multiple resolutions, we need to find the one most likely to serve the + # beginning of our timeframe + resolutions = sorted(agg_class.resolutions()) + history_per_resolution = agg_class.history_per_resolution() + for resolution in resolutions: + if (resolution in history_per_resolution \ + and time.time() - history_per_resolution[resolution] <= start_time ) \ + or resolutions[-1] == resolution: + selected_resolution = resolution + break + obj = agg_class(selected_resolution) + result = obj.get_top_data(start_time, end_time, key_fields, value_field, data_filter, max_hits) + print (ujson.dumps(result)) +else: + print ('missing parameters :') + tmp = list() + for key in app_params: + tmp.append('/%s %s' % (key, app_params[key])) + print (' %s %s'%(sys.argv[0], ' '.join(tmp))) + print ('') + print (' start_time : start time (seconds since epoch)') + print (' end_time : end timestamp (seconds since epoch)') + print (' key_fields : key field(s)') + print (' value_field : field to sum') + print (' filter : apply filter =value') + print (' provider : data provider classname') + print (' max_hits : maximum number of hits (+1 for rest of data)') + print (' sample : if provided, use these keys to generate sample data (e.g. em0,em1,em2)') diff --git a/src/opnsense/scripts/netflow/lib/aggregate.py b/src/opnsense/scripts/netflow/lib/aggregate.py index ee0204351..372062a03 100644 --- a/src/opnsense/scripts/netflow/lib/aggregate.py +++ b/src/opnsense/scripts/netflow/lib/aggregate.py @@ -267,7 +267,32 @@ class BaseFlowAggregator(object): # vacuum database if requested self._update_cur.execute('vacuum') - def get_data(self, start_time, end_time, fields): + def _parse_timestamp(self, timestamp): + """ convert input to datetime.datetime or return if it already was of that type + :param timestamp: timestamp to convert + :return: datetime.datetime object + """ + if type(timestamp) in (int, float): + return datetime.datetime.fromtimestamp(timestamp) + elif type(timestamp) != datetime.datetime: + return datetime.datetime.fromtimestamp(0) + else: + return timestamp + + def _valid_fields(self, fields): + """ cleanse fields (return only valid ones) + :param fields: field list + :return: list + """ + # validate field list (can only select fields in self.agg_fields) + select_fields = list() + for field in fields: + if field.strip() in self.agg_fields: + select_fields.append(field.strip()) + + return select_fields + + def get_timeserie_data(self, start_time, end_time, fields): """ fetch data from aggregation source, groups by mtime and selected fields :param start_time: start timestamp :param end_time: end timestamp @@ -276,10 +301,7 @@ class BaseFlowAggregator(object): """ if self.is_db_open() and 'timeserie' in self._known_targets: # validate field list (can only select fields in self.agg_fields) - select_fields = list() - for field in self.agg_fields: - if field in fields: - select_fields.append(field) + select_fields = self._valid_fields(fields) if len(select_fields) == 0: # select "none", add static null as field select_fields.append('null') @@ -288,20 +310,11 @@ class BaseFlowAggregator(object): sql_select += 'from timeserie \n' sql_select += 'where mtime >= :start_time and mtime < :end_time\n' sql_select += 'group by mtime, %s\n'% ','.join(select_fields) - # make sure start- and end time are of datetime.datetime type - if type(start_time) in (int, float): - start_time = datetime.datetime.fromtimestamp(start_time) - elif type(start_time) != datetime.datetime: - start_time = datetime.datetime.fromtimestamp(0) - - if type(end_time) in (int, float): - end_time = datetime.datetime.fromtimestamp(end_time) - elif type(end_time) != datetime.datetime: - end_time = datetime.datetime.fromtimestamp(0) # execute select query cur = self._db_connection.cursor() - cur.execute(sql_select, {'start_time': start_time, 'end_time': end_time}) + cur.execute(sql_select, {'start_time': self._parse_timestamp(start_time), + 'end_time': self._parse_timestamp(end_time)}) # field_names = (map(lambda x:x[0], cur.description)) for record in cur.fetchall(): @@ -315,3 +328,72 @@ class BaseFlowAggregator(object): yield result_record # close cursor cur.close() + + def get_top_data(self, start_time, end_time, fields, value_field, data_filter=None, max_hits=100): + """ Retrieve top (usage) from this aggregation. + Fetch data from aggregation source, groups by selected fields, sorts by value_field descending + use data_filter to filter before grouping. + :param start_time: start timestamp + :param end_time: end timestamp + :param fields: fields to retrieve + :param value_field: field to sum + :param data_filter: filter data, use as field=value + :param max_hits: maximum number of results, rest is summed into (other) + :return: iterator returning dict records (start_time, end_time, [fields], octets, packets) + """ + result = list() + select_fields = self._valid_fields(fields) + filter_fields = [] + query_params = {} + if value_field == 'octets': + value_sql = 'sum(octets)' + elif value_field == 'packets': + value_sql = 'sum(packets)' + else: + value_sql = '0' + + # query filters + query_params['start_time'] = self._parse_timestamp(start_time) + query_params['end_time'] = self._parse_timestamp(end_time) + if data_filter: + tmp = data_filter.split('=')[0].strip() + if tmp in self.agg_fields and data_filter.find('=') > -1: + filter_fields.append(tmp) + query_params[tmp] = '='.join(data_filter.split('=')[1:]) + + if len(select_fields) > 0: + # construct sql query to filter and select data + sql_select = 'select %s' % ','.join(select_fields) + sql_select += ', %s as total \n' % value_sql + sql_select += 'from timeserie \n' + sql_select += 'where mtime >= :start_time and mtime < :end_time\n' + for filter_field in filter_fields: + sql_select += ' and %s = :%s \n' % (filter_field, filter_field) + sql_select += 'group by %s\n'% ','.join(select_fields) + sql_select += 'order by %s desc ' % value_sql + + # execute select query + cur = self._db_connection.cursor() + cur.execute(sql_select, query_params) + + # fetch all data, to a max of [max_hits] rows. + field_names = (map(lambda x:x[0], cur.description)) + for record in cur.fetchall(): + result_record = dict() + for field_indx in range(len(field_names)): + if len(record) > field_indx: + result_record[field_names[field_indx]] = record[field_indx] + if len(result) < max_hits: + result.append(result_record) + else: + if len(result) == max_hits: + # generate row for "rest of data" + result.append({'total': 0}) + for key in result_record: + if key not in result[-1]: + result[-1][key] = "" + result[-1]['total'] += result_record['total'] + # close cursor + cur.close() + + return result