2023-07-04 23:44:38 +02:00

374 lines
11 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from functools import wraps
from urllib.parse import quote_plus
from flask import Markup, flash, g, redirect, render_template, request, url_for
from flask_babel import gettext
from flask_login.utils import decode_cookie
from flask_mail import Message
from flask_security import current_user
from psycopg2.errorcodes import UNIQUE_VIOLATION
from sqlalchemy.exc import SQLAlchemyError
from wtforms import FormField
from project import app, celery, mail
from project.access import (
get_admin_unit_for_manage,
get_admin_unit_for_manage_or_404,
get_admin_units_for_manage,
has_access,
)
from project.dateutils import berlin_tz, round_to_next_day
from project.models import Event, EventAttendanceMode, EventDate
from project.utils import get_place_str, strings_are_equal_ignoring_case
def set_current_admin_unit(admin_unit):
if admin_unit:
setattr(g, "manage_admin_unit", admin_unit)
def get_current_admin_unit(fallback=True, use_cookies=True, use_headers=False):
admin_unit = getattr(g, "manage_admin_unit", None)
if admin_unit:
return admin_unit
if current_user and current_user.is_authenticated:
if use_cookies:
admin_unit = get_current_admin_unit_from_cookies()
if not admin_unit and use_headers:
admin_unit = get_current_admin_unit_from_headers()
if not admin_unit and fallback:
admin_units = get_admin_units_for_manage()
if len(admin_units) > 0:
admin_unit = admin_units[0]
if admin_unit:
set_current_admin_unit(admin_unit)
return admin_unit
def get_current_admin_unit_for_api():
return get_current_admin_unit(fallback=False, use_cookies=False, use_headers=True)
def get_current_admin_unit_from_cookies():
try:
if request and request.cookies and "manage_admin_unit_id" in request.cookies:
encoded = request.cookies.get("manage_admin_unit_id")
manage_admin_unit_id = int(decode_cookie(encoded))
return get_admin_unit_for_manage(manage_admin_unit_id)
except Exception:
pass
return None
def get_current_admin_unit_from_headers():
try:
if request and request.headers and "X-OrganizationId" in request.headers:
manage_admin_unit_id_str = request.headers["X-OrganizationId"]
manage_admin_unit_id = int(manage_admin_unit_id_str)
return get_admin_unit_for_manage(manage_admin_unit_id)
except Exception:
pass
return None
def handleSqlError(e: SQLAlchemyError) -> str:
if not e.orig:
return str(e)
prefix = None
message = gettext(e.orig.message) if hasattr(e.orig, "message") else str(e.orig)
if e.orig.pgcode == UNIQUE_VIOLATION:
prefix = gettext(
"An entry with the entered values already exists. Duplicate entries are not allowed."
)
if not prefix:
return message
return "%s (%s)" % (prefix, message)
def get_pagination_urls(pagination, **kwargs):
result = {}
if pagination:
result["page"] = pagination.page
result["pages"] = pagination.pages
result["total"] = pagination.total
if pagination.has_prev:
args = request.args.copy()
args.update(kwargs)
args["page"] = pagination.prev_num
result["prev_url"] = url_for(request.endpoint, **args)
args["page"] = 1
result["first_url"] = url_for(request.endpoint, **args)
if pagination.has_next:
args = request.args.copy()
args.update(kwargs)
args["page"] = pagination.next_num
result["next_url"] = url_for(request.endpoint, **args)
args["page"] = pagination.pages
result["last_url"] = url_for(request.endpoint, **args)
return result
def flash_errors(form, prefix=None):
for field_name, errors in form.errors.items():
field = getattr(form, field_name)
field_label = field.label.text
if isinstance(field, FormField):
flash_errors(field.form, field_label)
continue
if prefix:
field_label = f"{prefix} {field_label}"
for error in errors:
flash(
gettext("Error in the %s field - %s") % (field_label, error),
"danger",
)
def flash_message(msg, url, link_text=None, category="success"):
if not link_text:
link_text = gettext("Show")
link = ' &ndash; <a href="%s">%s</a>' % (url, link_text)
message = Markup(msg + link)
flash(message, category)
def permission_missing(redirect_location, message=None):
if not message:
message = gettext("You do not have permission for this action")
flash(message, "danger")
return redirect(redirect_location)
def send_mail(recipient, subject, template, **context):
send_mails([recipient], subject, template, **context)
def send_mails(recipients, subject, template, **context):
if len(recipients) == 0: # pragma: no cover
return
body, html = render_mail_body(template, **context)
send_mails_with_body(recipients, subject, body, html)
def send_mail_async(recipient, subject, template, **context):
return send_mails_async([recipient], subject, template, **context)
def send_mails_async(recipients, subject, template, **context):
if len(recipients) == 0: # pragma: no cover
return
body, html = render_mail_body(template, **context)
return send_mails_with_body_async(recipients, subject, body, html)
def send_mails_with_body_async(recipients, subject, body, html):
from celery import group
from project.base_tasks import send_mail_with_body_task
result = group(
send_mail_with_body_task.s(r, subject, body, html) for r in recipients
).delay()
return result
def render_mail_body(template, **context):
body = render_template("email/%s.txt" % template, **context)
html = render_template("email/%s.html" % template, **context)
return body, html
def send_mails_with_body(recipients, subject, body, html):
# Send single mails, otherwise recipients will see each other
for recipient in recipients:
msg = Message(subject)
msg.recipients = [recipient]
msg.body = body
msg.html = html
send_mail_message(msg)
def send_mail_message(msg):
if not mail.default_sender:
app.logger.info(",".join(msg.recipients))
app.logger.info(msg.subject)
app.logger.info(msg.body)
return
mail.send(msg) # pragma: no cover
def non_match_for_deletion(str1: str, str2: str) -> bool:
return str1 != str2 and str1.casefold() != str2.casefold()
def truncate(data: str, length: int) -> str:
if not data:
return data
return (data[: length - 2] + "..") if len(data) > length else data
def get_share_links(url: str, title: str) -> dict:
share_links = dict()
encoded_url = quote_plus(url)
encoded_title = quote_plus(title)
share_links[
"facebook"
] = f"https://www.facebook.com/sharer/sharer.php?u={encoded_url}"
share_links[
"twitter"
] = f"https://twitter.com/intent/tweet?url={encoded_url}&text={encoded_title}"
share_links["email"] = f"mailto:?subject={encoded_title}&body={encoded_url}"
share_links["whatsapp"] = f"whatsapp://send?text={encoded_url}"
share_links["telegram"] = f"https://t.me/share/url?url={encoded_url}"
share_links["url"] = url
return share_links
def get_calendar_links_for_event_date(event_date: EventDate) -> dict:
calendar_links = dict()
url = url_for("event_date", id=event_date.id, _external=True)
encoded_url = quote_plus(url)
encoded_title = quote_plus(event_date.event.name)
encoded_timezone = quote_plus(berlin_tz.zone)
start_date = event_date.start
end_date = event_date.end if event_date.end else start_date
date_format = "%Y%m%dT%H%M%S"
if event_date.allday:
date_format = "%Y%m%d"
end_date = round_to_next_day(end_date)
start = start_date.astimezone(berlin_tz).strftime(date_format)
end = end_date.astimezone(berlin_tz).strftime(date_format)
if (
event_date.event.attendance_mode
and event_date.event.attendance_mode != EventAttendanceMode.online
):
location = get_place_str(event_date.event.event_place)
locationParam = f"&location={quote_plus(location)}"
else:
locationParam = ""
calendar_links[
"google"
] = f"http://www.google.com/calendar/event?action=TEMPLATE&text={encoded_title}&dates={start}/{end}&ctz={encoded_timezone}&details={encoded_url}{locationParam}"
calendar_links["ics"] = url_for("event_date_ical", id=event_date.id, _external=True)
return calendar_links
def get_calendar_links_for_event(event: Event) -> dict:
calendar_links = dict()
calendar_links["ics"] = url_for("event_ical", id=event.id, _external=True)
return calendar_links
def get_invitation_access_result(email: str):
from project.services.user import find_user_by_email
# Wenn der aktuelle Nutzer nicht der Empfänger der Einladung ist, Meldung ausgeben
if current_user.is_authenticated and not strings_are_equal_ignoring_case(
email, current_user.email
):
return permission_missing(
url_for("profile"),
gettext(
"The invitation was issued to another user. Sign in with the email address the invitation was sent to."
),
)
# Wenn Email nicht als Nutzer vorhanden, dann direkt zu Registrierung
if not find_user_by_email(email):
return redirect(url_for("security.register"))
# Wenn nicht angemeldet, dann zum Login
if not current_user.is_authenticated:
return app.login_manager.unauthorized()
return None
def get_celery_poll_result(): # pragma: no cover
try:
result = celery.AsyncResult(request.args["poll"])
ready = result.ready()
return {
"ready": ready,
"successful": result.successful() if ready else None,
"value": result.get() if ready else result.result,
}
except Exception as e:
return {
"ready": True,
"successful": False,
"error": getattr(e, "message", "Unknown error"),
}
def get_celery_poll_group_result(): # pragma: no cover
try:
result = celery.GroupResult.restore(request.args["poll"])
ready = result.ready()
return {
"ready": ready,
"count": len(result.children),
"completed": result.completed_count(),
"successful": result.successful() if ready else None,
}
except Exception as e:
return {
"ready": True,
"successful": False,
"error": getattr(e, "message", "Unknown error"),
}
def manage_required(permission=None):
def decorator(f):
@wraps(f)
def decorated_function(id, *args, **kwargs):
admin_unit = get_admin_unit_for_manage_or_404(id)
if permission and not has_access(admin_unit, permission):
return permission_missing(
url_for("manage_admin_unit", id=admin_unit.id)
)
set_current_admin_unit(admin_unit)
return f(id, *args, **kwargs)
return decorated_function
return decorator