From b533835f2b3dfe7955df4f0de24d98797f4174e6 Mon Sep 17 00:00:00 2001 From: Ad Schellevis Date: Tue, 18 Jul 2023 16:23:46 +0200 Subject: [PATCH] Reporting: Unbound DNS - duckdb version upgrade handling o make sure DbConnection() throws a new StorageVersionException when storage versions mismatch o add restore_database() function to overwrite an existing database with the contents of an earlier backup made by the pre-upgrade hook o the logger is responsible for the database, on startup, it should validate the version and initialise a restore when there's a mismatch In case the storage version doesn't match, there are 3 options, the backup is locked (restore running), in which case we exit, the restore went fine and we can start the logger, or there is no backup available, in which case we have no other choice then to drop the unsupported database. While here, also fix a small issue in stats.py leading to NaN values being returned due to https://github.com/duckdb/duckdb/issues/4066 --- .../Unbound/Api/OverviewController.php | 3 +- src/opnsense/scripts/unbound/logger.py | 46 +++++++++++++++---- src/opnsense/scripts/unbound/stats.py | 2 + src/opnsense/site-python/duckdb_helper.py | 38 ++++++++++++++- 4 files changed, 77 insertions(+), 12 deletions(-) diff --git a/src/opnsense/mvc/app/controllers/OPNsense/Unbound/Api/OverviewController.php b/src/opnsense/mvc/app/controllers/OPNsense/Unbound/Api/OverviewController.php index b981579b7..b2e7d9b54 100644 --- a/src/opnsense/mvc/app/controllers/OPNsense/Unbound/Api/OverviewController.php +++ b/src/opnsense/mvc/app/controllers/OPNsense/Unbound/Api/OverviewController.php @@ -101,7 +101,8 @@ class OverviewController extends ApiControllerBase $response = (new Backend())->configdpRun('unbound qstats details', [1000]); } - $parsed = json_decode($response, true); + $parsed = json_decode($response, true) ?? []; + /* Map the blocklist type keys to their corresponding description */ $nodes = (new \OPNsense\Unbound\Unbound())->getNodes(); diff --git a/src/opnsense/scripts/unbound/logger.py b/src/opnsense/scripts/unbound/logger.py index e7b3ebe6c..486ecdf17 100755 --- a/src/opnsense/scripts/unbound/logger.py +++ b/src/opnsense/scripts/unbound/logger.py @@ -32,16 +32,18 @@ import argparse import syslog import time import datetime +import os import pandas import signal import socket import duckdb sys.path.insert(0, "/usr/local/opnsense/site-python") -from duckdb_helper import DbConnection +from duckdb_helper import DbConnection, StorageVersionException, restore_database class DNSReader: - def __init__(self, target_pipe, flush_interval): - self.target_pipe = target_pipe + def __init__(self, source_pipe, target_db, flush_interval): + self.source_pipe = source_pipe + self.target_db = target_db self.timer = 0 self.cleanup_timer = 0 self.flush_interval = flush_interval @@ -67,7 +69,7 @@ class DNSReader: return host def _setup_db(self): - with DbConnection('/var/unbound/data/unbound.duckdb', read_only=False) as db: + with DbConnection(self.target_db, read_only=False) as db: db.connection.execute(""" CREATE TABLE IF NOT EXISTS query ( uuid UUID, @@ -152,7 +154,7 @@ class DNSReader: # read() callback cannot run to empty it. The dnsbl module side catches this with # a BlockingIOError, forcing it to re-buffer the query, making the process # "eventually consistent". Realistically this condition should never occur. - with DbConnection('/var/unbound/data/unbound.duckdb', read_only=False) as db: + with DbConnection(self.target_db, read_only=False) as db: self.timer = now if (now - self.cleanup_timer) > 3600: self.cleanup_timer = now @@ -191,7 +193,7 @@ class DNSReader: while not pipe_ready: try: # open() will block until a query has been pushed down the fifo - self.fd = open(self.target_pipe, 'r') + self.fd = open(self.source_pipe, 'r') pipe_ready = True except InterruptedError: self.close_logger() @@ -217,8 +219,8 @@ class DNSReader: # unbound closed pipe self.close_logger() -def run(pipe, flush_interval): - r = DNSReader(pipe, flush_interval) +def run(pipe, target_db, flush_interval): + r = DNSReader(pipe, target_db, flush_interval) try: r.run_logger() except InterruptedError: @@ -227,13 +229,37 @@ def run(pipe, flush_interval): raise if __name__ == '__main__': + syslog.openlog('unbound', facility=syslog.LOG_LOCAL4) parser = argparse.ArgumentParser() parser.add_argument('--pipe', help='named pipe file location', default='/var/unbound/data/dns_logger') + parser.add_argument('--targetdb', help='duckdb filename', default='/var/unbound/data/unbound.duckdb') + parser.add_argument('--backup_dir', help='backup directory', default='/var/cache/unbound.duckdb') parser.add_argument('--flush_interval', help='interval to flush to db', default=10) inputargs = parser.parse_args() + try: + with DbConnection(inputargs.targetdb, read_only=False) as db: + pass + except StorageVersionException: + try: + if restore_database(inputargs.backup_dir, inputargs.targetdb): + syslog.syslog( + syslog.LOG_NOTICE, + 'Database restored from %s due to version mismatch' % inputargs.backup_dir + ) + # XXX: remove contents of backup_dir? + else: + syslog.syslog(syslog.LOG_ERR, 'Restore needed, but backup locked, exit...') + sys.exit(-1) + except FileNotFoundError: + # no backup to recover, remove database and proceed normal startup + if os.path.isfile(inputargs.targetdb): + syslog.syslog( + syslog.LOG_NOTICE, + 'Missing restore data, removing %s to proceed startup' % inputargs.targetdb + ) + os.remove(inputargs.targetdb) - syslog.openlog('unbound', facility=syslog.LOG_LOCAL4) syslog.syslog(syslog.LOG_NOTICE, 'Backgrounding unbound logging backend.') - run(inputargs.pipe, inputargs.flush_interval) + run(inputargs.pipe, inputargs.targetdb, inputargs.flush_interval) diff --git a/src/opnsense/scripts/unbound/stats.py b/src/opnsense/scripts/unbound/stats.py index 34ba50741..aa64bd22a 100755 --- a/src/opnsense/scripts/unbound/stats.py +++ b/src/opnsense/scripts/unbound/stats.py @@ -235,9 +235,11 @@ def handle_details(args): LIMIT ? """, [args.limit]).fetchdf().astype({'uuid': str}) + if not details.empty: # use a resolved hostname if possible details['client'] = np.where(details['hostname'].isnull(), details['client'], details['hostname']) + details['blocklist'].replace(np.nan, None, inplace=True) details = details.drop(['hostname', 'ipaddr'], axis=1) # map the integer types to a sensible description details['action'] = details['action'].map({0: 'Pass', 1: 'Block', 2: 'Drop'}) diff --git a/src/opnsense/site-python/duckdb_helper.py b/src/opnsense/site-python/duckdb_helper.py index 4ede3b01a..33ec22b21 100644 --- a/src/opnsense/site-python/duckdb_helper.py +++ b/src/opnsense/site-python/duckdb_helper.py @@ -30,6 +30,11 @@ import os import duckdb import fcntl + +class StorageVersionException(Exception): + pass + + class DbConnection: """ ContextManager wrapper for a DuckDb connection. Use this to synchronize @@ -61,7 +66,12 @@ class DbConnection: # in a timestamp adjusted for the current time zone. Since we want to store and query # UTC at all times also set the database time zone to UTC. This is scoped within the connection. self.connection.execute("SET TimeZone='UTC'") - except duckdb.IOException: + except duckdb.IOException as e: + if str(e).find('database file with version number') > -1: + # XXX: this is extremely wacky, apparantly we are not able to read the current storage version + # via python so we can only watch for an exception... which is the same one for all types + raise StorageVersionException(str(e)) + # write-only, but no truncating, so use os-level open self._fd = os.open(self._path, os.O_WRONLY) # Try to obtain an exclusive lock and block when unable to. @@ -97,3 +107,29 @@ class DbConnection: return False return True + + + +def restore_database(path, target): + """ + :param path: backup source + :param target: duckdb target database + :return: bool success (false when locked) + """ + lock_fn = "%s/schema.sql" % path.rstrip('/') + if os.path.isfile(lock_fn): + with open(lock_fn, 'a+') as lockh: + try: + fcntl.flock(lockh, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError: + # locked + return False + if os.path.isfile(target): + os.remove(target) + with DbConnection(target, read_only=False) as db: + db.connection.execute("IMPORT DATABASE '%s';" % path) + else: + # import schema not found, raise exception to inform the caller there is no backup + raise FileNotFoundError(lock_fn) + + return True \ No newline at end of file