Reporting: Unbound DNS - duckdb version upgrade handling

o make sure DbConnection() throws a new StorageVersionException when storage versions mismatch
o add restore_database() function to overwrite an existing database with the contents of an earlier backup made by the  pre-upgrade hook
o the logger is responsible for the database, on startup, it should validate the version and initialise a restore when there's a mismatch

In case the storage version doesn't match, there are 3 options, the backup is locked (restore running), in which case we exit, the restore went fine and we can start the logger, or there is no backup available, in which case we have no other choice then to drop the unsupported database.

While here, also fix a small issue in stats.py leading to NaN values being returned due to https://github.com/duckdb/duckdb/issues/4066
This commit is contained in:
Ad Schellevis 2023-07-18 16:23:46 +02:00
parent 2be79db6f8
commit b533835f2b
4 changed files with 77 additions and 12 deletions

View File

@ -101,7 +101,8 @@ class OverviewController extends ApiControllerBase
$response = (new Backend())->configdpRun('unbound qstats details', [1000]);
}
$parsed = json_decode($response, true);
$parsed = json_decode($response, true) ?? [];
/* Map the blocklist type keys to their corresponding description */
$nodes = (new \OPNsense\Unbound\Unbound())->getNodes();

View File

@ -32,16 +32,18 @@ import argparse
import syslog
import time
import datetime
import os
import pandas
import signal
import socket
import duckdb
sys.path.insert(0, "/usr/local/opnsense/site-python")
from duckdb_helper import DbConnection
from duckdb_helper import DbConnection, StorageVersionException, restore_database
class DNSReader:
def __init__(self, target_pipe, flush_interval):
self.target_pipe = target_pipe
def __init__(self, source_pipe, target_db, flush_interval):
self.source_pipe = source_pipe
self.target_db = target_db
self.timer = 0
self.cleanup_timer = 0
self.flush_interval = flush_interval
@ -67,7 +69,7 @@ class DNSReader:
return host
def _setup_db(self):
with DbConnection('/var/unbound/data/unbound.duckdb', read_only=False) as db:
with DbConnection(self.target_db, read_only=False) as db:
db.connection.execute("""
CREATE TABLE IF NOT EXISTS query (
uuid UUID,
@ -152,7 +154,7 @@ class DNSReader:
# read() callback cannot run to empty it. The dnsbl module side catches this with
# a BlockingIOError, forcing it to re-buffer the query, making the process
# "eventually consistent". Realistically this condition should never occur.
with DbConnection('/var/unbound/data/unbound.duckdb', read_only=False) as db:
with DbConnection(self.target_db, read_only=False) as db:
self.timer = now
if (now - self.cleanup_timer) > 3600:
self.cleanup_timer = now
@ -191,7 +193,7 @@ class DNSReader:
while not pipe_ready:
try:
# open() will block until a query has been pushed down the fifo
self.fd = open(self.target_pipe, 'r')
self.fd = open(self.source_pipe, 'r')
pipe_ready = True
except InterruptedError:
self.close_logger()
@ -217,8 +219,8 @@ class DNSReader:
# unbound closed pipe
self.close_logger()
def run(pipe, flush_interval):
r = DNSReader(pipe, flush_interval)
def run(pipe, target_db, flush_interval):
r = DNSReader(pipe, target_db, flush_interval)
try:
r.run_logger()
except InterruptedError:
@ -227,13 +229,37 @@ def run(pipe, flush_interval):
raise
if __name__ == '__main__':
syslog.openlog('unbound', facility=syslog.LOG_LOCAL4)
parser = argparse.ArgumentParser()
parser.add_argument('--pipe', help='named pipe file location', default='/var/unbound/data/dns_logger')
parser.add_argument('--targetdb', help='duckdb filename', default='/var/unbound/data/unbound.duckdb')
parser.add_argument('--backup_dir', help='backup directory', default='/var/cache/unbound.duckdb')
parser.add_argument('--flush_interval', help='interval to flush to db', default=10)
inputargs = parser.parse_args()
try:
with DbConnection(inputargs.targetdb, read_only=False) as db:
pass
except StorageVersionException:
try:
if restore_database(inputargs.backup_dir, inputargs.targetdb):
syslog.syslog(
syslog.LOG_NOTICE,
'Database restored from %s due to version mismatch' % inputargs.backup_dir
)
# XXX: remove contents of backup_dir?
else:
syslog.syslog(syslog.LOG_ERR, 'Restore needed, but backup locked, exit...')
sys.exit(-1)
except FileNotFoundError:
# no backup to recover, remove database and proceed normal startup
if os.path.isfile(inputargs.targetdb):
syslog.syslog(
syslog.LOG_NOTICE,
'Missing restore data, removing %s to proceed startup' % inputargs.targetdb
)
os.remove(inputargs.targetdb)
syslog.openlog('unbound', facility=syslog.LOG_LOCAL4)
syslog.syslog(syslog.LOG_NOTICE, 'Backgrounding unbound logging backend.')
run(inputargs.pipe, inputargs.flush_interval)
run(inputargs.pipe, inputargs.targetdb, inputargs.flush_interval)

View File

@ -235,9 +235,11 @@ def handle_details(args):
LIMIT ?
""", [args.limit]).fetchdf().astype({'uuid': str})
if not details.empty:
# use a resolved hostname if possible
details['client'] = np.where(details['hostname'].isnull(), details['client'], details['hostname'])
details['blocklist'].replace(np.nan, None, inplace=True)
details = details.drop(['hostname', 'ipaddr'], axis=1)
# map the integer types to a sensible description
details['action'] = details['action'].map({0: 'Pass', 1: 'Block', 2: 'Drop'})

View File

@ -30,6 +30,11 @@ import os
import duckdb
import fcntl
class StorageVersionException(Exception):
pass
class DbConnection:
"""
ContextManager wrapper for a DuckDb connection. Use this to synchronize
@ -61,7 +66,12 @@ class DbConnection:
# in a timestamp adjusted for the current time zone. Since we want to store and query
# UTC at all times also set the database time zone to UTC. This is scoped within the connection.
self.connection.execute("SET TimeZone='UTC'")
except duckdb.IOException:
except duckdb.IOException as e:
if str(e).find('database file with version number') > -1:
# XXX: this is extremely wacky, apparantly we are not able to read the current storage version
# via python so we can only watch for an exception... which is the same one for all types
raise StorageVersionException(str(e))
# write-only, but no truncating, so use os-level open
self._fd = os.open(self._path, os.O_WRONLY)
# Try to obtain an exclusive lock and block when unable to.
@ -97,3 +107,29 @@ class DbConnection:
return False
return True
def restore_database(path, target):
"""
:param path: backup source
:param target: duckdb target database
:return: bool success (false when locked)
"""
lock_fn = "%s/schema.sql" % path.rstrip('/')
if os.path.isfile(lock_fn):
with open(lock_fn, 'a+') as lockh:
try:
fcntl.flock(lockh, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
# locked
return False
if os.path.isfile(target):
os.remove(target)
with DbConnection(target, read_only=False) as db:
db.connection.execute("IMPORT DATABASE '%s';" % path)
else:
# import schema not found, raise exception to inform the caller there is no backup
raise FileNotFoundError(lock_fn)
return True