eventcally/project/services/admin_unit.py
2023-10-01 14:33:48 +02:00

486 lines
15 KiB
Python

import datetime
from sqlalchemy import and_, func, or_
from sqlalchemy.orm import joinedload, load_only
from project import db
from project.models import (
AdminUnit,
AdminUnitInvitation,
AdminUnitMember,
AdminUnitMemberInvitation,
AdminUnitMemberRole,
AdminUnitRelation,
CustomWidget,
EventEventLists,
EventList,
EventOrganizer,
EventPlace,
Location,
)
from project.services.image import upsert_image_with_data
from project.services.location import assign_location_values
from project.services.reference import (
get_newest_reference_requests,
get_newest_references,
)
from project.services.search_params import (
AdminUnitSearchParams,
EventPlaceSearchParams,
OrganizerSearchParams,
)
def insert_admin_unit_for_user(admin_unit, user, invitation=None):
db.session.add(admin_unit)
# Nutzer als Admin hinzufügen
add_user_to_admin_unit_with_roles(user, admin_unit, ["admin", "event_verifier"])
db.session.commit()
# Organizer anlegen
organizer = EventOrganizer()
organizer.admin_unit_id = admin_unit.id
organizer.name = admin_unit.name
organizer.url = admin_unit.url
organizer.email = admin_unit.email
organizer.phone = admin_unit.phone
organizer.fax = admin_unit.fax
organizer.location = Location()
assign_location_values(organizer.location, admin_unit.location)
if admin_unit.logo:
organizer.logo = upsert_image_with_data(
organizer.logo,
admin_unit.logo.data,
admin_unit.logo.encoding_format,
)
db.session.add(organizer)
# Place anlegen
place = None
if admin_unit.location:
place = EventPlace()
place.admin_unit_id = admin_unit.id
place.name = admin_unit.location.city
place.location = Location()
assign_location_values(place.location, admin_unit.location)
db.session.add(place)
# Beziehung anlegen
relation = None
if invitation:
inviting_admin_unit = get_admin_unit_by_id(invitation.admin_unit_id)
relation = upsert_admin_unit_relation(invitation.admin_unit_id, admin_unit.id)
relation.invited = True
relation.auto_verify_event_reference_requests = (
inviting_admin_unit.incoming_reference_requests_allowed
and invitation.relation_auto_verify_event_reference_requests
)
relation.verify = (
inviting_admin_unit.can_verify_other and invitation.relation_verify
)
db.session.commit()
return (organizer, place, relation)
def get_admin_unit_by_id(id):
return AdminUnit.query.filter_by(id=id).first()
def get_admin_unit_by_name(unit_name):
return AdminUnit.query.filter_by(name=unit_name).first()
def get_admin_unit_member_role(role_name):
return AdminUnitMemberRole.query.filter_by(name=role_name).first()
def find_admin_unit_member_invitation(email, admin_unit_id):
return AdminUnitMemberInvitation.query.filter(
and_(
AdminUnitMemberInvitation.admin_unit_id == admin_unit_id,
func.lower(AdminUnitMemberInvitation.email) == func.lower(email),
)
).first()
def get_admin_unit_member_invitations(email):
return AdminUnitMemberInvitation.query.filter(
func.lower(AdminUnitMemberInvitation.email) == func.lower(email)
).all()
def insert_admin_unit_member_invitation(admin_unit_id, email, role_names):
invitation = AdminUnitMemberInvitation()
invitation.admin_unit_id = admin_unit_id
invitation.email = email
invitation.roles = ",".join(role_names)
db.session.add(invitation)
db.session.commit()
return invitation
def upsert_admin_unit_member_role(role_name, role_title, permissions):
result = AdminUnitMemberRole.query.filter_by(name=role_name).first()
if result is None:
result = AdminUnitMemberRole(name=role_name)
db.session.add(result)
result.title = role_title
result.permissions = permissions
return result
def add_user_to_admin_unit(user, admin_unit):
result = (
AdminUnitMember.query.with_parent(admin_unit).filter_by(user_id=user.id).first()
)
if result is None:
result = AdminUnitMember(user=user, admin_unit_id=admin_unit.id)
admin_unit.members.append(result)
db.session.add(result)
return result
def add_user_to_admin_unit_with_roles(user, admin_unit, role_names):
member = add_user_to_admin_unit(user, admin_unit)
add_roles_to_admin_unit_member(member, role_names)
return member
def add_roles_to_admin_unit_member(member, role_names):
for role_name in role_names:
role = get_admin_unit_member_role(role_name)
if not role:
continue
add_role_to_admin_unit_member(member, role)
def add_role_to_admin_unit_member(admin_unit_member, role):
if (
AdminUnitMemberRole.query.with_parent(admin_unit_member)
.filter_by(name=role.name)
.first()
is None
):
admin_unit_member.roles.append(role)
def get_member_for_admin_unit_by_user_id(admin_unit_id, user_id):
return AdminUnitMember.query.filter_by(
admin_unit_id=admin_unit_id, user_id=user_id
).first()
def get_admin_unit_member(id):
return AdminUnitMember.query.filter_by(id=id).first()
def get_admin_unit_query(params: AdminUnitSearchParams):
query = AdminUnit.query.join(AdminUnit.location, isouter=True)
if not params.include_unverified:
query = query.filter(AdminUnit.is_verified)
if params.only_verifier:
only_verifier_filter = and_(
AdminUnit.can_verify_other, AdminUnit.incoming_verification_requests_allowed
)
query = query.filter(only_verifier_filter)
if params.reference_request_for_admin_unit_id:
request_filter = and_(
AdminUnit.id != params.reference_request_for_admin_unit_id,
AdminUnit.incoming_reference_requests_allowed,
)
query = query.filter(request_filter)
if params.incoming_verification_requests_postal_code:
request_filter = or_(
AdminUnit.incoming_verification_requests_postal_codes.contains(
[params.incoming_verification_requests_postal_code]
),
AdminUnit.incoming_verification_requests_postal_codes == "{}",
)
query = query.filter(request_filter)
query = params.get_trackable_query(query, AdminUnit)
if params.postal_code:
if type(params.postal_code) is list:
postalCodes = params.postal_code
else: # pragma: no cover
postalCodes = [params.postal_code]
postalCodeFilters = None
for postalCode in postalCodes:
postalCodeFilter = Location.postalCode.ilike(postalCode + "%")
if postalCodeFilters is not None:
postalCodeFilters = or_(postalCodeFilters, postalCodeFilter)
else:
postalCodeFilters = postalCodeFilter
if postalCodeFilters is not None:
query = query.filter(postalCodeFilters)
if params.keyword:
like_keyword = "%" + params.keyword + "%"
order_keyword = params.keyword + "%"
keyword_filter = or_(
AdminUnit.name.ilike(like_keyword),
AdminUnit.short_name.ilike(like_keyword),
)
query = query.filter(keyword_filter)
query = params.get_trackable_order_by(query, AdminUnit)
query = query.order_by(
AdminUnit.name.ilike(order_keyword).desc(),
AdminUnit.short_name.ilike(order_keyword).desc(),
func.lower(AdminUnit.name),
)
else:
query = params.get_trackable_order_by(query, AdminUnit)
query = query.order_by(func.lower(AdminUnit.name))
return query
def get_organizer_query(params: OrganizerSearchParams):
query = EventOrganizer.query.filter(
EventOrganizer.admin_unit_id == params.admin_unit_id
)
query = params.get_trackable_query(query, EventOrganizer)
if params.name:
like_name = "%" + params.name + "%"
order_name = params.name + "%"
query = params.get_trackable_order_by(query, EventOrganizer)
query = query.filter(EventOrganizer.name.ilike(like_name)).order_by(
EventOrganizer.name.ilike(order_name).desc(),
func.lower(EventOrganizer.name),
)
else:
query = params.get_trackable_order_by(query, EventOrganizer)
query = query.order_by(func.lower(EventOrganizer.name))
return query
def get_custom_widget_query(admin_unit_id, name=None):
query = CustomWidget.query.filter(CustomWidget.admin_unit_id == admin_unit_id)
if name:
like_name = "%" + name + "%"
query = query.filter(CustomWidget.name.ilike(like_name))
return query.order_by(func.lower(CustomWidget.name))
def get_place_query(params: EventPlaceSearchParams):
query = EventPlace.query.filter(EventPlace.admin_unit_id == params.admin_unit_id)
query = params.get_trackable_query(query, EventPlace)
if params.name:
like_name = "%" + params.name + "%"
query = query.filter(EventPlace.name.ilike(like_name))
query = params.get_trackable_order_by(query, EventPlace)
return query.order_by(func.lower(EventPlace.name))
def get_event_list_query(admin_unit_id, name=None, event_id=None):
query = EventList.query.filter(EventList.admin_unit_id == admin_unit_id)
if name:
like_name = "%" + name + "%"
query = query.filter(EventList.name.ilike(like_name))
return query.order_by(func.lower(EventList.name))
def get_event_list_status_query(admin_unit_id, event_id, name=None):
event_count = (
db.session.query(func.count(EventEventLists.id))
.filter(
EventEventLists.event_id == event_id,
EventEventLists.list_id == EventList.id,
)
.label("event_count")
)
query = db.session.query(EventList, event_count).filter(
EventList.admin_unit_id == admin_unit_id
)
if name:
like_name = "%" + name + "%"
query = query.filter(EventList.name.ilike(like_name))
return query.group_by(EventList.id).order_by(func.lower(EventList.name))
def insert_admin_unit_relation(source_admin_unit_id: int, target_admin_unit_id: int):
result = AdminUnitRelation(
source_admin_unit_id=source_admin_unit_id,
target_admin_unit_id=target_admin_unit_id,
)
db.session.add(result)
return result
def get_admin_unit_relation(source_admin_unit_id: int, target_admin_unit_id: int):
return AdminUnitRelation.query.filter_by(
source_admin_unit_id=source_admin_unit_id,
target_admin_unit_id=target_admin_unit_id,
).first()
def upsert_admin_unit_relation(source_admin_unit_id: int, target_admin_unit_id: int):
result = get_admin_unit_relation(source_admin_unit_id, target_admin_unit_id)
if result is None:
result = insert_admin_unit_relation(source_admin_unit_id, target_admin_unit_id)
return result
def get_admin_unit_relations_for_reference_requests(
target_admin_unit_id: int, limit: int
):
return (
AdminUnitRelation.query.join(
AdminUnit,
AdminUnitRelation.source_admin_unit_id == AdminUnit.id,
)
.options(
joinedload(AdminUnitRelation.source_admin_unit).load_only(
AdminUnit.id, AdminUnit.name
),
)
.filter(
and_(
AdminUnitRelation.target_admin_unit_id == target_admin_unit_id,
AdminUnit.incoming_reference_requests_allowed,
)
)
.order_by(
AdminUnitRelation.auto_verify_event_reference_requests.desc(),
AdminUnitRelation.verify.desc(),
AdminUnitRelation.invited.desc(),
AdminUnitRelation.created_at.desc(),
)
.limit(limit)
.all()
)
def get_admin_units_for_reference_requests(admin_unit_id: int, limit: int):
return (
AdminUnit.query.options(
load_only(AdminUnit.id, AdminUnit.name),
)
.filter(
and_(
AdminUnit.id != admin_unit_id,
AdminUnit.incoming_reference_requests_allowed,
)
)
.limit(limit)
.all()
)
def get_admin_unit_invitation_query(admin_unit):
return AdminUnitInvitation.query.filter(
AdminUnitInvitation.admin_unit_id == admin_unit.id
)
def get_admin_unit_organization_invitations_query(email):
return AdminUnitInvitation.query.filter(
func.lower(AdminUnitInvitation.email) == func.lower(email)
)
def get_admin_unit_organization_invitations(email):
return get_admin_unit_organization_invitations_query(email).all()
def create_ical_events_for_admin_unit(
admin_unit: AdminUnit,
) -> list: # list[icalendar.Event]
from dateutil.relativedelta import relativedelta
from project.dateutils import get_today
from project.services.event import create_ical_events_for_search
from project.services.search_params import EventSearchParams
params = EventSearchParams()
params.date_from = get_today() - relativedelta(months=1)
params.admin_unit_id = admin_unit.id
params.can_read_private_events = False
params.include_admin_unit_references = True
return create_ical_events_for_search(params)
def get_admin_units_with_due_delete_request():
due = datetime.datetime.utcnow() - datetime.timedelta(days=3)
return AdminUnit.query.filter(AdminUnit.deletion_requested_at < due).all()
def delete_admin_unit(admin_unit: AdminUnit):
db.session.delete(admin_unit)
db.session.commit()
def get_admin_unit_suggestions_for_reference_requests(admin_unit, max_choices=5):
admin_unit_ids = []
admin_unit_choices = []
selected_ids = []
def add_admin_units(admin_units, selected=True):
for admin_unit in admin_units:
if admin_unit.id in admin_unit_ids:
continue
admin_unit_ids.append(admin_unit.id)
admin_unit_choices.append(admin_unit)
if selected:
selected_ids.append(admin_unit.id)
# Neuste ausgehende Empfehlungsanfragen
limit = max_choices - len(admin_unit_ids)
reference_requests = get_newest_reference_requests(admin_unit.id, limit)
add_admin_units([r.admin_unit for r in reference_requests])
# Neuste ausgehende Empfehlungen
limit = max_choices - len(admin_unit_ids)
if limit > 0:
references = get_newest_references(admin_unit.id, limit)
add_admin_units([r.admin_unit for r in references])
# Eingehende Beziehungen, die Organisation oder Events automatisch verifizieren
limit = max_choices - len(admin_unit_ids)
if limit > 0:
relations = get_admin_unit_relations_for_reference_requests(
admin_unit.id, limit
)
add_admin_units([r.source_admin_unit for r in relations])
# Organisationen, die eingehende Empfehlungsanfragen erlauben
limit = max_choices - len(admin_unit_ids)
if limit > 0:
admin_units_for_reference = get_admin_units_for_reference_requests(
admin_unit.id, limit
)
add_admin_units(admin_units_for_reference, False)
return (admin_unit_choices, selected_ids)