mirror of
https://github.com/lucaspalomodevelop/netbox-acls.git
synced 2026-03-13 07:29:40 +00:00
380 lines
14 KiB
Python
380 lines
14 KiB
Python
"""
|
|
Serializers control the translation of client data to and from Python objects,
|
|
while Django itself handles the database abstraction.
|
|
"""
|
|
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from drf_yasg.utils import swagger_serializer_method
|
|
from ipam.api.serializers import NestedPrefixSerializer
|
|
from netbox.api.fields import ContentTypeField
|
|
from netbox.api.serializers import NetBoxModelSerializer
|
|
from netbox.constants import NESTED_SERIALIZER_PREFIX
|
|
from rest_framework import serializers
|
|
from utilities.api import get_serializer_for_model
|
|
|
|
from ..constants import ACL_HOST_ASSIGNMENT_MODELS, ACL_INTERFACE_ASSIGNMENT_MODELS
|
|
from ..models import (
|
|
AccessList,
|
|
ACLExtendedRule,
|
|
ACLInterfaceAssignment,
|
|
ACLStandardRule,
|
|
)
|
|
from .nested_serializers import NestedAccessListSerializer
|
|
|
|
__all__ = [
|
|
"AccessListSerializer",
|
|
"ACLInterfaceAssignmentSerializer",
|
|
"ACLStandardRuleSerializer",
|
|
"ACLExtendedRuleSerializer",
|
|
]
|
|
|
|
# Sets a standard error message for ACL rules with an action of remark, but no remark set.
|
|
error_message_no_remark = "Action is set to remark, you MUST add a remark."
|
|
# Sets a standard error message for ACL rules with an action of remark, but no source_prefix is set.
|
|
error_message_action_remark_source_prefix_set = (
|
|
"Action is set to remark, Source Prefix CANNOT be set."
|
|
)
|
|
# Sets a standard error message for ACL rules with an action not set to remark, but no remark is set.
|
|
error_message_remark_without_action_remark = (
|
|
"CANNOT set remark unless action is set to remark."
|
|
)
|
|
# Sets a standard error message for ACL rules no associated to an ACL of the same type.
|
|
error_message_acl_type = "Provided parent Access List is not of right type."
|
|
|
|
|
|
class AccessListSerializer(NetBoxModelSerializer):
|
|
"""
|
|
Defines the serializer for the django AccessList model & associates it to a view.
|
|
"""
|
|
|
|
url = serializers.HyperlinkedIdentityField(
|
|
view_name="plugins-api:netbox_acls-api:accesslist-detail",
|
|
)
|
|
rule_count = serializers.IntegerField(read_only=True)
|
|
assigned_object_type = ContentTypeField(
|
|
queryset=ContentType.objects.filter(ACL_HOST_ASSIGNMENT_MODELS),
|
|
)
|
|
assigned_object = serializers.SerializerMethodField(read_only=True)
|
|
|
|
class Meta:
|
|
"""
|
|
Associates the django model AccessList & fields to the serializer.
|
|
"""
|
|
|
|
model = AccessList
|
|
fields = (
|
|
"id",
|
|
"url",
|
|
"display",
|
|
"name",
|
|
"assigned_object_type",
|
|
"assigned_object_id",
|
|
"assigned_object",
|
|
"type",
|
|
"default_action",
|
|
"comments",
|
|
"tags",
|
|
"custom_fields",
|
|
"created",
|
|
"last_updated",
|
|
"rule_count",
|
|
)
|
|
|
|
@swagger_serializer_method(serializer_or_field=serializers.DictField)
|
|
def get_assigned_object(self, obj):
|
|
serializer = get_serializer_for_model(
|
|
obj.assigned_object,
|
|
prefix=NESTED_SERIALIZER_PREFIX,
|
|
)
|
|
context = {"request": self.context["request"]}
|
|
return serializer(obj.assigned_object, context=context).data
|
|
|
|
def validate(self, data):
|
|
"""
|
|
Validates api inputs before processing:
|
|
- Check that the GFK object is valid.
|
|
- Check if Access List has no existing rules before change the Access List's type.
|
|
"""
|
|
error_message = {}
|
|
|
|
# Check that the GFK object is valid.
|
|
if "assigned_object_type" in data and "assigned_object_id" in data:
|
|
# TODO: This can removed after https://github.com/netbox-community/netbox/issues/10221 is fixed.
|
|
try:
|
|
assigned_object = data[ # noqa: F841
|
|
"assigned_object_type"
|
|
].get_object_for_this_type(
|
|
id=data["assigned_object_id"],
|
|
)
|
|
except ObjectDoesNotExist:
|
|
# Sets a standard error message for invalid GFK
|
|
error_message_invalid_gfk = f"Invalid assigned_object {data['assigned_object_type']} ID {data['assigned_object_id']}"
|
|
error_message["assigned_object_type"] = [error_message_invalid_gfk]
|
|
error_message["assigned_object_id"] = [error_message_invalid_gfk]
|
|
|
|
# Check if Access List has no existing rules before change the Access List's type.
|
|
if (
|
|
self.instance
|
|
and self.instance.type != data.get("type")
|
|
and self.instance.rule_count > 0
|
|
):
|
|
error_message["type"] = [
|
|
"This ACL has ACL rules associated, CANNOT change ACL type.",
|
|
]
|
|
|
|
if error_message:
|
|
raise serializers.ValidationError(error_message)
|
|
|
|
return super().validate(data)
|
|
|
|
|
|
class ACLInterfaceAssignmentSerializer(NetBoxModelSerializer):
|
|
"""
|
|
Defines the serializer for the django ACLInterfaceAssignment model & associates it to a view.
|
|
"""
|
|
|
|
url = serializers.HyperlinkedIdentityField(
|
|
view_name="plugins-api:netbox_acls-api:aclinterfaceassignment-detail",
|
|
)
|
|
access_list = NestedAccessListSerializer()
|
|
assigned_object_type = ContentTypeField(
|
|
queryset=ContentType.objects.filter(ACL_INTERFACE_ASSIGNMENT_MODELS),
|
|
)
|
|
assigned_object = serializers.SerializerMethodField(read_only=True)
|
|
|
|
class Meta:
|
|
"""
|
|
Associates the django model ACLInterfaceAssignment & fields to the serializer.
|
|
"""
|
|
|
|
model = ACLInterfaceAssignment
|
|
fields = (
|
|
"id",
|
|
"url",
|
|
"access_list",
|
|
"direction",
|
|
"assigned_object_type",
|
|
"assigned_object_id",
|
|
"assigned_object",
|
|
"comments",
|
|
"tags",
|
|
"custom_fields",
|
|
"created",
|
|
"last_updated",
|
|
)
|
|
|
|
@swagger_serializer_method(serializer_or_field=serializers.DictField)
|
|
def get_assigned_object(self, obj):
|
|
serializer = get_serializer_for_model(
|
|
obj.assigned_object,
|
|
prefix=NESTED_SERIALIZER_PREFIX,
|
|
)
|
|
context = {"request": self.context["request"]}
|
|
return serializer(obj.assigned_object, context=context).data
|
|
|
|
def validate(self, data):
|
|
"""
|
|
Validate the AccessList django model's inputs before allowing it to update the instance.
|
|
- Check that the GFK object is valid.
|
|
- Check that the associated interface's parent host has the selected ACL defined.
|
|
"""
|
|
error_message = {}
|
|
acl_host = data["access_list"].assigned_object
|
|
|
|
# Check that the GFK object is valid.
|
|
if "assigned_object_type" in data and "assigned_object_id" in data:
|
|
# TODO: This can removed after https://github.com/netbox-community/netbox/issues/10221 is fixed.
|
|
try:
|
|
assigned_object = data[ # noqa: F841
|
|
"assigned_object_type"
|
|
].get_object_for_this_type(
|
|
id=data["assigned_object_id"],
|
|
)
|
|
except ObjectDoesNotExist:
|
|
# Sets a standard error message for invalid GFK
|
|
error_message_invalid_gfk = f"Invalid assigned_object {data['assigned_object_type']} ID {data['assigned_object_id']}"
|
|
error_message["assigned_object_type"] = [error_message_invalid_gfk]
|
|
error_message["assigned_object_id"] = [error_message_invalid_gfk]
|
|
|
|
if data["assigned_object_type"].model == "interface":
|
|
interface_host = (
|
|
data["assigned_object_type"]
|
|
.get_object_for_this_type(id=data["assigned_object_id"])
|
|
.device
|
|
)
|
|
elif data["assigned_object_type"].model == "vminterface":
|
|
interface_host = (
|
|
data["assigned_object_type"]
|
|
.get_object_for_this_type(id=data["assigned_object_id"])
|
|
.virtual_machine
|
|
)
|
|
else:
|
|
interface_host = None
|
|
# Check that the associated interface's parent host has the selected ACL defined.
|
|
if acl_host != interface_host:
|
|
error_acl_not_assigned_to_host = (
|
|
"Access List not present on the selected interface's host."
|
|
)
|
|
error_message["access_list"] = [error_acl_not_assigned_to_host]
|
|
error_message["assigned_object_id"] = [error_acl_not_assigned_to_host]
|
|
|
|
if error_message:
|
|
raise serializers.ValidationError(error_message)
|
|
|
|
return super().validate(data)
|
|
|
|
|
|
class ACLStandardRuleSerializer(NetBoxModelSerializer):
|
|
"""
|
|
Defines the serializer for the django ACLStandardRule model & associates it to a view.
|
|
"""
|
|
|
|
url = serializers.HyperlinkedIdentityField(
|
|
view_name="plugins-api:netbox_acls-api:aclstandardrule-detail",
|
|
)
|
|
access_list = NestedAccessListSerializer()
|
|
source_prefix = NestedPrefixSerializer(
|
|
required=False,
|
|
allow_null=True,
|
|
default=None,
|
|
)
|
|
|
|
class Meta:
|
|
"""
|
|
Associates the django model ACLStandardRule & fields to the serializer.
|
|
"""
|
|
|
|
model = ACLStandardRule
|
|
fields = (
|
|
"id",
|
|
"url",
|
|
"display",
|
|
"access_list",
|
|
"index",
|
|
"action",
|
|
"tags",
|
|
"description",
|
|
"remark",
|
|
"created",
|
|
"custom_fields",
|
|
"last_updated",
|
|
"source_prefix",
|
|
)
|
|
|
|
def validate(self, data):
|
|
"""
|
|
Validate the ACLStandardRule django model's inputs before allowing it to update the instance:
|
|
- Check if action set to remark, but no remark set.
|
|
- Check if action set to remark, but source_prefix set.
|
|
"""
|
|
error_message = {}
|
|
|
|
# Check if action set to remark, but no remark set.
|
|
if data.get("action") == "remark" and data.get("remark") is None:
|
|
error_message["remark"] = [error_message_no_remark]
|
|
# Check if action set to remark, but source_prefix set.
|
|
if data.get("source_prefix"):
|
|
error_message["source_prefix"] = [
|
|
error_message_action_remark_source_prefix_set,
|
|
]
|
|
|
|
if error_message:
|
|
raise serializers.ValidationError(error_message)
|
|
|
|
return super().validate(data)
|
|
|
|
|
|
class ACLExtendedRuleSerializer(NetBoxModelSerializer):
|
|
"""
|
|
Defines the serializer for the django ACLExtendedRule model & associates it to a view.
|
|
"""
|
|
|
|
url = serializers.HyperlinkedIdentityField(
|
|
view_name="plugins-api:netbox_acls-api:aclextendedrule-detail",
|
|
)
|
|
access_list = NestedAccessListSerializer()
|
|
source_prefix = NestedPrefixSerializer(
|
|
required=False,
|
|
allow_null=True,
|
|
default=None,
|
|
)
|
|
destination_prefix = NestedPrefixSerializer(
|
|
required=False,
|
|
allow_null=True,
|
|
default=None,
|
|
)
|
|
|
|
class Meta:
|
|
"""
|
|
Associates the django model ACLExtendedRule & fields to the serializer.
|
|
"""
|
|
|
|
model = ACLExtendedRule
|
|
fields = (
|
|
"id",
|
|
"url",
|
|
"display",
|
|
"access_list",
|
|
"index",
|
|
"action",
|
|
"tags",
|
|
"description",
|
|
"created",
|
|
"custom_fields",
|
|
"last_updated",
|
|
"source_prefix",
|
|
"source_ports",
|
|
"destination_prefix",
|
|
"destination_ports",
|
|
"protocol",
|
|
"remark",
|
|
)
|
|
|
|
def validate(self, data):
|
|
"""
|
|
Validate the ACLExtendedRule django model's inputs before allowing it to update the instance:
|
|
- Check if action set to remark, but no remark set.
|
|
- Check if action set to remark, but source_prefix set.
|
|
- Check if action set to remark, but source_ports set.
|
|
- Check if action set to remark, but destination_prefix set.
|
|
- Check if action set to remark, but destination_ports set.
|
|
- Check if action set to remark, but protocol set.
|
|
- Check if action set to remark, but protocol set.
|
|
"""
|
|
error_message = {}
|
|
|
|
# Check if action set to remark, but no remark set.
|
|
if data.get("action") == "remark" and data.get("remark") is None:
|
|
error_message["remark"] = [error_message_no_remark]
|
|
# Check if action set to remark, but source_prefix set.
|
|
if data.get("source_prefix"):
|
|
error_message["source_prefix"] = [
|
|
error_message_action_remark_source_prefix_set,
|
|
]
|
|
# Check if action set to remark, but source_ports set.
|
|
if data.get("source_ports"):
|
|
error_message["source_ports"] = [
|
|
"Action is set to remark, Source Ports CANNOT be set.",
|
|
]
|
|
# Check if action set to remark, but destination_prefix set.
|
|
if data.get("destination_prefix"):
|
|
error_message["destination_prefix"] = [
|
|
"Action is set to remark, Destination Prefix CANNOT be set.",
|
|
]
|
|
# Check if action set to remark, but destination_ports set.
|
|
if data.get("destination_ports"):
|
|
error_message["destination_ports"] = [
|
|
"Action is set to remark, Destination Ports CANNOT be set.",
|
|
]
|
|
# Check if action set to remark, but protocol set.
|
|
if data.get("protocol"):
|
|
error_message["protocol"] = [
|
|
"Action is set to remark, Protocol CANNOT be set.",
|
|
]
|
|
|
|
if error_message:
|
|
raise serializers.ValidationError(error_message)
|
|
|
|
return super().validate(data)
|