Firewall: Aliases - add "URL Table in JSON format (IPs)" type which parses json payloads and extracts addresses, closes https://github.com/opnsense/core/issues/8107

While here, also fix a minor issue in https://github.com/opnsense/core/pull/8238 to calculate a proper alias has value when auth properties are specified.
This commit is contained in:
Ad Schellevis 2025-01-26 16:34:11 +01:00
parent 3654b42210
commit 03a8812a3b
5 changed files with 71 additions and 23 deletions

View File

@ -30,6 +30,7 @@
<port>Port(s)</port>
<url>URL (IPs)</url>
<urltable>URL Table (IPs)</urltable>
<urljson>URL Table in JSON format (IPs)</urljson>
<geoip>GeoIP</geoip>
<networkgroup>Network group</networkgroup>
<mac>MAC address</mac>
@ -45,6 +46,7 @@
</check001>
</Constraints>
</type>
<path_expression type="TextField"/>
<proto type="OptionField">
<Multiple>Y</Multiple>
<OptionValues>

View File

@ -329,6 +329,7 @@
$("#row_alias\\.updatefreq").hide();
$("#row_alias\\.authtype").hide();
$("#row_alias\\.interface").hide();
$("#row_alias\\.path_expression").hide();
$("#copy-paste").hide();
switch ($(this).val()) {
case 'authgroup':
@ -357,6 +358,9 @@
$("#alias\\.proto").selectpicker('hide');
$("#alias_type_default").show();
break;
case 'urljson':
$("#row_alias\\.path_expression").show();
/* FALLTHROUGH */
case 'urltable':
$("#row_alias\\.updatefreq").show();
/* FALLTHROUGH */
@ -897,6 +901,25 @@
<span class="help-block" id="help_block_alias.content"></span>
</td>
</tr>
<tr id="row_alias.path_expression">
<td>
<div class="control-label" id="control_label_alias.path_expression">
<a id="help_for_alias.path_expression" href="#" class="showhelp"><i class="fa fa-info-circle"></i></a>
<b>{{lang._('Path expression')}}</b>
</div>
</td>
<td>
<input type="text" class="form-control" size="50" id="alias.path_expression"/>
<div class="hidden" data-for="help_for_alias.path_expression">
<small>
{{lang._('Simplified expression to select a field inside a container, a dot [.] is used as field separator (e.g. container.fieldname).')}}
</small>
</div>
</td>
<td>
<span class="help-block" id="help_block_alias.authtype"></span>
</td>
</tr>
<tr id="row_alias.authtype">
<td>
<div class="control-label" id="control_label_alias.authtype">

View File

@ -60,9 +60,7 @@ class Alias(object):
'ssl_no_verify': ssl_no_verify,
'timeout': timeout,
'interface': None,
'proto': 'IPv4,IPv6',
'password': None,
'authtype': None,
'proto': 'IPv4,IPv6'
}
self._ttl = ttl
self._name = None
@ -72,13 +70,10 @@ class Alias(object):
for subelem in elem:
if subelem.tag == 'type':
self._type = subelem.text
elif subelem.tag == 'proto':
self._properties['proto'] = subelem.text
self._properties['type'] = subelem.text
elif subelem.tag == 'name':
self._name = subelem.text
self._properties['name'] = self._name
elif subelem.tag == 'interface':
self._properties['interface'] = subelem.text
elif subelem.tag == 'ttl':
tmp = subelem.text.strip()
if len(tmp.split('.')) <= 2 and tmp.replace('.', '').isdigit():
@ -92,12 +87,8 @@ class Alias(object):
self._items = set(sorted(subelem.text.split()))
elif subelem.tag == 'url':
self._items = set(sorted(subelem.text.split()))
elif subelem.tag == 'authtype':
self._properties['authtype'] = subelem.text
elif subelem.tag == 'password':
self._properties['password'] = subelem.text
elif subelem.tag == 'username':
self._properties['username'] = subelem.text
else:
self._properties[subelem.tag] = subelem.text
# we'll save the calculated hash for the unparsed alias content
self._filename_alias_hash = '/var/db/aliastables/%s.md5.txt' % self._name
@ -117,8 +108,9 @@ class Alias(object):
:return: md5 (string)
"""
tmp = ','.join(sorted(list(self._items)))
if self._properties['proto']:
tmp = '%s[%s]' % (tmp, self._properties['proto'])
for fieldname in ['proto', 'path_expression', 'authtype', 'username', 'password']:
if fieldname in self._properties:
tmp = '%s[%s]' % (tmp, self._properties[fieldname])
return md5(tmp.encode()).hexdigest()
def changed(self):
@ -231,7 +223,7 @@ class Alias(object):
"""
if self._type in ['host', 'network', 'networkgroup']:
return BaseContentParser(**self._properties)
elif self._type in ['url', 'urltable']:
elif self._type in ['url', 'urltable', 'urljson']:
return UriParser(**self._properties)
elif self._type == 'geoip':
return GEOIP(**self._properties)

View File

@ -28,6 +28,7 @@ import re
import syslog
import requests
import urllib3
import ujson
from .base import BaseContentParser
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@ -41,17 +42,45 @@ class UriParser(BaseContentParser):
self._authtype = authtype
self._username = username
self._password = password
self._type = kwargs.get('type', None)
# optional path expresion
if kwargs.get('path_expression', None):
self._path_expression = kwargs['path_expression'].split('.')
else:
self._path_expression = []
def _parse_line(self, line):
""" return unparsed (raw) alias entries without dependencies
:param line: string item to parse
:return: iterator
"""
if self._type == 'urljson':
try:
record = ujson.loads(line)
except ValueError:
record = {}
for field in self._path_expression:
if type(record) is dict and field in record:
record = record[field]
else:
return
if type(record) is str:
yield record
elif type(record) is list:
for item in record:
yield item
else:
raw_address = re.split(r'[\s,;|#]+', line)[0]
if raw_address and not raw_address.startswith('//'):
yield raw_address
def iter_addresses(self, url):
""" return unparsed (raw) alias entries without dependencies
""" parse addresses, yield only valid addresses and networks
:param url: url
:return: iterator
"""
# set request parameters
req_opts = dict()
req_opts['url'] = url
req_opts['stream'] = True
req_opts['timeout'] = self._timeout
req_opts = {'url': url, 'stream': True, 'timeout': self._timeout}
if self._ssl_no_verify:
req_opts['verify'] = False
@ -70,8 +99,7 @@ class UriParser(BaseContentParser):
lines = req.raw.read().decode().splitlines()
syslog.syslog(syslog.LOG_NOTICE, 'fetch alias url %s (lines: %s)' % (url, len(lines)))
for line in lines:
raw_address = re.split(r'[\s,;|#]+', line)[0]
if raw_address and not raw_address.startswith('//'):
for raw_address in self._parse_line(line):
for address in super().iter_addresses(raw_address):
yield address
else:

View File

@ -15,6 +15,9 @@
<type>{{ alias.type }}</type>
{% if alias.enabled|default('0') == '0'%}
<address></address>
{% elif alias.type.startswith('urljson') %}
<url>{{ alias.content|e|encode_idna }}</url>
<path_expression>{{ alias.path_expression|e }}</path_expression>
{% elif alias.type.startswith('urltable') %}
<url>{{ alias.content|e|encode_idna }}</url>
{% elif alias.type.startswith('url') %}