mirror of
https://github.com/lucaspalomodevelop/core.git
synced 2026-03-17 10:04:41 +00:00
Network insight, cleanups and restructures for https://github.com/opnsense/core/issues/2964
Adds configuration yaml support, using a simple config yaml containing the previously hardcoded values: ``` library_path: /path/to/core/src/opnsense/site-python flowd_source: /path/to/flowd.log database_dir: /path/to/netflow pid_filename: /path/to/netflow.pid ``` Startup using local config, using: ``` flowd_aggregate.py -c my_config.yaml --console ``` Further then path relocations, this commit should not contain any new features. The other scripts, not related to the daemon have not been altered (yet)
This commit is contained in:
parent
47d4b9b1ed
commit
dc38875202
@ -1,6 +1,6 @@
|
||||
#!/usr/local/bin/python2.7
|
||||
"""
|
||||
Copyright (c) 2016 Ad Schellevis <ad@opnsense.org>
|
||||
Copyright (c) 2016-2018 Ad Schellevis <ad@opnsense.org>
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
@ -36,36 +36,36 @@ import glob
|
||||
import copy
|
||||
import syslog
|
||||
import traceback
|
||||
sys.path.insert(0, "/usr/local/opnsense/site-python")
|
||||
from sqlite3_helper import check_and_repair
|
||||
import argparse
|
||||
from lib import load_config
|
||||
from lib.parse import parse_flow
|
||||
from lib.aggregate import AggMetadata
|
||||
import lib.aggregates
|
||||
from daemonize import Daemonize
|
||||
|
||||
|
||||
MAX_FILE_SIZE_MB=10
|
||||
MAX_LOGS=10
|
||||
MAX_FILE_SIZE_MB = 10
|
||||
MAX_LOGS = 10
|
||||
|
||||
|
||||
def aggregate_flowd(do_vacuum=False):
|
||||
def aggregate_flowd(config, do_vacuum=False):
|
||||
""" aggregate collected flowd data
|
||||
:param config: script configuration
|
||||
:param do_vacuum: vacuum database after cleanup
|
||||
:return: None
|
||||
"""
|
||||
# init metadata (progress maintenance)
|
||||
metadata = AggMetadata()
|
||||
metadata = AggMetadata(config.database_dir)
|
||||
|
||||
# register aggregate classes to stream data to
|
||||
stream_agg_objects = list()
|
||||
for agg_class in lib.aggregates.get_aggregators():
|
||||
for resolution in agg_class.resolutions():
|
||||
stream_agg_objects.append(agg_class(resolution))
|
||||
stream_agg_objects.append(agg_class(resolution, config.database_dir))
|
||||
|
||||
# parse flow data and stream to registered consumers
|
||||
prev_recv = metadata.last_sync()
|
||||
commit_record_count = 0
|
||||
for flow_record in parse_flow(prev_recv):
|
||||
for flow_record in parse_flow(prev_recv, config.flowd_source):
|
||||
if flow_record is None or (prev_recv != flow_record['recv'] and commit_record_count > 100000):
|
||||
# commit data on receive timestamp change or last record
|
||||
for stream_agg_object in stream_agg_objects:
|
||||
@ -88,15 +88,17 @@ def aggregate_flowd(do_vacuum=False):
|
||||
del metadata
|
||||
|
||||
|
||||
def check_rotate():
|
||||
def check_rotate(filename, pid_filename):
|
||||
""" Checks if flowd log needs to be rotated, if so perform rotate.
|
||||
We keep [MAX_LOGS] number of logs containing approx. [MAX_FILE_SIZE_MB] data, the flowd data probably contains
|
||||
more detailed data then the stored aggregates.
|
||||
:param filename: flowd logfile
|
||||
:param pid_filename: filename of pid
|
||||
:return: None
|
||||
"""
|
||||
if os.path.getsize("/var/log/flowd.log")/1024/1024 > MAX_FILE_SIZE_MB:
|
||||
if os.path.getsize(filename)/1024/1024 > MAX_FILE_SIZE_MB:
|
||||
# max filesize reached rotate
|
||||
filenames = sorted(glob.glob('/var/log/flowd.log.*'), reverse=True)
|
||||
filenames = sorted(glob.glob('%s.*' % filename), reverse=True)
|
||||
file_sequence = len(filenames)
|
||||
for filename in filenames:
|
||||
sequence = filename.split('.')[-1]
|
||||
@ -106,11 +108,11 @@ def check_rotate():
|
||||
elif int(sequence) != 0:
|
||||
os.rename(filename, filename.replace('.%s' % sequence, '.%06d' % (int(sequence)+1)))
|
||||
file_sequence -= 1
|
||||
# rename /var/log/flowd.log
|
||||
os.rename('/var/log/flowd.log', '/var/log/flowd.log.000001')
|
||||
# rename flowd.log
|
||||
os.rename(filename, '%s.000001' % filename)
|
||||
# signal flowd for new log file
|
||||
if os.path.isfile('/var/run/flowd.pid'):
|
||||
pid = open('/var/run/flowd.pid').read().strip()
|
||||
if os.path.isfile(pid_filename):
|
||||
pid = open(pid_filename).read().strip()
|
||||
if pid.isdigit():
|
||||
try:
|
||||
os.kill(int(pid), signal.SIGUSR1)
|
||||
@ -119,6 +121,12 @@ def check_rotate():
|
||||
|
||||
|
||||
class Main(object):
|
||||
config = None
|
||||
|
||||
@classmethod
|
||||
def set_config(cls, config):
|
||||
cls.config = config
|
||||
|
||||
def __init__(self):
|
||||
""" construct, hook signal handler and run aggregators
|
||||
:return: None
|
||||
@ -133,7 +141,7 @@ class Main(object):
|
||||
"""
|
||||
# check database consistency / repair
|
||||
syslog.syslog(syslog.LOG_NOTICE, 'startup, check database.')
|
||||
check_and_repair('/var/netflow/*.sqlite')
|
||||
check_and_repair('%s/*.sqlite' % self.config.database_dir)
|
||||
|
||||
vacuum_interval = (60*60*8) # 8 hour vacuum cycle
|
||||
vacuum_countdown = None
|
||||
@ -148,14 +156,15 @@ class Main(object):
|
||||
|
||||
# run aggregate
|
||||
try:
|
||||
aggregate_flowd(do_vacuum)
|
||||
aggregate_flowd(self.config, do_vacuum)
|
||||
if do_vacuum:
|
||||
syslog.syslog(syslog.LOG_NOTICE, 'vacuum done')
|
||||
except:
|
||||
syslog.syslog(syslog.LOG_ERR, 'flowd aggregate died with message %s' % (traceback.format_exc()))
|
||||
return
|
||||
# rotate if needed
|
||||
check_rotate()
|
||||
check_rotate(self.config.flowd_source, self.config.pid_filename)
|
||||
|
||||
# wait for next pass, exit on sigterm
|
||||
for i in range(30):
|
||||
if self.running:
|
||||
@ -172,37 +181,52 @@ class Main(object):
|
||||
self.running = False
|
||||
|
||||
|
||||
if len(sys.argv) > 1 and 'console' in sys.argv[1:]:
|
||||
# command line start
|
||||
if 'profile' in sys.argv[1:]:
|
||||
# start with profiling
|
||||
import cProfile
|
||||
import StringIO
|
||||
import pstats
|
||||
if __name__ == '__main__':
|
||||
# parse arguments and load config
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('-c', '--config', help='configuration yaml', default=None)
|
||||
parser.add_argument('--console', dest='console', help='run in console', action='store_true')
|
||||
parser.add_argument('--profile', dest='profile', help='enable profiler', action='store_true')
|
||||
parser.add_argument('--repair', dest='repair', help='init repair', action='store_true')
|
||||
cmd_args = parser.parse_args()
|
||||
|
||||
pr = cProfile.Profile(builtins=False)
|
||||
pr.enable()
|
||||
Main()
|
||||
pr.disable()
|
||||
s = StringIO.StringIO()
|
||||
sortby = 'cumulative'
|
||||
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
|
||||
ps.print_stats()
|
||||
print s.getvalue()
|
||||
Main.set_config(
|
||||
load_config(cmd_args.config)
|
||||
)
|
||||
|
||||
if cmd_args.console:
|
||||
# command line start
|
||||
if cmd_args.profile:
|
||||
# start with profiling
|
||||
import cProfile
|
||||
import StringIO
|
||||
import pstats
|
||||
|
||||
pr = cProfile.Profile(builtins=False)
|
||||
pr.enable()
|
||||
Main()
|
||||
pr.disable()
|
||||
s = StringIO.StringIO()
|
||||
sortby = 'cumulative'
|
||||
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
|
||||
ps.print_stats()
|
||||
print s.getvalue()
|
||||
else:
|
||||
Main()
|
||||
elif cmd_args.repair:
|
||||
# force a database repair, when
|
||||
try:
|
||||
from sqlite3_helper import check_and_repair
|
||||
lck = open(Main.config.pid_filename, 'a+')
|
||||
fcntl.flock(lck, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
check_and_repair(filename_mask='%s/*.sqlite' % Main.config.database_dir, force_repair=True)
|
||||
lck.close()
|
||||
os.remove(Main.config.pid_filename)
|
||||
except IOError:
|
||||
# already running, exit status 99
|
||||
sys.exit(99)
|
||||
else:
|
||||
Main()
|
||||
elif len(sys.argv) > 1 and 'repair' in sys.argv[1:]:
|
||||
# force a database repair, when
|
||||
try:
|
||||
lck = open('/var/run/flowd_aggregate.pid', 'a+')
|
||||
fcntl.flock(lck, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
check_and_repair(filename_mask='/var/netflow/*.sqlite', force_repair=True)
|
||||
lck.close()
|
||||
os.remove('/var/run/flowd_aggregate.pid')
|
||||
except IOError:
|
||||
# already running, exit status 99
|
||||
sys.exit(99)
|
||||
else:
|
||||
# Daemonize flowd aggregator
|
||||
daemon = Daemonize(app="flowd_aggregate", pid='/var/run/flowd_aggregate.pid', action=Main)
|
||||
daemon.start()
|
||||
# Daemonize flowd aggregator
|
||||
from daemonize import Daemonize
|
||||
daemon = Daemonize(app="flowd_aggregate", pid=Main.config.pid_filename, action=Main)
|
||||
daemon.start()
|
||||
|
||||
@ -0,0 +1,58 @@
|
||||
"""
|
||||
Copyright (c) 2018 Ad Schellevis <ad@opnsense.org>
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
1. Redistributions of source code must retain the above copyright notice,
|
||||
this list of conditions and the following disclaimer.
|
||||
|
||||
2. Redistributions in binary form must reproduce the above copyright
|
||||
notice, this list of conditions and the following disclaimer in the
|
||||
documentation and/or other materials provided with the distribution.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
|
||||
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
|
||||
AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
|
||||
AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
|
||||
OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
POSSIBILITY OF SUCH DAMAGE.
|
||||
"""
|
||||
import sys
|
||||
|
||||
|
||||
def load_config(config_yaml=None):
|
||||
""" setup configuration object
|
||||
:param config_yaml:
|
||||
:return:
|
||||
"""
|
||||
if config_yaml:
|
||||
import yaml
|
||||
cnf_input = yaml.load(open(config_yaml, 'r'))
|
||||
else:
|
||||
cnf_input = dict()
|
||||
|
||||
result = Config(**cnf_input)
|
||||
sys.path.insert(0, result.library_path)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class Config(object):
|
||||
""" Simple configuration wrapper for our netflow scripts, containing our defaults
|
||||
"""
|
||||
library_path = '/usr/local/opnsense/site-python'
|
||||
pid_filename = '/var/run/flowd_aggregate.pid'
|
||||
flowd_source = '/var/log/flowd.log'
|
||||
database_dir = '/var/netflow'
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
for key in kwargs:
|
||||
if hasattr(self, key):
|
||||
setattr(self, key, kwargs[key])
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
"""
|
||||
Copyright (c) 2016 Ad Schellevis <ad@opnsense.org>
|
||||
Copyright (c) 2016-2018 Ad Schellevis <ad@opnsense.org>
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
@ -27,11 +27,11 @@
|
||||
aggregate flow data (format in parse.py) into sqlite structured container per type/resolution.
|
||||
Implementations are collected in lib\aggregates\
|
||||
"""
|
||||
import syslog
|
||||
import os
|
||||
import datetime
|
||||
import sqlite3
|
||||
|
||||
|
||||
def convert_timestamp(val):
|
||||
""" convert timestamps from string (internal sqlite type) or seconds since epoch
|
||||
"""
|
||||
@ -57,20 +57,22 @@ def convert_timestamp(val):
|
||||
|
||||
return val
|
||||
|
||||
|
||||
sqlite3.register_converter('timestamp', convert_timestamp)
|
||||
|
||||
|
||||
class AggMetadata(object):
|
||||
""" store some metadata needed to keep track of parse progress
|
||||
"""
|
||||
def __init__(self):
|
||||
self._filename = '/var/netflow/metadata.sqlite'
|
||||
def __init__(self, database_dir='/var/netflow'):
|
||||
self._filename = '%s/metadata.sqlite' % database_dir
|
||||
# make sure the target directory exists
|
||||
target_path = os.path.dirname(self._filename)
|
||||
if not os.path.isdir(target_path):
|
||||
os.makedirs(target_path)
|
||||
# open sqlite database and cursor
|
||||
self._db_connection = sqlite3.connect(self._filename, timeout=60,
|
||||
detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
|
||||
detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
|
||||
self._db_cursor = self._db_connection.cursor()
|
||||
# known tables
|
||||
self._tables = list()
|
||||
@ -87,7 +89,6 @@ class AggMetadata(object):
|
||||
def _update_known_tables(self):
|
||||
""" request known tables
|
||||
"""
|
||||
result = list()
|
||||
self._db_cursor.execute('SELECT name FROM sqlite_master')
|
||||
for record in self._db_cursor.fetchall():
|
||||
self._tables.append(record[0])
|
||||
@ -110,337 +111,3 @@ class AggMetadata(object):
|
||||
else:
|
||||
self._db_cursor.execute('select max(mtime) from sync_timestamp')
|
||||
return self._db_cursor.fetchall()[0][0]
|
||||
|
||||
class BaseFlowAggregator(object):
|
||||
# target location ('/var/netflow/<store>.sqlite')
|
||||
target_filename = None
|
||||
# list of fields to use in this aggregate
|
||||
agg_fields = None
|
||||
|
||||
@classmethod
|
||||
def resolutions(cls):
|
||||
""" sample resolutions for this aggregation
|
||||
:return: list of sample resolutions
|
||||
"""
|
||||
return list()
|
||||
|
||||
@classmethod
|
||||
def history_per_resolution(cls):
|
||||
""" history to keep in seconds per sample resolution
|
||||
:return: dict sample resolution / expire time (seconds)
|
||||
"""
|
||||
return dict()
|
||||
|
||||
@classmethod
|
||||
def seconds_per_day(cls, days):
|
||||
"""
|
||||
:param days: number of days
|
||||
:return: number of seconds
|
||||
"""
|
||||
return 60*60*24*days
|
||||
|
||||
def __init__(self, resolution):
|
||||
""" construct new flow sample class
|
||||
:return: None
|
||||
"""
|
||||
self.resolution = resolution
|
||||
# target table name, data_<resolution in seconds>
|
||||
self._db_connection = None
|
||||
self._update_cur = None
|
||||
self._known_targets = list()
|
||||
# construct update and insert sql statements
|
||||
tmp = 'update timeserie set last_seen = :flow_end, '
|
||||
tmp += 'octets = octets + :octets_consumed, packets = packets + :packets_consumed '
|
||||
tmp += 'where mtime = :mtime and %s '
|
||||
self._update_stmt = tmp % (' and '.join(map(lambda x: '%s = :%s' % (x, x), self.agg_fields)))
|
||||
tmp = 'insert into timeserie (mtime, last_seen, octets, packets, %s) '
|
||||
tmp += 'values (:mtime, :flow_end, :octets_consumed, :packets_consumed, %s)'
|
||||
self._insert_stmt = tmp % (','.join(self.agg_fields), ','.join(map(lambda x: ':%s' % x, self.agg_fields)))
|
||||
# open database
|
||||
self._open_db()
|
||||
self._fetch_known_targets()
|
||||
|
||||
def __del__(self):
|
||||
""" close database on destruct
|
||||
:return: None
|
||||
"""
|
||||
if self._db_connection is not None:
|
||||
self._db_connection.close()
|
||||
|
||||
def _fetch_known_targets(self):
|
||||
""" read known target table names from the sqlite db
|
||||
:return: None
|
||||
"""
|
||||
if self._db_connection is not None:
|
||||
self._known_targets = list()
|
||||
cur = self._db_connection.cursor()
|
||||
cur.execute('SELECT name FROM sqlite_master')
|
||||
for record in cur.fetchall():
|
||||
self._known_targets.append(record[0])
|
||||
cur.close()
|
||||
|
||||
def _create_target_table(self):
|
||||
""" construct target aggregate table, using resulution and list of agg_fields
|
||||
:return: None
|
||||
"""
|
||||
if self._db_connection is not None:
|
||||
# construct new aggregate table
|
||||
sql_text = list()
|
||||
sql_text.append('create table timeserie ( ')
|
||||
sql_text.append(' mtime timestamp')
|
||||
sql_text.append(', last_seen timestamp')
|
||||
for agg_field in self.agg_fields:
|
||||
sql_text.append(', %s varchar(255)' % agg_field)
|
||||
sql_text.append(', octets numeric')
|
||||
sql_text.append(', packets numeric')
|
||||
sql_text.append(', primary key(mtime, %s)' % ','.join(self.agg_fields))
|
||||
sql_text.append(')')
|
||||
cur = self._db_connection.cursor()
|
||||
cur.executescript('\n'.join(sql_text))
|
||||
cur.close()
|
||||
# read table names
|
||||
self._fetch_known_targets()
|
||||
|
||||
def is_db_open(self):
|
||||
""" check if target database is open
|
||||
:return: database connected (True/False)
|
||||
"""
|
||||
if self._db_connection is not None:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _open_db(self):
|
||||
""" open / create database
|
||||
:return: None
|
||||
"""
|
||||
if self.target_filename is not None:
|
||||
# make sure the target directory exists
|
||||
target_path = os.path.dirname(self.target_filename % self.resolution)
|
||||
if not os.path.isdir(target_path):
|
||||
os.makedirs(target_path)
|
||||
# open sqlite database
|
||||
self._db_connection = sqlite3.connect(self.target_filename % self.resolution, timeout=60,
|
||||
detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
|
||||
# open update/insert cursor
|
||||
self._update_cur = self._db_connection.cursor()
|
||||
|
||||
def commit(self):
|
||||
""" commit data
|
||||
:return: None
|
||||
"""
|
||||
if self._db_connection is not None:
|
||||
self._db_connection.commit()
|
||||
|
||||
|
||||
def add(self, flow):
|
||||
""" calculate timeslices per flow depending on sample resolution
|
||||
:param flow: flow data (from parse.py)
|
||||
:return: None
|
||||
"""
|
||||
# make sure target exists
|
||||
if 'timeserie' not in self._known_targets:
|
||||
self._create_target_table()
|
||||
|
||||
# push record(s) depending on resolution
|
||||
start_time = int(flow['flow_start'] / self.resolution) * self.resolution
|
||||
while start_time <= flow['flow_end']:
|
||||
consume_start_time = max(flow['flow_start'], start_time)
|
||||
consume_end_time = min(start_time + self.resolution, flow['flow_end'])
|
||||
if flow['duration_ms'] != 0:
|
||||
consume_perc = (consume_end_time - consume_start_time) / float(flow['duration_ms'] / 1000.0)
|
||||
else:
|
||||
consume_perc = 1
|
||||
if self.is_db_open():
|
||||
# upsert data
|
||||
flow['octets_consumed'] = consume_perc * flow['octets']
|
||||
flow['packets_consumed'] = consume_perc * flow['packets']
|
||||
flow['mtime'] = datetime.datetime.utcfromtimestamp(start_time)
|
||||
self._update_cur.execute(self._update_stmt, flow)
|
||||
if self._update_cur.rowcount == 0:
|
||||
self._update_cur.execute(self._insert_stmt, flow)
|
||||
# next start time
|
||||
start_time += self.resolution
|
||||
|
||||
def cleanup(self, do_vacuum = False):
|
||||
""" cleanup timeserie table
|
||||
:param expire: cleanup table, remove data older then [expire] seconds
|
||||
:param do_vacuum: vacuum database
|
||||
:return: None
|
||||
"""
|
||||
if self.is_db_open() and 'timeserie' in self._known_targets \
|
||||
and self.resolution in self.history_per_resolution():
|
||||
self._update_cur.execute('select max(mtime) as "[timestamp]" from timeserie')
|
||||
last_timestamp = self._update_cur.fetchall()[0][0]
|
||||
if type(last_timestamp) == datetime.datetime:
|
||||
expire = self.history_per_resolution()[self.resolution]
|
||||
expire_timestamp = last_timestamp - datetime.timedelta(seconds=expire)
|
||||
self._update_cur.execute('delete from timeserie where mtime < :expire', {'expire': expire_timestamp})
|
||||
self.commit()
|
||||
if do_vacuum:
|
||||
# vacuum database if requested
|
||||
syslog.syslog(syslog.LOG_NOTICE, 'vacuum %s' % (self.target_filename % self.resolution))
|
||||
self._update_cur.execute('vacuum')
|
||||
|
||||
def _parse_timestamp(self, timestamp):
|
||||
""" convert input to datetime.datetime or return if it already was of that type
|
||||
:param timestamp: timestamp to convert
|
||||
:return: datetime.datetime object
|
||||
"""
|
||||
if type(timestamp) in (int, float):
|
||||
return datetime.datetime.utcfromtimestamp(timestamp)
|
||||
elif type(timestamp) != datetime.datetime:
|
||||
return datetime.datetime.utcfromtimestamp(0)
|
||||
else:
|
||||
return timestamp
|
||||
|
||||
def _valid_fields(self, fields):
|
||||
""" cleanse fields (return only valid ones)
|
||||
:param fields: field list
|
||||
:return: list
|
||||
"""
|
||||
# validate field list (can only select fields in self.agg_fields)
|
||||
select_fields = list()
|
||||
for field in fields:
|
||||
if field.strip() in self.agg_fields:
|
||||
select_fields.append(field.strip())
|
||||
|
||||
return select_fields
|
||||
|
||||
def get_timeserie_data(self, start_time, end_time, fields):
|
||||
""" fetch data from aggregation source, groups by mtime and selected fields
|
||||
:param start_time: start timestamp
|
||||
:param end_time: end timestamp
|
||||
:param fields: fields to retrieve
|
||||
:return: iterator returning dict records (start_time, end_time, [fields], octets, packets)
|
||||
"""
|
||||
if self.is_db_open() and 'timeserie' in self._known_targets:
|
||||
# validate field list (can only select fields in self.agg_fields)
|
||||
select_fields = self._valid_fields(fields)
|
||||
if len(select_fields) == 0:
|
||||
# select "none", add static null as field
|
||||
select_fields.append('null')
|
||||
sql_select = 'select mtime as "start_time [timestamp]", %s' % ','.join(select_fields)
|
||||
sql_select += ', sum(octets) as octets, sum(packets) as packets\n'
|
||||
sql_select += 'from timeserie \n'
|
||||
sql_select += 'where mtime >= :start_time and mtime < :end_time\n'
|
||||
sql_select += 'group by mtime, %s\n'% ','.join(select_fields)
|
||||
|
||||
# execute select query
|
||||
cur = self._db_connection.cursor()
|
||||
cur.execute(sql_select, {'start_time': self._parse_timestamp(start_time),
|
||||
'end_time': self._parse_timestamp(end_time)})
|
||||
#
|
||||
field_names = (map(lambda x:x[0], cur.description))
|
||||
for record in cur.fetchall():
|
||||
result_record = dict()
|
||||
for field_indx in range(len(field_names)):
|
||||
if len(record) > field_indx:
|
||||
result_record[field_names[field_indx]] = record[field_indx]
|
||||
if 'start_time' in result_record:
|
||||
result_record['end_time'] = result_record['start_time'] + datetime.timedelta(seconds=self.resolution)
|
||||
# send data
|
||||
yield result_record
|
||||
# close cursor
|
||||
cur.close()
|
||||
|
||||
def get_top_data(self, start_time, end_time, fields, value_field, data_filters=None, max_hits=100):
|
||||
""" Retrieve top (usage) from this aggregation.
|
||||
Fetch data from aggregation source, groups by selected fields, sorts by value_field descending
|
||||
use data_filter to filter before grouping.
|
||||
:param start_time: start timestamp
|
||||
:param end_time: end timestamp
|
||||
:param fields: fields to retrieve
|
||||
:param value_field: field to sum
|
||||
:param data_filter: filter data, use as field=value
|
||||
:param max_hits: maximum number of results, rest is summed into (other)
|
||||
:return: iterator returning dict records (start_time, end_time, [fields], octets, packets)
|
||||
"""
|
||||
result = list()
|
||||
if self.is_db_open() and 'timeserie' in self._known_targets:
|
||||
select_fields = self._valid_fields(fields)
|
||||
filter_fields = []
|
||||
query_params = {}
|
||||
if value_field == 'octets':
|
||||
value_sql = 'sum(octets)'
|
||||
elif value_field == 'packets':
|
||||
value_sql = 'sum(packets)'
|
||||
else:
|
||||
value_sql = '0'
|
||||
|
||||
# query filters, correct start_time for resolution
|
||||
query_params['start_time'] = self._parse_timestamp((int(start_time/self.resolution))*self.resolution)
|
||||
query_params['end_time'] = self._parse_timestamp(end_time)
|
||||
if data_filters:
|
||||
for data_filter in data_filters.split(','):
|
||||
tmp = data_filter.split('=')[0].strip()
|
||||
if tmp in self.agg_fields and data_filter.find('=') > -1:
|
||||
filter_fields.append(tmp)
|
||||
query_params[tmp] = '='.join(data_filter.split('=')[1:])
|
||||
|
||||
if len(select_fields) > 0:
|
||||
# construct sql query to filter and select data
|
||||
sql_select = 'select %s' % ','.join(select_fields)
|
||||
sql_select += ', %s as total, max(last_seen) last_seen \n' % value_sql
|
||||
sql_select += 'from timeserie \n'
|
||||
sql_select += 'where mtime >= :start_time and mtime < :end_time\n'
|
||||
for filter_field in filter_fields:
|
||||
sql_select += ' and %s = :%s \n' % (filter_field, filter_field)
|
||||
sql_select += 'group by %s\n'% ','.join(select_fields)
|
||||
sql_select += 'order by %s desc ' % value_sql
|
||||
|
||||
# execute select query
|
||||
cur = self._db_connection.cursor()
|
||||
cur.execute(sql_select, query_params)
|
||||
|
||||
# fetch all data, to a max of [max_hits] rows.
|
||||
field_names = (map(lambda x:x[0], cur.description))
|
||||
for record in cur.fetchall():
|
||||
result_record = dict()
|
||||
for field_indx in range(len(field_names)):
|
||||
if len(record) > field_indx:
|
||||
result_record[field_names[field_indx]] = record[field_indx]
|
||||
if len(result) < max_hits:
|
||||
result.append(result_record)
|
||||
else:
|
||||
if len(result) == max_hits:
|
||||
# generate row for "rest of data"
|
||||
result.append({'total': 0})
|
||||
for key in result_record:
|
||||
if key not in result[-1]:
|
||||
result[-1][key] = ""
|
||||
result[-1]['total'] += result_record['total']
|
||||
# close cursor
|
||||
cur.close()
|
||||
|
||||
return result
|
||||
|
||||
def get_data(self, start_time, end_time):
|
||||
""" get detail data
|
||||
:param start_time: start timestamp
|
||||
:param end_time: end timestamp
|
||||
:return: iterator
|
||||
"""
|
||||
if self.is_db_open() and 'timeserie' in self._known_targets:
|
||||
query_params = {}
|
||||
query_params['start_time'] = self._parse_timestamp((int(start_time/self.resolution))*self.resolution)
|
||||
query_params['end_time'] = self._parse_timestamp(end_time)
|
||||
sql_select = 'select mtime start_time, '
|
||||
sql_select += '%s, octets, packets, last_seen as "last_seen [timestamp]" \n' % ','.join(self.agg_fields)
|
||||
sql_select += 'from timeserie \n'
|
||||
sql_select += 'where mtime >= :start_time and mtime < :end_time\n'
|
||||
cur = self._db_connection.cursor()
|
||||
cur.execute(sql_select, query_params)
|
||||
|
||||
# fetch all data, to a max of [max_hits] rows.
|
||||
field_names = (map(lambda x:x[0], cur.description))
|
||||
while True:
|
||||
record = cur.fetchone()
|
||||
if record is None:
|
||||
break
|
||||
else:
|
||||
result_record=dict()
|
||||
for field_indx in range(len(field_names)):
|
||||
if len(record) > field_indx:
|
||||
result_record[field_names[field_indx]] = record[field_indx]
|
||||
yield result_record
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
"""
|
||||
Copyright (c) 2016 Ad Schellevis <ad@opnsense.org>
|
||||
Copyright (c) 2016-2018 Ad Schellevis <ad@opnsense.org>
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
@ -26,17 +26,357 @@
|
||||
--------------------------------------------------------------------------------------
|
||||
data aggregator loader
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
import sys
|
||||
import glob
|
||||
from lib.aggregate import BaseFlowAggregator
|
||||
import syslog
|
||||
import datetime
|
||||
import sqlite3
|
||||
|
||||
|
||||
class BaseFlowAggregator(object):
|
||||
# target location ('<store>.sqlite')
|
||||
target_filename = None
|
||||
# list of fields to use in this aggregate
|
||||
agg_fields = None
|
||||
|
||||
@classmethod
|
||||
def resolutions(cls):
|
||||
""" sample resolutions for this aggregation
|
||||
:return: list of sample resolutions
|
||||
"""
|
||||
return list()
|
||||
|
||||
@classmethod
|
||||
def history_per_resolution(cls):
|
||||
""" history to keep in seconds per sample resolution
|
||||
:return: dict sample resolution / expire time (seconds)
|
||||
"""
|
||||
return dict()
|
||||
|
||||
@classmethod
|
||||
def seconds_per_day(cls, days):
|
||||
"""
|
||||
:param days: number of days
|
||||
:return: number of seconds
|
||||
"""
|
||||
return 60*60*24*days
|
||||
|
||||
def __init__(self, resolution, database_dir='/var/netflow'):
|
||||
""" construct new flow sample class
|
||||
:return: None
|
||||
"""
|
||||
self.database_dir = database_dir
|
||||
self.resolution = resolution
|
||||
# target table name, data_<resolution in seconds>
|
||||
self._db_connection = None
|
||||
self._update_cur = None
|
||||
self._known_targets = list()
|
||||
# construct update and insert sql statements
|
||||
tmp = 'update timeserie set last_seen = :flow_end, '
|
||||
tmp += 'octets = octets + :octets_consumed, packets = packets + :packets_consumed '
|
||||
tmp += 'where mtime = :mtime and %s '
|
||||
self._update_stmt = tmp % (' and '.join(map(lambda x: '%s = :%s' % (x, x), self.agg_fields)))
|
||||
tmp = 'insert into timeserie (mtime, last_seen, octets, packets, %s) '
|
||||
tmp += 'values (:mtime, :flow_end, :octets_consumed, :packets_consumed, %s)'
|
||||
self._insert_stmt = tmp % (','.join(self.agg_fields), ','.join(map(lambda x: ':%s' % x, self.agg_fields)))
|
||||
# open database
|
||||
self._open_db()
|
||||
self._fetch_known_targets()
|
||||
|
||||
def __del__(self):
|
||||
""" close database on destruct
|
||||
:return: None
|
||||
"""
|
||||
if self._db_connection is not None:
|
||||
self._db_connection.close()
|
||||
|
||||
def _fetch_known_targets(self):
|
||||
""" read known target table names from the sqlite db
|
||||
:return: None
|
||||
"""
|
||||
if self._db_connection is not None:
|
||||
self._known_targets = list()
|
||||
cur = self._db_connection.cursor()
|
||||
cur.execute('SELECT name FROM sqlite_master')
|
||||
for record in cur.fetchall():
|
||||
self._known_targets.append(record[0])
|
||||
cur.close()
|
||||
|
||||
def _create_target_table(self):
|
||||
""" construct target aggregate table, using resulution and list of agg_fields
|
||||
:return: None
|
||||
"""
|
||||
if self._db_connection is not None:
|
||||
# construct new aggregate table
|
||||
sql_text = list()
|
||||
sql_text.append('create table timeserie ( ')
|
||||
sql_text.append(' mtime timestamp')
|
||||
sql_text.append(', last_seen timestamp')
|
||||
for agg_field in self.agg_fields:
|
||||
sql_text.append(', %s varchar(255)' % agg_field)
|
||||
sql_text.append(', octets numeric')
|
||||
sql_text.append(', packets numeric')
|
||||
sql_text.append(', primary key(mtime, %s)' % ','.join(self.agg_fields))
|
||||
sql_text.append(')')
|
||||
cur = self._db_connection.cursor()
|
||||
cur.executescript('\n'.join(sql_text))
|
||||
cur.close()
|
||||
# read table names
|
||||
self._fetch_known_targets()
|
||||
|
||||
def is_db_open(self):
|
||||
""" check if target database is open
|
||||
:return: database connected (True/False)
|
||||
"""
|
||||
if self._db_connection is not None:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _open_db(self):
|
||||
""" open / create database
|
||||
:return: None
|
||||
"""
|
||||
if self.target_filename is not None:
|
||||
# make sure the target directory exists
|
||||
if not os.path.isdir(self.database_dir):
|
||||
os.makedirs(self.database_dir)
|
||||
# open sqlite database
|
||||
self._db_connection = sqlite3.connect(
|
||||
("%s/%s" % (self.database_dir, self.target_filename)) % self.resolution, timeout=60,
|
||||
detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES
|
||||
)
|
||||
# open update/insert cursor
|
||||
self._update_cur = self._db_connection.cursor()
|
||||
|
||||
def commit(self):
|
||||
""" commit data
|
||||
:return: None
|
||||
"""
|
||||
if self._db_connection is not None:
|
||||
self._db_connection.commit()
|
||||
|
||||
def add(self, flow):
|
||||
""" calculate timeslices per flow depending on sample resolution
|
||||
:param flow: flow data (from parse.py)
|
||||
:return: None
|
||||
"""
|
||||
# make sure target exists
|
||||
if 'timeserie' not in self._known_targets:
|
||||
self._create_target_table()
|
||||
|
||||
# push record(s) depending on resolution
|
||||
start_time = int(flow['flow_start'] / self.resolution) * self.resolution
|
||||
while start_time <= flow['flow_end']:
|
||||
consume_start_time = max(flow['flow_start'], start_time)
|
||||
consume_end_time = min(start_time + self.resolution, flow['flow_end'])
|
||||
if flow['duration_ms'] != 0:
|
||||
consume_perc = (consume_end_time - consume_start_time) / float(flow['duration_ms'] / 1000.0)
|
||||
else:
|
||||
consume_perc = 1
|
||||
if self.is_db_open():
|
||||
# upsert data
|
||||
flow['octets_consumed'] = consume_perc * flow['octets']
|
||||
flow['packets_consumed'] = consume_perc * flow['packets']
|
||||
flow['mtime'] = datetime.datetime.utcfromtimestamp(start_time)
|
||||
self._update_cur.execute(self._update_stmt, flow)
|
||||
if self._update_cur.rowcount == 0:
|
||||
self._update_cur.execute(self._insert_stmt, flow)
|
||||
# next start time
|
||||
start_time += self.resolution
|
||||
|
||||
def cleanup(self, do_vacuum=False):
|
||||
""" cleanup timeserie table
|
||||
:param do_vacuum: vacuum database
|
||||
:return: None
|
||||
"""
|
||||
if self.is_db_open() and 'timeserie' in self._known_targets \
|
||||
and self.resolution in self.history_per_resolution():
|
||||
self._update_cur.execute('select max(mtime) as "[timestamp]" from timeserie')
|
||||
last_timestamp = self._update_cur.fetchall()[0][0]
|
||||
if type(last_timestamp) == datetime.datetime:
|
||||
expire = self.history_per_resolution()[self.resolution]
|
||||
expire_timestamp = last_timestamp - datetime.timedelta(seconds=expire)
|
||||
self._update_cur.execute('delete from timeserie where mtime < :expire', {'expire': expire_timestamp})
|
||||
self.commit()
|
||||
if do_vacuum:
|
||||
# vacuum database if requested
|
||||
syslog.syslog(syslog.LOG_NOTICE, 'vacuum %s' % (self.target_filename % self.resolution))
|
||||
self._update_cur.execute('vacuum')
|
||||
|
||||
@staticmethod
|
||||
def _parse_timestamp(timestamp):
|
||||
""" convert input to datetime.datetime or return if it already was of that type
|
||||
:param timestamp: timestamp to convert
|
||||
:return: datetime.datetime object
|
||||
"""
|
||||
if type(timestamp) in (int, float):
|
||||
return datetime.datetime.utcfromtimestamp(timestamp)
|
||||
elif type(timestamp) != datetime.datetime:
|
||||
return datetime.datetime.utcfromtimestamp(0)
|
||||
else:
|
||||
return timestamp
|
||||
|
||||
def _valid_fields(self, fields):
|
||||
""" cleanse fields (return only valid ones)
|
||||
:param fields: field list
|
||||
:return: list
|
||||
"""
|
||||
# validate field list (can only select fields in self.agg_fields)
|
||||
select_fields = list()
|
||||
for field in fields:
|
||||
if field.strip() in self.agg_fields:
|
||||
select_fields.append(field.strip())
|
||||
|
||||
return select_fields
|
||||
|
||||
def get_timeserie_data(self, start_time, end_time, fields):
|
||||
""" fetch data from aggregation source, groups by mtime and selected fields
|
||||
:param start_time: start timestamp
|
||||
:param end_time: end timestamp
|
||||
:param fields: fields to retrieve
|
||||
:return: iterator returning dict records (start_time, end_time, [fields], octets, packets)
|
||||
"""
|
||||
if self.is_db_open() and 'timeserie' in self._known_targets:
|
||||
# validate field list (can only select fields in self.agg_fields)
|
||||
select_fields = self._valid_fields(fields)
|
||||
if len(select_fields) == 0:
|
||||
# select "none", add static null as field
|
||||
select_fields.append('null')
|
||||
sql_select = 'select mtime as "start_time [timestamp]", %s' % ','.join(select_fields)
|
||||
sql_select += ', sum(octets) as octets, sum(packets) as packets\n'
|
||||
sql_select += 'from timeserie \n'
|
||||
sql_select += 'where mtime >= :start_time and mtime < :end_time\n'
|
||||
sql_select += 'group by mtime, %s\n' % ','.join(select_fields)
|
||||
|
||||
# execute select query
|
||||
cur = self._db_connection.cursor()
|
||||
cur.execute(sql_select, {'start_time': self._parse_timestamp(start_time),
|
||||
'end_time': self._parse_timestamp(end_time)})
|
||||
#
|
||||
field_names = (map(lambda x: x[0], cur.description))
|
||||
for record in cur.fetchall():
|
||||
result_record = dict()
|
||||
for field_indx in range(len(field_names)):
|
||||
if len(record) > field_indx:
|
||||
result_record[field_names[field_indx]] = record[field_indx]
|
||||
if 'start_time' in result_record:
|
||||
result_record['end_time'] = result_record['start_time'] \
|
||||
+ datetime.timedelta(seconds=self.resolution)
|
||||
# send data
|
||||
yield result_record
|
||||
# close cursor
|
||||
cur.close()
|
||||
|
||||
def get_top_data(self, start_time, end_time, fields, value_field, data_filters=None, max_hits=100):
|
||||
""" Retrieve top (usage) from this aggregation.
|
||||
Fetch data from aggregation source, groups by selected fields, sorts by value_field descending
|
||||
use data_filter to filter before grouping.
|
||||
:param start_time: start timestamp
|
||||
:param end_time: end timestamp
|
||||
:param fields: fields to retrieve
|
||||
:param value_field: field to sum
|
||||
:param data_filters: filter data, use as field=value
|
||||
:param max_hits: maximum number of results, rest is summed into (other)
|
||||
:return: iterator returning dict records (start_time, end_time, [fields], octets, packets)
|
||||
"""
|
||||
result = list()
|
||||
if self.is_db_open() and 'timeserie' in self._known_targets:
|
||||
select_fields = self._valid_fields(fields)
|
||||
filter_fields = []
|
||||
query_params = {}
|
||||
if value_field == 'octets':
|
||||
value_sql = 'sum(octets)'
|
||||
elif value_field == 'packets':
|
||||
value_sql = 'sum(packets)'
|
||||
else:
|
||||
value_sql = '0'
|
||||
|
||||
# query filters, correct start_time for resolution
|
||||
query_params['start_time'] = self._parse_timestamp((int(start_time/self.resolution))*self.resolution)
|
||||
query_params['end_time'] = self._parse_timestamp(end_time)
|
||||
if data_filters:
|
||||
for data_filter in data_filters.split(','):
|
||||
tmp = data_filter.split('=')[0].strip()
|
||||
if tmp in self.agg_fields and data_filter.find('=') > -1:
|
||||
filter_fields.append(tmp)
|
||||
query_params[tmp] = '='.join(data_filter.split('=')[1:])
|
||||
|
||||
if len(select_fields) > 0:
|
||||
# construct sql query to filter and select data
|
||||
sql_select = 'select %s' % ','.join(select_fields)
|
||||
sql_select += ', %s as total, max(last_seen) last_seen \n' % value_sql
|
||||
sql_select += 'from timeserie \n'
|
||||
sql_select += 'where mtime >= :start_time and mtime < :end_time\n'
|
||||
for filter_field in filter_fields:
|
||||
sql_select += ' and %s = :%s \n' % (filter_field, filter_field)
|
||||
sql_select += 'group by %s\n' % ','.join(select_fields)
|
||||
sql_select += 'order by %s desc ' % value_sql
|
||||
|
||||
# execute select query
|
||||
cur = self._db_connection.cursor()
|
||||
cur.execute(sql_select, query_params)
|
||||
|
||||
# fetch all data, to a max of [max_hits] rows.
|
||||
field_names = (map(lambda x: x[0], cur.description))
|
||||
for record in cur.fetchall():
|
||||
result_record = dict()
|
||||
for field_indx in range(len(field_names)):
|
||||
if len(record) > field_indx:
|
||||
result_record[field_names[field_indx]] = record[field_indx]
|
||||
if len(result) < max_hits:
|
||||
result.append(result_record)
|
||||
else:
|
||||
if len(result) == max_hits:
|
||||
# generate row for "rest of data"
|
||||
result.append({'total': 0})
|
||||
for key in result_record:
|
||||
if key not in result[-1]:
|
||||
result[-1][key] = ""
|
||||
result[-1]['total'] += result_record['total']
|
||||
# close cursor
|
||||
cur.close()
|
||||
|
||||
return result
|
||||
|
||||
def get_data(self, start_time, end_time):
|
||||
""" get detail data
|
||||
:param start_time: start timestamp
|
||||
:param end_time: end timestamp
|
||||
:return: iterator
|
||||
"""
|
||||
if self.is_db_open() and 'timeserie' in self._known_targets:
|
||||
query_params = dict()
|
||||
query_params['start_time'] = self._parse_timestamp((int(start_time/self.resolution))*self.resolution)
|
||||
query_params['end_time'] = self._parse_timestamp(end_time)
|
||||
sql_select = 'select mtime start_time, '
|
||||
sql_select += '%s, octets, packets, last_seen as "last_seen [timestamp]" \n' % ','.join(self.agg_fields)
|
||||
sql_select += 'from timeserie \n'
|
||||
sql_select += 'where mtime >= :start_time and mtime < :end_time\n'
|
||||
cur = self._db_connection.cursor()
|
||||
cur.execute(sql_select, query_params)
|
||||
|
||||
# fetch all data, to a max of [max_hits] rows.
|
||||
field_names = (map(lambda x: x[0], cur.description))
|
||||
while True:
|
||||
record = cur.fetchone()
|
||||
if record is None:
|
||||
break
|
||||
else:
|
||||
result_record = dict()
|
||||
for field_indx in range(len(field_names)):
|
||||
if len(record) > field_indx:
|
||||
result_record[field_names[field_indx]] = record[field_indx]
|
||||
yield result_record
|
||||
|
||||
|
||||
def get_aggregators():
|
||||
""" collect and return available aggregators
|
||||
:return: list of class references
|
||||
"""
|
||||
result = list()
|
||||
for filename in glob.glob('%s/*.py'%os.path.dirname(__file__)):
|
||||
for filename in glob.glob('%s/*.py' % os.path.dirname(__file__)):
|
||||
filename_base = os.path.basename(filename)
|
||||
if filename_base[0:2] != '__':
|
||||
module_name = 'lib.aggregates.%s' % '.'.join(filename_base.split('.')[:-1])
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
"""
|
||||
Copyright (c) 2016 Ad Schellevis <ad@opnsense.org>
|
||||
Copyright (c) 2016-2018 Ad Schellevis <ad@opnsense.org>
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
@ -26,12 +26,13 @@
|
||||
--------------------------------------------------------------------------------------
|
||||
data aggregator type
|
||||
"""
|
||||
from lib.aggregate import BaseFlowAggregator
|
||||
from . import BaseFlowAggregator
|
||||
|
||||
|
||||
class FlowInterfaceTotals(BaseFlowAggregator):
|
||||
""" collect interface totals
|
||||
"""
|
||||
target_filename = '/var/netflow/interface_%06d.sqlite'
|
||||
target_filename = 'interface_%06d.sqlite'
|
||||
agg_fields = ['if', 'direction']
|
||||
|
||||
@classmethod
|
||||
@ -40,25 +41,26 @@ class FlowInterfaceTotals(BaseFlowAggregator):
|
||||
:return: list of sample resolutions
|
||||
"""
|
||||
# sample in 30 seconds, 5 minutes, 1 hour and 1 day
|
||||
return [30, 300, 3600, 86400]
|
||||
return [30, 300, 3600, 86400]
|
||||
|
||||
@classmethod
|
||||
def history_per_resolution(cls):
|
||||
"""
|
||||
:return: dict sample resolution / expire time (seconds)
|
||||
"""
|
||||
return {30: cls.seconds_per_day(1),
|
||||
300: cls.seconds_per_day(7),
|
||||
3600: cls.seconds_per_day(31),
|
||||
86400: cls.seconds_per_day(365)
|
||||
}
|
||||
return {
|
||||
30: cls.seconds_per_day(1),
|
||||
300: cls.seconds_per_day(7),
|
||||
3600: cls.seconds_per_day(31),
|
||||
86400: cls.seconds_per_day(365)
|
||||
}
|
||||
|
||||
def __init__(self, resolution):
|
||||
def __init__(self, resolution, database_dir='/var/netflow'):
|
||||
"""
|
||||
:param resolution: sample resultion (seconds)
|
||||
:return: None
|
||||
"""
|
||||
super(FlowInterfaceTotals, self).__init__(resolution)
|
||||
super(FlowInterfaceTotals, self).__init__(resolution, database_dir)
|
||||
|
||||
def add(self, flow):
|
||||
""" combine up/down flow into interface and direction
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
"""
|
||||
Copyright (c) 2016 Ad Schellevis <ad@opnsense.org>
|
||||
Copyright (c) 2016-2018 Ad Schellevis <ad@opnsense.org>
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
@ -26,12 +26,13 @@
|
||||
--------------------------------------------------------------------------------------
|
||||
data aggregator type
|
||||
"""
|
||||
from lib.aggregate import BaseFlowAggregator
|
||||
from . import BaseFlowAggregator
|
||||
|
||||
|
||||
class FlowDstPortTotals(BaseFlowAggregator):
|
||||
""" collect interface totals
|
||||
"""
|
||||
target_filename = '/var/netflow/dst_port_%06d.sqlite'
|
||||
target_filename = 'dst_port_%06d.sqlite'
|
||||
agg_fields = ['if', 'protocol', 'dst_port']
|
||||
|
||||
@classmethod
|
||||
@ -39,7 +40,7 @@ class FlowDstPortTotals(BaseFlowAggregator):
|
||||
"""
|
||||
:return: list of sample resolutions
|
||||
"""
|
||||
return [300, 3600, 86400]
|
||||
return [300, 3600, 86400]
|
||||
|
||||
@classmethod
|
||||
def history_per_resolution(cls):
|
||||
@ -48,17 +49,18 @@ class FlowDstPortTotals(BaseFlowAggregator):
|
||||
"""
|
||||
# only save daily totals for a longer period of time, we probably only want to answer questions like
|
||||
# "top usage over the last 30 seconds, 5 minutes, etc.."
|
||||
return {300: 3600,
|
||||
3600: 86400,
|
||||
86400: cls.seconds_per_day(365)
|
||||
}
|
||||
return {
|
||||
300: 3600,
|
||||
3600: 86400,
|
||||
86400: cls.seconds_per_day(365)
|
||||
}
|
||||
|
||||
def __init__(self, resolution):
|
||||
def __init__(self, resolution, database_dir='/var/netflow'):
|
||||
"""
|
||||
:param resolution: sample resultion (seconds)
|
||||
:return: None
|
||||
"""
|
||||
super(FlowDstPortTotals, self).__init__(resolution)
|
||||
super(FlowDstPortTotals, self).__init__(resolution, database_dir)
|
||||
|
||||
def add(self, flow):
|
||||
# most likely service (destination) port
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
"""
|
||||
Copyright (c) 2016 Ad Schellevis <ad@opnsense.org>
|
||||
Copyright (c) 2016-2018 Ad Schellevis <ad@opnsense.org>
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
@ -26,12 +26,13 @@
|
||||
--------------------------------------------------------------------------------------
|
||||
data aggregator type
|
||||
"""
|
||||
from lib.aggregate import BaseFlowAggregator
|
||||
from . import BaseFlowAggregator
|
||||
|
||||
|
||||
class FlowSourceAddrTotals(BaseFlowAggregator):
|
||||
""" collect source totals
|
||||
"""
|
||||
target_filename = '/var/netflow/src_addr_%06d.sqlite'
|
||||
target_filename = 'src_addr_%06d.sqlite'
|
||||
agg_fields = ['if', 'src_addr', 'direction']
|
||||
|
||||
@classmethod
|
||||
@ -39,7 +40,7 @@ class FlowSourceAddrTotals(BaseFlowAggregator):
|
||||
"""
|
||||
:return: list of sample resolutions
|
||||
"""
|
||||
return [300, 3600, 86400]
|
||||
return [300, 3600, 86400]
|
||||
|
||||
@classmethod
|
||||
def history_per_resolution(cls):
|
||||
@ -48,17 +49,18 @@ class FlowSourceAddrTotals(BaseFlowAggregator):
|
||||
"""
|
||||
# only save daily totals for a longer period of time, we probably only want to answer questions like
|
||||
# "top usage over the last 30 seconds, 5 minutes, etc.."
|
||||
return {300: 3600,
|
||||
3600: 86400,
|
||||
86400: cls.seconds_per_day(365)
|
||||
}
|
||||
return {
|
||||
300: 3600,
|
||||
3600: 86400,
|
||||
86400: cls.seconds_per_day(365)
|
||||
}
|
||||
|
||||
def __init__(self, resolution):
|
||||
def __init__(self, resolution, database_dir='/var/netflow'):
|
||||
"""
|
||||
:param resolution: sample resultion (seconds)
|
||||
:return: None
|
||||
"""
|
||||
super(FlowSourceAddrTotals, self).__init__(resolution)
|
||||
super(FlowSourceAddrTotals, self).__init__(resolution, database_dir)
|
||||
|
||||
def add(self, flow):
|
||||
# most likely service (destination) port
|
||||
@ -70,10 +72,11 @@ class FlowSourceAddrTotals(BaseFlowAggregator):
|
||||
flow['direction'] = 'out'
|
||||
super(FlowSourceAddrTotals, self).add(flow)
|
||||
|
||||
|
||||
class FlowSourceAddrDetails(BaseFlowAggregator):
|
||||
""" collect source details on a daily resolution
|
||||
"""
|
||||
target_filename = '/var/netflow/src_addr_details_%06d.sqlite'
|
||||
target_filename = 'src_addr_details_%06d.sqlite'
|
||||
agg_fields = ['if', 'direction', 'src_addr', 'dst_addr', 'service_port', 'protocol']
|
||||
|
||||
@classmethod
|
||||
@ -81,21 +84,23 @@ class FlowSourceAddrDetails(BaseFlowAggregator):
|
||||
"""
|
||||
:return: list of sample resolutions
|
||||
"""
|
||||
return [86400]
|
||||
return [86400]
|
||||
|
||||
@classmethod
|
||||
def history_per_resolution(cls):
|
||||
"""
|
||||
:return: dict sample resolution / expire time (seconds)
|
||||
"""
|
||||
return {86400: cls.seconds_per_day(62)}
|
||||
return {
|
||||
86400: cls.seconds_per_day(62)
|
||||
}
|
||||
|
||||
def __init__(self, resolution):
|
||||
def __init__(self, resolution, database_dir='/var/netflow'):
|
||||
"""
|
||||
:param resolution: sample resultion (seconds)
|
||||
:param resolution: sample resolution (seconds)
|
||||
:return: None
|
||||
"""
|
||||
super(FlowSourceAddrDetails, self).__init__(resolution)
|
||||
super(FlowSourceAddrDetails, self).__init__(resolution, database_dir)
|
||||
|
||||
def add(self, flow):
|
||||
# most likely service (destination) port
|
||||
|
||||
@ -48,8 +48,6 @@ PARSE_FLOW_FIELDS = [
|
||||
{'check': flowd.FIELD_GATEWAY_ADDR, 'target': 'gateway_addr'},
|
||||
{'check': flowd.FIELD_FLOW_TIMES, 'target': 'netflow_ver'}]
|
||||
|
||||
# location of flowd logfiles to use
|
||||
FLOWD_LOG_FILES = '/var/log/flowd.log*'
|
||||
|
||||
class Interfaces(object):
|
||||
""" mapper for local interface index to interface name (1 -> em0 for example)
|
||||
@ -57,34 +55,35 @@ class Interfaces(object):
|
||||
def __init__(self):
|
||||
""" construct local interface mapping
|
||||
"""
|
||||
self._ifIndex = dict()
|
||||
self._if_index = dict()
|
||||
with tempfile.NamedTemporaryFile() as output_stream:
|
||||
subprocess.call(['/sbin/ifconfig', '-l'], stdout=output_stream, stderr=open(os.devnull, 'wb'))
|
||||
output_stream.seek(0)
|
||||
ifIndex=1
|
||||
if_index = 1
|
||||
for line in output_stream.read().split('\n')[0].split():
|
||||
self._ifIndex[str(ifIndex)] = line
|
||||
ifIndex += 1
|
||||
self._if_index["%s" % if_index] = line
|
||||
if_index += 1
|
||||
|
||||
def if_device(self, ifIndex):
|
||||
def if_device(self, if_index):
|
||||
""" convert index to device (if found)
|
||||
"""
|
||||
if str(ifIndex) in self._ifIndex:
|
||||
if "%s" % if_index in self._if_index:
|
||||
# found, return interface name
|
||||
return self._ifIndex[str(ifIndex)]
|
||||
return self._if_index["%s" % if_index]
|
||||
else:
|
||||
# not found, return index
|
||||
return str(ifIndex)
|
||||
return "%s" % if_index
|
||||
|
||||
|
||||
def parse_flow(recv_stamp):
|
||||
def parse_flow(recv_stamp, flowd_source='/var/log/flowd.log'):
|
||||
""" parse flowd logs and yield records (dict type)
|
||||
:param recv_stamp: last receive timestamp (recv)
|
||||
:param flowd_source: flowd logfile
|
||||
:return: iterator flow details
|
||||
"""
|
||||
interfaces = Interfaces()
|
||||
parse_done = False
|
||||
for filename in sorted(glob.glob(FLOWD_LOG_FILES)):
|
||||
for filename in sorted(glob.glob('%s*' % flowd_source)):
|
||||
if parse_done:
|
||||
# log file contains older data (recv_stamp), break
|
||||
break
|
||||
|
||||
@ -59,7 +59,7 @@ type:script
|
||||
message:start netflow data aggregator
|
||||
|
||||
[aggregate.repair]
|
||||
command:/usr/local/opnsense/scripts/netflow/flowd_aggregate.py repair && /usr/local/etc/rc.d/flowd_aggregate start
|
||||
command:/usr/local/opnsense/scripts/netflow/flowd_aggregate.py --repair && /usr/local/etc/rc.d/flowd_aggregate start
|
||||
parameters:
|
||||
type:script
|
||||
message:force database repair
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user