2022-07-29 16:15:27 +00:00

369 lines
13 KiB
Python

"""
Serializers control the translation of client data to and from Python objects,
while Django itself handles the database abstraction.
"""
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist
from drf_yasg.utils import swagger_serializer_method
from ipam.api.serializers import NestedPrefixSerializer
from netbox.api import ContentTypeField
from netbox.api.serializers import NetBoxModelSerializer
from rest_framework import serializers
from utilities.api import get_serializer_for_model
from ..constants import ACL_HOST_ASSIGNMENT_MODELS, ACL_INTERFACE_ASSIGNMENT_MODELS
from ..models import (
AccessList,
ACLExtendedRule,
ACLInterfaceAssignment,
ACLStandardRule,
)
from .nested_serializers import (
NestedAccessListSerializer,
NestedACLExtendedRuleSerializer,
NestedACLInterfaceAssignmentSerializer,
NestedACLStandardRuleSerializer,
)
__all__ = [
"AccessListSerializer",
"ACLInterfaceAssignmentSerializer",
"ACLStandardRuleSerializer",
"ACLExtendedRuleSerializer",
]
# Sets a standard error message for ACL rules with an action of remark, but no remark set.
error_message_no_remark = "Action is set to remark, you MUST add a remark."
# Sets a standard error message for ACL rules with an action of remark, but no source_prefix is set.
error_message_action_remark_source_prefix_set = (
"Action is set to remark, Source Prefix CANNOT be set."
)
# Sets a standard error message for ACL rules with an action not set to remark, but no remark is set.
error_message_remark_without_action_remark = (
"CANNOT set remark unless action is set to remark."
)
# Sets a standard error message for ACL rules no associated to an ACL of the same type.
error_message_acl_type = "Provided parent Access List is not of right type."
class AccessListSerializer(NetBoxModelSerializer):
"""
Defines the serializer for the django AccessList model & associates it to a view.
"""
url = serializers.HyperlinkedIdentityField(
view_name="plugins-api:netbox_access_lists-api:accesslist-detail",
)
rule_count = serializers.IntegerField(read_only=True)
assigned_object_type = ContentTypeField(
queryset=ContentType.objects.filter(ACL_HOST_ASSIGNMENT_MODELS),
)
assigned_object = serializers.SerializerMethodField(read_only=True)
class Meta:
"""
Associates the django model AccessList & fields to the serializer.
"""
model = AccessList
fields = (
"id",
"url",
"display",
"name",
"assigned_object_type",
"assigned_object_id",
"assigned_object",
"type",
"default_action",
"comments",
"tags",
"custom_fields",
"created",
"last_updated",
"rule_count",
)
@swagger_serializer_method(serializer_or_field=serializers.DictField)
def get_assigned_object(self, obj):
serializer = get_serializer_for_model(obj.assigned_object, prefix="Nested")
context = {"request": self.context["request"]}
return serializer(obj.assigned_object, context=context).data
def validate(self, data):
"""
Validates api inputs before processing:
- Check that the GFK object is valid.
- Check if Access List has no existing rules before change the Access List's type.
"""
error_message = {}
# Check that the GFK object is valid.
if "assigned_object_type" in data and "assigned_object_id" in data:
try:
assigned_object = data["assigned_object_type"].get_object_for_this_type(
id=data["assigned_object_id"]
)
except ObjectDoesNotExist:
# Sets a standard error message for invalid GFK
error_message_invalid_gfk = f"Invalid assigned_object {data['assigned_object_type']} ID {data['assigned_object_id']}"
error_message["assigned_object_type"] = [error_message_invalid_gfk]
error_message["assigned_object_id"] = [error_message_invalid_gfk]
# Check if Access List has no existing rules before change the Access List's type.
if (
self.instance
and self.instance.type != data.get("type")
and self.instance.rule_count > 0
):
error_message["type"] = [
"This ACL has ACL rules associated, CANNOT change ACL type."
]
if error_message:
raise serializers.ValidationError(error_message)
return super().validate(data)
class ACLInterfaceAssignmentSerializer(NetBoxModelSerializer):
"""
Defines the serializer for the django ACLInterfaceAssignment model & associates it to a view.
"""
url = serializers.HyperlinkedIdentityField(
view_name="plugins-api:netbox_access_lists-api:aclinterfaceassignment-detail",
)
assigned_object_type = ContentTypeField(
queryset=ContentType.objects.filter(ACL_INTERFACE_ASSIGNMENT_MODELS),
)
assigned_object = serializers.SerializerMethodField(read_only=True)
class Meta:
"""
Associates the django model ACLInterfaceAssignment & fields to the serializer.
"""
model = ACLInterfaceAssignment
fields = (
"id",
"url",
"access_list",
"direction",
"assigned_object_type",
"assigned_object_id",
"assigned_object",
"comments",
"tags",
"custom_fields",
"created",
"last_updated",
)
@swagger_serializer_method(serializer_or_field=serializers.DictField)
def get_assigned_object(self, obj):
serializer = get_serializer_for_model(obj.assigned_object, prefix="Nested")
context = {"request": self.context["request"]}
return serializer(obj.assigned_object, context=context).data
def validate(self, data):
"""
Validate the AccessList django model model's inputs before allowing it to update the instance.
- Check that the GFK object is valid.
- Check that the associated interface's parent host has the selected ACL defined.
"""
error_message = {}
acl_host = data["access_list"].assigned_object
# Check that the GFK object is vlaid.
if "assigned_object_type" in data and "assigned_object_id" in data:
try:
assigned_object = data["assigned_object_type"].get_object_for_this_type(
id=data["assigned_object_id"]
)
except ObjectDoesNotExist:
# Sets a standard error message for invalid GFK
error_message_invalid_gfk = f"Invalid assigned_object {data['assigned_object_type']} ID {data['assigned_object_id']}"
error_message["assigned_object_type"] = [error_message_invalid_gfk]
error_message["assigned_object_id"] = [error_message_invalid_gfk]
if data["assigned_object_type"].model == "interface":
interface_host = (
data["assigned_object_type"]
.get_object_for_this_type(id=data["assigned_object_id"])
.device
)
elif data["assigned_object_type"].model == "vminterface":
interface_host = (
data["assigned_object_type"]
.get_object_for_this_type(id=data["assigned_object_id"])
.virtual_machine
)
# Check that the associated interface's parent host has the selected ACL defined.
if acl_host != interface_host:
error_acl_not_assigned_to_host = (
"Access List not present on the selected interface's host."
)
error_message["access_list"] = [error_acl_not_assigned_to_host]
error_message["assigned_object_id"] = [error_acl_not_assigned_to_host]
if error_message:
raise serializers.ValidationError(error_message)
return super().validate(data)
class ACLStandardRuleSerializer(NetBoxModelSerializer):
"""
Defines the serializer for the django ACLStandardRule model & associates it to a view.
"""
url = serializers.HyperlinkedIdentityField(
view_name="plugins-api:netbox_access_lists-api:aclstandardrule-detail",
)
access_list = NestedAccessListSerializer()
source_prefix = NestedPrefixSerializer(
required=False,
allow_null=True,
default=None,
)
class Meta:
"""
Associates the django model ACLStandardRule & fields to the serializer.
"""
model = ACLStandardRule
fields = (
"id",
"url",
"display",
"access_list",
"index",
"action",
"tags",
"description",
"remark",
"created",
"custom_fields",
"last_updated",
"source_prefix",
)
def validate(self, data):
"""
Validate the ACLStandardRule django model model's inputs before allowing it to update the instance:
- Check if action set to remark, but no remark set.
- Check if action set to remark, but source_prefix set.
"""
error_message = {}
# Check if action set to remark, but no remark set.
if data.get("action") == "remark" and data.get("remark") is None:
error_message["remark"] = [error_message_no_remark]
# Check if action set to remark, but source_prefix set.
if data.get("source_prefix"):
error_message["source_prefix"] = [
error_message_action_remark_source_prefix_set
]
if error_message:
raise serializers.ValidationError(error_message)
return super().validate(data)
class ACLExtendedRuleSerializer(NetBoxModelSerializer):
"""
Defines the serializer for the django ACLExtendedRule model & associates it to a view.
"""
url = serializers.HyperlinkedIdentityField(
view_name="plugins-api:netbox_access_lists-api:aclextendedrule-detail",
)
access_list = NestedAccessListSerializer()
source_prefix = NestedPrefixSerializer(
required=False,
allow_null=True,
default=None,
)
destination_prefix = NestedPrefixSerializer(
required=False,
allow_null=True,
default=None,
)
class Meta:
"""
Associates the django model ACLExtendedRule & fields to the serializer.
"""
model = ACLExtendedRule
fields = (
"id",
"url",
"display",
"access_list",
"index",
"action",
"tags",
"description",
"created",
"custom_fields",
"last_updated",
"source_prefix",
"source_ports",
"destination_prefix",
"destination_ports",
"protocol",
"remark",
)
def validate(self, data):
"""
Validate the ACLExtendedRule django model model's inputs before allowing it to update the instance:
- Check if action set to remark, but no remark set.
- Check if action set to remark, but source_prefix set.
- Check if action set to remark, but source_ports set.
- Check if action set to remark, but destination_prefix set.
- Check if action set to remark, but destination_ports set.
- Check if action set to remark, but protocol set.
- Check if action set to remark, but protocol set.
"""
error_message = {}
# Check if action set to remark, but no remark set.
if data.get("action") == "remark" and data.get("remark") is None:
error_message["remark"] = [error_message_no_remark]
# Check if action set to remark, but source_prefix set.
if data.get("source_prefix"):
error_message["source_prefix"] = [
error_message_action_remark_source_prefix_set
]
# Check if action set to remark, but source_ports set.
if data.get("source_ports"):
error_message["source_ports"] = [
"Action is set to remark, Source Ports CANNOT be set."
]
# Check if action set to remark, but destination_prefix set.
if data.get("destination_prefix"):
error_message["destination_prefix"] = [
"Action is set to remark, Destination Prefix CANNOT be set."
]
# Check if action set to remark, but destination_ports set.
if data.get("destination_ports"):
error_message["destination_ports"] = [
"Action is set to remark, Destination Ports CANNOT be set."
]
# Check if action set to remark, but protocol set.
if data.get("protocol"):
error_message["protocol"] = [
"Action is set to remark, Protocol CANNOT be set."
]
if error_message:
raise serializers.ValidationError(error_message)
return super().validate(data)