Services: Unbound DNS: Blocklist - move whitelist (passlist) handling to unbound plugin in stead of the existing prefiltering option. closes https://github.com/opnsense/core/pull/8415

The previous handling "skimmed" the blocklist using regular expressions, but when these lists include wildcards, you need to filter the exact item to exclude it (e.g. *.org.domain in a blocklist will still block a.org.domain in a passlist).

By moving the evaluation to the place where requests are evaluated, we can pass the likely intended domains by their provided regex.
Although there is a performance penalty, it should be limited since we only compile the regex once.
This commit is contained in:
Ad Schellevis 2025-03-08 16:14:47 +01:00
parent f59c938a58
commit aa2cff3e66
3 changed files with 53 additions and 44 deletions

View File

@ -67,6 +67,12 @@ class BaseBlocklistHandler:
"""
pass
def get_passlist_patterns(self):
"""
Implement in derived class to return a list of regex expressions to exclude from blocklist matching
"""
return []
def _blocklist_reader(self, uri):
"""
Used by a derived class to define a caching and/or download routine.
@ -195,12 +201,26 @@ class BlocklistParser:
def update_blocklist(self):
blocklists = {}
global_passlist = set()
merged = {}
for handler in self.handlers:
for pattern in handler.get_passlist_patterns():
try:
re.compile(pattern, re.IGNORECASE)
global_passlist.add(pattern)
except re.error:
syslog.syslog(syslog.LOG_ERR,
'blocklist download : skip invalid whitelist exclude pattern "%s" (%s)' % (
pattern, handler.__class__.__name__
)
)
blocklists[handler.priority] = handler.get_blocklist()
merged['data'] = self._merge_results(blocklists)
merged['config'] = self._get_config()
wp = '|'.join(global_passlist)
merged['config']['global_passlist_regex'] = wp
syslog.syslog(syslog.LOG_NOTICE, 'blocklist processed : exclude domains matching %s' % wp)
# check if there are wildcards in the dataset
has_wildcards = False

View File

@ -37,7 +37,6 @@ class DefaultBlocklistHandler(BaseBlocklistHandler):
def __init__(self):
super().__init__('/usr/local/etc/unbound/unbound-blocklists.conf')
self.priority = 100
self._whitelist_pattern = self._get_excludes()
def get_config(self):
cfg = {}
@ -53,25 +52,22 @@ class DefaultBlocklistHandler(BaseBlocklistHandler):
for blocklist, bl_shortcode in self._blocklists_in_config():
per_file_stats = {'uri': blocklist, 'skip': 0, 'blocklist': 0, 'wildcard': 0}
for domain in self._domains_in_blocklist(blocklist):
if self._whitelist_pattern.match(domain):
per_file_stats['skip'] += 1
else:
if self.domain_pattern.match(domain):
per_file_stats['blocklist'] += 1
if domain in result:
# duplicate domain, signify in dataset for debugging purposes
if 'duplicates' in result[domain]:
result[domain]['duplicates'] += ',%s' % bl_shortcode
else:
result[domain]['duplicates'] = '%s' % bl_shortcode
if self.domain_pattern.match(domain):
per_file_stats['blocklist'] += 1
if domain in result:
# duplicate domain, signify in dataset for debugging purposes
if 'duplicates' in result[domain]:
result[domain]['duplicates'] += ',%s' % bl_shortcode
else:
if domain.startswith('*.'):
result[domain[2:]] = {'bl': bl_shortcode, 'wildcard': True}
per_file_stats['wildcard'] += 1
else:
result[domain] = {'bl': bl_shortcode, 'wildcard': False}
result[domain]['duplicates'] = '%s' % bl_shortcode
else:
per_file_stats['skip'] += 1
if domain.startswith('*.'):
result[domain[2:]] = {'bl': bl_shortcode, 'wildcard': True}
per_file_stats['wildcard'] += 1
else:
result[domain] = {'bl': bl_shortcode, 'wildcard': False}
else:
per_file_stats['skip'] += 1
syslog.syslog(
syslog.LOG_NOTICE,
'blocklist: %(uri)s (exclude: %(skip)d block: %(blocklist)d wildcard: %(wildcard)d)' % per_file_stats
@ -81,9 +77,8 @@ class DefaultBlocklistHandler(BaseBlocklistHandler):
for key, value in self.cnf['include'].items():
if key.startswith('custom'):
entry = value.rstrip().lower()
if not self._whitelist_pattern.match(entry):
if self.domain_pattern.match(entry):
result[entry] = {'bl': 'Manual', 'wildcard': False}
if self.domain_pattern.match(entry):
result[entry] = {'bl': 'Manual', 'wildcard': False}
elif key.startswith('wildcard'):
entry = value.rstrip().lower()
if self.domain_pattern.match(entry):
@ -140,27 +135,7 @@ class DefaultBlocklistHandler(BaseBlocklistHandler):
else:
syslog.syslog(syslog.LOG_ERR, 'unable to download blocklist from %s and no cache available' % uri)
def _get_excludes(self):
whitelist_pattern = re.compile('$^') # match nothing
def get_passlist_patterns(self):
if self.cnf.has_section('exclude'):
exclude_list = set()
for exclude_item in self.cnf['exclude']:
pattern = self.cnf['exclude'][exclude_item]
try:
re.compile(pattern, re.IGNORECASE)
exclude_list.add(pattern)
except re.error:
syslog.syslog(syslog.LOG_ERR,
'blocklist download : skip invalid whitelist exclude pattern "%s" (%s)' % (
exclude_item, pattern
)
)
if not exclude_list:
exclude_list.add('$^')
wp = '|'.join(exclude_list)
whitelist_pattern = re.compile(wp, re.IGNORECASE)
syslog.syslog(syslog.LOG_NOTICE, 'blocklist download : exclude domains matching %s' % wp)
return whitelist_pattern
return list(self.cnf['exclude'].values())
return []

View File

@ -37,6 +37,7 @@
import os
import json
import time
import re
import errno
import uuid
import ipaddress
@ -324,6 +325,9 @@ class DNSBL:
return False
domain = query.domain.rstrip('.').lower()
if mod_env['context'].global_pass_regex and mod_env['context'].global_pass_regex.match(domain):
return False
sub = domain
match = None
while match is None:
@ -360,6 +364,7 @@ class ModuleContext:
self.env = env
self.dst_addr = '0.0.0.0'
self.rcode = RCODE_NOERROR
self.global_pass_regex = None
if self.env:
self.dnssec_enabled = 'validator' in self.env.cfg.module_conf
@ -370,6 +375,15 @@ class ModuleContext:
self.config = config
self.dst_addr = self.config.get('dst_addr', '0.0.0.0')
self.rcode = RCODE_NXDOMAIN if self.config.get('rcode') == 'NXDOMAIN' else RCODE_NOERROR
passlist = self.config.get('global_passlist_regex', None)
if passlist:
# when a pass/white list is offered, we need to be absolutely sure we can use the regex.
# compile and skip when invalid.
try:
self.global_pass_regex = re.compile(passlist, re.IGNORECASE)
except re.error:
log_err("dnsbl_module: unable to compile regex in global_passlist_regex")
self.global_pass_regex = None
def time_diff_ms(start):
return round((time.time() - start) * 1000)