diff --git a/.vscode/settings.json b/.vscode/settings.json index 7f35355..bf81191 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,5 +3,11 @@ "python.formatting.provider": "black", "python.linting.enabled": true, "python.linting.pylintEnabled": false, - "python.linting.flake8Enabled": true + "python.linting.flake8Enabled": true, + "python.testing.pytestArgs": [ + "tests" + ], + "python.testing.unittestEnabled": false, + "python.testing.nosetestsEnabled": false, + "python.testing.pytestEnabled": true } \ No newline at end of file diff --git a/project/__init__.py b/project/__init__.py index 6f4026f..faa803c 100644 --- a/project/__init__.py +++ b/project/__init__.py @@ -51,7 +51,7 @@ mail_server = os.getenv("MAIL_SERVER") if mail_server is None: app.config["MAIL_SUPPRESS_SEND"] = True app.config["MAIL_DEFAULT_SENDER"] = "test@oveda.de" -else: +else: # pragma: no cover app.config["MAIL_SUPPRESS_SEND"] = False app.config["MAIL_SERVER"] = mail_server app.config["MAIL_PORT"] = os.getenv("MAIL_PORT") @@ -120,5 +120,5 @@ from project.views import ( widget, ) -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover app.run() diff --git a/project/access.py b/project/access.py index b1f6fce..1ae3c21 100644 --- a/project/access.py +++ b/project/access.py @@ -3,6 +3,7 @@ from flask_security import current_user from flask_security.utils import FsPermNeed from flask_principal import Permission from project.models import AdminUnitMember, AdminUnit +from project.services.admin_unit import get_member_for_admin_unit_by_user_id def has_current_user_permission(permission): @@ -10,6 +11,13 @@ def has_current_user_permission(permission): return user_perm.can() +def has_admin_unit_member_role(admin_unit_member, role_name): + for role in admin_unit_member.roles: + if role.name == role_name: + return True + return False + + def has_admin_unit_member_permission(admin_unit_member, permission): for role in admin_unit_member.roles: if permission in role.get_permissions(): @@ -17,16 +25,26 @@ def has_admin_unit_member_permission(admin_unit_member, permission): return False +def get_current_user_member_for_admin_unit(admin_unit_id): + return get_member_for_admin_unit_by_user_id(admin_unit_id, current_user.id) + + def has_current_user_member_permission_for_admin_unit(admin_unit_id, permission): - admin_unit_member = AdminUnitMember.query.filter_by( - admin_unit_id=admin_unit_id, user_id=current_user.id - ).first() + admin_unit_member = get_current_user_member_for_admin_unit(admin_unit_id) if admin_unit_member is not None: if has_admin_unit_member_permission(admin_unit_member, permission): return True return False +def has_current_user_member_role_for_admin_unit(admin_unit_id, role_name): + admin_unit_member = get_current_user_member_for_admin_unit(admin_unit_id) + if admin_unit_member is not None: + if has_admin_unit_member_role(admin_unit_member, role_name): + return True + return False + + def has_current_user_permission_for_admin_unit(admin_unit, permission): if not current_user.is_authenticated: return False @@ -49,44 +67,10 @@ def access_or_401(admin_unit, permission): abort(401) -def can_list_admin_unit_members(admin_unit): - return has_current_user_permission_for_admin_unit( - admin_unit, "admin_unit.members:read" - ) - - -def can_create_event(admin_unit): - return has_current_user_permission_for_admin_unit(admin_unit, "event:create") - - -def can_update_event(event): - return has_current_user_permission_for_admin_unit(event.admin_unit, "event:update") - - -def can_delete_event(event): - return has_current_user_permission_for_admin_unit(event.admin_unit, "event:delete") - - def can_reference_event(event): return len(get_admin_units_for_event_reference(event)) > 0 -def can_update_organizer(organizer): - return get_admin_unit_for_manage(organizer.admin_unit_id) is not None - - -def can_create_admin_unit(): - return current_user.is_authenticated - - -def can_verify_event_for_admin_unit(admin_unit): - return has_current_user_permission_for_admin_unit(admin_unit, "event:verify") - - -def can_verify_event(event): - return can_verify_event_for_admin_unit(event.admin_unit) - - def get_admin_units_with_current_user_permission(permission): result = list() diff --git a/project/dateutils.py b/project/dateutils.py index 58d6013..6baf2f3 100644 --- a/project/dateutils.py +++ b/project/dateutils.py @@ -35,7 +35,7 @@ def form_input_to_date(date_str, hour=0, minute=0, second=0): def form_input_from_date(date): - return date.strftime("%Y-%m-%d") + return date.strftime("%Y-%m-%d") if date else "" def dates_from_recurrence_rule(start, recurrence_rule): diff --git a/project/forms/planing.py b/project/forms/planing.py index 01197a9..c40821c 100644 --- a/project/forms/planing.py +++ b/project/forms/planing.py @@ -1,4 +1,3 @@ -from flask import request from flask_babelex import lazy_gettext from flask_wtf import FlaskForm from wtforms import HiddenField, StringField, SubmitField, SelectField @@ -32,6 +31,3 @@ class PlaningForm(FlaskForm): ) submit = SubmitField(lazy_gettext("Find")) - - def is_submitted(self): - return "submit" in request.args diff --git a/project/forms/widgets.py b/project/forms/widgets.py index c43295d..91ea8a5 100644 --- a/project/forms/widgets.py +++ b/project/forms/widgets.py @@ -1,5 +1,6 @@ from wtforms import DateTimeField, SelectMultipleField, SelectField -from wtforms.widgets import html_params, HTMLString, ListWidget, CheckboxInput +from wtforms.widgets import html_params, ListWidget, CheckboxInput +from markupsafe import Markup from wtforms.validators import StopValidation from datetime import datetime from flask_babelex import to_user_timezone, gettext @@ -37,7 +38,7 @@ class CustomDateTimeWidget: time_minute_params = html_params(name=field.name, id=id + "-minute", **kwargs) clear_button_id = id + "-clear-button" - return HTMLString( + return Markup( '
:
'.format( date_params, clear_button_id, @@ -80,7 +81,7 @@ class CustomDateWidget: date = date_value.strftime("%Y-%m-%d") date_params = html_params(name=field.name, id=id, value=date, **kwargs) - return HTMLString(''.format(date_params)) + return Markup(''.format(date_params)) class CustomDateField(DateTimeField): diff --git a/project/init_data.py b/project/init_data.py index dc16803..2633da0 100644 --- a/project/init_data.py +++ b/project/init_data.py @@ -1,6 +1,7 @@ from project import app, db -from project.services.user import upsert_user_role, add_roles_to_user +from project.services.user import upsert_user_role, add_admin_roles_to_user from project.services.admin_unit import upsert_admin_unit_member_role +from project.services.event import upsert_event_category from project.models import Location @@ -41,8 +42,27 @@ def create_initial_data(): upsert_user_role("admin", "Administrator", admin_permissions) upsert_user_role("event_verifier", "Event expert", event_permissions) - add_roles_to_user("grams.daniel@gmail.com", ["admin", "event_verifier"]) + add_admin_roles_to_user("grams.daniel@gmail.com") Location.update_coordinates() + upsert_event_category("Art") + upsert_event_category("Book") + upsert_event_category("Movie") + upsert_event_category("Family") + upsert_event_category("Festival") + upsert_event_category("Religious") + upsert_event_category("Shopping") + upsert_event_category("Comedy") + upsert_event_category("Music") + upsert_event_category("Dance") + upsert_event_category("Nightlife") + upsert_event_category("Theater") + upsert_event_category("Dining") + upsert_event_category("Conference") + upsert_event_category("Meetup") + upsert_event_category("Fitness") + upsert_event_category("Sports") + upsert_event_category("Other") + db.session.commit() diff --git a/project/jsonld.py b/project/jsonld.py index 6fb6be5..99bc077 100644 --- a/project/jsonld.py +++ b/project/jsonld.py @@ -1,5 +1,4 @@ import datetime -import decimal from json import JSONEncoder from flask import url_for from project.models import EventAttendanceMode, EventStatus @@ -13,8 +12,6 @@ class DateTimeEncoder(JSONEncoder): def default(self, obj): if isinstance(obj, (datetime.date, datetime.datetime)): return (obj.astimezone(berlin_tz)).isoformat() - if isinstance(obj, decimal.Decimal): - return float(obj) def get_sd_for_admin_unit(admin_unit): @@ -29,7 +26,7 @@ def get_sd_for_admin_unit(admin_unit): return result -def get_sd_for_organizer_organization(organizer): +def get_sd_for_organizer(organizer): result = {} result["@type"] = "Organization" result["name"] = organizer.name @@ -49,10 +46,6 @@ def get_sd_for_organizer_organization(organizer): return result -def get_sd_for_organizer(organizer): - return get_sd_for_organizer_organization(organizer) - - def get_sd_for_location(location): result = {} result["@type"] = "PostalAddress" @@ -84,7 +77,7 @@ def get_sd_for_place(place, use_ref=True): if place.location: result["address"] = get_sd_for_location(place.location) - if place.location.latitude != 0: + if place.location.coordinate: result["geo"] = get_sd_for_geo(place.location) if place.photo_id: @@ -134,7 +127,7 @@ def get_sd_for_event_date(event_date): result["previousStartDate"] = event.previous_start_date if event.accessible_for_free: - result["accessible_for_free"] = event.accessible_for_free + result["isAccessibleForFree"] = event.accessible_for_free if event.age_from and event.age_to: result["typicalAgeRange"] = "%d-%d" % (event.age_from, event.age_to) diff --git a/project/models.py b/project/models.py index ed13717..ca52846 100644 --- a/project/models.py +++ b/project/models.py @@ -217,7 +217,7 @@ class Location(db.Model, TrackableMixin): ) ).all() - for location in locations: + for location in locations: # pragma: no cover location.update_coordinate() db.session.commit() @@ -242,9 +242,6 @@ class EventPlace(db.Model, TrackableMixin): description = Column(UnicodeText()) admin_unit_id = db.Column(db.Integer, db.ForeignKey("adminunit.id"), nullable=True) - def is_empty(self): - return not self.name - @listens_for(EventPlace, "before_insert") @listens_for(EventPlace, "before_update") @@ -321,9 +318,6 @@ class EventOrganizer(db.Model, TrackableMixin): logo = db.relationship("Image", uselist=False) admin_unit_id = db.Column(db.Integer, db.ForeignKey("adminunit.id"), nullable=True) - def is_empty(self): - return not self.name - @listens_for(EventOrganizer, "before_insert") @listens_for(EventOrganizer, "before_update") @@ -412,12 +406,8 @@ class EventSuggestion(db.Model, TrackableMixin): @listens_for(EventSuggestion, "before_insert") @listens_for(EventSuggestion, "before_update") def purge_event_suggestion(mapper, connect, self): - if self.organizer and self.organizer.is_empty(): - self.organizer_id = None if self.organizer_id is not None: self.organizer_text = None - if self.event_place and self.event_place.is_empty(): - self.event_place_id = None if self.event_place_id is not None: self.event_place_text = None if self.photo and self.photo.is_empty(): @@ -490,10 +480,6 @@ class Event(db.Model, TrackableMixin): @listens_for(Event, "before_insert") @listens_for(Event, "before_update") def purge_event(mapper, connect, self): - if self.organizer and self.organizer.is_empty(): - self.organizer_id = None - if self.event_place and self.event_place.is_empty(): - self.event_place_id = None if self.photo and self.photo.is_empty(): self.photo_id = None diff --git a/project/oauth.py b/project/oauth.py index a5377dd..e12fc59 100644 --- a/project/oauth.py +++ b/project/oauth.py @@ -15,7 +15,7 @@ blueprint = make_google_blueprint( # create/login local user on successful OAuth login @oauth_authorized.connect_via(blueprint) -def google_logged_in(blueprint, token): +def google_logged_in(blueprint, token): # pragma: no cover if not token: flash("Failed to log in.", category="error") return False @@ -60,7 +60,7 @@ def google_logged_in(blueprint, token): # notify on OAuth provider error @oauth_error.connect_via(blueprint) -def google_error(blueprint, message, response): +def google_error(blueprint, message, response): # pragma: no cover msg = "OAuth error from {name}! message={message} response={response}".format( name=blueprint.name, message=message, response=response ) diff --git a/project/scrape/form.py b/project/scrape/form.py new file mode 100644 index 0000000..2f2d1b5 --- /dev/null +++ b/project/scrape/form.py @@ -0,0 +1,131 @@ +import urllib.parse +from werkzeug.datastructures import MultiDict + + +# see https://www.w3.org/TR/html52/sec-forms.html +class Form: + def __init__(self, form): + assert form.name == "form" # feed me BeautifulSoup
tags + + self.form = form + self._action = form.get("action", "") + self._method = form.get("method", "GET") + self._enctype = form.get("enctype", "") + + self.fields = MultiDict() + self.buttons = {} + + for field in form.find_all(("input", "button", "select", "textarea")): + name = field.get("name") + if not name: + continue + + field_value = field.get("value") + + if field.name == "select": + options = field.find_all("option") + selected_values = list() + + for option in options: + if option.has_attr("selected"): + selected_values.append(option.get("value")) + + if len(selected_values) == 0 and len(options) > 0: + selected_values.append(options[0].get("value")) + + if field.has_attr("multiple"): + field_value = selected_values + elif len(selected_values) > 0: + field_value = selected_values[0] + elif field.name == "textarea": + field_value = field.text.strip() + + self.fields.add(name, (field, field_value)) + + if field.name in ("input", "button"): + if field.get("type") == "submit": + self.buttons[name] = ( + field.get("formaction"), + field.get("formmethod"), + ) + + def _get_default_button(self): + if self.buttons: + return next(iter(self.buttons.keys())) + + def get_action(self, button=None, relative_to=""): # pragma: no cover + # Get default submit button if none specified + if button is None: + button = self._get_default_button() + + # Use the submit button's formaction if available + action = None if button is None else self.buttons[button][0] + action = action or self._action + return urllib.parse.urljoin(relative_to, action) + + def get_method(self, button=None): # pragma: no cover + if button is None: + button = self._get_default_button() + + method = None if button is None else self.buttons[button][1] + return method or self._method + + def _fill_impl(self, button, values): + filled = MultiDict() + + for form_name, (field, default_value) in self.fields.items(multi=True): + # Skip disabled fields + if field.has_attr("disabled"): # pragma: no cover + continue + + # Skip buttons that are not the submit button + is_button = (field.name == "button") or ( + field.name == "input" + and field.get("type") in ("submit", "image", "reset", "button") + ) + if is_button and form_name != button: # pragma: no cover + continue + + # Skip radio buttons and checkboxes that are not checked + is_radio_or_checkbox = field.name == "input" and field.get("type") in ( + "radio", + "checkbox", + ) + if is_radio_or_checkbox and not field.has_attr("checked"): + continue + + # Add the default value + if default_value is None: # pragma: no cover + if is_button: + default_value = "Submit" + elif is_radio_or_checkbox: + default_value = "on" + else: + default_value = "" + + if type(default_value) is list: + filled.setlist(form_name, default_value) + else: + filled.add(form_name, default_value) + + if self._enctype: + filled.add("content_type", self._enctype) + + # Override any form values with our input + for key, value in values.items(): + if key in filled: + filled.pop(key) + if type(value) is list: + filled.setlist(key, value) + else: + filled.add(key, value) + return filled + + def fill(self, *args): # pragma: no cover + if len(args) == 0: + return self._fill_impl(self._get_default_button(), {}) + elif len(args) == 1: + return self._fill_impl(self._get_default_button(), args[0]) + elif len(args) == 2: + return self._fill_impl(args[0], args[1]) + raise ValueError("Expected fill(values) or fill(button, values)") diff --git a/project/scrape/scrape_fp.py b/project/scrape/scrape_fp.py index 5e1bb60..54dce4b 100644 --- a/project/scrape/scrape_fp.py +++ b/project/scrape/scrape_fp.py @@ -1,6 +1,6 @@ from project import ( db, - get_admin_unit, + get_admin_unit_by_name, update_event_dates_with_recurrence_rule, upsert_event_category, ) @@ -70,7 +70,7 @@ def scrape(debug): js_assigns[key] = value break - admin_unit = get_admin_unit("Ferienpass Goslar") + admin_unit = get_admin_unit_by_name("Ferienpass Goslar") category = upsert_event_category("Other") for js_event in js_assigns["events"]: diff --git a/project/scrape/scrape_hi.py b/project/scrape/scrape_hi.py index ec58203..dc95b61 100644 --- a/project/scrape/scrape_hi.py +++ b/project/scrape/scrape_hi.py @@ -14,13 +14,13 @@ from project.models import ( EventOrganizer, ) from sqlalchemy import and_, not_ -from project.services.admin_unit import get_admin_unit +from project.services.admin_unit import get_admin_unit_by_name from project.services.event import ( upsert_event_category, update_event_dates_with_recurrence_rule, ) -admin_unit = get_admin_unit("Harzinfo") +admin_unit = get_admin_unit_by_name("Harzinfo") category = upsert_event_category("Other") base_url = "https://www.harzinfo.de" url = base_url + "/?ndssearch=fullsearch&no_cache=1&L=0" diff --git a/project/services/admin_unit.py b/project/services/admin_unit.py index 874be7d..f36f01a 100644 --- a/project/services/admin_unit.py +++ b/project/services/admin_unit.py @@ -1,18 +1,49 @@ from project import db -from project.models import AdminUnit, AdminUnitMember, AdminUnitMemberRole +from project.models import ( + AdminUnit, + AdminUnitMember, + AdminUnitMemberRole, + AdminUnitMemberInvitation, + EventOrganizer, + Location, +) +from project.services.location import assign_location_values +from project.services.image import upsert_image_with_data +from sqlalchemy import and_ -def upsert_admin_unit(unit_name, short_name=None): - admin_unit = AdminUnit.query.filter_by(name=unit_name).first() - if admin_unit is None: - admin_unit = AdminUnit(name=unit_name) - db.session.add(admin_unit) +def insert_admin_unit_for_user(admin_unit, user): + db.session.add(admin_unit) - admin_unit.short_name = short_name - return admin_unit + # Nutzer als Admin hinzufügen + add_user_to_admin_unit_with_roles(user, admin_unit, ["admin", "event_verifier"]) + db.session.commit() + + # Organizer anlegen + organizer = EventOrganizer() + organizer.admin_unit_id = admin_unit.id + organizer.name = admin_unit.name + organizer.url = admin_unit.url + organizer.email = admin_unit.email + organizer.phone = admin_unit.phone + organizer.fax = admin_unit.fax + organizer.location = Location() + assign_location_values(organizer.location, admin_unit.location) + if admin_unit.logo: + organizer.logo = upsert_image_with_data( + organizer.logo, + admin_unit.logo.data, + admin_unit.logo.encoding_format, + ) + db.session.add(organizer) + db.session.commit() -def get_admin_unit(unit_name): +def get_admin_unit_by_id(id): + return AdminUnit.query.filter_by(id=id).first() + + +def get_admin_unit_by_name(unit_name): return AdminUnit.query.filter_by(name=unit_name).first() @@ -20,6 +51,25 @@ def get_admin_unit_member_role(role_name): return AdminUnitMemberRole.query.filter_by(name=role_name).first() +def find_admin_unit_member_invitation(email, admin_unit_id): + return AdminUnitMemberInvitation.query.filter( + and_( + AdminUnitMemberInvitation.admin_unit_id == admin_unit_id, + AdminUnitMemberInvitation.email == email, + ) + ).first() + + +def insert_admin_unit_member_invitation(admin_unit_id, email, role_names): + invitation = AdminUnitMemberInvitation() + invitation.admin_unit_id = admin_unit_id + invitation.email = email + invitation.roles = ",".join(role_names) + db.session.add(invitation) + db.session.commit() + return invitation + + def upsert_admin_unit_member_role(role_name, role_title, permissions): result = AdminUnitMemberRole.query.filter_by(name=role_name).first() if result is None: @@ -53,6 +103,8 @@ def add_user_to_admin_unit_with_roles(user, admin_unit, role_names): def add_roles_to_admin_unit_member(member, role_names): for role_name in role_names: role = get_admin_unit_member_role(role_name) + if not role: + continue add_role_to_admin_unit_member(member, role) @@ -64,3 +116,13 @@ def add_role_to_admin_unit_member(admin_unit_member, role): is None ): admin_unit_member.roles.append(role) + + +def get_member_for_admin_unit_by_user_id(admin_unit_id, user_id): + return AdminUnitMember.query.filter_by( + admin_unit_id=admin_unit_id, user_id=user_id + ).first() + + +def get_admin_unit_member(id): + return AdminUnitMember.query.filter_by(id=id).first() diff --git a/project/services/event.py b/project/services/event.py index f956d0b..28a4330 100644 --- a/project/services/event.py +++ b/project/services/event.py @@ -84,11 +84,8 @@ def get_event_dates_query(params): date_filter = and_(date_filter, EventDate.start < params.date_to) # PostgreSQL specific https://stackoverflow.com/a/25597632 - if params.weekday: - if type(params.weekday) is list: - weekdays = params.weekday - else: - weekdays = [params.weekday] + if params.weekday and type(params.weekday) is list: + weekdays = params.weekday date_filter = and_(date_filter, extract("dow", EventDate.start).in_(weekdays)) return ( @@ -125,9 +122,9 @@ def get_events_query(params): ) -def update_event_dates_with_recurrence_rule(event, start, end): - event.start = start - event.end = end +def update_event_dates_with_recurrence_rule(event): + start = event.start + end = event.end if end: time_difference = relativedelta(end, start) @@ -168,3 +165,12 @@ def update_event_dates_with_recurrence_rule(event, start, end): event.dates = [date for date in event.dates if date not in dates_to_remove] event.dates.extend(dates_to_add) + + +def insert_event(event): + update_event_dates_with_recurrence_rule(event) + db.session.add(event) + + +def update_event(event): + update_event_dates_with_recurrence_rule(event) diff --git a/project/services/event_suggestion.py b/project/services/event_suggestion.py index b1c3d06..f96288f 100644 --- a/project/services/event_suggestion.py +++ b/project/services/event_suggestion.py @@ -1,7 +1,13 @@ +from project import db from project.models import EventReviewStatus, EventSuggestion from sqlalchemy import and_ +def insert_event_suggestion(event_suggestion): + event_suggestion.review_status = EventReviewStatus.inbox + db.session.add(event_suggestion) + + def get_event_reviews_badge_query(admin_unit): return EventSuggestion.query.filter( and_( diff --git a/project/services/image.py b/project/services/image.py new file mode 100644 index 0000000..ffe795f --- /dev/null +++ b/project/services/image.py @@ -0,0 +1,17 @@ +from project.models import Image +import base64 + + +def upsert_image_with_data(image, data, encoding_format="image/jpeg"): + if image is None: + image = Image() + + image.data = data + image.encoding_format = encoding_format + + return image + + +def upsert_image_with_base64_str(image, base64_str, encoding_format): + data = base64.b64decode(base64_str) + return upsert_image_with_data(image, data, encoding_format) diff --git a/project/services/location.py b/project/services/location.py index bdcd0c2..adbdfae 100644 --- a/project/services/location.py +++ b/project/services/location.py @@ -1,21 +1,3 @@ -from project import db -from project.models import Location - - -def upsert_location(street, postalCode, city, latitude=0, longitude=0, state=None): - result = Location.query.filter_by( - street=street, postalCode=postalCode, city=city, state=state - ).first() - if result is None: - result = Location(street=street, postalCode=postalCode, city=city, state=state) - db.session.add(result) - - result.latitude = latitude - result.longitude = longitude - - return result - - def assign_location_values(target, origin): if origin: target.street = origin.street diff --git a/project/services/organizer.py b/project/services/organizer.py index 19440e9..9c4245c 100644 --- a/project/services/organizer.py +++ b/project/services/organizer.py @@ -4,10 +4,14 @@ from sqlalchemy import and_ from sqlalchemy.sql import func -def upsert_event_organizer(admin_unit_id, name): - result = EventOrganizer.query.filter( +def get_event_organizer(admin_unit_id, name): + return EventOrganizer.query.filter( and_(EventOrganizer.name == name, EventOrganizer.admin_unit_id == admin_unit_id) ).first() + + +def upsert_event_organizer(admin_unit_id, name): + result = get_event_organizer(admin_unit_id, name) if result is None: result = EventOrganizer(name=name, admin_unit_id=admin_unit_id) result.location = Location() diff --git a/project/services/place.py b/project/services/place.py index ca96f71..a394d58 100644 --- a/project/services/place.py +++ b/project/services/place.py @@ -3,18 +3,15 @@ from project.models import EventPlace, Location from sqlalchemy.sql import and_, func -def upsert_event_place(admin_unit_id, organizer_id, name): +def upsert_event_place(admin_unit_id, name): result = EventPlace.query.filter( and_( EventPlace.name == name, EventPlace.admin_unit_id == admin_unit_id, - EventPlace.organizer_id == organizer_id, ) ).first() if result is None: - result = EventPlace( - name=name, admin_unit_id=admin_unit_id, organizer_id=organizer_id - ) + result = EventPlace(name=name, admin_unit_id=admin_unit_id) result.location = Location() db.session.add(result) diff --git a/project/services/user.py b/project/services/user.py index bb3a6d2..2666955 100644 --- a/project/services/user.py +++ b/project/services/user.py @@ -17,6 +17,10 @@ def add_roles_to_user(user_name, role_names): user_datastore.add_role_to_user(user, role_name) +def add_admin_roles_to_user(user_name): + add_roles_to_user(user_name, ["admin", "event_verifier"]) + + def upsert_user_role(role_name, role_title, permissions): role = user_datastore.find_or_create_role(role_name) role.title = role_title @@ -27,3 +31,7 @@ def upsert_user_role(role_name, role_title, permissions): def find_user_by_email(email): return user_datastore.find_user(email=email) + + +def get_user(id): + return user_datastore.get_user(id) diff --git a/project/static/jquery.recurrenceinput.js b/project/static/jquery.recurrenceinput.js index dce1249..82bbd3d 100644 --- a/project/static/jquery.recurrenceinput.js +++ b/project/static/jquery.recurrenceinput.js @@ -164,7 +164,7 @@ rangeByEndDateHuman: 'ends on', including: ', and also', - except Exception: ', except for', + except: ', except for', cancel: 'Cancel', save: 'Save', diff --git a/project/static/site.js b/project/static/site.js index ed0e70a..86f7a98 100644 --- a/project/static/site.js +++ b/project/static/site.js @@ -51,7 +51,7 @@ jQuery.tools.recurrenceinput.localize('de', { rangeByEndDate: 'Bis ', rangeByEndDateHuman: 'endet am ', including: ', und auch ', - except Exception: ', ausser für', + except: ', ausser für', cancel: 'Abbrechen', save: 'Speichern', recurrenceStart: 'Beginn der Wiederholung', diff --git a/project/views/admin_unit.py b/project/views/admin_unit.py index 759df7b..c85eadf 100644 --- a/project/views/admin_unit.py +++ b/project/views/admin_unit.py @@ -6,14 +6,12 @@ from sqlalchemy.exc import SQLAlchemyError from project.access import get_admin_unit_for_manage_or_404, has_access from project.forms.admin_unit import CreateAdminUnitForm, UpdateAdminUnitForm from project.views.utils import ( - upsert_image_with_data, handleSqlError, permission_missing, flash_errors, ) -from project.models import AdminUnit, Location, EventOrganizer -from project.services.admin_unit import add_user_to_admin_unit_with_roles -from project.services.location import assign_location_values +from project.models import AdminUnit, Location +from project.services.admin_unit import insert_admin_unit_for_user def update_admin_unit_with_form(admin_unit, form): @@ -31,33 +29,7 @@ def admin_unit_create(): update_admin_unit_with_form(admin_unit, form) try: - db.session.add(admin_unit) - - # Aktuellen Nutzer als Admin hinzufügen - add_user_to_admin_unit_with_roles( - current_user, admin_unit, ["admin", "event_verifier"] - ) - db.session.commit() - - # Organizer anlegen - organizer = EventOrganizer() - organizer.admin_unit_id = admin_unit.id - organizer.name = admin_unit.name - organizer.url = admin_unit.url - organizer.email = admin_unit.email - organizer.phone = admin_unit.phone - organizer.fax = admin_unit.fax - organizer.location = Location() - assign_location_values(organizer.location, admin_unit.location) - if admin_unit.logo: - organizer.logo = upsert_image_with_data( - organizer.logo, - admin_unit.logo.data, - admin_unit.logo.encoding_format, - ) - db.session.add(organizer) - db.session.commit() - + insert_admin_unit_for_user(admin_unit, current_user) flash(gettext("Admin unit successfully created"), "success") return redirect(url_for("manage_admin_unit", id=admin_unit.id)) except SQLAlchemyError as e: diff --git a/project/views/event.py b/project/views/event.py index b7fd596..59eea0b 100644 --- a/project/views/event.py +++ b/project/views/event.py @@ -26,7 +26,8 @@ from project.views.utils import ( from project.utils import get_event_category_name from project.services.event import ( upsert_event_category, - update_event_dates_with_recurrence_rule, + insert_event, + update_event, ) from project.services.place import get_event_places from sqlalchemy.sql import func @@ -112,7 +113,7 @@ def event_create_for_admin_unit_id(id): event_suggestion.review_status = EventReviewStatus.verified event_suggestion.rejection_resaon = None - db.session.add(event) + insert_event(event) db.session.commit() if event_suggestion: @@ -148,6 +149,7 @@ def event_update(event_id): update_event_with_form(event, form) try: + update_event(event) db.session.commit() flash_message( gettext("Event successfully updated"), @@ -257,7 +259,6 @@ def update_event_with_form(event, form, event_suggestion=None): event.categories = EventCategory.query.filter( EventCategory.id.in_(form.category_ids.data) ).all() - update_event_dates_with_recurrence_rule(event, form.start.data, form.end.data) def get_user_rights(event): diff --git a/project/views/event_place.py b/project/views/event_place.py index d2a27c3..c5a949c 100644 --- a/project/views/event_place.py +++ b/project/views/event_place.py @@ -38,6 +38,9 @@ def manage_admin_unit_places_create(id): except SQLAlchemyError as e: db.session.rollback() flash(handleSqlError(e), "danger") + else: + flash_errors(form) + return render_template("event_place/create.html", form=form) @@ -61,6 +64,8 @@ def event_place_update(id): except SQLAlchemyError as e: db.session.rollback() flash(handleSqlError(e), "danger") + else: + flash_errors(form) return render_template("event_place/update.html", form=form, place=place) diff --git a/project/views/organizer.py b/project/views/organizer.py index a443ce4..30f3113 100644 --- a/project/views/organizer.py +++ b/project/views/organizer.py @@ -1,5 +1,5 @@ from project import app, db -from project.models import EventOrganizer, Location +from project.models import EventOrganizer from flask import render_template, flash, url_for, redirect from flask_babelex import gettext from flask_security import auth_required @@ -27,7 +27,6 @@ def manage_admin_unit_organizer_create(id): if form.validate_on_submit(): organizer = EventOrganizer() organizer.admin_unit_id = admin_unit.id - organizer.location = Location() update_organizer_with_form(organizer, form) try: @@ -40,6 +39,8 @@ def manage_admin_unit_organizer_create(id): except SQLAlchemyError as e: db.session.rollback() flash(handleSqlError(e), "danger") + else: + flash_errors(form) return render_template("organizer/create.html", form=form) @@ -63,6 +64,8 @@ def organizer_update(id): except SQLAlchemyError as e: db.session.rollback() flash(handleSqlError(e), "danger") + else: + flash_errors(form) return render_template("organizer/update.html", form=form, organizer=organizer) diff --git a/project/views/reference_request_review.py b/project/views/reference_request_review.py index 83f63c5..318cc75 100644 --- a/project/views/reference_request_review.py +++ b/project/views/reference_request_review.py @@ -49,9 +49,9 @@ def event_reference_request_review(id): if request.review_status == EventReferenceRequestReviewStatus.verified: reference = create_event_reference_for_request(request) reference.rating = form.rating.data - msg = gettext("Request successfully updated") - else: msg = gettext("Reference successfully created") + else: + msg = gettext("Request successfully updated") db.session.commit() send_reference_request_review_status_mails(request) diff --git a/project/views/utils.py b/project/views/utils.py index 4138cdf..53ac1b1 100644 --- a/project/views/utils.py +++ b/project/views/utils.py @@ -1,8 +1,9 @@ -from project.models import Image, Analytics +from project.models import Analytics from project import db, mail from flask_babelex import gettext from flask import request, url_for, render_template, flash, redirect, Markup from flask_mail import Message +from sqlalchemy.exc import SQLAlchemyError def track_analytics(key, value1, value2): @@ -17,22 +18,15 @@ def track_analytics(key, value1, value2): return result -def handleSqlError(e): - message = str(e.__dict__["orig"]) +def handleSqlError(e: SQLAlchemyError) -> str: + if e.orig: + message = str(e.orig) + else: + message = str(e) print(message) return message -def upsert_image_with_data(image, data, encoding_format="image/jpeg"): - if image is None: - image = Image() - - image.data = data - image.encoding_format = encoding_format - - return image - - def get_pagination_urls(pagination, **kwargs): result = {} @@ -86,6 +80,7 @@ def send_mails(recipients, subject, template, **context): msg.html = render_template("email/%s.html" % template, **context) if not mail.default_sender: + print(",".join(msg.recipients)) print(msg.subject) print(msg.body) return diff --git a/project/views/widget.py b/project/views/widget.py index 995e8d8..689bd33 100644 --- a/project/views/widget.py +++ b/project/views/widget.py @@ -13,6 +13,7 @@ from flask_babelex import gettext from flask_security import current_user from sqlalchemy.sql import func from sqlalchemy.exc import SQLAlchemyError +from project.services.event_suggestion import insert_event_suggestion from project.services.event import get_event_dates_query from project.services.event_search import EventSearchParams from project.services.place import get_event_places @@ -127,7 +128,7 @@ def event_suggestion_create_for_admin_unit(au_short_name): event_suggestion.review_status = EventReviewStatus.inbox try: - db.session.add(event_suggestion) + insert_event_suggestion(event_suggestion) db.session.commit() send_event_inbox_mails(admin_unit, event_suggestion) diff --git a/requirements.txt b/requirements.txt index f0eae85..b2032df 100644 --- a/requirements.txt +++ b/requirements.txt @@ -63,6 +63,7 @@ pyflakes==2.2.0 pyparsing==2.4.7 pytest==6.1.2 pytest-cov==2.10.1 +pytest-mock==3.3.1 python-dateutil==2.8.1 python-dotenv==0.15.0 python-editor==1.0.4 diff --git a/tests/conftest.py b/tests/conftest.py index be756b9..1d7246a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,7 @@ import pytest import os +from .utils import UtilActions +from .seeder import Seeder def pytest_generate_tests(metafunc): @@ -7,14 +9,38 @@ def pytest_generate_tests(metafunc): @pytest.fixture -def client(): - from project import app, db +def app(): + from project import app app.config["TESTING"] = True app.testing = True - client = app.test_client() + return app + + +@pytest.fixture +def db(app): + from project import db + from project.init_data import create_initial_data + with app.app_context(): db.drop_all() db.create_all() - yield client + create_initial_data() + + return db + + +@pytest.fixture +def client(app, db): + return app.test_client() + + +@pytest.fixture +def utils(client, app): + return UtilActions(client, app) + + +@pytest.fixture +def seeder(app, db, utils): + return Seeder(app, db, utils) diff --git a/tests/seeder.py b/tests/seeder.py new file mode 100644 index 0000000..de8253d --- /dev/null +++ b/tests/seeder.py @@ -0,0 +1,240 @@ +class Seeder(object): + def __init__(self, app, db, utils): + self._app = app + self._db = db + self._utils = utils + + def setup_base(self, admin=False): + user_id = self.create_user(admin=admin) + self._utils.login() + admin_unit_id = self.create_admin_unit(user_id) + return (user_id, admin_unit_id) + + def setup_base_event_verifier(self): + owner_id = self.create_user("owner@owner") + admin_unit_id = self.create_admin_unit(owner_id, "Other crew") + member_id = self.create_admin_unit_member_event_verifier(admin_unit_id) + self._utils.login() + return (owner_id, admin_unit_id, member_id) + + def create_user( + self, email="test@test.de", password="MeinPasswortIstDasBeste", admin=False + ): + from project.services.user import upsert_user, add_admin_roles_to_user + + with self._app.app_context(): + user = upsert_user(email, password) + + if admin: + add_admin_roles_to_user(email) + + self._db.session.commit() + user_id = user.id + + return user_id + + def create_admin_unit(self, user_id, name="Meine Crew"): + from project.models import AdminUnit + from project.services.user import get_user + from project.services.admin_unit import insert_admin_unit_for_user + + with self._app.app_context(): + user = get_user(user_id) + admin_unit = AdminUnit() + admin_unit.name = name + admin_unit.short_name = name.lower().replace(" ", "") + insert_admin_unit_for_user(admin_unit, user) + self._db.session.commit() + admin_unit_id = admin_unit.id + + return admin_unit_id + + def create_admin_unit_member(self, admin_unit_id, role_names): + from project.services.user import get_user + from project.services.admin_unit import ( + get_admin_unit_by_id, + add_user_to_admin_unit_with_roles, + ) + + with self._app.app_context(): + user_id = self.create_user() + user = get_user(user_id) + admin_unit = get_admin_unit_by_id(admin_unit_id) + member = add_user_to_admin_unit_with_roles(user, admin_unit, role_names) + self._db.session.commit() + member_id = member.id + + return member_id + + def create_invitation(self, admin_unit_id, email, role_names=["admin"]): + from project.services.admin_unit import insert_admin_unit_member_invitation + + with self._app.app_context(): + invitation = insert_admin_unit_member_invitation( + admin_unit_id, email, role_names + ) + invitation_id = invitation.id + + return invitation_id + + def create_admin_unit_member_event_verifier(self, admin_unit_id): + return self.create_admin_unit_member(admin_unit_id, ["event_verifier"]) + + def upsert_event_place(self, admin_unit_id, name): + from project.services.place import upsert_event_place + + with self._app.app_context(): + place = upsert_event_place(admin_unit_id, name) + self._db.session.commit() + place_id = place.id + + return place_id + + def upsert_default_event_place(self, admin_unit_id): + from project.services.admin_unit import get_admin_unit_by_id + + with self._app.app_context(): + admin_unit = get_admin_unit_by_id(admin_unit_id) + place_id = self.upsert_event_place(admin_unit_id, admin_unit.name) + + return place_id + + def upsert_event_organizer(self, admin_unit_id, name): + from project.services.organizer import upsert_event_organizer + + with self._app.app_context(): + organizer = upsert_event_organizer(admin_unit_id, name) + self._db.session.commit() + organizer_id = organizer.id + + return organizer_id + + def upsert_default_event_organizer(self, admin_unit_id): + from project.services.admin_unit import get_admin_unit_by_id + + with self._app.app_context(): + admin_unit = get_admin_unit_by_id(admin_unit_id) + organizer_id = self.upsert_event_organizer(admin_unit_id, admin_unit.name) + + return organizer_id + + def create_event(self, admin_unit_id): + from project.models import Event + from project.services.event import insert_event, upsert_event_category + from project.dateutils import now + + with self._app.app_context(): + event = Event() + event.admin_unit_id = admin_unit_id + event.categories = [upsert_event_category("Other")] + event.name = "Name" + event.description = "Beschreibung" + event.start = now + event.event_place_id = self.upsert_default_event_place(admin_unit_id) + event.organizer_id = self.upsert_default_event_organizer(admin_unit_id) + insert_event(event) + self._db.session.commit() + event_id = event.id + return event_id + + def get_default_image_base64(self): + return """/9j/4AAQSkZJRgABAQEBLAEsAAD/2wBDAAcFBQYFBAcGBgYIBwcICxILCwoKCxYPEA0SGhYbGhkWGRgcICgiHB4mHhgZIzAkJiorLS4tGyIyNTEsNSgsLSz/2wBDAQcICAsJCxULCxUsHRkdLCwsLCwsLCwsLCwsLCwsLCwsLCwsLCwsLCwsLCwsLCwsLCwsLCwsLCwsLCwsLCwsLCz/wAARCABcAFoDASIAAhEBAxEB/8QAHwAAAQUBAQEBAQEAAAAAAAAAAAECAwQFBgcICQoL/8QAtRAAAgEDAwIEAwUFBAQAAAF9AQIDAAQRBRIhMUEGE1FhByJxFDKBkaEII0KxwRVS0fAkM2JyggkKFhcYGRolJicoKSo0NTY3ODk6Q0RFRkdISUpTVFVWV1hZWmNkZWZnaGlqc3R1dnd4eXqDhIWGh4iJipKTlJWWl5iZmqKjpKWmp6ipqrKztLW2t7i5usLDxMXGx8jJytLT1NXW19jZ2uHi4+Tl5ufo6erx8vP09fb3+Pn6/8QAHwEAAwEBAQEBAQEBAQAAAAAAAAECAwQFBgcICQoL/8QAtREAAgECBAQDBAcFBAQAAQJ3AAECAxEEBSExBhJBUQdhcRMiMoEIFEKRobHBCSMzUvAVYnLRChYkNOEl8RcYGRomJygpKjU2Nzg5OkNERUZHSElKU1RVVldYWVpjZGVmZ2hpanN0dXZ3eHl6goOEhYaHiImKkpOUlZaXmJmaoqOkpaanqKmqsrO0tba3uLm6wsPExcbHyMnK0tPU1dbX2Nna4uPk5ebn6Onq8vP09fb3+Pn6/9oADAMBAAIRAxEAPwD5tooooAKAK6jwl4E1TxXLvhQW9mpw9zKDt+gH8R+n4kcV7LoHw48OaCqOLRb25X/ltcjec+y9B7cZ960jTcgPAbDQ9V1Xmw066uxnBMMTMB+IFbsPw48VPaSN/YsofIwGdAxGDnAJz6V9GKQqhQAqjoAMAVbg06S40m61FceXZsik+u7g/llTWjpJbsD5TvvCev6arPd6PeQovJcxEqPxHFZJGK+uN361ia14R0HxBGRf6dC8h/5aoNkg/wCBDn8OlDo9gPmGivQfGHwqvtCje90tn1CxXllx+9jHqQPvD3H5V58RisXFx0YBRRRUgFdr8PPAx8T3pu7wMmm27fMRwZW/uj29T+HeuZ0TTJta1q20+3/1k77c/wB0dz+Ayfwr6P0uwt9H0u3sLVQkMCBV9T6k+56mt6cOZ3A0LeGG0t47e3iSKGNQqIgwFA6AVat4JrlsRrx3Y8AVHYWxu5Tu4jX7x/pW9GVjQIgCqOgFdqiYVavJotyCHSIh/rXZz6L8orpdE1jQ7XwxHZ3AZA6MJ0eNjuYkh8nHPORWDJcLFG0jHCqCSfQV1Oi+G7iLSY/tFy8U0oDSIqAbfneTB98uc/SufEpJImjOU27nGR6bBPbJIjSxlhuG/k4zxnpzj6fSqVzZz23JG9P7y/19K1ole1U2k3+utmML8Y5Xj8iMEexFPMgIwea6YxTirGftpRlZnO768p+JXw+iMMuvaRFsZcvdQIOCO7qO3uPxr17UbMQHzoh+7J5H93/61ZxYEYOCD1BrKcE9GdcZKSuj5WIwcUV1nxD8NDw74hJt0C2V3mSEDgKc/Mv4Z/IiuTrgkrOzKPSfhBpga/vdUkTIhUQxsemW5b8QAP8AvqvWA/41wXwuaGLwr5at+9kleVl/8d/9lru7QhryIHpuz+VehSjaKE3ZXOltUFvbrGOo6n1PepfM96p+b70qOj3dmsiq6NdQBlYZBBlTII7it37queYveZv6LoEmurHcTqI9O3BwXGTcAEEAD+4SOp6jp13UySTWBvVfDYUM8h2i1hIUEAJ37f8A689K7f8AsHSFXjSrEH/r3T/Csd7FL6ZrzTbG0MFq22NBDGFu2GQ4zjgDkKePmBJyMZ8qc3N3Z6UIqCsjJ1DQbk2cOp21mkU7xKLq0Tajbl/iUDgn1GewxzxWHHcpLGro25W6GvTLe0028tkmSytyjjI3QBT9CCMg+oPNcL4wWK38TNHDGkafZo+EAA+9J2FdOHqO/IznrwVuYznKyIUblWGCK5uYGGZ4yeVOK2fNrH1QgXmf7yg12SRGHl71jifihpw1Dwe9wFBls5FlBxzt6N/PP4V4lnHYflX0Lr5ik8P3sMr7VnhaIHrywIr56z/nFefiI2dztPSvAt3FaadYzyI8qROxeNJPLJ+ZuNxBx27GvV5LzRZriyfTD5bsfnia4EhIMcbAjgdC7qeOqn0OPE/A2oSw28iwkrPbyiaNweQe35Fc/jXqvje1uY7fTLq51f8AtKYq3lSSEecYi25HYbiQCWbbnHyle+QvXBpqImrqx0vm0+Eia+s4jnD3cCnaxU/61OhHI/CuUstenaFGfbMpHO7qPxrZ0rWLWTWtMVzJEzX1sMEZGTMnf/61bVItRZ5cXadj0zXZNJttM1NIbvUhJDBKon+23BiSUISELb8bvb8OpAOamp6JHbHbb61Hb2katKU1KdRFGwXySAJOQwYYA6cg9Ky4bnSIfCttAv8AZks7aYXeFkXzmU2RlM5z8xy3BOMdc81UXUbGfSdYCX1oxlsrKJSJkOWhVGcdeq7jn0xzXjHqnUp5Ftpd3PBBfSSvd+V9mOpzJtYRBpBuDEZysmD3JHIBzXP+Jora28QAWhnMb2kMgM8zysdxc9XJPT3q+dR02X+0IP8AhJ9Lt/N1GS4giUq88m4bQEAf59wJAABJJ9a5nxJr9jNrz7L1Ll4baOGRIIyvksrODGwLHDjGCM/h69GH1qIwr/AO82sy+bzdQjjxIzFR8sUZkcjqSFHJAHJ7VSuNeba3lIIlA5ZzuP8Ah/Os3w3Fe6n4sjv1Z/NgJuUzEZmkKEfKiBlLkZGQpyBk9q9KatG7MMMryuHxAisNOvR9lv8A7REsW9kDrJ5Y2juvGc7uOCMD1rwMISOo/OvXfiXrjXKXRaRpUhiWzhLtIzMMnqZPmyCzcHpj8a8i+TuDXn4hvRM7i/oWpf2bq0czn903ySf7p/wOD+Fe9eE7n+19Kn8PSXaxrcLtjQiNY2ywbcAADJLkYXcQAM5YDivnMHFdt4P8TSwPFbGd4J0+WCVWIJHTbn6dPXp6ZKM18DA7aWSOw1S4giJNuH+T96kuB2O5PlPvirSzyRywTwMpkgmjnUMcBijhgM/hXQ/2lpvjdbW1vFNjNF8kfkKMRJtBdznCrDGqMducnPXILPzeo6Te6DDayTfMtwm5wBxEx5CE9m2FWI4xuHFehGal7ktzmrUOZ80dzrV+JN8nw4Xwr/YVs5XS/wCzvtBv2Gf3Xl79vlfjjP412I+NmmKuF0PUh9Wi/wDi68WW8RuuV/Wn/aI/+ei1m8LTMfaVk9UekX3xalkmvBa6ChhmuIp0a4udrLsWMY2qpHWPru79K43U9Vl1fU5b2W2jtmkLnZHIXHzTSy9cD/nrjp/DnvxkNdxr0JY+wqzpdm2sy3Ae4+y21rF5srqm9tu5UGBkZ+Z17gAZOfWo0adL3kHLVqqz2K1zO9xIttbguzkLheSx7AV1hjtvC3ghhJIo1S6Y5jIFxFKVcjCkFosLkHPEiMCBwwwWdppvg6xOoS3iyangoUhuow6xOOGj2gkMUZHWQErgup9H808W+J3V5LiRo31C45+VAOcYLkDAyeST3OT61E582r2R2QgqasjmvGepm6vRaq5YRHdId27Ln/D+p9K5mnSOzuzOSzE5JPUmm15s5OUuYsKUHBzSUVAHV6J4uMW2HUGYgYCzjkj/AHu5+vXivXPD/wARpUuDcXqRajDKrHzIcRsXJUl2K43MQgU5IOM88tu+eRU9reXNo5a3nkiJ67TjP19a6Y1na01dAfRFgukeI7x7e10W1SRLXzhgTJvnMg3JhCxEYEjY+XIEa5IGabe6Z4OspLgLfTyvDdvGUEgI2LcbQB3YGIbsjPJ68bT4rb+LtSGfNEE5OOXTHb/ZIFdJb6hLNapKwQFhnABwK64+9qm7Bc6DV5LSTVZ/sMMMVqjlIvKLkOoOA3zktkjn+gqGy1G40u4+1W03lOqspYgEFSMEEEEEYPeuIu/FV8jNEkdumDwwU5/nj9Kwb3Vb6+ZjcXLuCfu9F46cDiipiIwXLa4XOx8ReNvNuJpI5ze3cpy8x5Udvx9scY/KuFnuJLmZpZnLyMclj1NMJNJXBUqSm9QCiiisgP/Z""" + + def get_default_image_upload(self): + from werkzeug.datastructures import FileStorage + from io import BytesIO + + return FileStorage( + stream=BytesIO(b"this is a test"), + filename="default.png", + content_type="image/png", + ) + + def get_default_image_upload_base64(self): + base64_str = self.get_default_image_base64() + return "data:image/png;base64,{}".format(base64_str) + + def upsert_default_image(self): + from project.services.image import upsert_image_with_base64_str + + with self._app.app_context(): + base64_str = self.get_default_image_base64() + image = upsert_image_with_base64_str(None, base64_str, "image/png") + self._db.session.add(image) + self._db.session.commit() + image_id = image.id + + return image_id + + def create_event_suggestion(self, admin_unit_id, free_text=False): + from project.models import EventSuggestion + from project.services.event_suggestion import insert_event_suggestion + from project.dateutils import now + + with self._app.app_context(): + suggestion = EventSuggestion() + suggestion.admin_unit_id = admin_unit_id + suggestion.contact_name = "Vorname Nachname" + suggestion.contact_email = "vorname@nachname.de" + suggestion.contact_email_notice = True + suggestion.name = "Vorschlag" + suggestion.description = "Beschreibung" + suggestion.start = now + suggestion.photo_id = self.upsert_default_image() + + if free_text: + suggestion.event_place_text = "Freitext Ort" + suggestion.organizer_text = "Freitext Organisator" + else: + suggestion.event_place_id = self.upsert_default_event_place( + admin_unit_id + ) + suggestion.organizer_id = self.upsert_default_event_organizer( + admin_unit_id + ) + insert_event_suggestion(suggestion) + self._db.session.commit() + suggestion_id = suggestion.id + return suggestion_id + + def create_reference(self, event_id, admin_unit_id): + from project.models import EventReference + + with self._app.app_context(): + reference = EventReference() + reference.event_id = event_id + reference.admin_unit_id = admin_unit_id + self._db.session.add(reference) + self._db.session.commit() + reference_id = reference.id + return reference_id + + def create_any_reference(self, admin_unit_id): + other_user_id = self.create_user("other@test.de") + other_admin_unit_id = self.create_admin_unit(other_user_id, "Other Crew") + event_id = self.create_event(other_admin_unit_id) + reference_id = self.create_reference(event_id, admin_unit_id) + return (other_user_id, other_admin_unit_id, event_id, reference_id) + + def create_reference_request(self, event_id, admin_unit_id): + from project.models import ( + EventReferenceRequest, + EventReferenceRequestReviewStatus, + ) + + with self._app.app_context(): + reference_request = EventReferenceRequest() + reference_request.event_id = event_id + reference_request.admin_unit_id = admin_unit_id + reference_request.review_status = EventReferenceRequestReviewStatus.inbox + self._db.session.add(reference_request) + self._db.session.commit() + reference_request_id = reference_request.id + return reference_request_id + + def create_incoming_reference_request(self, admin_unit_id): + other_user_id = self.create_user("other@test.de") + other_admin_unit_id = self.create_admin_unit(other_user_id, "Other Crew") + event_id = self.create_event(other_admin_unit_id) + reference_request_id = self.create_reference_request(event_id, admin_unit_id) + return (other_user_id, other_admin_unit_id, event_id, reference_request_id) diff --git a/tests/services/test_event.py b/tests/services/test_event.py new file mode 100644 index 0000000..aee5c15 --- /dev/null +++ b/tests/services/test_event.py @@ -0,0 +1,38 @@ +def test_update_event_dates_with_recurrence_rule(client, seeder, utils, app): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + + with app.app_context(): + from project.models import Event + from project.dateutils import create_berlin_date + from project.services.event import update_event_dates_with_recurrence_rule + + event = Event.query.get(event_id) + event.start = create_berlin_date(2030, 12, 31, 14, 30) + event.end = create_berlin_date(2030, 12, 31, 16, 30) + update_event_dates_with_recurrence_rule(event) + + len_dates = len(event.dates) + assert len_dates == 1 + + event_date = event.dates[0] + assert event_date.start == event.start + assert event_date.end == event.end + + # Update again + update_event_dates_with_recurrence_rule(event) + + len_dates = len(event.dates) + assert len_dates == 1 + + event_date = event.dates[0] + assert event_date.start == event.start + assert event_date.end == event.end + + # Wiederholt sich alle 1 Tage, endet nach 7 Ereigniss(en) + event.recurrence_rule = "RRULE:FREQ=DAILY;COUNT=7" + + update_event_dates_with_recurrence_rule(event) + + len_dates = len(event.dates) + assert len_dates == 7 diff --git a/tests/services/test_event_search.py b/tests/services/test_event_search.py new file mode 100644 index 0000000..ce5a408 --- /dev/null +++ b/tests/services/test_event_search.py @@ -0,0 +1,10 @@ +def test_date_str(client, seeder, utils): + from project.dateutils import create_berlin_date + from project.services.event_search import EventSearchParams + + params = EventSearchParams() + params.date_from = create_berlin_date(2030, 12, 30, 0) + params.date_to = create_berlin_date(2030, 12, 31, 0) + + assert params.date_from_str == "2030-12-30" + assert params.date_to_str == "2030-12-31" diff --git a/tests/test___init__.py b/tests/test___init__.py new file mode 100644 index 0000000..ced0855 --- /dev/null +++ b/tests/test___init__.py @@ -0,0 +1,10 @@ +def test_mail_server(): + import os + + os.environ["DATABASE_URL"] = "postgresql://postgres@localhost/gsevpt_tests" + os.environ["MAIL_SERVER"] = "mailserver.com" + + from project import app + + app.config["TESTING"] = True + app.testing = True diff --git a/tests/test_access.py b/tests/test_access.py new file mode 100644 index 0000000..5588386 --- /dev/null +++ b/tests/test_access.py @@ -0,0 +1,27 @@ +def test_has_admin_unit_member_role(client, app, db, seeder): + owner_id, admin_unit_id, member_id = seeder.setup_base_event_verifier() + + with app.app_context(): + from project.models import AdminUnitMember + from project.access import has_admin_unit_member_role + + member = AdminUnitMember.query.get(member_id) + assert has_admin_unit_member_role(member, "admin") is False + + +def test_has_current_user_member_role_for_admin_unit(client, app, db, seeder): + owner_id, admin_unit_id, member_id = seeder.setup_base_event_verifier() + + with app.test_request_context(): + with app.app_context(): + from project.models import AdminUnitMember + from project.access import has_current_user_member_role_for_admin_unit + from flask_login import login_user + + member = AdminUnitMember.query.get(member_id) + login_user(member.user) + + assert ( + has_current_user_member_role_for_admin_unit(admin_unit_id, "admin") + is False + ) diff --git a/tests/test_dateutils.py b/tests/test_dateutils.py new file mode 100644 index 0000000..1b3ed34 --- /dev/null +++ b/tests/test_dateutils.py @@ -0,0 +1,64 @@ +def test_calculate_occurrences(): + result = get_calculate_occurrences("RRULE:FREQ=DAILY;COUNT=7") + assert result["batch"]["batch_size"] == 10 + + number_of_occurences = len(result["occurrences"]) + assert number_of_occurences == 7 + + occurence = result["occurrences"][0] + assert occurence["date"] == "20300101T000000" + assert occurence["formattedDate"] == '"01.01.2030"' + + +def test_calculate_occurrences_exdate(): + result = get_calculate_occurrences( + "RRULE:FREQ=DAILY;COUNT=2\nEXDATE:20300102T000000" + ) + + number_of_occurences = len(result["occurrences"]) + assert number_of_occurences == 2 + + +def test_calculate_occurrences_rdate(): + result = get_calculate_occurrences( + "RRULE:FREQ=DAILY;COUNT=2\nEXDATE:20300102T000000\nRDATE:20300103T000000" + ) + + number_of_occurences = len(result["occurrences"]) + assert number_of_occurences == 3 + + +def test_calculate_occurrences_exdateStart(): + result = get_calculate_occurrences( + "RRULE:FREQ=DAILY;COUNT=20\nEXDATE:20300102T000000", 10 + ) + + number_of_occurences = len(result["occurrences"]) + assert number_of_occurences == 10 + + +def test_calculate_occurrences_exdateBefore(): + result = get_calculate_occurrences( + "RRULE:FREQ=DAILY;COUNT=20\nEXDATE:20290102T000000", 10 + ) + + number_of_occurences = len(result["occurrences"]) + assert number_of_occurences == 10 + + +def test_calculate_occurrences_count(): + result = get_calculate_occurrences("RRULE:FREQ=DAILY;COUNT=100") + + number_of_occurences = len(result["occurrences"]) + assert number_of_occurences == 10 + + +def get_calculate_occurrences(rrule_str, start=0): + from datetime import datetime + from project.dateutils import calculate_occurrences + + start_date = datetime(2030, 1, 1) + date_format = '"%d.%m.%Y"' + batch_size = 10 + + return calculate_occurrences(start_date, date_format, rrule_str, start, batch_size) diff --git a/tests/test_i10n.py b/tests/test_i10n.py new file mode 100644 index 0000000..8dd73be --- /dev/null +++ b/tests/test_i10n.py @@ -0,0 +1,4 @@ +def test_send_mails(client, app, utils): + from project.i10n import print_dynamic_texts + + print_dynamic_texts() diff --git a/tests/test_jsonld.py b/tests/test_jsonld.py new file mode 100644 index 0000000..be7aa17 --- /dev/null +++ b/tests/test_jsonld.py @@ -0,0 +1,230 @@ +import pytest + + +def test_get_sd_for_admin_unit(client, app, db, seeder): + user_id, admin_unit_id = seeder.setup_base() + + with app.app_context(): + from project.jsonld import get_sd_for_admin_unit + from project.models import AdminUnit + + admin_unit = AdminUnit.query.get(admin_unit_id) + admin_unit.url = "http://www.goslar.de" + + result = get_sd_for_admin_unit(admin_unit) + assert result["url"] == "http://www.goslar.de" + + +def test_get_sd_for_organizer(client, app, db, seeder): + user_id, admin_unit_id = seeder.setup_base() + organizer_id = seeder.upsert_default_event_organizer(admin_unit_id) + + with app.app_context(): + from project.jsonld import get_sd_for_organizer + from project.models import EventOrganizer + + organizer = EventOrganizer.query.get(organizer_id) + organizer.email = "info@goslar.de" + organizer.phone = "12345" + organizer.fax = "67890" + organizer.url = "http://www.goslar.de" + + result = get_sd_for_organizer(organizer) + assert result["email"] == "info@goslar.de" + assert result["phone"] == "12345" + assert result["faxNumber"] == "67890" + assert result["url"] == "http://www.goslar.de" + + +def test_get_sd_for_place(client, app, db, utils, seeder): + user_id, admin_unit_id = seeder.setup_base() + place_id = seeder.upsert_default_event_place(admin_unit_id) + + with app.app_context(): + from project.jsonld import get_sd_for_place + from project.models import EventPlace, Location, Image + from math import isclose + + place = EventPlace.query.get(place_id) + place.url = "http://www.goslar.de" + + photo = Image() + photo.data = b"something" + place.photo = photo + + location = Location() + location.street = "Markt 7" + location.postalCode = "38640" + location.city = "Goslar" + location.latitude = 51.9077888 + location.longitude = 10.4333312 + place.location = location + + db.session.commit() + + with app.test_request_context(): + result = get_sd_for_place(place) + + assert result["photo"] == utils.get_url("image", id=photo.id) + assert result["url"] == "http://www.goslar.de" + assert result["address"]["streetAddress"] == "Markt 7" + assert result["address"]["postalCode"] == "38640" + assert result["address"]["addressLocality"] == "Goslar" + assert isclose(result["geo"]["latitude"], 51.9077888) + assert isclose(result["geo"]["longitude"], 10.4333312) + + +def test_get_sd_for_place_noCoordinates(client, app, db, utils, seeder): + user_id, admin_unit_id = seeder.setup_base() + place_id = seeder.upsert_default_event_place(admin_unit_id) + + with app.app_context(): + from project.jsonld import get_sd_for_place + from project.models import EventPlace, Location + + place = EventPlace.query.get(place_id) + + location = Location() + location.street = "Markt 7" + location.postalCode = "38640" + location.city = "Goslar" + place.location = location + + db.session.commit() + + result = get_sd_for_place(place) + assert "geo" not in result + + +def test_get_sd_for_event_date(client, app, db, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + + with app.app_context(): + from project.jsonld import get_sd_for_event_date + from project.models import Event, Image + from project.services.event import update_event + from project.dateutils import create_berlin_date + + event = Event.query.get(event_id) + event.start = create_berlin_date(2030, 12, 31, 14, 30) + event.end = create_berlin_date(2030, 12, 31, 16, 30) + event.previous_start_date = create_berlin_date(2030, 12, 30, 14, 30) + event.external_link = "www.goslar.de" + event.ticket_link = "www.tickets.de" + event.accessible_for_free = True + + photo = Image() + photo.data = b"something" + event.photo = photo + + update_event(event) + db.session.commit() + event_date = event.dates[0] + + with app.test_request_context(): + result = get_sd_for_event_date(event_date) + + assert result["startDate"] == event.start + assert result["endDate"] == event.end + assert result["previousStartDate"] == event.previous_start_date + assert result["isAccessibleForFree"] + assert result["url"][0] == utils.get_url("event_date", id=event_date.id) + assert result["url"][1] == "www.goslar.de" + assert result["url"][2] == "www.tickets.de" + assert result["image"] == utils.get_url("image", id=photo.id) + + +@pytest.mark.parametrize( + "age_from, age_to, typicalAgeRange", + [(18, None, "18-"), (None, 14, "-14"), (9, 99, "9-99")], +) +def test_get_sd_for_event_date_ageRange( + client, app, db, seeder, utils, age_from, age_to, typicalAgeRange +): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + + with app.app_context(): + from project.jsonld import get_sd_for_event_date + from project.models import Event + from project.services.event import update_event + + event = Event.query.get(event_id) + event.age_from = age_from + event.age_to = age_to + + update_event(event) + db.session.commit() + event_date = event.dates[0] + + with app.test_request_context(): + result = get_sd_for_event_date(event_date) + + assert result["typicalAgeRange"] == typicalAgeRange + + +@pytest.mark.parametrize( + "attendance_mode, eventAttendanceMode", + [ + (1, "OfflineEventAttendanceMode"), + (2, "OnlineEventAttendanceMode"), + (3, "MixedEventAttendanceMode"), + ], +) +def test_get_sd_for_event_date_eventAttendanceMode( + client, app, db, seeder, utils, attendance_mode, eventAttendanceMode +): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + + with app.app_context(): + from project.jsonld import get_sd_for_event_date + from project.models import Event, EventAttendanceMode + from project.services.event import update_event + + event = Event.query.get(event_id) + event.attendance_mode = EventAttendanceMode(attendance_mode) + + update_event(event) + db.session.commit() + event_date = event.dates[0] + + with app.test_request_context(): + result = get_sd_for_event_date(event_date) + + assert result["eventAttendanceMode"] == eventAttendanceMode + + +@pytest.mark.parametrize( + "status, eventStatus", + [ + (1, "EventScheduled"), + (2, "EventCancelled"), + (3, "EventMovedOnline"), + (4, "EventPostponed"), + (5, "EventRescheduled"), + ], +) +def test_get_sd_for_event_date_eventStatus( + client, app, db, seeder, utils, status, eventStatus +): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + + with app.app_context(): + from project.jsonld import get_sd_for_event_date + from project.models import Event, EventStatus + from project.services.event import update_event + + event = Event.query.get(event_id) + event.status = EventStatus(status) + + update_event(event) + db.session.commit() + event_date = event.dates[0] + + with app.test_request_context(): + result = get_sd_for_event_date(event_date) + + assert result["eventStatus"] == eventStatus diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100644 index 0000000..b3751aa --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,23 @@ +def test_location_update_coordinate(client, app, db): + from project.models import Location + + location = Location() + location.latitude = 51.9077888 + location.longitude = 10.4333312 + location.update_coordinate() + + assert location.coordinate is not None + + +def test_event_category(client, app, db, seeder): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + + with app.app_context(): + from project.models import Event + + event = Event.query.get(event_id) + event.categories = [] + db.session.commit() + + assert event.category is None diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..daeddef --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,147 @@ +import re +from flask import g, url_for +from sqlalchemy.exc import IntegrityError +from bs4 import BeautifulSoup + + +class UtilActions(object): + def __init__(self, client, app): + self._client = client + self._app = app + + def register(self, email="test@test.de", password="MeinPasswortIstDasBeste"): + response = self._client.get("/register") + assert response.status_code == 200 + + with self._client: + response = self._client.post( + "/register", + data={ + "email": email, + "password": password, + "password_confirm": password, + "csrf_token": self.get_csrf(response), + "submit": "Register", + }, + follow_redirects=True, + ) + assert response.status_code == 200 + assert g.identity.user.email == email + + def login(self, email="test@test.de", password="MeinPasswortIstDasBeste"): + from project.services.user import find_user_by_email + + response = self._client.get("/login") + assert response.status_code == 200 + + with self._client: + response = self._client.post( + "/login", + data={ + "email": email, + "password": password, + "csrf_token": self.get_csrf(response), + "submit": "Anmelden", + }, + follow_redirects=True, + ) + assert response.status_code == 200 + assert g.identity.user.email == email + + with self._app.app_context(): + user = find_user_by_email(email) + user_id = user.id + + return user_id + + def logout(self): + return self._client.get("/logout") + + def get_csrf(self, response, prefix=None): + name = "csrf_token" + if prefix: + name = prefix + "-" + name + + pattern = ( + '' + ) + return ( + re.search(pattern.encode("utf-8"), response.data).group(1).decode("utf-8") + ) + + def create_form_data(self, response, values: dict) -> dict: + from project.scrape.form import Form + + soup = BeautifulSoup(response.data, "html.parser") + form = Form(soup.find("form")) + return form.fill(values) + + def post_form(self, url, response, values: dict): + data = self.create_form_data(response, values) + return self._client.post(url, data=data) + + def post_json(self, url, data: dict): + response = self._client.post(url, json=data) + assert response.content_type == "application/json" + return response.json + + def mock_db_commit(self, mocker): + mocked_commit = mocker.patch("project.db.session.commit") + mocked_commit.side_effect = IntegrityError( + "MockException", "MockException", None + ) + + def mock_send_mails(self, mocker): + return mocker.patch("project.views.utils.send_mails") + + def assert_send_mail_called(self, mock, recipient): + mock.assert_called_once() + args, kwargs = mock.call_args + assert args[0] == [recipient] + + def get_url(self, endpoint, **values): + with self._app.test_request_context(): + url = url_for(endpoint, **values, _external=False) + return url + + def get_ok(self, url): + response = self._client.get(url) + self.assert_response_ok(response) + return response + + def assert_response_ok(self, response): + assert response.status_code == 200 + + def get_unauthorized(self, url): + response = self._client.get(url) + self.assert_response_unauthorized(response) + return response + + def assert_response_unauthorized(self, response): + assert response.status_code == 401 + + def assert_response_notFound(self, response): + assert response.status_code == 404 + + def get_endpoint(self, endpoint, **values): + return self._client.get(self.get_url(endpoint, **values)) + + def get_endpoint_ok(self, endpoint, **values): + return self.get_ok(self.get_url(endpoint, **values)) + + def assert_response_redirect(self, response, endpoint, **values): + assert response.status_code == 302 + + redirect_url = "http://localhost" + self.get_url(endpoint, **values) + assert response.headers["Location"] == redirect_url + + def assert_response_db_error(self, response): + assert response.status_code == 200 + assert b"MockException" in response.data + + def assert_response_error_message(self, response, error_message=b"alert-danger"): + assert response.status_code == 200 + assert error_message in response.data + + def assert_response_permission_missing(self, response, endpoint, **values): + self.assert_response_redirect(response, endpoint, **values) diff --git a/tests/views/__init__.py b/tests/views/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/views/test_admin.py b/tests/views/test_admin.py new file mode 100644 index 0000000..4fdeef6 --- /dev/null +++ b/tests/views/test_admin.py @@ -0,0 +1,20 @@ +def test_normal_user(client, seeder, utils): + seeder.create_user() + utils.login() + response = client.get("/admin") + assert response.status_code == 403 + + +def test_admin_user(client, seeder, utils, app): + seeder.create_user(admin=True) + utils.login() + response = client.get("/admin") + assert response.status_code == 200 + + +def test_admin_units(client, seeder, utils, app): + seeder.create_user(admin=True) + user = utils.login() + seeder.create_admin_unit(user, "Meine Crew") + response = client.get("/admin/admin_units") + assert b"Meine Crew" in response.data diff --git a/tests/views/test_admin_unit.py b/tests/views/test_admin_unit.py new file mode 100644 index 0000000..0dd83b3 --- /dev/null +++ b/tests/views/test_admin_unit.py @@ -0,0 +1,127 @@ +def create_form_data(response, utils): + return { + "csrf_token": utils.get_csrf(response), + "name": "Meine Crew", + "short_name": "meine_crew", + "location-csrf_token": utils.get_csrf(response, "location"), + "location-postalCode": "38640", + "location-city": "Goslar", + "logo-csrf_token": utils.get_csrf(response, "logo"), + "submit": "Submit", + } + + +def test_create(client, app, utils, seeder): + utils.register() + response = client.get("/admin_unit/create") + assert response.status_code == 200 + + data = create_form_data(response, utils) + data["logo-image_file"] = seeder.get_default_image_upload() + + with client: + response = client.post( + "/admin_unit/create", + data=data, + ) + assert response.status_code == 302 + + with app.app_context(): + from project.services.admin_unit import get_admin_unit_by_name + from project.services.organizer import get_event_organizer + from project.access import has_current_user_member_role_for_admin_unit + + admin_unit = get_admin_unit_by_name("Meine Crew") + assert admin_unit is not None + assert admin_unit.name == "Meine Crew" + assert admin_unit.location.city == "Goslar" + assert admin_unit.location.postalCode == "38640" + assert has_current_user_member_role_for_admin_unit(admin_unit.id, "admin") + assert has_current_user_member_role_for_admin_unit( + admin_unit.id, "event_verifier" + ) + + organizer = get_event_organizer(admin_unit.id, "Meine Crew") + assert organizer.name == "Meine Crew" + assert organizer.location.city == "Goslar" + assert organizer.location.postalCode == "38640" + assert organizer is not None + + +def test_create_duplicate(client, app, utils, seeder): + seeder.create_user() + user_id = utils.login() + seeder.create_admin_unit(user_id, "Meine Crew") + + response = client.get("/admin_unit/create") + assert response.status_code == 200 + + with client: + response = client.post( + "/admin_unit/create", + data=create_form_data(response, utils), + ) + assert response.status_code == 200 + assert b"duplicate" in response.data + + +def test_update(client, app, utils, seeder): + seeder.create_user() + user_id = utils.login() + admin_unit_id = seeder.create_admin_unit(user_id, "Initial name") + + url = "/admin_unit/%d/update" % admin_unit_id + response = client.get(url) + assert response.status_code == 200 + + with client: + response = client.post( + url, + data=create_form_data(response, utils), + ) + assert response.status_code == 302 + + with app.app_context(): + from project.services.admin_unit import get_admin_unit_by_id + + admin_unit_from_db = get_admin_unit_by_id(admin_unit_id) + assert admin_unit_from_db is not None + assert admin_unit_from_db.name == "Meine Crew" + + +def test_update_duplicate(client, app, utils, seeder): + user_id = seeder.create_user() + utils.login() + seeder.create_admin_unit(user_id, "Meine Crew") + admin_unit_id = seeder.create_admin_unit(user_id, "Other Crew") + + url = "/admin_unit/%d/update" % admin_unit_id + response = client.get(url) + assert response.status_code == 200 + + with client: + response = client.post( + url, + data=create_form_data(response, utils), + ) + assert response.status_code == 200 + assert b"duplicate" in response.data + + +def test_update_permission_missing(client, app, db, utils, seeder): + owner_id = seeder.create_user("owner@owner") + admin_unit_id = seeder.create_admin_unit(owner_id, "Other crew") + seeder.create_admin_unit_member_event_verifier(admin_unit_id) + utils.login() + + url = "/admin_unit/%d/update" % admin_unit_id + response = client.get(url) + assert response.status_code == 302 + + +def test_list(client, app, utils, seeder): + seeder.create_user() + user_id = utils.login() + seeder.create_admin_unit(user_id, "Meine Crew") + response = client.get("/manage/admin_units") + assert b"Meine Crew" in response.data diff --git a/tests/views/test_admin_unit_member.py b/tests/views/test_admin_unit_member.py new file mode 100644 index 0000000..7a12410 --- /dev/null +++ b/tests/views/test_admin_unit_member.py @@ -0,0 +1,151 @@ +def test_update(client, app, utils, seeder): + seeder.create_user() + user_id = utils.login() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + member_id = seeder.create_admin_unit_member_event_verifier(admin_unit_id) + + url = "/manage/member/%d/update" % member_id + response = client.get(url) + assert response.status_code == 200 + + with client: + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "roles": "admin", + "submit": "Submit", + }, + ) + assert response.status_code == 302 + + with app.app_context(): + from project.services.admin_unit import get_admin_unit_member + + member = get_admin_unit_member(member_id) + assert len(member.roles) == 1 + assert any(r.name == "admin" for r in member.roles) + + +def test_update_db_error(client, app, utils, seeder, mocker): + seeder.create_user() + user_id = utils.login() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + member_id = seeder.create_admin_unit_member_event_verifier(admin_unit_id) + + url = "/manage/member/%d/update" % member_id + response = client.get(url) + assert response.status_code == 200 + + with client: + utils.mock_db_commit(mocker) + + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "roles": "admin", + "submit": "Submit", + }, + ) + assert response.status_code == 200 + assert b"MockException" in response.data + + +def test_update_permission_missing(client, app, db, utils, seeder): + owner_id = seeder.create_user("owner@owner") + admin_unit_id = seeder.create_admin_unit(owner_id, "Other crew") + member_id = seeder.create_admin_unit_member_event_verifier(admin_unit_id) + utils.login() + + url = "/manage/member/%d/update" % member_id + response = client.get(url) + assert response.status_code == 302 + + +def test_delete(client, app, utils, seeder): + seeder.create_user() + user_id = utils.login() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + member_id = seeder.create_admin_unit_member_event_verifier(admin_unit_id) + + url = "/manage/member/%d/delete" % member_id + response = client.get(url) + assert response.status_code == 200 + + with client: + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "email": "test@test.de", + "submit": "Submit", + }, + ) + assert response.status_code == 302 + + with app.app_context(): + from project.services.admin_unit import get_admin_unit_member + + member = get_admin_unit_member(member_id) + assert member is None + + +def test_delete_db_error(client, app, utils, seeder, mocker): + seeder.create_user() + user_id = utils.login() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + member_id = seeder.create_admin_unit_member_event_verifier(admin_unit_id) + + url = "/manage/member/%d/delete" % member_id + response = client.get(url) + assert response.status_code == 200 + + with client: + utils.mock_db_commit(mocker) + + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "email": "test@test.de", + "submit": "Submit", + }, + ) + + assert response.status_code == 200 + assert b"MockException" in response.data + + +def test_delete_email_does_not_match(client, app, utils, seeder): + seeder.create_user() + user_id = utils.login() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + member_id = seeder.create_admin_unit_member_event_verifier(admin_unit_id) + + url = "/manage/member/%d/delete" % member_id + response = client.get(url) + assert response.status_code == 200 + + with client: + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "email": "wrong@test.de", + "submit": "Submit", + }, + ) + assert response.status_code == 200 + assert b"Die eingegebene Email passt nicht zur Email" in response.data + + +def test_delete_permission_missing(client, app, db, utils, seeder): + owner_id = seeder.create_user("owner@owner") + admin_unit_id = seeder.create_admin_unit(owner_id, "Other crew") + member_id = seeder.create_admin_unit_member_event_verifier(admin_unit_id) + utils.login() + + url = "/manage/member/%d/delete" % member_id + response = client.get(url) + assert response.status_code == 302 diff --git a/tests/views/test_admin_unit_member_invitation.py b/tests/views/test_admin_unit_member_invitation.py new file mode 100644 index 0000000..b29044c --- /dev/null +++ b/tests/views/test_admin_unit_member_invitation.py @@ -0,0 +1,343 @@ +def test_create(client, app, utils, seeder, mocker): + seeder.create_user() + user_id = utils.login() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + + url = "/manage/admin_unit/%d/members/invite" % admin_unit_id + response = client.get(url) + assert response.status_code == 200 + + with client: + mail_mock = utils.mock_send_mails(mocker) + email = "new@member.de" + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "email": email, + "roles": "admin", + "submit": "Submit", + }, + ) + assert response.status_code == 302 + utils.assert_send_mail_called(mail_mock, email) + + with app.app_context(): + from project.services.admin_unit import find_admin_unit_member_invitation + + invitation = find_admin_unit_member_invitation(email, admin_unit_id) + assert invitation.roles == "admin" + + +def test_create_db_error(client, app, utils, seeder, mocker): + seeder.create_user() + user_id = utils.login() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + + url = "/manage/admin_unit/%d/members/invite" % admin_unit_id + response = client.get(url) + assert response.status_code == 200 + + with client: + utils.mock_db_commit(mocker) + email = "new@member.de" + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "email": email, + "roles": "admin", + "submit": "Submit", + }, + ) + assert response.status_code == 200 + assert b"MockException" in response.data + + +def test_create_permission_missing(client, app, db, utils, seeder): + owner_id = seeder.create_user("owner@owner") + admin_unit_id = seeder.create_admin_unit(owner_id, "Other crew") + seeder.create_admin_unit_member_event_verifier(admin_unit_id) + utils.login() + + url = "/manage/admin_unit/%d/members/invite" % admin_unit_id + response = client.get(url) + assert response.status_code == 302 + + +def test_read_accept(client, app, db, utils, seeder): + user_id = seeder.create_user() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + + email = "new@member.de" + new_member_user_id = seeder.create_user(email) + utils.login(email) + + invitation_id = seeder.create_invitation(admin_unit_id, email) + + url = "/invitations/%d" % invitation_id + response = client.get(url) + assert response.status_code == 200 + + with client: + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "accept": "Akzeptieren", + }, + ) + assert response.status_code == 302 + assert response.headers["Location"] == "http://localhost/manage" + + with app.app_context(): + from project.services.admin_unit import ( + find_admin_unit_member_invitation, + get_member_for_admin_unit_by_user_id, + ) + + invitation = find_admin_unit_member_invitation(email, admin_unit_id) + assert invitation is None + + member = get_member_for_admin_unit_by_user_id( + admin_unit_id, new_member_user_id + ) + assert len(member.roles) == 1 + assert any(r.name == "admin" for r in member.roles) + + +def test_read_accept_WrongRole(client, app, db, utils, seeder): + user_id = seeder.create_user() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + + email = "new@member.de" + seeder.create_user(email) + utils.login(email) + + invitation_id = seeder.create_invitation(admin_unit_id, email, ["wrongrole"]) + + url = "/invitations/%d" % invitation_id + response = client.get(url) + assert response.status_code == 200 + + with client: + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "accept": "Akzeptieren", + }, + ) + utils.assert_response_redirect(response, "manage") + + +def test_read_decline(client, app, db, utils, seeder): + user_id = seeder.create_user() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + + email = "new@member.de" + new_member_user_id = seeder.create_user(email) + utils.login(email) + + invitation_id = seeder.create_invitation(admin_unit_id, email) + + url = "/invitations/%d" % invitation_id + response = client.get(url) + assert response.status_code == 200 + + with client: + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "decline": "Ablehnen", + }, + ) + assert response.status_code == 302 + assert response.headers["Location"] == "http://localhost/manage" + + with app.app_context(): + from project.services.admin_unit import ( + find_admin_unit_member_invitation, + get_member_for_admin_unit_by_user_id, + ) + + invitation = find_admin_unit_member_invitation(email, admin_unit_id) + assert invitation is None + + member = get_member_for_admin_unit_by_user_id( + admin_unit_id, new_member_user_id + ) + assert member is None + + +def test_read_db_error(client, app, utils, seeder, mocker): + user_id = seeder.create_user() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + + email = "new@member.de" + seeder.create_user(email) + utils.login(email) + + invitation_id = seeder.create_invitation(admin_unit_id, email) + + url = "/invitations/%d" % invitation_id + response = client.get(url) + assert response.status_code == 200 + + with client: + utils.mock_db_commit(mocker) + + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "accept": "Akzeptieren", + }, + ) + + assert response.status_code == 200 + assert b"MockException" in response.data + + +def test_read_new_member_not_registered(client, app, utils, seeder): + seeder.create_user() + user_id = utils.login() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + email = "new@member.de" + + invitation_id = seeder.create_invitation(admin_unit_id, email) + + url = "/invitations/%d" % invitation_id + response = client.get(url) + assert response.status_code == 302 + assert response.headers["Location"] == "http://localhost/register" + + +def test_read_new_member_not_authenticated(client, app, utils, seeder): + user_id = seeder.create_user() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + + email = "new@member.de" + seeder.create_user(email) + + invitation_id = seeder.create_invitation(admin_unit_id, email) + + url = "/invitations/%d" % invitation_id + response = client.get(url) + assert response.status_code == 302 + assert response.headers["Location"].startswith("http://localhost/login") + + +def test_read_currentUserDoesNotMatchInvitationEmail(client, app, db, utils, seeder): + user_id = seeder.create_user() + utils.login() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + + email = "new@member.de" + seeder.create_user(email) + + invitation_id = seeder.create_invitation(admin_unit_id, email) + + url = "/invitations/%d" % invitation_id + response = client.get(url) + assert response.status_code == 302 + assert response.headers["Location"] == "http://localhost/profile" + + +def test_delete(client, app, utils, seeder): + seeder.create_user() + user_id = utils.login() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + email = "new@member.de" + + invitation_id = seeder.create_invitation(admin_unit_id, email) + + url = "/manage/invitation/%d/delete" % invitation_id + response = client.get(url) + assert response.status_code == 200 + + with client: + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "email": email, + "submit": "Submit", + }, + ) + assert response.status_code == 302 + + with app.app_context(): + from project.services.admin_unit import find_admin_unit_member_invitation + + invitation = find_admin_unit_member_invitation(email, admin_unit_id) + assert invitation is None + + +def test_delete_db_error(client, app, utils, seeder, mocker): + seeder.create_user() + user_id = utils.login() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + email = "new@member.de" + + invitation_id = seeder.create_invitation(admin_unit_id, email) + + url = "/manage/invitation/%d/delete" % invitation_id + response = client.get(url) + assert response.status_code == 200 + + with client: + utils.mock_db_commit(mocker) + + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "email": email, + "submit": "Submit", + }, + ) + + assert response.status_code == 200 + assert b"MockException" in response.data + + +def test_delete_email_does_not_match(client, app, utils, seeder): + seeder.create_user() + user_id = utils.login() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + email = "new@member.de" + + invitation_id = seeder.create_invitation(admin_unit_id, email) + + url = "/manage/invitation/%d/delete" % invitation_id + response = client.get(url) + assert response.status_code == 200 + + with client: + response = client.post( + url, + data={ + "csrf_token": utils.get_csrf(response), + "email": "wrong@test.de", + "submit": "Submit", + }, + ) + assert response.status_code == 200 + assert b"Die eingegebene Email passt nicht zur Email" in response.data + + +def test_delete_permission_missing(client, app, db, utils, seeder): + owner_id = seeder.create_user("owner@owner") + admin_unit_id = seeder.create_admin_unit(owner_id, "Other crew") + email = "new@member.de" + seeder.create_admin_unit_member_event_verifier(admin_unit_id) + utils.login() + + invitation_id = seeder.create_invitation(admin_unit_id, email) + + url = "/manage/invitation/%d/delete" % invitation_id + + response = client.get(url) + assert response.status_code == 302 diff --git a/tests/views/test_api.py b/tests/views/test_api.py new file mode 100644 index 0000000..fe0b84e --- /dev/null +++ b/tests/views/test_api.py @@ -0,0 +1,56 @@ +def test_events(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + seeder.create_event(admin_unit_id) + + url = utils.get_url("api_events") + utils.get_ok(url) + + +def test_event_dates(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + seeder.create_event(admin_unit_id) + + url = utils.get_url("api_event_dates") + utils.get_ok(url) + + url = utils.get_url("api_event_dates", keyword="name") + utils.get_ok(url) + + url = utils.get_url("api_event_dates", category_id=2000) + utils.get_ok(url) + + url = utils.get_url("api_event_dates", category_id=0) + utils.get_ok(url) + + url = utils.get_url("api_event_dates", weekday=1) + utils.get_ok(url) + + url = utils.get_url( + "api_event_dates", coordinate="51.9077888,10.4333312", distance=500 + ) + utils.get_ok(url) + + url = utils.get_url("api_event_dates", date_from="2020-10-03", date_to="2021-10-03") + utils.get_ok(url) + + organizer_id = seeder.upsert_default_event_organizer(admin_unit_id) + url = utils.get_url("api_event_dates", organizer_id=organizer_id) + utils.get_ok(url) + + +def test_infoscreen(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + seeder.create_event(admin_unit_id) + au_short_name = "meinecrew" + + url = utils.get_url("api_infoscreen", au_short_name=au_short_name) + utils.get_ok(url) + + +def test_event_places(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + organizer_id = seeder.upsert_default_event_organizer(admin_unit_id) + seeder.upsert_event_place(admin_unit_id, "Mein Ort") + + url = utils.get_url("api_event_places", id=organizer_id) + utils.get_ok(url) diff --git a/tests/test_app.py b/tests/views/test_app.py similarity index 100% rename from tests/test_app.py rename to tests/views/test_app.py diff --git a/tests/views/test_auth.py b/tests/views/test_auth.py new file mode 100644 index 0000000..3fb6f61 --- /dev/null +++ b/tests/views/test_auth.py @@ -0,0 +1,15 @@ +from project.services.user import find_user_by_email + + +def test_register(client, app, utils): + utils.register("test@test.de", "MeinPasswortIstDasBeste") + + with app.app_context(): + user = find_user_by_email("test@test.de") + assert user is not None + + +def test_login(client, app, db, utils, seeder): + seeder.create_user("test@test.de", "MeinPasswortIstDasBeste") + user_id = utils.login("test@test.de", "MeinPasswortIstDasBeste") + assert user_id is not None diff --git a/tests/views/test_event.py b/tests/views/test_event.py new file mode 100644 index 0000000..463c4d9 --- /dev/null +++ b/tests/views/test_event.py @@ -0,0 +1,365 @@ +import pytest + + +def test_read(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + + url = utils.get_url("event", event_id=event_id) + utils.get_ok(url) + + +@pytest.mark.parametrize("db_error", [True, False]) +def test_create(client, app, utils, seeder, mocker, db_error): + user_id, admin_unit_id = seeder.setup_base() + place_id = seeder.upsert_default_event_place(admin_unit_id) + organizer_id = seeder.upsert_default_event_organizer(admin_unit_id) + + url = utils.get_url("event_create_for_admin_unit_id", id=admin_unit_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + response = utils.post_form( + url, + response, + { + "name": "Name", + "description": "Beschreibung", + "start": ["2030-12-31", "23", "59"], + "event_place_id": place_id, + "organizer_id": organizer_id, + "photo-image_base64": seeder.get_default_image_upload_base64(), + }, + ) + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_events", id=admin_unit_id + ) + + with app.app_context(): + from project.models import Event + + event = ( + Event.query.filter(Event.admin_unit_id == admin_unit_id) + .filter(Event.name == "Name") + .first() + ) + assert event is not None + + +def test_create_newPlaceAndOrganizer(client, app, utils, seeder, mocker): + user_id, admin_unit_id = seeder.setup_base() + + url = utils.get_url("event_create_for_admin_unit_id", id=admin_unit_id) + response = utils.get_ok(url) + + response = utils.post_form( + url, + response, + { + "name": "Name", + "description": "Beschreibung", + "start": ["2030-12-31", "23", "59"], + "organizer_choice": 2, + "new_organizer-name": "Neuer Veranstalter", + "event_place_choice": 2, + "new_event_place-name": "Neuer Ort", + }, + ) + utils.assert_response_redirect( + response, "manage_admin_unit_events", id=admin_unit_id + ) + + with app.app_context(): + from project.models import Event + + event = ( + Event.query.filter(Event.admin_unit_id == admin_unit_id) + .filter(Event.name == "Name") + .first() + ) + assert event is not None + + +def test_create_missingName(client, app, utils, seeder, mocker): + user_id, admin_unit_id = seeder.setup_base() + + url = utils.get_url("event_create_for_admin_unit_id", id=admin_unit_id) + response = utils.get_ok(url) + + response = utils.post_form( + url, + response, + {}, + ) + + utils.assert_response_error_message(response) + + +def test_create_missingPlace(client, app, utils, seeder, mocker): + user_id, admin_unit_id = seeder.setup_base() + + url = utils.get_url("event_create_for_admin_unit_id", id=admin_unit_id) + response = utils.get_ok(url) + + response = utils.post_form( + url, + response, + { + "name": "Name", + "description": "Beschreibung", + "start": ["2030-12-31", "23", "59"], + }, + ) + + utils.assert_response_error_message(response) + + +def test_create_missingOrganizer(client, app, utils, seeder, mocker): + user_id, admin_unit_id = seeder.setup_base() + place_id = seeder.upsert_default_event_place(admin_unit_id) + organizer_id = seeder.upsert_default_event_organizer(admin_unit_id) + + url = utils.get_url("event_create_for_admin_unit_id", id=admin_unit_id) + response = utils.get_ok(url) + + response = utils.post_form( + url, + response, + { + "name": "Name", + "description": "Beschreibung", + "start": ["31.12.2030", "23", "59"], + "event_place_id": place_id, + "organizer_id": organizer_id, + }, + ) + + utils.assert_response_error_message(response) + + +def test_create_invalidDateFormat(client, app, utils, seeder, mocker): + user_id, admin_unit_id = seeder.setup_base() + place_id = seeder.upsert_default_event_place(admin_unit_id) + + url = utils.get_url("event_create_for_admin_unit_id", id=admin_unit_id) + response = utils.get_ok(url) + + response = utils.post_form( + url, + response, + { + "name": "Name", + "description": "Beschreibung", + "start": ["2030-12-31", "23", "59"], + "event_place_id": place_id, + }, + ) + + utils.assert_response_error_message(response) + + +def test_duplicate(client, app, utils, seeder, mocker): + user_id, admin_unit_id = seeder.setup_base() + template_event_id = seeder.create_event(admin_unit_id) + + url = utils.get_url( + "event_create_for_admin_unit_id", + id=admin_unit_id, + template_id=template_event_id, + ) + response = utils.get_ok(url) + + response = utils.post_form(url, response, {}) + utils.assert_response_redirect( + response, "manage_admin_unit_events", id=admin_unit_id + ) + + with app.app_context(): + from project.models import Event + + events = ( + Event.query.filter(Event.admin_unit_id == admin_unit_id) + .filter(Event.name == "Name") + .all() + ) + assert len(events) == 2 + + +@pytest.mark.parametrize("free_text", [True, False]) +def test_create_fromSuggestion(client, app, utils, seeder, mocker, free_text): + user_id, admin_unit_id = seeder.setup_base() + suggestion_id = seeder.create_event_suggestion(admin_unit_id, free_text) + + url = utils.get_url( + "event_create_for_admin_unit_id", + id=admin_unit_id, + event_suggestion_id=suggestion_id, + ) + response = utils.get_ok(url) + + response = utils.post_form(url, response, {}) + utils.assert_response_redirect( + response, "manage_admin_unit_events", id=admin_unit_id + ) + + with app.app_context(): + from project.models import Event, EventSuggestion + + event = ( + Event.query.filter(Event.admin_unit_id == admin_unit_id) + .filter(Event.name == "Vorschlag") + .first() + ) + assert event is not None + + suggestion = EventSuggestion.query.get(suggestion_id) + assert suggestion is not None + assert suggestion.verified + assert suggestion.event_id == event.id + + +def test_create_verifiedSuggestionRedirectsToReviewStatus( + client, app, utils, seeder, mocker +): + user_id, admin_unit_id = seeder.setup_base() + suggestion_id = seeder.create_event_suggestion(admin_unit_id) + + url = utils.get_url( + "event_create_for_admin_unit_id", + id=admin_unit_id, + event_suggestion_id=suggestion_id, + ) + response = utils.get_ok(url) + + response = utils.post_form(url, response, {}) + utils.assert_response_redirect( + response, "manage_admin_unit_events", id=admin_unit_id + ) + + response = client.get(url) + utils.assert_response_redirect( + response, "event_suggestion_review_status", event_suggestion_id=suggestion_id + ) + + +@pytest.mark.parametrize("db_error", [True, False]) +def test_update(client, seeder, utils, app, mocker, db_error): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + + url = utils.get_url("event_update", event_id=event_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + response = utils.post_form( + url, + response, + { + "name": "Neuer Name", + }, + ) + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_events", id=admin_unit_id + ) + + with app.app_context(): + from project.models import Event + + event = ( + Event.query.filter(Event.admin_unit_id == admin_unit_id) + .filter(Event.name == "Neuer Name") + .first() + ) + assert event is not None + + +@pytest.mark.parametrize("db_error", [True, False]) +def test_delete(client, seeder, utils, app, mocker, db_error): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + + url = utils.get_url("event_delete", event_id=event_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + response = utils.post_form( + url, + response, + { + "name": "Name", + }, + ) + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_events", id=admin_unit_id + ) + + with app.app_context(): + from project.models import Event + + event = ( + Event.query.filter(Event.admin_unit_id == admin_unit_id) + .filter(Event.name == "Name") + .first() + ) + assert event is None + + +def test_delete_nameDoesNotMatch(client, seeder, utils, app, mocker): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + + url = utils.get_url("event_delete", event_id=event_id) + response = utils.get_ok(url) + + response = utils.post_form( + url, + response, + { + "name": "Falscher Name", + }, + ) + + utils.assert_response_error_message( + response, b"Der eingegebene Name entspricht nicht dem Namen der Veranstaltung" + ) + + +def test_rrule(client, seeder, utils, app): + url = utils.get_url("event_rrule") + json = utils.post_json( + url, + { + "year": 2020, + "month": 11, + "day": 25, + "rrule": "RRULE:FREQ=DAILY;COUNT=7", + "start": 0, + }, + ) + + assert json["batch"]["batch_size"] == 10 + + occurence = json["occurrences"][0] + assert occurence["date"] == "20201125T000000" + assert occurence["formattedDate"] == '"25.11.2020"' diff --git a/tests/views/test_event_date.py b/tests/views/test_event_date.py new file mode 100644 index 0000000..25048d1 --- /dev/null +++ b/tests/views/test_event_date.py @@ -0,0 +1,24 @@ +def test_read(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + seeder.create_event(admin_unit_id) + + url = utils.get_url("event_date", id=1) + utils.get_ok(url) + + url = utils.get_url("event_date", id=1, src="home") + response = client.get(url) + utils.assert_response_redirect(response, "event_date", id=1) + + +def test_list(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + seeder.create_event(admin_unit_id) + + url = utils.get_url("event_dates") + utils.get_ok(url) + + url = utils.get_url("event_dates", keyword="name") + utils.get_ok(url) + + url = utils.get_url("event_dates", category_id=2000) + utils.get_ok(url) diff --git a/tests/views/test_event_place.py b/tests/views/test_event_place.py new file mode 100644 index 0000000..f460d40 --- /dev/null +++ b/tests/views/test_event_place.py @@ -0,0 +1,135 @@ +import pytest + + +@pytest.mark.parametrize("db_error", [True, False]) +def test_create(client, app, utils, seeder, mocker, db_error): + user_id, admin_unit_id = seeder.setup_base() + + url = utils.get_url("manage_admin_unit_places_create", id=admin_unit_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + response = utils.post_form( + url, + response, + { + "name": "Neuer Ort", + }, + ) + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_event_places", id=admin_unit_id + ) + + with app.app_context(): + from project.models import EventPlace + + place = ( + EventPlace.query.filter(EventPlace.admin_unit_id == admin_unit_id) + .filter(EventPlace.name == "Neuer Ort") + .first() + ) + assert place is not None + + +@pytest.mark.parametrize("db_error", [True, False]) +def test_update(client, seeder, utils, app, mocker, db_error): + user_id, admin_unit_id = seeder.setup_base() + place_id = seeder.upsert_default_event_place(admin_unit_id) + + url = utils.get_url("event_place_update", id=place_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + response = utils.post_form( + url, + response, + { + "name": "Neuer Name", + }, + ) + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_event_places", id=admin_unit_id + ) + + with app.app_context(): + from project.models import EventPlace + + place = EventPlace.query.get(place_id) + assert place.name == "Neuer Name" + + +@pytest.mark.parametrize("db_error", [True, False]) +@pytest.mark.parametrize("non_match", [True, False]) +def test_delete(client, seeder, utils, app, mocker, db_error, non_match): + user_id, admin_unit_id = seeder.setup_base() + place_id = seeder.upsert_event_place(admin_unit_id, "Mein Ort") + + url = utils.get_url("event_place_delete", id=place_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + form_name = "Mein Ort" + + if non_match: + form_name = "Falscher Name" + + response = utils.post_form( + url, + response, + { + "name": form_name, + }, + ) + + if non_match: + utils.assert_response_error_message( + response, b"Der eingegebene Name entspricht nicht dem Namen des Ortes" + ) + return + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_event_places", id=admin_unit_id + ) + + with app.app_context(): + from project.models import EventPlace + + place = EventPlace.query.get(place_id) + assert place is None + + +def test_create_Unauthorized(client, app, utils, seeder): + owner_id = seeder.create_user("owner@owner") + admin_unit_id = seeder.create_admin_unit(owner_id, "Other crew") + seeder.create_admin_unit_member(admin_unit_id, []) + utils.login() + + url = utils.get_url("manage_admin_unit_places_create", id=admin_unit_id) + utils.get_unauthorized(url) + + +def test_create_admin(client, app, utils, seeder): + user_id, admin_unit_id = seeder.setup_base(admin=True) + + url = utils.get_url("manage_admin_unit_places_create", id=admin_unit_id) + utils.get_ok(url) diff --git a/tests/views/test_event_suggestion.py b/tests/views/test_event_suggestion.py new file mode 100644 index 0000000..4b40f02 --- /dev/null +++ b/tests/views/test_event_suggestion.py @@ -0,0 +1,76 @@ +import pytest + + +def test_review(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + event_suggestion_id = seeder.create_event_suggestion(admin_unit_id) + + url = utils.get_url( + "event_suggestion_review", event_suggestion_id=event_suggestion_id + ) + utils.get_ok(url) + + +def test_review_status(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + event_suggestion_id = seeder.create_event_suggestion(admin_unit_id) + + url = utils.get_url( + "event_suggestion_review_status", event_suggestion_id=event_suggestion_id + ) + utils.get_ok(url) + + +@pytest.mark.parametrize("db_error", [True, False]) +@pytest.mark.parametrize("is_verified", [True, False]) +def test_reject(client, app, utils, seeder, mocker, db, db_error, is_verified): + user_id, admin_unit_id = seeder.setup_base() + event_suggestion_id = seeder.create_event_suggestion(admin_unit_id) + + url = utils.get_url( + "event_suggestion_reject", event_suggestion_id=event_suggestion_id + ) + + if is_verified: + with app.app_context(): + from project.models import EventSuggestion, EventReviewStatus + + suggestion = EventSuggestion.query.get(event_suggestion_id) + suggestion.review_status = EventReviewStatus.verified + db.session.commit() + + response = client.get(url) + utils.assert_response_redirect( + response, "event_suggestion_review", event_suggestion_id=event_suggestion_id + ) + return + + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + mail_mock = utils.mock_send_mails(mocker) + response = utils.post_form( + url, + response, + { + "rejection_resaon": 0, + }, + ) + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_event_reviews", id=admin_unit_id + ) + utils.assert_send_mail_called(mail_mock, "vorname@nachname.de") + + with app.app_context(): + from project.models import EventSuggestion, EventReviewStatus + + suggestion = EventSuggestion.query.get(event_suggestion_id) + assert suggestion.review_status == EventReviewStatus.rejected + assert suggestion.rejection_resaon is None diff --git a/tests/views/test_image.py b/tests/views/test_image.py new file mode 100644 index 0000000..65377a1 --- /dev/null +++ b/tests/views/test_image.py @@ -0,0 +1,6 @@ +def test_read(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + image_id = seeder.upsert_default_image() + + url = utils.get_url("image", id=image_id) + utils.get_ok(url) diff --git a/tests/views/test_manage.py b/tests/views/test_manage.py new file mode 100644 index 0000000..2ffea5b --- /dev/null +++ b/tests/views/test_manage.py @@ -0,0 +1,134 @@ +import pytest + + +def test_index_noCookie(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + + response = utils.get_endpoint("manage") + utils.assert_response_redirect(response, "manage_admin_units") + + +def test_index_withValidCookie(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + client.set_cookie("localhost", "manage_admin_unit_id", str(admin_unit_id)) + + response = utils.get_endpoint("manage") + utils.assert_response_redirect(response, "manage_admin_unit", id=admin_unit_id) + + +def test_index_withInvalidCookie(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + client.set_cookie("localhost", "manage_admin_unit_id", "invalid") + + response = utils.get_endpoint("manage") + utils.assert_response_redirect(response, "manage_admin_units") + + +def test_admin_unit(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + + response = utils.get_endpoint("manage_admin_unit", id=admin_unit_id) + utils.assert_response_redirect( + response, "manage_admin_unit_events", id=admin_unit_id + ) + + +def test_admin_unit_404(client, seeder, utils): + owner_id = seeder.create_user("owner@owner") + admin_unit_id = seeder.create_admin_unit(owner_id, "Other crew") + seeder.create_user() + utils.login() + + response = utils.get_endpoint("manage_admin_unit", id=admin_unit_id) + utils.assert_response_notFound(response) + + +def test_admin_unit_event_reviews(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + + utils.get_endpoint_ok("manage_admin_unit_event_reviews", id=admin_unit_id) + + +def test_admin_unit_events(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + + utils.get_endpoint_ok( + "manage_admin_unit_events", + id=admin_unit_id, + date_from="2020-10-03", + date_to="2021-10-03", + ) + + +def test_admin_unit_events_invalidDateFormat(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + + utils.get_endpoint_ok( + "manage_admin_unit_events", + id=admin_unit_id, + date_from="03.10.2020", + date_to="2021-10-03", + ) + utils.get_endpoint_ok( + "manage_admin_unit_events", id=admin_unit_id, date_from="", date_to="" + ) + + +def test_admin_unit_organizers(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + + utils.get_endpoint_ok("manage_admin_unit_organizers", id=admin_unit_id) + + +def test_admin_unit_event_places(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + + utils.get_endpoint_ok("manage_admin_unit_event_places", id=admin_unit_id) + + +def test_admin_unit_members(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + + utils.get_endpoint_ok("manage_admin_unit_members", id=admin_unit_id) + + +def test_admin_unit_members_permission_missing(client, seeder, utils): + owner_id, admin_unit_id, member_id = seeder.setup_base_event_verifier() + + response = utils.get_endpoint("manage_admin_unit_members", id=admin_unit_id) + utils.assert_response_permission_missing( + response, "manage_admin_unit", id=admin_unit_id + ) + + +@pytest.mark.parametrize("db_error", [True, False]) +def test_admin_unit_widgets(client, seeder, utils, mocker, db_error): + user_id, admin_unit_id = seeder.setup_base() + + url = utils.get_url("manage_admin_unit_widgets", id=admin_unit_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + response = utils.post_form(url, response, {}) + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_widgets", id=admin_unit_id + ) + + +def test_admin_unit_widgets_permission_missing(client, seeder, utils, mocker): + owner_id, admin_unit_id, member_id = seeder.setup_base_event_verifier() + + url = utils.get_url("manage_admin_unit_widgets", id=admin_unit_id) + response = utils.get_ok(url) + response = utils.post_form(url, response, {}) + + utils.assert_response_permission_missing( + response, "manage_admin_unit", id=admin_unit_id + ) diff --git a/tests/views/test_organizer.py b/tests/views/test_organizer.py new file mode 100644 index 0000000..e05e7fe --- /dev/null +++ b/tests/views/test_organizer.py @@ -0,0 +1,121 @@ +import pytest + + +@pytest.mark.parametrize("db_error", [True, False]) +def test_create(client, app, utils, seeder, mocker, db_error): + user_id, admin_unit_id = seeder.setup_base() + + url = utils.get_url("manage_admin_unit_organizer_create", id=admin_unit_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + response = utils.post_form( + url, + response, + { + "name": "Neuer Organisator", + "logo-image_file": seeder.get_default_image_upload(), + }, + ) + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_organizers", id=admin_unit_id + ) + + with app.app_context(): + from project.models import EventOrganizer + + organizer = ( + EventOrganizer.query.filter(EventOrganizer.admin_unit_id == admin_unit_id) + .filter(EventOrganizer.name == "Neuer Organisator") + .first() + ) + assert organizer is not None + + +@pytest.mark.parametrize("db_error", [True, False]) +def test_update(client, seeder, utils, app, mocker, db_error): + user_id, admin_unit_id = seeder.setup_base() + organizer_id = seeder.upsert_default_event_organizer(admin_unit_id) + + url = utils.get_url("organizer_update", id=organizer_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + response = utils.post_form( + url, + response, + { + "name": "Neuer Name", + "logo-delete_flag": "y", + }, + ) + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_organizers", id=admin_unit_id + ) + + with app.app_context(): + from project.models import EventOrganizer + + organizer = EventOrganizer.query.get(organizer_id) + assert organizer.name == "Neuer Name" + + +@pytest.mark.parametrize("db_error", [True, False]) +@pytest.mark.parametrize("non_match", [True, False]) +def test_delete(client, seeder, utils, app, mocker, db_error, non_match): + user_id, admin_unit_id = seeder.setup_base() + organizer_id = seeder.upsert_event_organizer(admin_unit_id, "Mein Organisator") + + url = utils.get_url("organizer_delete", id=organizer_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + form_name = "Mein Organisator" + + if non_match: + form_name = "Falscher Name" + + response = utils.post_form( + url, + response, + { + "name": form_name, + }, + ) + + if non_match: + utils.assert_response_error_message( + response, + b"Der eingegebene Name entspricht nicht dem Namen des Veranstalters", + ) + return + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_organizers", id=admin_unit_id + ) + + with app.app_context(): + from project.models import EventOrganizer + + organizer = EventOrganizer.query.get(organizer_id) + assert organizer is None diff --git a/tests/views/test_planing.py b/tests/views/test_planing.py new file mode 100644 index 0000000..b4af6bb --- /dev/null +++ b/tests/views/test_planing.py @@ -0,0 +1,9 @@ +def test_list(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + seeder.create_event(admin_unit_id) + + url = utils.get_url("planing") + utils.get_ok(url) + + url = utils.get_url("planing", keyword="name") + utils.get_ok(url) diff --git a/tests/views/test_reference.py b/tests/views/test_reference.py new file mode 100644 index 0000000..624fee0 --- /dev/null +++ b/tests/views/test_reference.py @@ -0,0 +1,163 @@ +import pytest + + +@pytest.mark.parametrize("db_error", [True, False]) +def test_create(client, app, utils, seeder, mocker, db_error): + user_id, admin_unit_id = seeder.setup_base() + other_admin_unit_id = seeder.create_admin_unit(user_id, "Other Crew") + event_id = seeder.create_event(other_admin_unit_id) + + url = utils.get_url("event_reference", event_id=event_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + response = utils.post_form( + url, + response, + {"admin_unit_id": admin_unit_id}, + ) + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect(response, "event", event_id=event_id) + + with app.app_context(): + from project.models import EventReference + + reference = ( + EventReference.query.filter(EventReference.admin_unit_id == admin_unit_id) + .filter(EventReference.event_id == event_id) + .first() + ) + assert reference is not None + + +def test_create_401(client, app, utils, seeder, mocker): + seeder.create_user() + seeder._utils.login() + + owner_id = seeder.create_user("owner@owner") + other_admin_unit_id = seeder.create_admin_unit(owner_id, "Other Crew") + event_id = seeder.create_event(other_admin_unit_id) + + url = utils.get_url("event_reference", event_id=event_id) + response = client.get(url) + assert response.status_code == 401 + + +@pytest.mark.parametrize("db_error", [True, False]) +def test_update(client, seeder, utils, app, mocker, db_error): + user_id, admin_unit_id = seeder.setup_base() + ( + other_user_id, + other_admin_unit_id, + event_id, + reference_id, + ) = seeder.create_any_reference(admin_unit_id) + + url = utils.get_url("event_reference_update", id=reference_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + response = utils.post_form( + url, + response, + { + "rating": 70, + }, + ) + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_references_incoming", id=admin_unit_id + ) + + with app.app_context(): + from project.models import EventReference + + reference = EventReference.query.get(reference_id) + assert reference.rating == 70 + + +@pytest.mark.parametrize("db_error", [True, False]) +@pytest.mark.parametrize("non_match", [True, False]) +def test_delete(client, seeder, utils, app, mocker, db_error, non_match): + user_id, admin_unit_id = seeder.setup_base() + ( + other_user_id, + other_admin_unit_id, + event_id, + reference_id, + ) = seeder.create_any_reference(admin_unit_id) + + url = utils.get_url("reference_delete", id=reference_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + form_name = "Name" + + if non_match: + form_name = "Falscher Name" + + response = utils.post_form( + url, + response, + { + "name": form_name, + }, + ) + + if non_match: + utils.assert_response_error_message( + response, + b"Der eingegebene Name entspricht nicht dem Namen der Veranstaltung", + ) + return + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_references_incoming", id=admin_unit_id + ) + + with app.app_context(): + from project.models import EventReference + + reference = EventReference.query.get(reference_id) + assert reference is None + + +def test_admin_unit_references_incoming(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + ( + other_user_id, + other_admin_unit_id, + event_id, + reference_id, + ) = seeder.create_any_reference(admin_unit_id) + + utils.get_endpoint_ok("manage_admin_unit_references_incoming", id=admin_unit_id) + + +def test_admin_unit_references_outgoing(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + + other_user_id = seeder.create_user("other@test.de") + other_admin_unit_id = seeder.create_admin_unit(other_user_id, "Other Crew") + seeder.create_reference(event_id, other_admin_unit_id) + + utils.get_endpoint_ok("manage_admin_unit_references_outgoing", id=admin_unit_id) diff --git a/tests/views/test_reference_request.py b/tests/views/test_reference_request.py new file mode 100644 index 0000000..9aa433d --- /dev/null +++ b/tests/views/test_reference_request.py @@ -0,0 +1,70 @@ +import pytest + + +@pytest.mark.parametrize("db_error", [True, False]) +def test_create(client, app, utils, seeder, mocker, db_error): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + + other_user_id = seeder.create_user("other@test.de") + other_admin_unit_id = seeder.create_admin_unit(other_user_id, "Other Crew") + + url = utils.get_url("event_reference_request_create", event_id=event_id) + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + mail_mock = utils.mock_send_mails(mocker) + response = utils.post_form( + url, + response, + {"admin_unit_id": other_admin_unit_id}, + ) + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect(response, "event", event_id=event_id) + utils.assert_send_mail_called(mail_mock, "other@test.de") + + with app.app_context(): + from project.models import ( + EventReferenceRequest, + EventReferenceRequestReviewStatus, + ) + + reference_request = ( + EventReferenceRequest.query.filter( + EventReferenceRequest.admin_unit_id == other_admin_unit_id + ) + .filter(EventReferenceRequest.event_id == event_id) + .first() + ) + assert reference_request is not None + assert ( + reference_request.review_status == EventReferenceRequestReviewStatus.inbox + ) + + +def test_admin_unit_reference_requests_incoming(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + seeder.create_incoming_reference_request(admin_unit_id) + + utils.get_endpoint_ok( + "manage_admin_unit_reference_requests_incoming", id=admin_unit_id + ) + + +def test_admin_unit_reference_requests_outgoing(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + + other_user_id = seeder.create_user("other@test.de") + other_admin_unit_id = seeder.create_admin_unit(other_user_id, "Other Crew") + seeder.create_reference_request(event_id, other_admin_unit_id) + + utils.get_endpoint_ok( + "manage_admin_unit_reference_requests_outgoing", id=admin_unit_id + ) diff --git a/tests/views/test_reference_request_review.py b/tests/views/test_reference_request_review.py new file mode 100644 index 0000000..4dab7d1 --- /dev/null +++ b/tests/views/test_reference_request_review.py @@ -0,0 +1,162 @@ +import pytest + + +@pytest.mark.parametrize("db_error", [True, False]) +@pytest.mark.parametrize("is_verified", [True, False]) +def test_review_verify(client, seeder, utils, app, mocker, db, db_error, is_verified): + user_id, admin_unit_id = seeder.setup_base() + ( + other_user_id, + other_admin_unit_id, + event_id, + reference_request_id, + ) = seeder.create_incoming_reference_request(admin_unit_id) + + url = utils.get_url("event_reference_request_review", id=reference_request_id) + + if is_verified: + with app.app_context(): + from project.models import ( + EventReferenceRequest, + EventReferenceRequestReviewStatus, + ) + + reference_request = EventReferenceRequest.query.get(reference_request_id) + reference_request.review_status = EventReferenceRequestReviewStatus.verified + db.session.commit() + + response = client.get(url) + utils.assert_response_redirect( + response, "manage_admin_unit_reference_requests_incoming", id=admin_unit_id + ) + return + + response = utils.get_ok(url) + + if db_error: + utils.mock_db_commit(mocker) + + mail_mock = utils.mock_send_mails(mocker) + response = utils.post_form( + url, + response, + { + "review_status": 2, + }, + ) + + if db_error: + utils.assert_response_db_error(response) + return + + utils.assert_response_redirect( + response, "manage_admin_unit_reference_requests_incoming", id=admin_unit_id + ) + utils.assert_send_mail_called(mail_mock, "other@test.de") + + with app.app_context(): + from project.models import ( + EventReferenceRequest, + EventReferenceRequestReviewStatus, + EventReference, + ) + + reference_request = EventReferenceRequest.query.get(reference_request_id) + assert reference_request.verified + + reference = EventReference.query.get(reference_request_id) + assert reference.event_id == event_id + assert reference.admin_unit_id == admin_unit_id + + +def test_review_reject(client, seeder, utils, app, mocker): + user_id, admin_unit_id = seeder.setup_base() + ( + other_user_id, + other_admin_unit_id, + event_id, + reference_request_id, + ) = seeder.create_incoming_reference_request(admin_unit_id) + + url = utils.get_url("event_reference_request_review", id=reference_request_id) + response = utils.get_ok(url) + + response = utils.post_form( + url, + response, + { + "review_status": 3, + "rejection_reason": 0, + }, + ) + + utils.assert_response_redirect( + response, "manage_admin_unit_reference_requests_incoming", id=admin_unit_id + ) + + with app.app_context(): + from project.models import ( + EventReferenceRequest, + EventReferenceRequestReviewStatus, + ) + + reference_request = EventReferenceRequest.query.get(reference_request_id) + assert ( + reference_request.review_status + == EventReferenceRequestReviewStatus.rejected + ) + assert reference_request.rejection_reason is None + + +def test_review_status(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + ( + other_user_id, + other_admin_unit_id, + event_id, + reference_request_id, + ) = seeder.create_incoming_reference_request(admin_unit_id) + + url = utils.get_url( + "event_reference_request_review_status", id=reference_request_id + ) + utils.get_ok(url) + + +def test_review_status_401_third(client, seeder, utils): + seeder.create_user() + seeder._utils.login() + + third_user_id = seeder.create_user("third@third.de") + third_admin_unit_id = seeder.create_admin_unit(third_user_id, "Third Crew") + ( + other_user_id, + other_admin_unit_id, + event_id, + reference_request_id, + ) = seeder.create_incoming_reference_request(third_admin_unit_id) + + url = utils.get_url( + "event_reference_request_review_status", id=reference_request_id + ) + response = client.get(url) + assert response.status_code == 401 + + +def test_review_status_401_unauthorized(client, seeder, utils): + seeder.create_user() + + third_user_id = seeder.create_user("third@third.de") + third_admin_unit_id = seeder.create_admin_unit(third_user_id, "Third Crew") + ( + other_user_id, + other_admin_unit_id, + event_id, + reference_request_id, + ) = seeder.create_incoming_reference_request(third_admin_unit_id) + + url = utils.get_url( + "event_reference_request_review_status", id=reference_request_id + ) + response = client.get(url) + assert response.status_code == 401 diff --git a/tests/views/test_root.py b/tests/views/test_root.py new file mode 100644 index 0000000..25377db --- /dev/null +++ b/tests/views/test_root.py @@ -0,0 +1,27 @@ +def test_home(client, seeder, utils): + url = utils.get_url("home") + utils.get_ok(url) + + url = utils.get_url("home", src="infoscreen") + response = client.get(url) + utils.assert_response_redirect(response, "home") + + +def test_example(client, seeder, utils): + url = utils.get_url("example") + utils.get_ok(url) + + +def test_impressum(client, seeder, utils): + url = utils.get_url("impressum") + utils.get_ok(url) + + +def test_datenschutz(client, seeder, utils): + url = utils.get_url("datenschutz") + utils.get_ok(url) + + +def test_developer(client, seeder, utils): + url = utils.get_url("developer") + utils.get_ok(url) diff --git a/tests/views/test_user.py b/tests/views/test_user.py new file mode 100644 index 0000000..38ef52e --- /dev/null +++ b/tests/views/test_user.py @@ -0,0 +1,6 @@ +def test_profile(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + seeder.create_event(admin_unit_id) + + url = utils.get_url("profile", id=1) + utils.get_ok(url) diff --git a/tests/views/test_utils.py b/tests/views/test_utils.py new file mode 100644 index 0000000..d5ee5df --- /dev/null +++ b/tests/views/test_utils.py @@ -0,0 +1,31 @@ +def test_send_mails(client, seeder, app, utils): + from project.models import AdminUnitMemberInvitation + from project.views.utils import send_mail + + user_id, admin_unit_id = seeder.setup_base() + email = "new@member.de" + invitation_id = seeder.create_invitation(admin_unit_id, email) + + with app.test_request_context(): + with app.app_context(): + from project import mail + + mail.default_sender = None + invitation = AdminUnitMemberInvitation.query.get(invitation_id) + send_mail( + email, + "You have received an invitation", + "invitation_notice", + invitation=invitation, + ) + + +def test_get_pagination_urls(client, seeder, app, utils): + user_id, admin_unit_id = seeder.setup_base() + + for i in range(31): + seeder.upsert_event_organizer(admin_unit_id, "Organizer %d" % i) + + utils.get_endpoint_ok( + "manage_admin_unit_organizers", id=admin_unit_id, page=2, per_page=10 + ) diff --git a/tests/views/test_widget.py b/tests/views/test_widget.py new file mode 100644 index 0000000..4eb4788 --- /dev/null +++ b/tests/views/test_widget.py @@ -0,0 +1,186 @@ +import pytest + + +def test_event_dates(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + seeder.create_event(admin_unit_id) + au_short_name = "meinecrew" + + url = utils.get_url("widget_event_dates", au_short_name=au_short_name) + utils.get_ok(url) + + url = utils.get_url( + "widget_event_dates", au_short_name=au_short_name, keyword="name" + ) + utils.get_ok(url) + + url = utils.get_url( + "widget_event_dates", au_short_name=au_short_name, category_id=1 + ) + utils.get_ok(url) + + url = utils.get_url( + "widget_event_dates", + au_short_name=au_short_name, + coordinate="51.9077888,10.4333312", + distance=500, + ) + utils.get_ok(url) + + url = utils.get_url( + "widget_event_dates", + au_short_name=au_short_name, + date_from="2020-10-03", + date_to="2021-10-03", + ) + utils.get_ok(url) + + +def test_event_date(client, seeder, utils, app, db): + user_id, admin_unit_id = seeder.setup_base() + event_id = seeder.create_event(admin_unit_id) + au_short_name = "meinecrew" + + with app.app_context(): + from project.models import AdminUnit + from colour import Color + + admin_unit = AdminUnit.query.get(admin_unit_id) + admin_unit.widget_font = "Arial" + admin_unit.widget_background_color = Color("#F5F5F5") + admin_unit.widget_primary_color = Color("#000000") + admin_unit.widget_link_color = Color("#FF0000") + db.session.commit() + + url = utils.get_url("widget_event_date", au_short_name=au_short_name, id=event_id) + utils.get_ok(url) + + +def test_infoscreen(client, seeder, utils): + user_id, admin_unit_id = seeder.setup_base() + seeder.create_event(admin_unit_id) + au_short_name = "meinecrew" + + url = utils.get_url("widget_infoscreen", au_short_name=au_short_name) + utils.get_ok(url) + + organizer_id = seeder.upsert_default_event_organizer(admin_unit_id) + url = utils.get_url( + "widget_infoscreen", au_short_name=au_short_name, organizer_id=organizer_id + ) + utils.get_ok(url) + + +def get_create_data(): + return { + "accept_tos": "y", + "name": "Vorschlag", + "start": ["2030-12-31", "23", "59"], + "contact_name": "Vorname Nachname", + "contact_email": "vorname@nachname.de", + "contact_email_notice": "y", + "event_place_id": "Freitext Ort", + "organizer_id": "Freitext Organisator", + } + + +@pytest.mark.parametrize("db_error", [True, False]) +@pytest.mark.parametrize("free_text", [True, False]) +def test_event_suggestion_create_for_admin_unit( + client, app, seeder, utils, mocker, db_error, free_text +): + user_id = seeder.create_user() + admin_unit_id = seeder.create_admin_unit(user_id, "Meine Crew") + au_short_name = "meinecrew" + + url = utils.get_url( + "event_suggestion_create_for_admin_unit", au_short_name=au_short_name + ) + response = utils.get_ok(url) + + data = get_create_data() + if not free_text: + data["event_place_id"] = seeder.upsert_default_event_place(admin_unit_id) + data["organizer_id"] = seeder.upsert_default_event_organizer(admin_unit_id) + + if db_error: + utils.mock_db_commit(mocker) + + mail_mock = utils.mock_send_mails(mocker) + + response = utils.post_form( + url, + response, + data, + ) + + if db_error: + utils.assert_response_db_error(response) + return + + with app.app_context(): + from project.models import ( + EventSuggestion, + EventReviewStatus, + ) + + suggestion = ( + EventSuggestion.query.filter(EventSuggestion.admin_unit_id == admin_unit_id) + .filter(EventSuggestion.name == "Vorschlag") + .first() + ) + assert suggestion is not None + assert suggestion.review_status == EventReviewStatus.inbox + suggestion_id = suggestion.id + + utils.assert_response_redirect( + response, "event_suggestion_review_status", event_suggestion_id=suggestion_id + ) + utils.assert_send_mail_called(mail_mock, "test@test.de") + + +def test_event_suggestion_create_for_admin_unit_emptyFreeText( + client, app, seeder, utils, mocker +): + user_id = seeder.create_user() + seeder.create_admin_unit(user_id, "Meine Crew") + au_short_name = "meinecrew" + + url = utils.get_url( + "event_suggestion_create_for_admin_unit", au_short_name=au_short_name + ) + response = utils.get_ok(url) + + data = get_create_data() + data["event_place_id"] = " " + data["organizer_id"] = " " + + response = utils.post_form( + url, + response, + data, + ) + utils.assert_response_error_message(response) + + +def test_event_suggestion_create_for_admin_unit_invalidEventPlaceId( + client, app, seeder, utils, mocker +): + user_id = seeder.create_user() + seeder.create_admin_unit(user_id, "Meine Crew") + au_short_name = "meinecrew" + + url = utils.get_url( + "event_suggestion_create_for_admin_unit", au_short_name=au_short_name + ) + response = utils.get_ok(url) + + data = get_create_data() + data["event_place_id"] = "\u00B2" # unicode for ² + + response = utils.post_form( + url, + response, + data, + ) + assert response.status_code == 302