diff --git a/.flake8 b/.flake8 index 6745d6a..bc74fd8 100644 --- a/.flake8 +++ b/.flake8 @@ -33,3 +33,5 @@ ignore = per-file-ignores = # allow nicely aligned parametrizations **/*_test.py:E241 + # allow long lines in migrations (only do that for raw SQL please) + **/migrations/*.py:E501 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 469f2e7..bf62974 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,7 +30,7 @@ jobs: echo "unsupported event: $GITHUB_EVENT_NAME" exit 1 fi - if [[ $upstream_branch != master && $upstream_branch != *-maintenance ]]; then + if [[ $upstream_branch != master && $upstream_branch != *-maintenance && $upstream_branch != search ]]; then echo "assuming there is no branch named ${upstream_branch} in indico; defaulting to master" upstream_branch=master else @@ -190,6 +190,7 @@ jobs: strategy: matrix: include: + - plugin: citadel - plugin: livesync - plugin: payment_paypal - plugin: vc_zoom @@ -221,6 +222,11 @@ jobs: echo "VIRTUAL_ENV=$(pwd)/.venv" >> $GITHUB_ENV echo "$(pwd)/.venv/bin" >> $GITHUB_PATH + - name: Install extra dependencies + if: matrix.plugin == 'citadel' + run: | + pip install -e "${GITHUB_WORKSPACE}/livesync/" + - name: Install plugin run: | cd "${GITHUB_WORKSPACE}/${{ matrix.plugin }}" diff --git a/citadel/MANIFEST.in b/citadel/MANIFEST.in new file mode 100644 index 0000000..e6902c6 --- /dev/null +++ b/citadel/MANIFEST.in @@ -0,0 +1,4 @@ +graft indico_citadel/migrations +graft indico_citadel/translations + +global-exclude *.pyc __pycache__ .keep diff --git a/citadel/indico_citadel/__init__.py b/citadel/indico_citadel/__init__.py new file mode 100644 index 0000000..daa263b --- /dev/null +++ b/citadel/indico_citadel/__init__.py @@ -0,0 +1,11 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +from indico.util.i18n import make_bound_gettext + + +_ = make_bound_gettext('citadel') diff --git a/citadel/indico_citadel/backend.py b/citadel/indico_citadel/backend.py new file mode 100644 index 0000000..511fbec --- /dev/null +++ b/citadel/indico_citadel/backend.py @@ -0,0 +1,355 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +import re +import time +from functools import cached_property +from operator import attrgetter +from pprint import pformat + +import requests +from jinja2.filters import do_filesizeformat +from pygments import highlight +from pygments.formatters.terminal256 import Terminal256Formatter +from pygments.lexers.agile import Python3Lexer +from requests.adapters import HTTPAdapter +from requests.exceptions import RequestException +from sqlalchemy import select +from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import contains_eager +from sqlalchemy.orm.attributes import flag_modified +from urllib3 import Retry +from werkzeug.urls import url_join + +from indico.core.db import db +from indico.modules.attachments import Attachment +from indico.modules.attachments.models.attachments import AttachmentFile, AttachmentType +from indico.modules.categories import Category +from indico.util.console import cformat, verbose_iterator +from indico.util.string import strip_control_chars + +from indico_citadel.models.id_map import CitadelIdMap, get_entry_type +from indico_citadel.schemas import (AttachmentRecordSchema, ContributionRecordSchema, EventNoteRecordSchema, + EventRecordSchema, SubContributionRecordSchema) +from indico_citadel.util import parallelize +from indico_livesync import LiveSyncBackendBase, SimpleChange, Uploader + + +lexer = Python3Lexer() +formatter = Terminal256Formatter(style='native') + + +def _format_change_str(change): + return ','.join(flag.name for flag in SimpleChange if change & flag) + + +def _print_record(record): + obj_type, obj_id, data, changes = record + print() # verbose_iterator during initial exports doesn't end its line + print(f'{_format_change_str(changes)}: {obj_type.name} {obj_id}') + if data is not None: + print(highlight(pformat(data), lexer, formatter)) + return record + + +class LiveSyncCitadelUploader(Uploader): + PARALLELISM_RECORDS = 250 + PARALLELISM_FILES = 200 + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.categories = None + self.search_app = self.backend.plugin.settings.get('search_backend_url') + self.endpoint_url = url_join(self.search_app, 'api/records/') + self.headers = { + 'Authorization': 'Bearer {}'.format(self.backend.plugin.settings.get('search_backend_token')) + } + + @cached_property + def schemas(self): + # this is a property because `self.categories` is only set to the cached data during an initial + # export, and if we create the schemas earlier the context won't get the new data. and using the + # property is cleaner than mutating `self.categories` after having passed it to the schemas... + return [ + EventRecordSchema(context={ + 'categories': self.categories, + 'schema': url_join(self.search_app, 'schemas/indico/events_v1.0.0.json'), + }), + ContributionRecordSchema(context={ + 'categories': self.categories, + 'schema': url_join(self.search_app, 'schemas/indico/contributions_v1.0.0.json'), + }), + SubContributionRecordSchema(context={ + 'categories': self.categories, + 'schema': url_join(self.search_app, 'schemas/indico/subcontributions_v1.0.0.json'), + }), + AttachmentRecordSchema(context={ + 'categories': self.categories, + 'schema': url_join(self.search_app, 'schemas/indico/attachments_v1.0.0.json'), + }), + EventNoteRecordSchema(context={ + 'categories': self.categories, + 'schema': url_join(self.search_app, 'schemas/indico/notes_v1.0.0.json'), + }) + ] + + def dump_record(self, obj): + for schema in self.schemas: + if isinstance(obj, schema.Meta.model): + return schema.dump(obj) + raise ValueError(f'unknown object ref: {obj}') + + def _citadel_create(self, session, object_type, object_id, data): + self.logger.debug('Creating %s %d on citadel', object_type.name, object_id) + try: + resp = session.post(self.endpoint_url, json=data) + resp.raise_for_status() + except RequestException as exc: + if resp := exc.response: + raise Exception(f'Could not create record on citadel: {resp.status_code}; {resp.text}; {data}') + raise Exception(f'Could not create record on citadel: {exc}; {data}') + response_data = resp.json() + new_citadel_id = int(response_data['metadata']['control_number']) + try: + self.logger.debug('Created mapping for %s %d -> citadel id %d', object_type.name, object_id, new_citadel_id) + CitadelIdMap.create(object_type, object_id, new_citadel_id) + db.session.commit() + except IntegrityError: + # if we already have a mapping entry, delete the newly created record and + # update the existing one in case something changed in the meantime + self.logger.error(f'{object_type.name.title()} %d already in citadel; deleting+updating', object_id) + db.session.rollback() + self._citadel_delete(session, new_citadel_id, delete_mapping=False) + existing_citadel_id = CitadelIdMap.get_citadel_id(object_type, object_id) + assert existing_citadel_id is not None + self._citadel_update(session, existing_citadel_id, data) + resp.close() + + def _citadel_update(self, session, citadel_id, data): + self.logger.debug('Updating record %d on citadel', citadel_id) + try: + resp = session.put(url_join(self.search_app, f'api/record/{citadel_id}'), json=data) + self.logger.debug('Updated %d on citadel', citadel_id) + resp.raise_for_status() + resp.close() + except RequestException as exc: + if resp := exc.response: + raise Exception(f'Could not update record {citadel_id} on citadel: ' + f'{resp.status_code}; {resp.text}; {data}') + raise Exception(f'Could not update record {citadel_id} on citadel: {exc}; {data}') + + def _citadel_delete(self, session, citadel_id, *, delete_mapping): + self.logger.debug('Deleting record %d from citadel', citadel_id) + try: + resp = session.delete(url_join(self.search_app, f'api/record/{citadel_id}')) + self.logger.debug('Deleted %d from citadel', citadel_id) + resp.raise_for_status() + resp.close() + except RequestException as exc: + if resp := exc.response: + raise Exception(f'Could not delete record {citadel_id} from citadel: {resp.status_code}; {resp.text}') + raise Exception(f'Could not delete record {citadel_id} from citadel: {exc}') + if delete_mapping: + CitadelIdMap.query.filter_by(citadel_id=citadel_id).delete() + db.session.commit() + + def upload_record(self, entry, session): + object_type, object_id, data, change_type = entry + + if change_type & SimpleChange.created: + self._citadel_create(session, object_type, object_id, data) + elif change_type & SimpleChange.updated: + citadel_id = CitadelIdMap.get_citadel_id(object_type, object_id) + if citadel_id is None: + raise Exception(f'Cannot update {object_type.name} {object_id}: No citadel ID found') + self._citadel_update(session, citadel_id, data) + elif change_type & SimpleChange.deleted: + citadel_id = CitadelIdMap.get_citadel_id(object_type, object_id) + if citadel_id is None: + raise Exception(f'Cannot delete {object_type.name} {object_id}: No citadel ID found') + self._citadel_delete(session, citadel_id, delete_mapping=True) + + def upload_file(self, entry, session): + self.logger.debug('Uploading attachment %d (%s) [%s]', entry.attachment.file.id, + entry.attachment.file.filename, do_filesizeformat(entry.attachment.file.size)) + ts = time.time() + with entry.attachment.file.open() as file: + delta = time.time() - ts + self.logger.debug('File opened: %d (%s) [%.03fs]', entry.attachment.file.id, + entry.attachment.file.filename, delta) + ts = time.time() + resp = session.put( + url_join(self.search_app, f'api/record/{entry.citadel_id}/files/attachment'), + data=file + ) + delta = time.time() - ts + self.logger.debug('Upload finished: %d (%s) [%.03fs]', entry.attachment.file.id, + entry.attachment.file.filename, delta) + if resp.ok: + entry.attachment_file_id = entry.attachment.file.id + db.session.merge(entry) + db.session.commit() + resp.close() + return True + else: + self.logger.error('Failed uploading attachment %d: [%d] %s', + entry.attachment.id, resp.status_code, resp.text) + resp.close() + return False + + def run_initial(self, records, total): + cte = Category.get_tree_cte(lambda cat: db.func.json_build_object('id', cat.id, 'title', cat.title)) + self.categories = dict(db.session.execute(select([cte.c.id, cte.c.path])).fetchall()) + return super().run_initial(records, total) + + def upload_records(self, records): + session = requests.Session() + retry = Retry( + total=10, + backoff_factor=3, + status_forcelist=[502, 503, 504], + allowed_methods=frozenset(['POST', 'PUT', 'DELETE']) + ) + session.mount(self.search_app, HTTPAdapter(max_retries=retry, pool_maxsize=self.PARALLELISM_RECORDS)) + session.headers = self.headers + dumped_records = ( + ( + get_entry_type(rec), rec.id, + self.dump_record(rec) if not change_type & SimpleChange.deleted else None, + change_type + ) for rec, change_type in records + ) + + if self.verbose: + dumped_records = (_print_record(x) for x in dumped_records) + + uploader = parallelize(self.upload_record, entries=dumped_records, batch_size=self.PARALLELISM_RECORDS) + __, aborted = uploader(session) + return not aborted + + def upload_files(self, files): + session = requests.Session() + retry = Retry( + total=10, + backoff_factor=3, + status_forcelist=[502, 503, 504], + allowed_methods=frozenset(['PUT']) + ) + session.mount(self.search_app, HTTPAdapter(max_retries=retry, pool_maxsize=self.PARALLELISM_FILES)) + session.headers = self.headers + uploader = parallelize(self.upload_file, entries=files, batch_size=self.PARALLELISM_FILES) + results, aborted = uploader(session) + return len(results), sum(1 for success in results if not success), aborted + + +class LiveSyncCitadelBackend(LiveSyncBackendBase): + """Citadel + + This backend uploads data to Citadel. + """ + + uploader = LiveSyncCitadelUploader + unique = True + reset_deletes_indexed_data = False + + def check_queue_status(self): + allowed, reason = super().check_queue_status() + if not allowed: + return False, reason + if not self.agent.settings.get('file_upload_done'): + return False, 'file upload pending' + return True, None + + def is_configured(self): + return bool(self.plugin.settings.get('search_backend_url') and self.plugin.settings.get('search_backend_token')) + + def set_initial_file_upload_state(self, state): + if self.agent.settings.get('file_upload_done') == state: + return + self.plugin.logger.info('Initial file upload flag set to %s', state) + self.agent.settings['file_upload_done'] = state + flag_modified(self.agent, 'settings') + + def get_initial_query(self, model_cls, force): + query = super().get_initial_query(model_cls, force) + if not force: + query = query.filter(~model_cls.citadel_id_mapping.has()) + return query + + def process_queue(self, uploader): + super().process_queue(uploader) + uploader_name = type(uploader).__name__ + self.plugin.logger.info(f'{uploader_name} starting file upload') + total, errors, aborted = self.run_export_files(verbose=False) + if aborted: + self.plugin.logger.info(f'{uploader_name} aborted after uploading %d files (%d failed)', total, errors) + else: + self.plugin.logger.info(f'{uploader_name} finished uploading %d files (%d failed)', total, errors) + + def run_initial_export(self, batch_size, force=False, verbose=False): + if not super().run_initial_export(batch_size, force, verbose): + print('Initial export failed') + return False + + print('Initial export finished') + + if self.get_initial_query(Attachment, force=True).has_rows(): + print('You need to export attachment contents as well') + print(cformat('To do so, run %{yellow!}indico citadel upload%{reset}')) + else: + # no files -> mark file upload as done so queue runs are possible + self.set_initial_file_upload_state(True) + return True + + def run_export_files(self, batch=1000, force=False, max_size=None, verbose=True): + from indico_citadel.plugin import CitadelPlugin + + if max_size is None: + max_size = CitadelPlugin.settings.get('max_file_size') + + attachments = ( + CitadelIdMap.query + .join(Attachment) + .join(AttachmentFile, Attachment.file_id == AttachmentFile.id) + .filter(Attachment.type == AttachmentType.file) + .filter(AttachmentFile.size > 0, AttachmentFile.size <= max_size * 1024 * 1024) + .filter(db.func.lower(AttachmentFile.extension).in_( + [s.lower() for s in CitadelPlugin.settings.get('file_extensions')] + )) + .options(contains_eager(CitadelIdMap.attachment).contains_eager(Attachment.file)) + ) + if not force: + attachments = attachments.filter(db.or_(CitadelIdMap.attachment_file_id.is_(None), + CitadelIdMap.attachment_file_id != Attachment.file_id)) + uploader = self.uploader(self) + attachments = attachments.yield_per(batch) + total = attachments.count() + if verbose: + attachments = verbose_iterator(attachments, total, attrgetter('id'), + lambda obj: re.sub(r'\s+', ' ', strip_control_chars(obj.attachment.title)), + print_total_time=True) + else: + self.plugin.logger.info(f'{total} files need to be uploaded') + total, errors, aborted = uploader.upload_files(attachments) + return total, errors, aborted + + def check_reset_status(self): + if not self.is_configured(): + return False, 'Citadel is not properly configured.' + if ( + not CitadelIdMap.query.has_rows() and + not self.agent.queue.has_rows() and + not self.agent.initial_data_exported + ): + return False, 'It looks like you did not export any data to Citadel yet so there is nothing to reset.' + return True, None + + def reset(self): + super().reset() + self.set_initial_file_upload_state(False) + CitadelIdMap.query.delete() diff --git a/citadel/indico_citadel/cli.py b/citadel/indico_citadel/cli.py new file mode 100644 index 0000000..0e21690 --- /dev/null +++ b/citadel/indico_citadel/cli.py @@ -0,0 +1,57 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +import click + +from indico.cli.core import cli_group +from indico.core.db import db +from indico.util.console import cformat + +from indico_citadel.models.id_map import CitadelIdMap +from indico_livesync.models.agents import LiveSyncAgent + + +@cli_group(name='citadel') +def cli(): + """Manage the Citadel plugin.""" + + +@cli.command() +@click.option('--force', '-f', is_flag=True, help="Upload even if it has already been done once.") +@click.option('--batch', type=int, default=1000, show_default=True, metavar='N', + help="The amount of records yielded per upload batch.") +@click.option('--max-size', type=int, metavar='SIZE', + help="The max size (in MB) of files to upload. Defaults to the size from the plugin settings.") +def upload(batch, force, max_size): + """Upload file contents for full text search.""" + agent = LiveSyncAgent.query.filter(LiveSyncAgent.backend_name == 'citadel').first() + if agent is None: + print('No citadel livesync agent found') + return + if not CitadelIdMap.query.has_rows(): + print('It looks like you did not export any data to Citadel yet.') + print(cformat('To do so, run %{yellow!}indico livesync initial-export {}%{reset}').format(agent.id)) + return + + backend = agent.create_backend() + if not backend.is_configured(): + print('Citadel is not properly configured.') + return + + total, errors, aborted = backend.run_export_files(batch, force, max_size=max_size) + if not errors and not aborted: + print(f'{total} files uploaded') + if max_size is None: + backend.set_initial_file_upload_state(True) + db.session.commit() + else: + print('Max size was set; not enabling queue runs.') + else: + if aborted: + print('Upload aborted') + print(f'{total} files processed, {errors} failed') + print('Please re-run this script; queue runs will remain disabled for now') diff --git a/citadel/indico_citadel/migrations/.no-headers b/citadel/indico_citadel/migrations/.no-headers new file mode 100644 index 0000000..e69de29 diff --git a/citadel/indico_citadel/migrations/20210330_1742_0cf18be7ade1_add_mapping_table.py b/citadel/indico_citadel/migrations/20210330_1742_0cf18be7ade1_add_mapping_table.py new file mode 100644 index 0000000..9d7abb3 --- /dev/null +++ b/citadel/indico_citadel/migrations/20210330_1742_0cf18be7ade1_add_mapping_table.py @@ -0,0 +1,63 @@ +"""Add mapping table + +Revision ID: 0cf18be7ade1 +Revises: +Create Date: 2021-03-30 17:42:59.493830 +""" + +from enum import Enum + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.sql.ddl import CreateSchema, DropSchema + +from indico.core.db.sqlalchemy import PyIntEnum + + +# revision identifiers, used by Alembic. +revision = '0cf18be7ade1' +down_revision = None +branch_labels = None +depends_on = None + + +class _EntryType(int, Enum): + event = 1 + contribution = 2 + subcontribution = 3 + attachment = 4 + note = 5 + + +def upgrade(): + op.execute(CreateSchema('plugin_citadel')) + op.create_table( + 'id_map', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('citadel_id', sa.Integer(), nullable=False, index=True, unique=True), + sa.Column('entry_type', PyIntEnum(_EntryType), nullable=False), + sa.Column('event_id', sa.Integer(), nullable=True, index=True, unique=True), + sa.Column('contrib_id', sa.Integer(), nullable=True, index=True, unique=True), + sa.Column('subcontrib_id', sa.Integer(), nullable=True, index=True, unique=True), + sa.Column('attachment_id', sa.Integer(), nullable=True, index=True, unique=True), + sa.Column('note_id', sa.Integer(), nullable=True, index=True, unique=True), + sa.Column('attachment_file_id', sa.Integer(), nullable=True, index=True, unique=True), + sa.CheckConstraint('entry_type != 1 OR (event_id IS NOT NULL AND attachment_id IS NULL AND contrib_id IS NULL AND note_id IS NULL AND subcontrib_id IS NULL)', name='valid_event_entry'), + sa.CheckConstraint('entry_type != 2 OR (contrib_id IS NOT NULL AND attachment_id IS NULL AND event_id IS NULL AND note_id IS NULL AND subcontrib_id IS NULL)', name='valid_contribution_entry'), + sa.CheckConstraint('entry_type != 3 OR (subcontrib_id IS NOT NULL AND attachment_id IS NULL AND contrib_id IS NULL AND event_id IS NULL AND note_id IS NULL)', name='valid_subcontribution_entry'), + sa.CheckConstraint('entry_type != 4 OR (attachment_id IS NOT NULL AND contrib_id IS NULL AND event_id IS NULL AND note_id IS NULL AND subcontrib_id IS NULL)', name='valid_attachment_entry'), + sa.CheckConstraint('entry_type != 5 OR (note_id IS NOT NULL AND attachment_id IS NULL AND contrib_id IS NULL AND event_id IS NULL AND subcontrib_id IS NULL)', name='valid_note_entry'), + sa.ForeignKeyConstraint(['attachment_id'], ['attachments.attachments.id']), + sa.ForeignKeyConstraint(['contrib_id'], ['events.contributions.id']), + sa.ForeignKeyConstraint(['event_id'], ['events.events.id']), + sa.ForeignKeyConstraint(['note_id'], ['events.notes.id']), + sa.ForeignKeyConstraint(['subcontrib_id'], ['events.subcontributions.id']), + sa.ForeignKeyConstraint(['attachment_file_id'], ['attachments.files.id']), + sa.PrimaryKeyConstraint('id'), + schema='plugin_citadel' + ) + + +def downgrade(): + op.drop_table('id_map', schema='plugin_citadel') + op.execute(DropSchema('plugin_citadel')) diff --git a/citadel/indico_citadel/models/__init__.py b/citadel/indico_citadel/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/citadel/indico_citadel/models/id_map.py b/citadel/indico_citadel/models/id_map.py new file mode 100644 index 0000000..a208d1c --- /dev/null +++ b/citadel/indico_citadel/models/id_map.py @@ -0,0 +1,201 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +from indico.core.db import db +from indico.core.db.sqlalchemy import PyIntEnum +from indico.modules.attachments import Attachment +from indico.modules.events import Event +from indico.modules.events.contributions import Contribution +from indico.modules.events.contributions.models.subcontributions import SubContribution +from indico.modules.events.notes.models.notes import EventNote +from indico.util.enum import IndicoEnum + + +class EntryType(int, IndicoEnum): + event = 1 + contribution = 2 + subcontribution = 3 + attachment = 4 + note = 5 + + +_types_for_model = { + Event: EntryType.event, + Contribution: EntryType.contribution, + SubContribution: EntryType.subcontribution, + Attachment: EntryType.attachment, + EventNote: EntryType.note, +} + +_column_for_types = { + EntryType.event: 'event_id', + EntryType.contribution: 'contrib_id', + EntryType.subcontribution: 'subcontrib_id', + EntryType.attachment: 'attachment_id', + EntryType.note: 'note_id' +} + + +def get_entry_type(entry): + for model, entry_type in _types_for_model.items(): + if isinstance(entry, model): + return entry_type + + +def _make_checks(): + available_columns = set(_column_for_types.values()) + for link_type in EntryType: + required_col = _column_for_types[link_type] + forbidden_cols = available_columns - {required_col} + criteria = [f'{required_col} IS NOT NULL'] + criteria += [f'{col} IS NULL' for col in sorted(forbidden_cols)] + condition = 'entry_type != {} OR ({})'.format(link_type, ' AND '.join(criteria)) + yield db.CheckConstraint(condition, f'valid_{link_type.name}_entry') + + +class CitadelIdMap(db.Model): + __tablename__ = 'id_map' + __table_args__ = tuple(_make_checks()) + ({'schema': 'plugin_citadel'},) + + id = db.Column( + db.Integer, + primary_key=True + ) + citadel_id = db.Column( + db.Integer, + nullable=False, + index=True, + unique=True + ) + entry_type = db.Column( + PyIntEnum(EntryType), + nullable=False + ) + event_id = db.Column( + db.Integer, + db.ForeignKey('events.events.id'), + index=True, + nullable=True, + unique=True + ) + contrib_id = db.Column( + db.Integer, + db.ForeignKey('events.contributions.id'), + index=True, + nullable=True, + unique=True + ) + subcontrib_id = db.Column( + db.Integer, + db.ForeignKey('events.subcontributions.id'), + index=True, + nullable=True, + unique=True + ) + attachment_id = db.Column( + db.Integer, + db.ForeignKey('attachments.attachments.id'), + index=True, + nullable=True, + unique=True + ) + note_id = db.Column( + db.Integer, + db.ForeignKey('events.notes.id'), + index=True, + nullable=True, + unique=True + ) + attachment_file_id = db.Column( + db.Integer, + db.ForeignKey('attachments.files.id'), + index=True, + nullable=True, + unique=True + ) + + event = db.relationship( + 'Event', + lazy=True, + backref=db.backref( + 'citadel_id_mapping', + uselist=False, + lazy=True + ) + ) + contribution = db.relationship( + 'Contribution', + lazy=True, + backref=db.backref( + 'citadel_id_mapping', + uselist=False, + lazy=True + ) + ) + subcontribution = db.relationship( + 'SubContribution', + lazy=True, + backref=db.backref( + 'citadel_id_mapping', + uselist=False, + lazy=True + ) + ) + attachment = db.relationship( + 'Attachment', + lazy=True, + backref=db.backref( + 'citadel_id_mapping', + uselist=False, + lazy=True + ) + ) + note = db.relationship( + 'EventNote', + lazy=True, + backref=db.backref( + 'citadel_id_mapping', + uselist=False, + lazy=True + ) + ) + attachment_file = db.relationship( + 'AttachmentFile', + lazy=True, + backref=db.backref( + 'citadel_id_mapping', + uselist=False, + lazy=True + ) + ) + + @classmethod + def get_citadel_id(cls, obj_type, obj_id): + """Get the citadel_id for a given object type and id. + + :param obj_type: The EntryType of the object + :param obj_id: The id of the object + """ + query = db.session.query(cls.citadel_id).filter_by(entry_type=obj_type) + attr = _column_for_types.get(obj_type) + if not attr: + raise Exception(f'Unsupported object type {obj_type}') + return query.filter(getattr(cls, attr) == obj_id).scalar() + + @classmethod + def create(cls, obj_type, obj_id, citadel_id): + """Create a new mapping. + + :param obj_type: The EntryType of the object + :param obj_id: The id of the object + :param citadel_id: The citadel entry ID + """ + attr = _column_for_types.get(obj_type) + if not attr: + raise Exception(f'Unsupported object type {obj_type}') + entry = cls(citadel_id=citadel_id, entry_type=obj_type, **{attr: obj_id}) + db.session.add(entry) diff --git a/citadel/indico_citadel/plugin.py b/citadel/indico_citadel/plugin.py new file mode 100644 index 0000000..0b80c4f --- /dev/null +++ b/citadel/indico_citadel/plugin.py @@ -0,0 +1,68 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +from wtforms.fields.html5 import IntegerField, URLField +from wtforms.validators import URL, DataRequired, NumberRange + +from indico.core import signals +from indico.core.plugins import PluginCategory +from indico.web.forms.base import IndicoForm +from indico.web.forms.fields import IndicoPasswordField, TextListField + +from indico_citadel import _ +from indico_citadel.backend import LiveSyncCitadelBackend +from indico_citadel.cli import cli +from indico_livesync import LiveSyncPluginBase + + +class CitadelSettingsForm(IndicoForm): + search_backend_url = URLField(_('Citadel URL'), [DataRequired(), URL(require_tld=False)], + description=_('The URL of the Citadel server')) + search_backend_token = IndicoPasswordField(_('Citadel API token'), [DataRequired()], toggle=True, + description=_('The authentication token to access Citadel')) + file_extensions = TextListField(_('File extensions'), + description=_('File extensions to upload for full-text search')) + max_file_size = IntegerField(_('Max. file size'), + [DataRequired(), NumberRange(min=1)], + description=_('Maximum size (in MB) to upload for full-text search. Note that ' + 'increasing this after the initial export will upload all files ' + 'for indexing that have not been uploaded before during the next queue ' + 'run, which may take a long time on larger instances. You may want ' + 'to run a manual upload for the new file size first!')) + + +class CitadelPlugin(LiveSyncPluginBase): + """Citadel + + Provides the search/livesync integration with Citadel + """ + + category = PluginCategory.search + configurable = True + settings_form = CitadelSettingsForm + default_settings = { + 'search_backend_url': '', + 'search_backend_token': '', + 'file_extensions': [ + 'key', 'odp', 'pps', 'ppt', 'pptx', 'ods', 'xls', 'xlsm', 'xlsx', 'doc', 'docx', 'odt', 'pdf', 'rtf', + 'tex', 'txt', 'wdp' + ], + 'max_file_size': 10, + } + backend_classes = {'citadel': LiveSyncCitadelBackend} + + def init(self): + super().init() + self.connect(signals.get_search_providers, self.get_search_providers) + self.connect(signals.plugin.cli, self._extend_indico_cli) + + def get_search_providers(self, sender, **kwargs): + from indico_citadel.search import CitadelProvider + return CitadelProvider + + def _extend_indico_cli(self, sender, **kwargs): + return cli diff --git a/citadel/indico_citadel/result_schemas.py b/citadel/indico_citadel/result_schemas.py new file mode 100644 index 0000000..dc4e013 --- /dev/null +++ b/citadel/indico_citadel/result_schemas.py @@ -0,0 +1,94 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +import math + +from marshmallow import fields, pre_load + +from indico.modules.search.base import SearchTarget +from indico.modules.search.result_schemas import (AggregationSchema, AttachmentResultSchema, BucketSchema, + EventResultSchema, ResultItemSchema, ResultSchema) + + +class CitadelEventResultSchema(EventResultSchema): + @pre_load + def _translate_keys(self, data, **kwargs): + data = data.copy() + data['event_type'] = data.pop('type_format') + return data + + +class CitadelAttachmentResultSchema(AttachmentResultSchema): + @pre_load + def _translate_keys(self, data, **kwargs): + data = data.copy() + data['attachment_type'] = data.pop('type_format') + return data + + +class _CitadelBucketSchema(BucketSchema): + @pre_load + def _make_filter(self, data, **kwargs): + data = data.copy() + range_from = data.pop('from_as_string', None) + range_to = data.pop('to_as_string', None) + if range_from or range_to: + data['filter'] = f'[{range_from or "*"} TO {range_to or "*"}]' + else: + data['filter'] = data['key'] + return data + + +class CitadelAggregationSchema(AggregationSchema): + buckets = fields.List(fields.Nested(_CitadelBucketSchema)) + + +class CitadelResultItemSchema(ResultItemSchema): + type_schemas = { + **ResultItemSchema.type_schemas, + SearchTarget.event.name: CitadelEventResultSchema, + SearchTarget.attachment.name: CitadelAttachmentResultSchema, + } + + +class CitadelResultSchema(ResultSchema): + results = fields.List(fields.Nested(CitadelResultItemSchema), required=True) + aggregations = fields.Dict(fields.String(), fields.Nested(CitadelAggregationSchema)) + + @pre_load + def _extract_data(self, data, **kwargs): + from .search import filters + + total = data['hits']['total'] + pages = min(1000, math.ceil(total / self.context['results_per_page'])) + # The citadel service stores every indexable/queryable attribute in a _data + # This extraction should ensure Indico is abstracted from that complexity + results = [ + { + **item['metadata'].pop('_data'), + **item['metadata'], + 'highlight': { + key.removeprefix('_data.'): value for key, value in item['highlight'].items() + } + } + for item in data['hits']['hits'] + ] + aggregations = { + key: { + 'label': str(filters[key]), # resolve lazy strings + 'buckets': value['buckets'] + } + for key, value in data['aggregations'].items() + if key in filters + } + + return { + 'aggregations': aggregations, + 'results': results, + 'total': total, + 'pages': pages, + } diff --git a/citadel/indico_citadel/schemas.py b/citadel/indico_citadel/schemas.py new file mode 100644 index 0000000..80ea46a --- /dev/null +++ b/citadel/indico_citadel/schemas.py @@ -0,0 +1,223 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +from marshmallow import post_dump +from webargs import fields + +from indico.core.config import config +from indico.core.db.sqlalchemy.principals import PrincipalType +from indico.core.marshmallow import mm +from indico.modules.attachments.models.attachments import Attachment +from indico.modules.events import Event +from indico.modules.events.contributions.models.contributions import Contribution +from indico.modules.events.contributions.models.subcontributions import SubContribution +from indico.modules.events.notes.models.notes import EventNote +from indico.modules.search.schemas import EventSchema +from indico.util.string import strip_tags +from indico.web.flask.util import url_for + +from indico_citadel.util import remove_none_entries +from indico_livesync.export_schemas import (AttachmentSchema, CategorySchema, ContributionSchema, EventNoteSchema, + SubContributionSchema) + + +PRINCIPAL_TYPES = { + PrincipalType.user, + PrincipalType.local_group, PrincipalType.multipass_group, + PrincipalType.event_role, PrincipalType.category_role, +} + + +def _get_identifiers(access_list): + return sorted(p.identifier for p in access_list if p.principal_type in PRINCIPAL_TYPES) + + +def _get_category_chain(event, categories): + if not event: + return None + if categories is not None: + return categories[event.category_id] + return CategorySchema(many=True).dump(event.detailed_category_chain) + + +class ACLSchema: + _access = fields.Method('_get_object_acl') + + def _get_acl(self, obj): + if isinstance(obj, SubContribution): + obj = obj.contribution + + if obj.is_public or not obj.is_protected: + # is_public is a cheaper check, while is_protected climbs up the chain + # until the first non-inheriting protection parent, so we can short-circuit + # the check + return None + return _get_identifiers(obj.get_access_list()) + + def _get_attachment_acl(self, attachment): + linked_object = attachment.folder.object + manager_list = set(linked_object.get_manager_list(recursive=True)) + + if attachment.is_self_protected: + return _get_identifiers({e for e in attachment.acl} | manager_list) + elif attachment.is_inheriting and attachment.folder.is_self_protected: + return _get_identifiers({e for e in attachment.folder.acl} | manager_list) + else: + return self._get_acl(linked_object) + + def _get_object_acl(self, object): + """Return the object ACLs. + + More information here https://cern-search.docs.cern.ch/usage/permissions/ + """ + default_acl = 'IndicoAdmin' + + if isinstance(object, (Event, Contribution)): + obj_acl = self._get_acl(object) + elif isinstance(object, SubContribution): + obj_acl = self._get_acl(object.contribution) + elif isinstance(object, Attachment): + obj_acl = self._get_attachment_acl(object) + elif isinstance(object, EventNote): + obj_acl = self._get_acl(object.object) + else: + raise ValueError(f'unknown object {object}') + + acl = { + 'owner': [default_acl], + 'update': [default_acl], + 'delete': [default_acl] + } + if obj_acl is not None: + acl['read'] = [default_acl] + obj_acl + + return acl + + +class RecordSchema(ACLSchema): + class Meta: + fields = ('_data', '_access', 'schema') + + schema = fields.Function(lambda _, ctx: ctx.get('schema'), data_key='$schema') + + @post_dump + def remove_none_fields(self, data, **kwargs): + """Remove fields that are None to avoid json schema validation errors.""" + return remove_none_entries(data) + + @post_dump + def site(self, data, **kwargs): + if data['_data']: + data['_data']['site'] = config.BASE_URL + return data + + +class EventRecordSchema(RecordSchema, EventSchema): + class Meta: + model = Event + indexable = ('title', 'description', 'keywords', 'location', 'persons') + non_indexable = ('type', 'event_type', 'event_id', 'url', 'category_id', 'category_path', 'start_dt', 'end_dt') + fields = RecordSchema.Meta.fields + non_indexable + + _data = fields.Function(lambda event: EventSchema(only=EventRecordSchema.Meta.indexable).dump(event)) + category_path = fields.Function(lambda e, ctx: _get_category_chain(e, ctx.get('categories'))) + # By default, CERNs global indexing requires external URLs + url = mm.String(attribute='external_url') + + @post_dump + def _transform(self, data, **kwargs): + data['type_format'] = data.pop('event_type') + if desc := data['_data'].get('description'): + data['_data']['description'] = strip_tags(desc).strip() + return data + + +class AttachmentRecordSchema(RecordSchema, AttachmentSchema): + class Meta: + model = Attachment + indexable = ('title', 'filename', 'user') + non_indexable = ('attachment_id', 'folder_id', 'type', 'attachment_type', 'event_id', 'contribution_id', + 'category_id', 'category_path', 'url', 'subcontribution_id', 'modified_dt') + fields = RecordSchema.Meta.fields + non_indexable + + _data = fields.Function(lambda at: AttachmentSchema(only=AttachmentRecordSchema.Meta.indexable).dump(at)) + category_path = fields.Function(lambda a, ctx: _get_category_chain(a.folder.event, ctx.get('categories'))) + url = mm.String(attribute='absolute_download_url') + + @post_dump + def _translate_keys(self, data, **kwargs): + data['type_format'] = data.pop('attachment_type') + return data + + +class ContributionRecordSchema(RecordSchema, ContributionSchema): + class Meta: + model = Contribution + indexable = ('title', 'description', 'location', 'persons') + non_indexable = ('contribution_id', 'type', 'contribution_type', 'event_id', 'url', 'category_id', + 'category_path', 'start_dt', 'end_dt', 'duration') + fields = RecordSchema.Meta.fields + non_indexable + + _data = fields.Function(lambda contrib: ContributionSchema( + only=ContributionRecordSchema.Meta.indexable + ).dump(contrib)) + category_path = fields.Function(lambda c, ctx: _get_category_chain(c.event, ctx.get('categories'))) + url = mm.Function(lambda contrib: url_for('contributions.display_contribution', contrib, _external=True)) + + @post_dump + def _transform(self, data, **kwargs): + if contribution_type := data.pop('contribution_type', None): + data['type_format'] = contribution_type + if desc := data['_data'].get('description'): + data['_data']['description'] = strip_tags(desc).strip() + return data + + +class SubContributionRecordSchema(RecordSchema, SubContributionSchema): + class Meta: + model = SubContribution + indexable = ('title', 'description', 'persons', 'location') + non_indexable = ('subcontribution_id', 'type', 'event_id', 'contribution_id', 'category_id', 'category_path', + 'url', 'start_dt', 'end_dt', 'duration') + fields = RecordSchema.Meta.fields + non_indexable + + _data = fields.Function(lambda subc: SubContributionSchema( + only=SubContributionRecordSchema.Meta.indexable + ).dump(subc)) + category_path = fields.Function(lambda subc, ctx: _get_category_chain(subc.event, ctx.get('categories'))) + url = mm.Function(lambda subc: url_for('contributions.display_subcontribution', subc, _external=True)) + + @post_dump + def _transform(self, data, **kwargs): + if desc := data['_data'].get('description'): + data['_data']['description'] = strip_tags(desc).strip() + return data + + +class _EventNoteDataSchema(EventNoteSchema): + class Meta: + fields = ('title', 'content', 'user') + + title = mm.Function(lambda note: f'{note.object.title} - Notes/Minutes') + + @post_dump + def _transform(self, data, **kwargs): + if desc := data.get('content'): + data['content'] = strip_tags(desc).strip() + return data + + +class EventNoteRecordSchema(RecordSchema, EventNoteSchema): + class Meta: + model = EventNote + non_indexable = ('note_id', 'type', 'event_id', 'contribution_id', 'subcontribution_id', 'category_id', + 'category_path', 'url', 'modified_dt') + fields = RecordSchema.Meta.fields + non_indexable + + _data = fields.Function(lambda note: _EventNoteDataSchema().dump(note)) + category_path = fields.Function(lambda note, ctx: _get_category_chain(note.event, ctx.get('categories'))) + url = mm.Function(lambda note: url_for('event_notes.view', note, _external=True)) diff --git a/citadel/indico_citadel/schemas_test.py b/citadel/indico_citadel/schemas_test.py new file mode 100644 index 0000000..3188c1f --- /dev/null +++ b/citadel/indico_citadel/schemas_test.py @@ -0,0 +1,441 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +from datetime import datetime, timedelta +from io import BytesIO + +import pytest +from pytz import utc + +from indico.core.db.sqlalchemy.protection import ProtectionMode +from indico.core.marshmallow import mm +from indico.modules.attachments.models.attachments import Attachment, AttachmentFile, AttachmentType +from indico.modules.attachments.models.folders import AttachmentFolder +from indico.modules.events.contributions.models.persons import ContributionPersonLink, SubContributionPersonLink +from indico.modules.events.contributions.models.subcontributions import SubContribution +from indico.modules.events.models.persons import EventPerson, EventPersonLink +from indico.modules.events.notes.models.notes import EventNote, RenderMode + + +pytest_plugins = 'indico.modules.events.timetable.testing.fixtures' + + +def test_dump_event(db, dummy_user, dummy_event): + from .schemas import EventRecordSchema + + schema = EventRecordSchema(context={'schema': 'test-events'}) + dummy_event.description = 'A dummy event' + dummy_event.keywords = ['foo', 'bar'] + person = EventPerson.create_from_user(dummy_user, dummy_event) + person2 = EventPerson(event=dummy_event, first_name='Admin', last_name='Saurus', affiliation='Indico') + dummy_event.person_links.append(EventPersonLink(person=person)) + dummy_event.person_links.append(EventPersonLink(person=person2)) + db.session.flush() + category_id = dummy_event.category_id + assert schema.dump(dummy_event) == { + '$schema': 'test-events', + '_access': { + 'delete': ['IndicoAdmin'], + 'owner': ['IndicoAdmin'], + 'update': ['IndicoAdmin'], + }, + '_data': { + 'description': 'A dummy event', + 'keywords': ['foo', 'bar'], + 'location': {'address': '', 'room_name': '', 'venue_name': ''}, + 'persons': [{'name': 'Guinea Pig'}, + {'affiliation': 'Indico', 'name': 'Admin Saurus'}], + 'site': 'http://localhost', + 'title': 'dummy#0' + }, + 'category_id': 1, + 'category_path': [ + {'id': 0, 'title': 'Home', 'url': '/'}, + {'id': category_id, 'title': 'dummy', 'url': f'/category/{category_id}/'}, + ], + 'end_dt': dummy_event.end_dt.isoformat(), + 'event_id': 0, + 'start_dt': dummy_event.start_dt.isoformat(), + 'type': 'event', + 'type_format': 'meeting', + 'url': 'http://localhost/event/0/', + } + + +@pytest.mark.parametrize('scheduled', (False, True)) +def test_dump_contribution(db, dummy_user, dummy_event, dummy_contribution, create_entry, scheduled): + from .schemas import ContributionRecordSchema + + person = EventPerson.create_from_user(dummy_user, dummy_event) + dummy_contribution.person_links.append(ContributionPersonLink(person=person)) + dummy_contribution.description = 'A dummy contribution' + + extra = {} + if scheduled: + create_entry(dummy_contribution, utc.localize(datetime(2020, 4, 20, 4, 20))) + extra = { + 'start_dt': dummy_contribution.start_dt.isoformat(), + 'end_dt': dummy_contribution.end_dt.isoformat(), + } + + db.session.flush() + category_id = dummy_contribution.event.category_id + schema = ContributionRecordSchema(context={'schema': 'test-contribs'}) + assert schema.dump(dummy_contribution) == { + '$schema': 'test-contribs', + '_access': { + 'delete': ['IndicoAdmin'], + 'owner': ['IndicoAdmin'], + 'update': ['IndicoAdmin'], + }, + '_data': { + 'description': 'A dummy contribution', + 'location': {'address': '', 'room_name': '', 'venue_name': ''}, + 'persons': [{'name': 'Guinea Pig'}], + 'site': 'http://localhost', + 'title': 'Dummy Contribution', + }, + 'category_id': category_id, + 'category_path': [ + {'id': 0, 'title': 'Home', 'url': '/'}, + {'id': category_id, 'title': 'dummy', 'url': f'/category/{category_id}/'}, + ], + 'contribution_id': dummy_contribution.id, + 'duration': 20, + 'event_id': 0, + 'type': 'contribution', + 'url': f'http://localhost/event/0/contributions/{dummy_contribution.id}/', + **extra + } + + +@pytest.mark.parametrize('scheduled', (False, True)) +def test_dump_subcontribution(db, dummy_user, dummy_event, dummy_contribution, create_entry, scheduled): + from .schemas import SubContributionRecordSchema + + extra = {} + if scheduled: + create_entry(dummy_contribution, utc.localize(datetime(2020, 4, 20, 4, 20))) + extra = { + 'start_dt': dummy_contribution.start_dt.isoformat(), + 'end_dt': dummy_contribution.end_dt.isoformat(), + } + + subcontribution = SubContribution(contribution=dummy_contribution, title='Dummy Subcontribution', + description='A dummy subcontribution', + duration=timedelta(minutes=10)) + + person = EventPerson.create_from_user(dummy_user, dummy_event) + subcontribution.person_links.append(SubContributionPersonLink(person=person)) + + db.session.flush() + category_id = dummy_contribution.event.category_id + schema = SubContributionRecordSchema(context={'schema': 'test-subcontribs'}) + assert schema.dump(subcontribution) == { + '$schema': 'test-subcontribs', + '_access': { + 'delete': ['IndicoAdmin'], + 'owner': ['IndicoAdmin'], + 'update': ['IndicoAdmin'], + }, + '_data': { + 'description': 'A dummy subcontribution', + 'location': {'address': '', 'room_name': '', 'venue_name': ''}, + 'persons': [{'name': 'Guinea Pig'}], + 'site': 'http://localhost', + 'title': 'Dummy Subcontribution', + }, + 'category_id': category_id, + 'category_path': [ + {'id': 0, 'title': 'Home', 'url': '/'}, + {'id': category_id, 'title': 'dummy', 'url': f'/category/{category_id}/'}, + ], + 'contribution_id': dummy_contribution.id, + 'duration': 10, + 'event_id': 0, + 'subcontribution_id': subcontribution.id, + 'type': 'subcontribution', + 'url': f'http://localhost/event/0/contributions/{dummy_contribution.id}/subcontributions/{subcontribution.id}', + **extra + } + + +def test_dump_attachment(db, dummy_user, dummy_contribution): + from .schemas import AttachmentRecordSchema + + folder = AttachmentFolder(title='Dummy Folder', description='a dummy folder') + file = AttachmentFile(user=dummy_user, filename='dummy_file.txt', content_type='text/plain') + attachment = Attachment(folder=folder, user=dummy_user, title='Dummy Attachment', type=AttachmentType.file, + file=file) + attachment.folder.object = dummy_contribution + attachment.file.save(BytesIO(b'hello world')) + db.session.flush() + + category_id = dummy_contribution.event.category_id + schema = AttachmentRecordSchema(context={'schema': 'test-attachment'}) + assert schema.dump(attachment) == { + '$schema': 'test-attachment', + '_access': { + 'delete': ['IndicoAdmin'], + 'owner': ['IndicoAdmin'], + 'update': ['IndicoAdmin'], + }, + '_data': { + 'filename': 'dummy_file.txt', + 'site': 'http://localhost', + 'title': 'Dummy Attachment', + 'user': {'name': 'Guinea Pig'}, + }, + 'attachment_id': attachment.id, + 'category_id': category_id, + 'category_path': [ + {'id': 0, 'title': 'Home', 'url': '/'}, + {'id': category_id, 'title': 'dummy', 'url': f'/category/{category_id}/'}, + ], + 'contribution_id': dummy_contribution.id, + 'event_id': 0, + 'folder_id': folder.id, + 'modified_dt': attachment.modified_dt.isoformat(), + 'type': 'attachment', + 'type_format': 'file', + 'url': ( + f'http://localhost/event/0/contributions/' + f'{dummy_contribution.id}/attachments/{folder.id}/{attachment.id}/dummy_file.txt' + ), + } + + +@pytest.mark.parametrize('link_type', ('event', 'contrib', 'subcontrib')) +def test_dump_event_note(db, dummy_user, dummy_event, dummy_contribution, link_type): + from .schemas import EventNoteRecordSchema + + if link_type == 'event': + ids = {} + note = EventNote(object=dummy_event) + url = '/event/0/note/' + elif link_type == 'contrib': + ids = {'contribution_id': dummy_contribution.id} + note = EventNote(object=dummy_contribution) + url = f'/event/0/contributions/{dummy_contribution.id}/note/' + elif link_type == 'subcontrib': + subcontribution = SubContribution(contribution=dummy_contribution, title='Dummy Subcontribution', + duration=timedelta(minutes=10)) + db.session.flush() + ids = { + 'contribution_id': subcontribution.contribution_id, + 'subcontribution_id': subcontribution.id, + } + note = EventNote(object=subcontribution) + url = f'/event/0/contributions/{dummy_contribution.id}/subcontributions/{subcontribution.id}/note/' + + note.create_revision(RenderMode.html, 'this is a dummy note', dummy_user) + db.session.flush() + category_id = dummy_event.category_id + schema = EventNoteRecordSchema(context={'schema': 'test-notes'}) + assert schema.dump(note) == { + '$schema': 'test-notes', + '_access': { + 'delete': ['IndicoAdmin'], + 'owner': ['IndicoAdmin'], + 'update': ['IndicoAdmin'], + }, + '_data': { + 'content': 'this is a dummy note', + 'site': 'http://localhost', + 'title': f'{note.object.title} - Notes/Minutes', + 'user': {'name': 'Guinea Pig'} + }, + 'category_id': category_id, + 'category_path': [ + {'id': 0, 'title': 'Home', 'url': '/'}, + {'id': category_id, 'title': 'dummy', 'url': f'/category/{category_id}/'}, + ], + 'modified_dt': note.current_revision.created_dt.isoformat(), + 'event_id': 0, + 'note_id': note.id, + 'type': 'event_note', + 'url': f'http://localhost{url}', + **ids + } + + +def test_event_acls(dummy_event, create_user): + from .schemas import ACLSchema + + class TestSchema(ACLSchema, mm.Schema): + pass + + def assert_acl(expected_read_acl): + __tracebackhide__ = True + data = schema.dump(dummy_event) + read_acl = data['_access'].pop('read', None) + assert data == {'_access': {'delete': ['IndicoAdmin'], 'owner': ['IndicoAdmin'], 'update': ['IndicoAdmin']}} + if read_acl is not None: + read_acl = set(read_acl) + assert read_acl == expected_read_acl + + schema = TestSchema() + u1 = create_user(1, email='user1@example.com') + u2 = create_user(2, email='user2@example.com') + u3 = create_user(3, email='user3@example.com') + + # event is inheriting public, so no acl + assert_acl(None) + + # event is protected and the acl is empty (nobody has regular access) + dummy_event.protection_mode = ProtectionMode.protected + assert_acl({'IndicoAdmin'}) + + dummy_event.update_principal(u1, read_access=True) + dummy_event.category.update_principal(u2, read_access=True) + dummy_event.category.parent.update_principal(u3, read_access=True) + + # self-protected, so no acl inherited + assert_acl({'IndicoAdmin', 'User:1'}) + + # event is inheriting from public categories, so there is no acl + dummy_event.protection_mode = ProtectionMode.inheriting + assert_acl(None) + + # event it itself public, so no acl here as well + dummy_event.protection_mode = ProtectionMode.public + assert_acl(None) + + # inheriting, so all parent acl entries + dummy_event.protection_mode = ProtectionMode.inheriting + dummy_event.category.parent.protection_mode = ProtectionMode.protected + assert_acl({'IndicoAdmin', 'User:1', 'User:2', 'User:3'}) + + # category protected, so no parent category acl inherited + dummy_event.category.protection_mode = ProtectionMode.protected + assert_acl({'IndicoAdmin', 'User:1', 'User:2'}) + + # parent category acl entry is a manager, that one is inherited + dummy_event.category.parent.update_principal(u3, full_access=True) + assert_acl({'IndicoAdmin', 'User:1', 'User:2', 'User:3'}) + + +def test_attachment_acls(dummy_event, dummy_user, create_user): + from .schemas import ACLSchema + + class TestSchema(ACLSchema, mm.Schema): + pass + + folder = AttachmentFolder(title='Dummy Folder', description='a dummy folder') + attachment = Attachment(folder=folder, user=dummy_user, title='Dummy Attachment', type=AttachmentType.link, + link_url='https://example.com') + attachment.folder.object = dummy_event + + def assert_acl(expected_read_acl): + __tracebackhide__ = True + data = schema.dump(attachment) + read_acl = data['_access'].pop('read', None) + assert data == {'_access': {'delete': ['IndicoAdmin'], 'owner': ['IndicoAdmin'], 'update': ['IndicoAdmin']}} + if read_acl is not None: + read_acl = set(read_acl) + assert read_acl == expected_read_acl + + schema = TestSchema() + u1 = create_user(1, email='user1@example.com') + u2 = create_user(2, email='user2@example.com') + u3 = create_user(3, email='user3@example.com') + + # event is inheriting public, so no acl + assert_acl(None) + + # event is protected and the acl is empty (nobody has regular access) + dummy_event.protection_mode = ProtectionMode.protected + assert_acl({'IndicoAdmin'}) + + dummy_event.update_principal(u1, read_access=True) + dummy_event.category.update_principal(u2, read_access=True) + dummy_event.category.parent.update_principal(u3, read_access=True) + + # self-protected, so no acl inherited + assert_acl({'IndicoAdmin', 'User:1'}) + + # event is inheriting from public categories, so there is no acl + dummy_event.protection_mode = ProtectionMode.inheriting + assert_acl(None) + + # event it itself public, so no acl here as well + dummy_event.protection_mode = ProtectionMode.public + assert_acl(None) + + # inheriting, so all parent acl entries + dummy_event.protection_mode = ProtectionMode.inheriting + dummy_event.category.parent.protection_mode = ProtectionMode.protected + assert_acl({'IndicoAdmin', 'User:1', 'User:2', 'User:3'}) + + # category protected, so no parent category acl inherited + dummy_event.category.protection_mode = ProtectionMode.protected + assert_acl({'IndicoAdmin', 'User:1', 'User:2'}) + + # parent category acl entry is a manager, that one is inherited + dummy_event.category.parent.update_principal(u3, full_access=True) + assert_acl({'IndicoAdmin', 'User:1', 'User:2', 'User:3'}) + + # attachment self-protected, only the category/event manager has access + folder.update_principal(u2, read_access=True) + attachment.protection_mode = ProtectionMode.protected + assert_acl({'IndicoAdmin', 'User:3'}) + + # the user in the attachment acl has access as well + attachment.update_principal(u1, read_access=True) + attachment.protection_mode = ProtectionMode.protected + assert_acl({'IndicoAdmin', 'User:3', 'User:1'}) + + # attachment inheriting from self-protected folder - only the folder acl is used + attachment.protection_mode = ProtectionMode.inheriting + folder.protection_mode = ProtectionMode.protected + assert_acl({'IndicoAdmin', 'User:3', 'User:2'}) + + +@pytest.mark.parametrize('obj_type', ('event', 'contrib', 'subcontrib', 'attachment', 'note')) +def test_acls(dummy_event, dummy_contribution, dummy_user, create_user, obj_type): + from .schemas import ACLSchema + + class TestSchema(ACLSchema, mm.Schema): + pass + + if obj_type == 'event': + obj = dummy_event + elif obj_type == 'contrib': + obj = dummy_contribution + elif obj_type == 'subcontrib': + obj = SubContribution(contribution=dummy_contribution, title='Test', duration=timedelta(minutes=10)) + elif obj_type == 'attachment': + folder = AttachmentFolder(title='Dummy Folder', description='a dummy folder') + obj = Attachment(folder=folder, user=dummy_user, title='Dummy Attachment', type=AttachmentType.link, + link_url='https://example.com') + obj.folder.object = dummy_event + elif obj_type == 'note': + obj = EventNote(object=dummy_event) + obj.create_revision(RenderMode.html, 'this is a dummy note', dummy_user) + + def assert_acl(expected_read_acl): + __tracebackhide__ = True + data = schema.dump(obj) + read_acl = data['_access'].pop('read', None) + assert data == {'_access': {'delete': ['IndicoAdmin'], 'owner': ['IndicoAdmin'], 'update': ['IndicoAdmin']}} + if read_acl is not None: + read_acl = set(read_acl) + assert read_acl == expected_read_acl + + schema = TestSchema() + user = create_user(1, email='user1@example.com') + + # everything is public + assert_acl(None) + + # event is protected and the acl is empty (nobody has regular access) + dummy_event.protection_mode = ProtectionMode.protected + assert_acl({'IndicoAdmin'}) + + # user on the acl has access + dummy_event.update_principal(user, read_access=True) + assert_acl({'IndicoAdmin', 'User:1'}) diff --git a/citadel/indico_citadel/search.py b/citadel/indico_citadel/search.py new file mode 100644 index 0000000..bb49392 --- /dev/null +++ b/citadel/indico_citadel/search.py @@ -0,0 +1,101 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +import requests +from requests.exceptions import RequestException +from werkzeug.urls import url_join + +from indico.modules.search.base import IndicoSearchProvider, SearchOption + +from indico_citadel import _ +from indico_citadel.result_schemas import CitadelResultSchema +from indico_citadel.util import format_filters, format_query + + +class CitadelProvider(IndicoSearchProvider): + def __init__(self, *args, **kwargs): + from indico_citadel.plugin import CitadelPlugin + + super().__init__(*args, **kwargs) + self.token = CitadelPlugin.settings.get('search_backend_token') + self.backend_url = CitadelPlugin.settings.get('search_backend_url') + self.records_url = url_join(self.backend_url, 'api/records/') + + def search(self, query, access, page=1, object_types=(), **params): + # https://cern-search.docs.cern.ch/usage/operations/#query-documents + # this token is used by the backend to authenticate and also to filter + # the objects that we can actually read + headers = { + 'Authorization': f'Bearer {self.token}' + } + + operator = params.pop('default_operator', 'AND') + sort = params.pop('sort', None) + filter_query, ranges = format_filters(params, filters, range_filters) + # Look for objects matching the `query` and schema, make sure the query is properly escaped + # https://cern-search.docs.cern.ch/usage/operations/#advanced-queries + parts = [format_query(query, {k: field for k, (field, _) in placeholders.items()})] + if ranges: + parts.append(ranges) + search_params = {'page': page, 'size': self.RESULTS_PER_PAGE, 'q': ' '.join(parts), 'highlight': '_data.*', + 'type': [x.name for x in object_types], 'sort': sort, 'default_operator': operator, + **filter_query} + # Filter by the objects that can be viewed by users/groups in the `access` argument + if access: + search_params['access'] = ','.join(access) + + try: + resp = requests.get(self.records_url, params=search_params, headers=headers) + resp.raise_for_status() + except RequestException: + raise Exception('Failed contacting the search service') + + data = resp.json() + return CitadelResultSchema(context={'results_per_page': self.RESULTS_PER_PAGE}).load(data) + + def get_placeholders(self): + return [SearchOption(key, label) for key, (_, label) in placeholders.items()] + + def get_filters(self): + return [SearchOption(key, label) for key, label in filters.items()] + + def get_sort_options(self): + return [SearchOption(key, label) for key, label in sort_options.items()] + + +placeholders = { + 'title': ('_data.title', _('The title an event, contribution, etc.)')), + 'person': ('_data.persons.name', _("A speaker, author or event chair's name")), + 'affiliation': ('_data.persons.affiliation', _("A speaker, author or event chair's affiliation")), + 'type': ('type', _('An entry type (such as conference, meeting, file, etc.)')), + 'venue': ('_data.location.venue_name', _("Name of the venue")), + 'room': ('_data.location.room_name', _("Name of the room")), + 'address': ('_data.location.address', _("Address of the venue")), + 'file': ('_data.filename', _("Name of the attached file")), + 'keyword': ('_data.keywords', _('A keyword associated with an event')), + 'category': ('category_path.title', _('The category of an event')), +} + +range_filters = { + 'start_range': 'start_dt' +} + +sort_options = { + 'bestmatch': _('Best match'), + 'mostrecent': _('Newest first'), + '-mostrecent': _('Oldest first') +} + +filters = { + 'affiliation': _('Affiliation'), + 'person': _('Person'), + 'type_format': _('Type'), + 'venue': _('Location'), + 'start_range': _('Date'), + 'category': _('Category'), + 'category_id': _('Category ID'), +} diff --git a/citadel/indico_citadel/translations/messages.pot b/citadel/indico_citadel/translations/messages.pot new file mode 100644 index 0000000..825861f --- /dev/null +++ b/citadel/indico_citadel/translations/messages.pot @@ -0,0 +1,38 @@ +# Translations template for PROJECT. +# Copyright (C) 2020 ORGANIZATION +# This file is distributed under the same license as the PROJECT project. +# FIRST AUTHOR , 2020. +# +#, fuzzy +msgid "" +msgstr "" +"Project-Id-Version: PROJECT VERSION\n" +"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n" +"POT-Creation-Date: 2020-11-27 11:55+0100\n" +"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n" +"Last-Translator: FULL NAME \n" +"Language-Team: LANGUAGE \n" +"MIME-Version: 1.0\n" +"Content-Type: text/plain; charset=utf-8\n" +"Content-Transfer-Encoding: 8bit\n" +"Generated-By: Babel 2.8.0\n" + +#: indico_citadel/plugin.py:23 +msgid "Search backend URL" +msgstr "" + +#: indico_citadel/plugin.py:24 +msgid "Search backend token" +msgstr "" + +#: indico_citadel/plugin.py:25 +msgid "Authentication token for the Search backend" +msgstr "" + +#: indico_citadel/plugin.py:26 +msgid "Search owner role" +msgstr "" + +#: indico_citadel/plugin.py:27 +msgid "Tika server URL" +msgstr "" diff --git a/citadel/indico_citadel/util.py b/citadel/indico_citadel/util.py new file mode 100644 index 0000000..d000fed --- /dev/null +++ b/citadel/indico_citadel/util.py @@ -0,0 +1,138 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +import re +import sys +import threading +from functools import wraps + +from flask import current_app +from flask.globals import _app_ctx_stack + + +def parallelize(func, entries, batch_size=200): + @wraps(func) + def wrapper(*args, **kwargs): + iterable_lock = threading.Lock() + result_lock = threading.Lock() + abort = threading.Event() + finished = threading.Event() + results = [] + app = current_app._get_current_object() + main_app_context = _app_ctx_stack.top + worker_exc_info = None + + def worker(iterator): + nonlocal worker_exc_info + while not abort.is_set() and not finished.is_set(): + try: + with iterable_lock: + with main_app_context: + item = next(iterator) + except StopIteration: + finished.set() + break + + with app.app_context(): + try: + res = func(item, *args, **kwargs) + except BaseException: + worker_exc_info = sys.exc_info() + finished.set() + return + with result_lock: + results.append(res) + + it = iter(entries) + threads = [threading.Thread(target=worker, name=f'worker/{i}', args=(it,)) + for i in enumerate(range(batch_size))] + + for t in threads: + t.start() + + try: + finished.wait() + except KeyboardInterrupt: + print('\nFinishing pending jobs before aborting') + abort.set() + + for t in threads: + t.join() + + if worker_exc_info: + raise worker_exc_info[1].with_traceback(worker_exc_info[2]) + + return results, abort.is_set() + + return wrapper + + +def format_query(query, placeholders): + """Format and split the query into keywords and placeholders. + + https://cern-search.docs.cern.ch/usage/operations/#advanced-queries + + :param query: search query + :param placeholders: placeholder whitelist + :returns escaped query + """ + patt = r'(?:^|\s)({}):([^:"\s]+|"[^"]+")(?:$|\s)'.format('|'.join(map(re.escape, placeholders))) + idx = 0 + keys = [] + for match in re.finditer(patt, query): + placeholder = f'{placeholders[match.group(1)]}:{escape(match.group(2))}' + if idx != match.start(): + keys.append(escape(query[idx:match.start()])) + keys.append(placeholder) + idx = match.end() + + if idx != len(query): + keys.append(escape(query[idx:len(query)])) + + return ' '.join(keys).strip() + + +def format_filters(params, filters, range_filters): + """Extract any special placeholder filter, such as ranges, from the query params. + + https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_ranges + + :param params: The filter query params + :param filters: The filter whitelist + :param range_filters: The range filter whitelist + :returns: filters, extracted placeholders + """ + _filters = {} + query = [] + for k, v in params.items(): + if k not in filters: + continue + if k in range_filters: + match = re.match(r'[[{].+ TO .+[]}]', v) + if match: + query.append(f'+{range_filters[k]}:{v}') + continue + _filters[k] = v + return _filters, ' '.join(query) + + +def escape(query): + """Prepend all special ElasticSearch characters with a backslash.""" + patt = r'([+\-=>=3.0.dev0 + indico-plugin-livesync>=3.0.dev0 + +[options.entry_points] +indico.plugins = + citadel = indico_citadel.plugin:CitadelPlugin + + + +[pydocstyle] +ignore = D100,D101,D102,D103,D104,D105,D107,D203,D213 diff --git a/citadel/setup.py b/citadel/setup.py new file mode 100644 index 0000000..03c7604 --- /dev/null +++ b/citadel/setup.py @@ -0,0 +1,11 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +from setuptools import setup + + +setup() diff --git a/livesync/indico_livesync/base.py b/livesync/indico_livesync/base.py index 89b8639..95cce69 100644 --- a/livesync/indico_livesync/base.py +++ b/livesync/indico_livesync/base.py @@ -9,8 +9,13 @@ from flask_pluginengine import depends, trim_docstring from sqlalchemy.orm import subqueryload from indico.core.plugins import IndicoPlugin, PluginCategory +from indico.modules.attachments.models.attachments import Attachment from indico.modules.categories import Category from indico.modules.categories.models.principals import CategoryPrincipal +from indico.modules.events.contributions.models.contributions import Contribution +from indico.modules.events.contributions.models.subcontributions import SubContribution +from indico.modules.events.models.events import Event +from indico.modules.events.notes.models.notes import EventNote from indico.util.date_time import now_utc from indico.util.decorators import classproperty @@ -48,6 +53,9 @@ class LiveSyncBackendBase: form = AgentForm #: whether only one agent with this backend is allowed unique = False + #: whether a reset can delete data on whatever backend is used as well or the user + #: needs to do it themself after doing a reset + reset_deletes_indexed_data = False @classproperty @classmethod @@ -70,6 +78,25 @@ class LiveSyncBackendBase: """ self.agent = agent + def is_configured(self): + """Check whether the backend is properly configured. + + If this returns False, running the initial export or queue + will not be possible. + """ + return True + + def check_queue_status(self): + """Return whether queue runs are allowed (or why not). + + :return: ``allowed, reason`` tuple; the reason is None if runs are allowed. + """ + if not self.is_configured(): + return False, 'not configured' + if self.agent.initial_data_exported: + return True, None + return False, 'initial export not performed' + def fetch_records(self, count=None): query = (self.agent.queue .filter_by(processed=False) @@ -84,26 +111,49 @@ class LiveSyncBackendBase: """ self.agent.last_run = now_utc() - def run(self): + def process_queue(self, uploader): + """Process queued entries during an export run.""" + records = self.fetch_records() + LiveSyncPlugin.logger.info(f'Uploading %d records via {self.uploader.__name__}', len(records)) + uploader.run(records) + + def run(self, verbose=False, from_cli=False): """Runs the livesync export""" if self.uploader is None: # pragma: no cover raise NotImplementedError - records = self.fetch_records() - uploader = self.uploader(self) - LiveSyncPlugin.logger.info('Uploading %d records', len(records)) - uploader.run(records) + uploader = self.uploader(self, verbose=verbose, from_cli=from_cli) + self.process_queue(uploader) self.update_last_run() - def run_initial_export(self): + def get_initial_query(self, model_cls, force): + """Get the initial export query for a given model. + + Supported models are `Event`, `Contribution`, `SubContribution`, + `Attachment` and `EventNote`. + + :param model_cls: The model class to query + :param force: Whether the initial export was started with ``--force`` + """ + fn = { + Event: query_events, + Contribution: query_contributions, + SubContribution: query_subcontributions, + Attachment: query_attachments, + EventNote: query_notes, + }[model_cls] + return fn() + + def run_initial_export(self, batch_size, force=False, verbose=False): """Runs the initial export. This process is expected to take a very long time. + :return: True if everything was successful, False if not """ if self.uploader is None: # pragma: no cover raise NotImplementedError - uploader = self.uploader(self) + uploader = self.uploader(self, verbose=verbose, from_cli=True) Category.allow_relationship_preloading = True Category.preload_relationships(Category.query, 'acl_entries', @@ -111,13 +161,54 @@ class LiveSyncBackendBase: CategoryPrincipal)) _category_cache = Category.query.all() # noqa: F841 - events = query_events() - uploader.run_initial(events.yield_per(5000), events.count()) - contributions = query_contributions() - uploader.run_initial(contributions.yield_per(5000), contributions.count()) - subcontributions = query_subcontributions() - uploader.run_initial(subcontributions.yield_per(5000), subcontributions.count()) - attachments = query_attachments() - uploader.run_initial(attachments.yield_per(5000), attachments.count()) - notes = query_notes() - uploader.run_initial(notes.yield_per(5000), notes.count()) + events = self.get_initial_query(Event, force) + contributions = self.get_initial_query(Contribution, force) + subcontributions = self.get_initial_query(SubContribution, force) + attachments = self.get_initial_query(Attachment, force) + notes = self.get_initial_query(EventNote, force) + + print('Exporting events') + if not uploader.run_initial(events.yield_per(batch_size), events.count()): + print('Initial export of events failed') + return False + print('Exporting contributions') + if not uploader.run_initial(contributions.yield_per(batch_size), contributions.count()): + print('Initial export of contributions failed') + return False + print('Exporting subcontributions') + if not uploader.run_initial(subcontributions.yield_per(batch_size), subcontributions.count()): + print('Initial export of subcontributions failed') + return False + print('Exporting attachments') + if not uploader.run_initial(attachments.yield_per(batch_size), attachments.count()): + print('Initial export of attachments failed') + return False + print('Exporting notes') + if not uploader.run_initial(notes.yield_per(batch_size), notes.count()): + print('Initial export of notes failed') + return False + return True + + def check_reset_status(self): + """Return whether a reset is allowed (or why not). + + When resetting is not allowed, the message indicates why this is the case. + + :return: ``allowed, reason`` tuple; the reason is None if resetting is allowed. + """ + if not self.agent.queue.has_rows() and not self.agent.initial_data_exported: + return False, 'There is nothing to reset' + return True, None + + def reset(self): + """Perform a full reset of all data related to the backend. + + This deletes all queued changes, resets the initial export state back + to pending and do any other backend-specific tasks that may be required. + + It is not necessary to delete the actual search indexes (which are possibly + on a remote service), but if your backend has the ability to do it you may + want to do it and display a message to the user indicating this. + """ + self.agent.initial_data_exported = False + self.agent.queue.delete() diff --git a/livesync/indico_livesync/cli.py b/livesync/indico_livesync/cli.py index f2dff1c..2c9ddce 100644 --- a/livesync/indico_livesync/cli.py +++ b/livesync/indico_livesync/cli.py @@ -5,11 +5,14 @@ # them and/or modify them under the terms of the MIT License; # see the LICENSE file for more details. +import time + import click from flask_pluginengine import current_plugin from terminaltables import AsciiTable from indico.cli.core import cli_group +from indico.core.config import config from indico.core.db import db from indico.util.console import cformat @@ -34,18 +37,23 @@ def agents(): """Lists the currently active agents""" print('The following LiveSync agents are active:') agent_list = LiveSyncAgent.query.order_by(LiveSyncAgent.backend_name, db.func.lower(LiveSyncAgent.name)).all() - table_data = [['ID', 'Name', 'Backend', 'Initial Export', 'Queue']] + table_data = [['ID', 'Name', 'Backend', 'Queue', 'Status']] for agent in agent_list: - initial = (cformat('%{green!}done%{reset}') if agent.initial_data_exported else - cformat('%{yellow!}pending%{reset}')) if agent.backend is None: backend_title = cformat('%{red!}invalid backend ({})%{reset}').format(agent.backend_name) + queue_status = 'n/a' else: backend_title = agent.backend.title - table_data.append([str(agent.id), agent.name, backend_title, initial, - str(agent.queue.filter_by(processed=False).count())]) + backend = agent.create_backend() + queue_allowed, reason = backend.check_queue_status() + if queue_allowed: + queue_status = cformat('%{green!}ready%{reset}') + else: + queue_status = cformat('%{yellow!}{}%{reset}').format(reason) + table_data.append([str(agent.id), agent.name, backend_title, + str(agent.queue.filter_by(processed=False).count()), queue_status]) table = AsciiTable(table_data) - table.justify_columns[4] = 'right' + table.justify_columns[3] = 'right' print(table.table) if not all(a.initial_data_exported for a in agent_list): print() @@ -56,33 +64,52 @@ def agents(): @cli.command() @click.argument('agent_id', type=int) -@click.option('--force', is_flag=True, help="Perform export even if it has already been done once.") -def initial_export(agent_id, force): +@click.option('--force', '-f', is_flag=True, help="Perform export even if it has already been done once.") +@click.option('--verbose', '-v', is_flag=True, help="Be more verbose (what this does is up to the backend)") +@click.option('--batch', type=int, default=5000, help="The amount of records yielded per export batch.", + show_default=True, metavar='N') +def initial_export(agent_id, batch, force, verbose): """Performs the initial data export for an agent""" agent = LiveSyncAgent.get(agent_id) if agent is None: print('No such agent') return + if agent.backend is None: print(cformat('Cannot run agent %{red!}{}%{reset} (backend not found)').format(agent.name)) return + print(cformat('Selected agent: %{white!}{}%{reset} ({})').format(agent.name, agent.backend.title)) + + backend = agent.create_backend() + if not backend.is_configured(): + print(cformat('Agent %{red!}{}%{reset} is not properly configured').format(agent.name)) + return + if agent.initial_data_exported and not force: print('The initial export has already been performed for this agent.') print(cformat('To re-run it, use %{yellow!}--force%{reset}')) return - backend = agent.create_backend() - backend.run_initial_export() + if not backend.run_initial_export(batch, force, verbose): + print('The initial export failed; not marking it as done') + return + agent.initial_data_exported = True db.session.commit() @cli.command() @click.argument('agent_id', type=int, required=False) -@click.option('--force', is_flag=True, help="Run even if initial export was not done") -def run(agent_id, force=False): +@click.option('--force', '-f', is_flag=True, help="Run even if initial export was not done") +@click.option('--verbose', '-v', is_flag=True, help="Be more verbose (what this does is up to the backend)") +def run(agent_id, force, verbose): """Runs the livesync agent""" + from indico_livesync.plugin import LiveSyncPlugin + if LiveSyncPlugin.settings.get('disable_queue_runs'): + print(cformat('%{yellow!}Queue runs are disabled%{reset}')) + return + if agent_id is None: agent_list = LiveSyncAgent.query.all() else: @@ -96,13 +123,60 @@ def run(agent_id, force=False): if agent.backend is None: print(cformat('Skipping agent: %{red!}{}%{reset} (backend not found)').format(agent.name)) continue - if not agent.initial_data_exported and not force: - print(cformat('Skipping agent: %{red!}{}%{reset} (initial export not performed)').format(agent.name)) + backend = agent.create_backend() + queue_allowed, reason = backend.check_queue_status() + if not queue_allowed and not force: + print(cformat('Skipping agent: %{red!}{}%{reset} ({})').format(agent.name, reason)) continue print(cformat('Running agent: %{white!}{}%{reset}').format(agent.name)) try: - agent.create_backend().run() + backend.run(verbose, from_cli=True) db.session.commit() except Exception: db.session.rollback() raise + + +@cli.command() +@click.argument('agent_id', type=int) +def reset(agent_id): + """Performs the initial data export for an agent""" + agent = LiveSyncAgent.get(agent_id) + if agent is None: + print('No such agent') + return + + if agent.backend is None: + print(cformat('Cannot run agent %{red!}{}%{reset} (backend not found)').format(agent.name)) + return + + backend = agent.create_backend() + reset_allowed, message = backend.check_reset_status() + + if not reset_allowed: + print(f'Resetting is not possible: {message}') + return + + print(cformat('Selected agent: %{white!}{}%{reset} ({})').format(agent.name, backend.title)) + print(cformat('%{yellow!}!!! %{red!}DANGER %{yellow!}!!!%{reset}')) + if backend.reset_deletes_indexed_data: + print(cformat('%{yellow!}This command will delete all indexed data on this backend.%{reset}') + .format(backend.title)) + else: + print(cformat('%{yellow!}This command should only be used if the data on this backend ' + 'has been deleted.%{reset}') + .format(backend.title)) + print(cformat('%{yellow!}After resetting you need to perform a new initial export.%{reset}')) + click.confirm(click.style('Do you really want to perform the reset?', fg='red', bold=True), + default=False, abort=True) + if not config.DEBUG: + click.confirm(click.style('Are you absolutely sure?', fg='red', bold=True), default=False, abort=True) + for i in range(5): + print(cformat('\rResetting in %{white!}{}%{reset}s (CTRL+C to abort)').format(5 - i), end='') + time.sleep(1) + print('') + + backend.reset() + db.session.commit() + print(cformat('Reset complete; run %{green!}indico livesync initial-export {}%{reset} for a new export') + .format(agent.id)) diff --git a/livesync/indico_livesync/controllers.py b/livesync/indico_livesync/controllers.py index ecd76de..8428aa1 100644 --- a/livesync/indico_livesync/controllers.py +++ b/livesync/indico_livesync/controllers.py @@ -7,6 +7,7 @@ from flask import flash, redirect, request from flask_pluginengine import current_plugin, render_plugin_template +from sqlalchemy.orm.attributes import flag_modified from werkzeug.exceptions import NotFound from indico.core.db import db @@ -81,7 +82,9 @@ class RHEditAgent(RHAdminBase): if form.validate_on_submit(): data = form.data self.agent.name = data.pop('name') - self.agent.settings = data + if data: + self.agent.settings.update(data) + flag_modified(self.agent, 'settings') flash(_('Agent updated'), 'success') return jsonify_data(flash=False) diff --git a/livesync/indico_livesync/export_schemas.py b/livesync/indico_livesync/export_schemas.py new file mode 100644 index 0000000..e04c5cb --- /dev/null +++ b/livesync/indico_livesync/export_schemas.py @@ -0,0 +1,116 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +from marshmallow import fields + +from indico.core.db.sqlalchemy.links import LinkType +from indico.core.marshmallow import mm +from indico.modules.attachments import Attachment +from indico.modules.categories import Category +from indico.modules.events.contributions import Contribution +from indico.modules.events.contributions.models.subcontributions import SubContribution +from indico.modules.events.notes.models.notes import EventNote +from indico.modules.search.base import SearchTarget +from indico.modules.search.schemas import LocationSchema, PersonSchema +from indico.util.marshmallow import NoneRemovingList +from indico.web.flask.util import url_for + + +class CategorySchema(mm.SQLAlchemyAutoSchema): + class Meta: + model = Category + fields = ('id', 'title', 'url') + + url = fields.Function(lambda c: url_for('categories.display', category_id=c['id'])) + + +class AttachmentSchema(mm.SQLAlchemyAutoSchema): + class Meta: + model = Attachment + fields = ('attachment_id', 'folder_id', 'type', 'attachment_type', 'title', 'filename', 'event_id', + 'contribution_id', 'subcontribution_id', 'user', 'url', 'category_id', 'category_path', + 'modified_dt') + + attachment_id = fields.Int(attribute='id') + folder_id = fields.Int(attribute='folder_id') + type = fields.Constant(SearchTarget.attachment.name) + attachment_type = fields.String(attribute='type.name') + filename = fields.String(attribute='file.filename') + event_id = fields.Int(attribute='folder.event.id') + contribution_id = fields.Method('_contribution_id') + subcontribution_id = fields.Int(attribute='folder.subcontribution_id') + user = fields.Nested(PersonSchema) + category_id = fields.Int(attribute='folder.event.category_id') + category_path = fields.List(fields.Nested(CategorySchema), attribute='folder.event.detailed_category_chain') + url = fields.String(attribute='download_url') + + def _contribution_id(self, attachment): + if attachment.folder.link_type == LinkType.contribution: + return attachment.folder.contribution_id + elif attachment.folder.link_type == LinkType.subcontribution: + return attachment.folder.subcontribution.contribution_id + return None + + +class ContributionSchema(mm.SQLAlchemyAutoSchema): + class Meta: + model = Contribution + fields = ('contribution_id', 'type', 'contribution_type', 'event_id', 'title', 'description', 'location', + 'persons', 'url', 'category_id', 'category_path', 'start_dt', 'end_dt', 'duration') + + contribution_id = fields.Int(attribute='id') + type = fields.Constant(SearchTarget.contribution.name) + contribution_type = fields.String(attribute='type.name') + location = fields.Function(lambda contrib: LocationSchema().dump(contrib)) + persons = NoneRemovingList(fields.Nested(PersonSchema), attribute='person_links') + category_id = fields.Int(attribute='event.category_id') + category_path = fields.List(fields.Nested(CategorySchema), attribute='event.detailed_category_chain') + url = fields.Function(lambda contrib: url_for('contributions.display_contribution', contrib, _external=False)) + duration = fields.TimeDelta(precision=fields.TimeDelta.MINUTES) + + +class SubContributionSchema(mm.SQLAlchemyAutoSchema): + class Meta: + model = SubContribution + fields = ('subcontribution_id', 'type', 'title', 'description', 'event_id', 'contribution_id', 'persons', + 'location', 'url', 'category_id', 'category_path', 'start_dt', 'end_dt', 'duration') + + subcontribution_id = fields.Int(attribute='id') + type = fields.Constant(SearchTarget.subcontribution.name) + event_id = fields.Int(attribute='contribution.event_id') + persons = NoneRemovingList(fields.Nested(PersonSchema), attribute='person_links') + location = fields.Function(lambda subc: LocationSchema().dump(subc.contribution)) + category_id = fields.Int(attribute='event.category_id') + category_path = fields.List(fields.Nested(CategorySchema), attribute='event.detailed_category_chain') + url = fields.Function(lambda subc: url_for('contributions.display_subcontribution', subc, _external=False)) + start_dt = fields.DateTime(attribute='contribution.start_dt') + end_dt = fields.DateTime(attribute='contribution.end_dt') + duration = fields.TimeDelta(precision=fields.TimeDelta.MINUTES) + + +class EventNoteSchema(mm.SQLAlchemyAutoSchema): + class Meta: + model = EventNote + fields = ('note_id', 'type', 'content', 'event_id', 'contribution_id', 'subcontribution_id', 'url', + 'category_id', 'category_path', 'modified_dt', 'user') + + note_id = fields.Int(attribute='id') + type = fields.Constant(SearchTarget.event_note.name) + content = fields.Str(attribute='current_revision.source') + contribution_id = fields.Method('_contribution_id') + subcontribution_id = fields.Int() + category_id = fields.Int(attribute='event.category_id') + category_path = fields.List(fields.Nested(CategorySchema), attribute='event.detailed_category_chain') + url = fields.Function(lambda note: url_for('event_notes.view', note, _external=False)) + modified_dt = fields.DateTime(attribute='current_revision.created_dt') + user = fields.Nested(PersonSchema, attribute='current_revision.user') + + def _contribution_id(self, note): + if note.link_type == LinkType.contribution: + return note.contribution_id + elif note.link_type == LinkType.subcontribution: + return note.subcontribution.contribution_id diff --git a/livesync/indico_livesync/export_schemas_test.py b/livesync/indico_livesync/export_schemas_test.py new file mode 100644 index 0000000..9d0a135 --- /dev/null +++ b/livesync/indico_livesync/export_schemas_test.py @@ -0,0 +1,183 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +from datetime import datetime, timedelta +from io import BytesIO + +import pytest +from pytz import utc + +from indico.modules.attachments.models.attachments import Attachment, AttachmentFile, AttachmentType +from indico.modules.attachments.models.folders import AttachmentFolder +from indico.modules.events.contributions.models.persons import ContributionPersonLink, SubContributionPersonLink +from indico.modules.events.contributions.models.subcontributions import SubContribution +from indico.modules.events.models.persons import EventPerson +from indico.modules.events.notes.models.notes import EventNote, RenderMode + + +pytest_plugins = 'indico.modules.events.timetable.testing.fixtures' + + +@pytest.mark.parametrize('scheduled', (False, True)) +def test_dump_contribution(db, dummy_user, dummy_event, dummy_contribution, create_entry, scheduled): + from .export_schemas import ContributionSchema + + person = EventPerson.create_from_user(dummy_user, dummy_event) + dummy_contribution.person_links.append(ContributionPersonLink(person=person)) + dummy_contribution.description = 'A dummy contribution' + + extra = {'start_dt': None, 'end_dt': None} + if scheduled: + create_entry(dummy_contribution, utc.localize(datetime(2020, 4, 20, 4, 20))) + extra = { + 'start_dt': dummy_contribution.start_dt.isoformat(), + 'end_dt': dummy_contribution.end_dt.isoformat(), + } + + db.session.flush() + category_id = dummy_contribution.event.category_id + schema = ContributionSchema() + assert schema.dump(dummy_contribution) == { + 'description': 'A dummy contribution', + 'location': {'address': '', 'room_name': '', 'venue_name': ''}, + 'persons': [{'affiliation': None, 'name': 'Guinea Pig'}], + 'title': 'Dummy Contribution', + 'category_id': category_id, + 'category_path': [ + {'id': 0, 'title': 'Home', 'url': '/'}, + {'id': category_id, 'title': 'dummy', 'url': f'/category/{category_id}/'}, + ], + 'contribution_id': dummy_contribution.id, + 'duration': 20, + 'event_id': 0, + 'type': 'contribution', + 'url': f'/event/0/contributions/{dummy_contribution.id}/', + **extra + } + + +@pytest.mark.parametrize('scheduled', (False, True)) +def test_dump_subcontribution(db, dummy_user, dummy_event, dummy_contribution, create_entry, scheduled): + from .export_schemas import SubContributionSchema + + extra = {'start_dt': None, 'end_dt': None} + if scheduled: + create_entry(dummy_contribution, utc.localize(datetime(2020, 4, 20, 4, 20))) + extra = { + 'start_dt': dummy_contribution.start_dt.isoformat(), + 'end_dt': dummy_contribution.end_dt.isoformat(), + } + + subcontribution = SubContribution(contribution=dummy_contribution, title='Dummy Subcontribution', + description='A dummy subcontribution', + duration=timedelta(minutes=10)) + + person = EventPerson.create_from_user(dummy_user, dummy_event) + subcontribution.person_links.append(SubContributionPersonLink(person=person)) + + db.session.flush() + category_id = dummy_contribution.event.category_id + schema = SubContributionSchema() + assert schema.dump(subcontribution) == { + 'description': 'A dummy subcontribution', + 'location': {'address': '', 'room_name': '', 'venue_name': ''}, + 'persons': [{'affiliation': None, 'name': 'Guinea Pig'}], + 'title': 'Dummy Subcontribution', + 'category_id': category_id, + 'category_path': [ + {'id': 0, 'title': 'Home', 'url': '/'}, + {'id': category_id, 'title': 'dummy', 'url': f'/category/{category_id}/'}, + ], + 'contribution_id': dummy_contribution.id, + 'duration': 10, + 'event_id': 0, + 'subcontribution_id': subcontribution.id, + 'type': 'subcontribution', + 'url': f'/event/0/contributions/{dummy_contribution.id}/subcontributions/{subcontribution.id}', + **extra + } + + +def test_dump_attachment(db, dummy_user, dummy_contribution): + from .export_schemas import AttachmentSchema + + folder = AttachmentFolder(title='Dummy Folder', description='a dummy folder') + file = AttachmentFile(user=dummy_user, filename='dummy_file.txt', content_type='text/plain') + attachment = Attachment(folder=folder, user=dummy_user, title='Dummy Attachment', type=AttachmentType.file, + file=file) + attachment.folder.object = dummy_contribution + attachment.file.save(BytesIO(b'hello world')) + db.session.flush() + + category_id = dummy_contribution.event.category_id + schema = AttachmentSchema() + assert schema.dump(attachment) == { + 'filename': 'dummy_file.txt', + 'title': 'Dummy Attachment', + 'user': {'affiliation': None, 'name': 'Guinea Pig'}, + 'attachment_id': attachment.id, + 'attachment_type': 'file', + 'category_id': category_id, + 'category_path': [ + {'id': 0, 'title': 'Home', 'url': '/'}, + {'id': category_id, 'title': 'dummy', 'url': f'/category/{category_id}/'}, + ], + 'contribution_id': dummy_contribution.id, + 'subcontribution_id': None, + 'event_id': 0, + 'folder_id': folder.id, + 'modified_dt': attachment.modified_dt.isoformat(), + 'type': 'attachment', + 'url': ( + f'/event/0/contributions/' + f'{dummy_contribution.id}/attachments/{folder.id}/{attachment.id}/dummy_file.txt' + ), + } + + +@pytest.mark.parametrize('link_type', ('event', 'contrib', 'subcontrib')) +def test_dump_event_note(db, dummy_user, dummy_event, dummy_contribution, link_type): + from .export_schemas import EventNoteSchema + + if link_type == 'event': + ids = {'contribution_id': None, 'subcontribution_id': None} + note = EventNote(object=dummy_event) + url = '/event/0/note/' + elif link_type == 'contrib': + ids = {'contribution_id': dummy_contribution.id, 'subcontribution_id': None} + note = EventNote(object=dummy_contribution) + url = f'/event/0/contributions/{dummy_contribution.id}/note/' + elif link_type == 'subcontrib': + subcontribution = SubContribution(contribution=dummy_contribution, title='Dummy Subcontribution', + duration=timedelta(minutes=10)) + db.session.flush() + ids = { + 'contribution_id': subcontribution.contribution_id, + 'subcontribution_id': subcontribution.id, + } + note = EventNote(object=subcontribution) + url = f'/event/0/contributions/{dummy_contribution.id}/subcontributions/{subcontribution.id}/note/' + + note.create_revision(RenderMode.html, 'this is a dummy note', dummy_user) + db.session.flush() + category_id = dummy_event.category_id + schema = EventNoteSchema() + assert schema.dump(note) == { + 'content': 'this is a dummy note', + 'user': {'affiliation': None, 'name': 'Guinea Pig'}, + 'category_id': category_id, + 'category_path': [ + {'id': 0, 'title': 'Home', 'url': '/'}, + {'id': category_id, 'title': 'dummy', 'url': f'/category/{category_id}/'}, + ], + 'modified_dt': note.current_revision.created_dt.isoformat(), + 'event_id': 0, + 'note_id': note.id, + 'type': 'event_note', + 'url': url, + **ids + } diff --git a/livesync/indico_livesync/handler.py b/livesync/indico_livesync/handler.py index d27e7a3..5a4cb93 100644 --- a/livesync/indico_livesync/handler.py +++ b/livesync/indico_livesync/handler.py @@ -11,12 +11,17 @@ from flask import g from sqlalchemy import inspect from indico.core import signals +from indico.core.db.sqlalchemy.links import LinkType from indico.core.db.sqlalchemy.protection import ProtectionMode +from indico.modules.attachments.models.attachments import Attachment +from indico.modules.attachments.models.folders import AttachmentFolder from indico.modules.categories.models.categories import Category from indico.modules.events import Event from indico.modules.events.contributions.models.contributions import Contribution from indico.modules.events.contributions.models.subcontributions import SubContribution +from indico.modules.events.notes.models.notes import EventNote from indico.modules.events.sessions import Session +from indico.modules.events.sessions.models.blocks import SessionBlock from indico_livesync.models.queue import ChangeType, LiveSyncQueueEntry from indico_livesync.util import get_excluded_categories, obj_ref @@ -42,6 +47,12 @@ def connect_signals(plugin): plugin.connect(signals.event.subcontribution_updated, _updated) # event times plugin.connect(signals.event.times_changed, _event_times_changed, sender=Event) + plugin.connect(signals.event.times_changed, _event_times_changed, sender=Contribution) + # location + plugin.connect(signals.event.location_changed, _location_changed, sender=Event) + plugin.connect(signals.event.location_changed, _location_changed, sender=Contribution) + plugin.connect(signals.event.location_changed, _location_changed, sender=Session) + plugin.connect(signals.event.location_changed, _session_block_location_changed, sender=SessionBlock) # timetable plugin.connect(signals.event.timetable_entry_created, _timetable_changed) plugin.connect(signals.event.timetable_entry_updated, _timetable_changed) @@ -57,14 +68,18 @@ def connect_signals(plugin): plugin.connect(signals.acl.entry_changed, _acl_entry_changed, sender=Session) plugin.connect(signals.acl.entry_changed, _acl_entry_changed, sender=Contribution) # notes - plugin.connect(signals.event.notes.note_added, _note_changed) - plugin.connect(signals.event.notes.note_deleted, _note_changed) - plugin.connect(signals.event.notes.note_modified, _note_changed) + plugin.connect(signals.event.notes.note_added, _created) + plugin.connect(signals.event.notes.note_deleted, _deleted) + plugin.connect(signals.event.notes.note_modified, _updated) # attachments - plugin.connect(signals.attachments.folder_deleted, _attachment_changed) - plugin.connect(signals.attachments.attachment_created, _attachment_changed) - plugin.connect(signals.attachments.attachment_deleted, _attachment_changed) - plugin.connect(signals.attachments.attachment_updated, _attachment_changed) + plugin.connect(signals.attachments.folder_deleted, _attachment_folder_deleted) + plugin.connect(signals.attachments.attachment_created, _created) + plugin.connect(signals.attachments.attachment_deleted, _deleted) + plugin.connect(signals.attachments.attachment_updated, _updated) + plugin.connect(signals.acl.protection_changed, _attachment_folder_protection_changed, sender=AttachmentFolder) + plugin.connect(signals.acl.protection_changed, _protection_changed, sender=Attachment) + plugin.connect(signals.acl.entry_changed, _attachment_folder_acl_entry_changed, sender=AttachmentFolder) + plugin.connect(signals.acl.entry_changed, _acl_entry_changed, sender=Attachment) def _moved(obj, old_parent, **kwargs): @@ -80,7 +95,7 @@ def _moved(obj, old_parent, **kwargs): def _created(obj, **kwargs): - if isinstance(obj, Event): + if isinstance(obj, (Event, EventNote, Attachment)): parent = None elif isinstance(obj, Contribution): parent = obj.event @@ -105,12 +120,21 @@ def _event_times_changed(sender, obj, **kwargs): _register_change(obj, ChangeType.data_changed) +def _session_block_location_changed(sender, obj, **kwargs): + for contrib in obj.contributions: + _register_change(contrib, ChangeType.location_changed) + + +def _location_changed(sender, obj, **kwargs): + _register_change(obj, ChangeType.location_changed) + + def _timetable_changed(entry, **kwargs): _register_change(entry.event, ChangeType.data_changed) def _category_protection_changed(sender, obj, mode, old_mode, **kwargs): - parent_mode = obj.protection_parent.effective_protection_mode + parent_mode = obj.protection_parent.effective_protection_mode if obj.protection_parent else None if ((old_mode == ProtectionMode.inheriting and parent_mode == mode) or (old_mode == parent_mode and mode == ProtectionMode.inheriting)): return @@ -142,15 +166,29 @@ def _acl_entry_changed(sender, obj, entry, old_data, **kwargs): _register_change(obj, ChangeType.protection_changed) -def _note_changed(note, **kwargs): - obj = note.event if isinstance(note.object, Session) else note.object - _register_change(obj, ChangeType.data_changed) +def _attachment_folder_deleted(folder, **kwargs): + if folder.link_type not in (LinkType.event, LinkType.contribution, LinkType.subcontribution): + return + for attachment in folder.attachments: + _register_deletion(attachment) -def _attachment_changed(attachment_or_folder, **kwargs): - folder = getattr(attachment_or_folder, 'folder', attachment_or_folder) - if not isinstance(folder.object, Category) and not isinstance(folder.object, Session): - _register_change(folder.object.event, ChangeType.data_changed) +def _attachment_folder_protection_changed(sender, obj, **kwargs): + if not inspect(obj).persistent: + return + if obj.link_type not in (LinkType.event, LinkType.contribution, LinkType.subcontribution): + return + for attachment in obj.attachments: + _register_change(attachment, ChangeType.protection_changed) + + +def _attachment_folder_acl_entry_changed(sender, obj, entry, old_data, **kwargs): + if not inspect(obj).persistent: + return + if obj.link_type not in (LinkType.event, LinkType.contribution, LinkType.subcontribution): + return + for attachment in obj.attachments: + _acl_entry_changed(type(attachment), attachment, entry, old_data) def _apply_changes(sender, **kwargs): @@ -168,7 +206,7 @@ def _register_deletion(obj): def _register_change(obj, action): if not isinstance(obj, Category): - event = obj.event + event = obj.folder.event if isinstance(obj, Attachment) else obj.event if event is None or event.is_deleted: # When deleting an event we get data change signals afterwards. We can simple ignore them. # Also, ACL changes during user merges might involve deleted objects which we also don't care about diff --git a/livesync/indico_livesync/initial.py b/livesync/indico_livesync/initial.py index d540822..bb73c85 100644 --- a/livesync/indico_livesync/initial.py +++ b/livesync/indico_livesync/initial.py @@ -21,6 +21,8 @@ from indico.modules.events.sessions import Session from indico.modules.events.sessions.models.blocks import SessionBlock from indico.modules.events.sessions.models.principals import SessionPrincipal +from indico_livesync.util import get_excluded_categories + def apply_acl_entry_strategy(rel, principal): user_strategy = rel.joinedload('user') @@ -38,13 +40,20 @@ def apply_acl_entry_strategy(rel, principal): return rel +def _get_excluded_category_filter(event_model=Event): + if excluded_category_ids := get_excluded_categories(): + return event_model.category_id.notin_(excluded_category_ids) + return True + + def query_events(): return ( Event.query .filter_by(is_deleted=False) + .filter(_get_excluded_category_filter()) .options( apply_acl_entry_strategy(selectinload(Event.acl_entries), EventPrincipal), - selectinload(Event.person_links), + selectinload(Event.person_links).joinedload('person').joinedload('user').load_only('is_system'), joinedload(Event.own_venue), joinedload(Event.own_room).options(raiseload('*'), joinedload('location')), ) @@ -73,10 +82,10 @@ def query_contributions(): return ( Contribution.query .join(Event) - .filter(~Contribution.is_deleted, ~Event.is_deleted) + .filter(~Contribution.is_deleted, ~Event.is_deleted, _get_excluded_category_filter()) .options( selectinload(Contribution.acl_entries), - selectinload(Contribution.person_links), + selectinload(Contribution.person_links).joinedload('person').joinedload('user').load_only('is_system'), event_strategy, session_strategy, session_block_strategy, @@ -97,6 +106,7 @@ def query_subcontributions(): contrib_strategy = contains_eager(SubContribution.contribution) contrib_strategy.joinedload(Contribution.own_venue) contrib_strategy.joinedload(Contribution.own_room).options(raiseload('*'), joinedload('location')) + contrib_strategy.joinedload(Contribution.timetable_entry) apply_acl_entry_strategy(contrib_strategy.selectinload(Contribution.acl_entries), ContributionPrincipal) event_strategy = contrib_strategy.contains_eager(Contribution.event.of_type(contrib_event)) @@ -119,9 +129,10 @@ def query_subcontributions(): .join(Contribution.event.of_type(contrib_event)) .outerjoin(Contribution.session.of_type(contrib_session)) .outerjoin(Contribution.session_block.of_type(contrib_block)) - .filter(~SubContribution.is_deleted, ~Contribution.is_deleted, ~contrib_event.is_deleted) + .filter(~SubContribution.is_deleted, ~Contribution.is_deleted, ~contrib_event.is_deleted, + _get_excluded_category_filter(contrib_event)) .options( - selectinload(SubContribution.person_links), + selectinload(SubContribution.person_links).joinedload('person').joinedload('user').load_only('is_system'), contrib_strategy, event_strategy, session_strategy, @@ -190,19 +201,22 @@ def query_attachments(): .filter(AttachmentFolder.link_type != LinkType.category) .filter(db.or_( AttachmentFolder.link_type != LinkType.event, - ~Event.is_deleted + ~Event.is_deleted & _get_excluded_category_filter(), )) .filter(db.or_( AttachmentFolder.link_type != LinkType.contribution, - ~Contribution.is_deleted & ~contrib_event.is_deleted + ~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event) )) .filter(db.or_( AttachmentFolder.link_type != LinkType.subcontribution, - ~SubContribution.is_deleted & ~subcontrib_contrib.is_deleted & ~subcontrib_event.is_deleted + db.and_(~SubContribution.is_deleted, + ~subcontrib_contrib.is_deleted, + ~subcontrib_event.is_deleted, + _get_excluded_category_filter(subcontrib_event)) )) .filter(db.or_( AttachmentFolder.link_type != LinkType.session, - ~Session.is_deleted & ~session_event.is_deleted + ~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event) )) .order_by(Attachment.id) ) @@ -261,23 +275,26 @@ def query_notes(): .filter(~EventNote.is_deleted) .filter(db.or_( EventNote.link_type != LinkType.event, - ~Event.is_deleted + ~Event.is_deleted & _get_excluded_category_filter() )) .filter(db.or_( EventNote.link_type != LinkType.contribution, - ~Contribution.is_deleted & ~contrib_event.is_deleted + ~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event) )) .filter(db.or_( EventNote.link_type != LinkType.subcontribution, - ~SubContribution.is_deleted & ~subcontrib_contrib.is_deleted & ~subcontrib_event.is_deleted + db.and_(~SubContribution.is_deleted, + ~subcontrib_contrib.is_deleted, + ~subcontrib_event.is_deleted, + _get_excluded_category_filter(subcontrib_event)) )) .filter(db.or_( EventNote.link_type != LinkType.session, - ~Session.is_deleted & ~session_event.is_deleted + ~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event) )) .options( note_strategy, - joinedload(EventNote.current_revision).raiseload(EventNoteRevision.user), + joinedload(EventNote.current_revision).joinedload(EventNoteRevision.user).joinedload('_affiliation'), ) .order_by(EventNote.id) ) diff --git a/livesync/indico_livesync/migrations/20201023_1224_6ef9616e57cb_add_note_id_and_update_constraints.py b/livesync/indico_livesync/migrations/20201023_1224_6ef9616e57cb_add_note_id_and_update_constraints.py new file mode 100644 index 0000000..756845b --- /dev/null +++ b/livesync/indico_livesync/migrations/20201023_1224_6ef9616e57cb_add_note_id_and_update_constraints.py @@ -0,0 +1,60 @@ +"""Add note_id and update constraints + +Revision ID: 6ef9616e57cb +Revises: aa0dbc6c14aa +Create Date: 2020-10-23 12:24:51.648130 +""" + +import sqlalchemy as sa +from alembic import op + + +# revision identifiers, used by Alembic. +revision = '6ef9616e57cb' +down_revision = 'aa0dbc6c14aa' +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column('queues', sa.Column('note_id', sa.Integer(), nullable=True, index=True), schema='plugin_livesync') + op.create_foreign_key(None, 'queues', 'notes', ['note_id'], ['id'], source_schema='plugin_livesync', + referent_schema='events') + op.drop_constraint('ck_queues_valid_enum_type', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_category_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_event_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_contribution_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_subcontribution_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_session_entry', 'queues', schema='plugin_livesync') + op.execute(''' + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_enum_type CHECK ((type = ANY (ARRAY[1, 2, 3, 4, 5, 6]))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_category_entry CHECK (((type <> 1) OR ((contribution_id IS NULL) AND (event_id IS NULL) AND (note_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (category_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_event_entry CHECK (((type <> 2) OR ((category_id IS NULL) AND (contribution_id IS NULL) AND (note_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (event_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_contribution_entry CHECK (((type <> 3) OR ((category_id IS NULL) AND (event_id IS NULL) AND (note_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (contribution_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_subcontribution_entry CHECK (((type <> 4) OR ((category_id IS NULL) AND (contribution_id IS NULL) AND (event_id IS NULL) AND (note_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_session_entry CHECK (((type <> 5) OR ((category_id IS NULL) AND (contribution_id IS NULL) AND (event_id IS NULL) AND (note_id IS NULL) AND (subcontribution_id IS NULL) AND (session_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_note_entry CHECK (((type <> 6) OR ((category_id IS NULL) AND (contribution_id IS NULL) AND (event_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (note_id IS NOT NULL)))); + ''') + + +def downgrade(): + op.execute('DELETE FROM plugin_livesync.queues WHERE type = 6') + + op.drop_constraint('ck_queues_valid_enum_type', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_category_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_event_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_contribution_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_subcontribution_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_session_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_note_entry', 'queues', schema='plugin_livesync') + + op.drop_column('queues', 'note_id', schema='plugin_livesync') + + op.execute(''' + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_enum_type CHECK ((type = ANY (ARRAY[1, 2, 3, 4, 5]))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_category_entry CHECK (((type <> 1) OR ((contribution_id IS NULL) AND (event_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (category_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_event_entry CHECK (((type <> 2) OR ((category_id IS NULL) AND (contribution_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (event_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_contribution_entry CHECK (((type <> 3) OR ((category_id IS NULL) AND (event_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (contribution_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_subcontribution_entry CHECK (((type <> 4) OR ((category_id IS NULL) AND (contribution_id IS NULL) AND (event_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_session_entry CHECK (((type <> 5) OR ((category_id IS NULL) AND (contribution_id IS NULL) AND (event_id IS NULL) AND (subcontribution_id IS NULL) AND (session_id IS NOT NULL)))); + ''') diff --git a/livesync/indico_livesync/migrations/20210427_1359_d8e65cb6160d_add_attachment_id_to_queue.py b/livesync/indico_livesync/migrations/20210427_1359_d8e65cb6160d_add_attachment_id_to_queue.py new file mode 100644 index 0000000..dd1fce9 --- /dev/null +++ b/livesync/indico_livesync/migrations/20210427_1359_d8e65cb6160d_add_attachment_id_to_queue.py @@ -0,0 +1,64 @@ +"""Add attachment_id to queue + +Revision ID: d8e65cb6160d +Revises: 6ef9616e57cb +Create Date: 2021-04-27 13:59:11.538263 +""" + +import sqlalchemy as sa +from alembic import op + + +# revision identifiers, used by Alembic. +revision = 'd8e65cb6160d' +down_revision = '6ef9616e57cb' +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column('queues', sa.Column('attachment_id', sa.Integer(), nullable=True, index=True), schema='plugin_livesync') + op.create_foreign_key(None, 'queues', 'attachments', ['attachment_id'], ['id'], source_schema='plugin_livesync', + referent_schema='attachments') + op.drop_constraint('ck_queues_valid_enum_type', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_category_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_event_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_contribution_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_subcontribution_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_session_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_note_entry', 'queues', schema='plugin_livesync') + op.execute(''' + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_enum_type CHECK ((type = ANY (ARRAY[1, 2, 3, 4, 5, 6, 7]))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_attachment_entry CHECK (((type <> 7) OR ((category_id IS NULL) AND (contribution_id IS NULL) AND (event_id IS NULL) AND (note_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (attachment_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_category_entry CHECK (((type <> 1) OR ((attachment_id IS NULL) AND (contribution_id IS NULL) AND (event_id IS NULL) AND (note_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (category_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_contribution_entry CHECK (((type <> 3) OR ((attachment_id IS NULL) AND (category_id IS NULL) AND (event_id IS NULL) AND (note_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (contribution_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_event_entry CHECK (((type <> 2) OR ((attachment_id IS NULL) AND (category_id IS NULL) AND (contribution_id IS NULL) AND (note_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (event_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_note_entry CHECK (((type <> 6) OR ((attachment_id IS NULL) AND (category_id IS NULL) AND (contribution_id IS NULL) AND (event_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (note_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_session_entry CHECK (((type <> 5) OR ((attachment_id IS NULL) AND (category_id IS NULL) AND (contribution_id IS NULL) AND (event_id IS NULL) AND (note_id IS NULL) AND (subcontribution_id IS NULL) AND (session_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_subcontribution_entry CHECK (((type <> 4) OR ((attachment_id IS NULL) AND (category_id IS NULL) AND (contribution_id IS NULL) AND (event_id IS NULL) AND (note_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NOT NULL)))); + ''') + + +def downgrade(): + op.execute('DELETE FROM plugin_livesync.queues WHERE type = 7') + + op.drop_constraint('ck_queues_valid_enum_type', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_category_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_event_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_contribution_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_subcontribution_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_session_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_note_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_attachment_entry', 'queues', schema='plugin_livesync') + + op.drop_column('queues', 'attachment_id', schema='plugin_livesync') + + op.execute(''' + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_enum_type CHECK ((type = ANY (ARRAY[1, 2, 3, 4, 5, 6]))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_category_entry CHECK (((type <> 1) OR ((contribution_id IS NULL) AND (event_id IS NULL) AND (note_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (category_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_contribution_entry CHECK (((type <> 3) OR ((category_id IS NULL) AND (event_id IS NULL) AND (note_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (contribution_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_event_entry CHECK (((type <> 2) OR ((category_id IS NULL) AND (contribution_id IS NULL) AND (note_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (event_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_note_entry CHECK (((type <> 6) OR ((category_id IS NULL) AND (contribution_id IS NULL) AND (event_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NULL) AND (note_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_session_entry CHECK (((type <> 5) OR ((category_id IS NULL) AND (contribution_id IS NULL) AND (event_id IS NULL) AND (note_id IS NULL) AND (subcontribution_id IS NULL) AND (session_id IS NOT NULL)))); + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT ck_queues_valid_subcontribution_entry CHECK (((type <> 4) OR ((category_id IS NULL) AND (contribution_id IS NULL) AND (event_id IS NULL) AND (note_id IS NULL) AND (session_id IS NULL) AND (subcontribution_id IS NOT NULL)))); + ''') diff --git a/livesync/indico_livesync/migrations/20210506_1917_02a78555cdcb_add_location_changed_change_type.py b/livesync/indico_livesync/migrations/20210506_1917_02a78555cdcb_add_location_changed_change_type.py new file mode 100644 index 0000000..720f478 --- /dev/null +++ b/livesync/indico_livesync/migrations/20210506_1917_02a78555cdcb_add_location_changed_change_type.py @@ -0,0 +1,30 @@ +"""Add location_changed change type + +Revision ID: 02a78555cdcb +Revises: d8e65cb6160d +Create Date: 2021-05-06 19:17:41.256096 +""" + +from alembic import op + + +# revision identifiers, used by Alembic. +revision = '02a78555cdcb' +down_revision = 'd8e65cb6160d' +branch_labels = None +depends_on = None + + +def upgrade(): + op.execute(''' + ALTER TABLE plugin_livesync.queues DROP CONSTRAINT "ck_queues_valid_enum_change"; + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT "ck_queues_valid_enum_change" CHECK ((change = ANY (ARRAY[1, 2, 3, 4, 5, 6]))); + ''') + + +def downgrade(): + op.execute('DELETE FROM plugin_livesync.queues WHERE change = 6') + op.execute(''' + ALTER TABLE plugin_livesync.queues DROP CONSTRAINT "ck_queues_valid_enum_change"; + ALTER TABLE plugin_livesync.queues ADD CONSTRAINT "ck_queues_valid_enum_change" CHECK ((change = ANY (ARRAY[1, 2, 3, 4, 5]))); + ''') diff --git a/livesync/indico_livesync/models/queue.py b/livesync/indico_livesync/models/queue.py index 5bedade..707bfdf 100644 --- a/livesync/indico_livesync/models/queue.py +++ b/livesync/indico_livesync/models/queue.py @@ -6,11 +6,10 @@ # see the LICENSE file for more details. from flask import g -from werkzeug.datastructures import ImmutableDict from indico.core.db.sqlalchemy import PyIntEnum, UTCDateTime, db +from indico.modules.attachments.models.attachments import Attachment from indico.modules.categories.models.categories import Category -from indico.modules.events.models.events import Event from indico.util.date_time import now_utc from indico.util.enum import IndicoEnum from indico.util.string import format_repr @@ -25,6 +24,7 @@ class ChangeType(int, IndicoEnum): moved = 3 data_changed = 4 protection_changed = 5 + location_changed = 6 class EntryType(int, IndicoEnum): @@ -33,6 +33,8 @@ class EntryType(int, IndicoEnum): contribution = 3 subcontribution = 4 session = 5 + note = 6 + attachment = 7 _column_for_types = { @@ -40,7 +42,9 @@ _column_for_types = { EntryType.event: 'event_id', EntryType.contribution: 'contribution_id', EntryType.subcontribution: 'subcontribution_id', - EntryType.session: 'session_id' + EntryType.session: 'session_id', + EntryType.note: 'note_id', + EntryType.attachment: 'attachment_id', } @@ -142,6 +146,24 @@ class LiveSyncQueueEntry(db.Model): nullable=True ) + #: ID of the changed note + note_id = db.Column( + 'note_id', + db.Integer, + db.ForeignKey('events.notes.id'), + index=True, + nullable=True + ) + + #: ID of the changed attachment + attachment_id = db.Column( + 'attachment_id', + db.Integer, + db.ForeignKey('attachments.attachments.id'), + index=True, + nullable=True + ) + #: The associated :class:LiveSyncAgent agent = db.relationship( 'LiveSyncAgent', @@ -198,6 +220,26 @@ class LiveSyncQueueEntry(db.Model): ) ) + note = db.relationship( + 'EventNote', + lazy=False, + backref=db.backref( + 'livesync_queue_entries', + cascade='all, delete-orphan', + lazy='dynamic' + ) + ) + + attachment = db.relationship( + 'Attachment', + lazy=False, + backref=db.backref( + 'livesync_queue_entries', + cascade='all, delete-orphan', + lazy='dynamic' + ) + ) + @property def object(self): """Return the changed object.""" @@ -211,16 +253,15 @@ class LiveSyncQueueEntry(db.Model): return self.contribution elif self.type == EntryType.subcontribution: return self.subcontribution - - @property - def object_ref(self): - """Return the reference of the changed object.""" - return ImmutableDict(type=self.type, category_id=self.category_id, event_id=self.event_id, - session_id=self.session_id, contrib_id=self.contrib_id, subcontrib_id=self.subcontrib_id) + elif self.type == EntryType.note: + return self.note + elif self.type == EntryType.attachment: + return self.attachment def __repr__(self): return format_repr(self, 'id', 'agent_id', 'change', 'type', - category_id=None, event_id=None, session_id=None, contrib_id=None, subcontrib_id=None) + category_id=None, event_id=None, session_id=None, contrib_id=None, subcontrib_id=None, + note_id=None, attachment_id=None) @classmethod def create(cls, changes, ref, excluded_categories=set()): @@ -240,7 +281,7 @@ class LiveSyncQueueEntry(db.Model): if any(c.id in excluded_categories for c in obj.chain_query): return else: - event = obj if isinstance(obj, Event) else obj.event + event = obj.folder.event if isinstance(obj, Attachment) else obj.event if event.category not in g.setdefault('livesync_excluded_categories_checked', {}): g.livesync_excluded_categories_checked[event.category] = excluded_categories & set(event.category_chain) if g.livesync_excluded_categories_checked[event.category]: diff --git a/livesync/indico_livesync/plugin.py b/livesync/indico_livesync/plugin.py index 394eea5..b2b8e82 100644 --- a/livesync/indico_livesync/plugin.py +++ b/livesync/indico_livesync/plugin.py @@ -5,6 +5,7 @@ # them and/or modify them under the terms of the MIT License; # see the LICENSE file for more details. +from wtforms.fields.core import BooleanField from wtforms.fields.html5 import IntegerField from wtforms.validators import NumberRange @@ -12,6 +13,7 @@ from indico.core import signals from indico.core.plugins import IndicoPlugin, PluginCategory from indico.web.forms.base import IndicoForm from indico.web.forms.fields import MultipleItemsField +from indico.web.forms.widgets import SwitchWidget from indico_livesync import _ from indico_livesync.blueprint import blueprint @@ -31,6 +33,8 @@ class SettingsForm(IndicoForm): fields=[{'id': 'id', 'caption': _("Category ID"), 'required': True}], description=_("Changes to objects inside these categories or any of their " "subcategories are excluded.")) + disable_queue_runs = BooleanField(_('Disable queue runs'), widget=SwitchWidget(), + description=_('Disable all scheduled queue runs.')) class LiveSyncPlugin(IndicoPlugin): @@ -42,7 +46,8 @@ class LiveSyncPlugin(IndicoPlugin): configurable = True settings_form = SettingsForm default_settings = {'excluded_categories': [], - 'queue_entry_ttl': 0} + 'queue_entry_ttl': 0, + 'disable_queue_runs': False} category = PluginCategory.synchronization def init(self): diff --git a/livesync/indico_livesync/simplify.py b/livesync/indico_livesync/simplify.py index 61dca60..4a9241e 100644 --- a/livesync/indico_livesync/simplify.py +++ b/livesync/indico_livesync/simplify.py @@ -11,10 +11,14 @@ from collections import defaultdict from sqlalchemy.orm import joinedload from indico.core.db import db +from indico.modules.attachments.models.attachments import Attachment +from indico.modules.attachments.models.folders import AttachmentFolder from indico.modules.categories.models.categories import Category from indico.modules.events.contributions.models.contributions import Contribution from indico.modules.events.contributions.models.subcontributions import SubContribution from indico.modules.events.models.events import Event +from indico.modules.events.notes.models.notes import EventNote +from indico.modules.events.sessions import Session from indico.util.enum import IndicoEnum from indico_livesync.models.queue import ChangeType, EntryType @@ -26,6 +30,9 @@ class SimpleChange(int, IndicoEnum): updated = 4 +CREATED_DELETED = SimpleChange.created | SimpleChange.deleted + + def process_records(records): """Converts queue entries into object changes. @@ -33,8 +40,10 @@ def process_records(records): :return: a dict mapping object references to `SimpleChange` bitsets """ changes = defaultdict(int) + cascaded_create_records = set() cascaded_update_records = set() cascaded_delete_records = set() + cascaded_location_changes = set() for record in records: if record.change != ChangeType.deleted and record.object is None: @@ -43,7 +52,7 @@ def process_records(records): continue if record.change == ChangeType.created: assert record.type != EntryType.category - changes[record.object] |= SimpleChange.created + cascaded_create_records.add(record) elif record.change == ChangeType.deleted: assert record.type != EntryType.category cascaded_delete_records.add(record) @@ -52,6 +61,14 @@ def process_records(records): elif record.change == ChangeType.data_changed: assert record.type != EntryType.category changes[record.object] |= SimpleChange.updated + # subcontributions have their parent's time information, so we need to + # cascade contribution updates to them + if record.type == EntryType.contribution: + for subcontrib in record.object.subcontributions: + changes[subcontrib] |= SimpleChange.updated + elif record.change == ChangeType.location_changed: + assert record.type in (EntryType.event, EntryType.contribution, EntryType.session) + cascaded_location_changes.add(record) for obj in _process_cascaded_category_contents(cascaded_update_records): changes[obj] |= SimpleChange.updated @@ -59,6 +76,17 @@ def process_records(records): for obj in _process_cascaded_event_contents(cascaded_delete_records): changes[obj] |= SimpleChange.deleted + for obj in _process_cascaded_event_contents(cascaded_create_records): + changes[obj] |= SimpleChange.created + + for obj in _process_cascaded_locations(cascaded_location_changes): + changes[obj] |= SimpleChange.updated + + created_and_deleted = {obj for obj, flags in changes.items() if (flags & CREATED_DELETED) == CREATED_DELETED} + for obj in created_and_deleted: + # discard any change where the object was both created and deleted + del changes[obj] + return changes @@ -110,37 +138,120 @@ def _process_cascaded_event_contents(records, additional_events=None): found in records """ changed_events = additional_events or set() + changed_sessions = set() changed_contributions = set() changed_subcontributions = set() + changed_attachments = set() + changed_notes = set() + note_records = {rec.note_id for rec in records if rec.type == EntryType.note} + attachment_records = {rec.attachment_id for rec in records if rec.type == EntryType.attachment} session_records = {rec.session_id for rec in records if rec.type == EntryType.session} contribution_records = {rec.contrib_id for rec in records if rec.type == EntryType.contribution} subcontribution_records = {rec.subcontrib_id for rec in records if rec.type == EntryType.subcontribution} event_records = {rec.event_id for rec in records if rec.type == EntryType.event} + if attachment_records: + changed_attachments.update(Attachment.query.filter(Attachment.id.in_(attachment_records))) + + if note_records: + changed_notes.update(EventNote.query.filter(EventNote.id.in_(note_records))) + if event_records: changed_events.update(Event.query.filter(Event.id.in_(event_records))) - yield from changed_events - - # Sessions are added (explicitly changed only, since they don't need to be sent anywhere) - if session_records: - changed_contributions.update(Contribution.query - .filter(Contribution.session_id.in_(session_records), ~Contribution.is_deleted)) - - # Contributions are added (implictly + explicitly changed) changed_event_ids = {ev.id for ev in changed_events} - condition = Contribution.event_id.in_(changed_event_ids) & ~Contribution.is_deleted - if contribution_records: - condition = db.or_(condition, Contribution.id.in_(contribution_records)) - contrib_query = Contribution.query.filter(condition).options(joinedload('subcontributions')) + if changed_event_ids: + changed_attachments.update( + Attachment.query.filter( + Attachment.folder.has(AttachmentFolder.linked_event_id.in_(changed_event_ids)) + ) + ) + changed_notes.update(EventNote.query.filter(EventNote.linked_event_id.in_(changed_event_ids))) - for contribution in contrib_query: + yield from changed_events + + # Sessions are added (implictly + explicitly changed) + if changed_event_ids or session_records: + condition = Session.event_id.in_(changed_event_ids) & ~Session.is_deleted + if session_records: + condition = db.or_(condition, Session.id.in_(session_records)) + changed_sessions.update(Session.query.filter(Session.event_id.in_(changed_event_ids), ~Session.is_deleted)) + + if changed_sessions: + # XXX I kept this very similar to the structure of the code for contributions below, + # but why aren't we just merging this into the block right above?! + changed_session_ids = {s.id for s in changed_sessions} + changed_contributions.update(Contribution.query + .filter(Contribution.session_id.in_(changed_session_ids), + ~Contribution.is_deleted)) + changed_attachments.update( + Attachment.query.filter( + ~Attachment.is_deleted, + Attachment.folder.has(db.and_(AttachmentFolder.session_id.in_(changed_session_ids), + ~AttachmentFolder.is_deleted)) + ) + ) + changed_notes.update(EventNote.query.filter(EventNote.session_id.in_(changed_session_ids), + ~EventNote.is_deleted)) + + # Contributions are added (implictly + explicitly changed) + if changed_event_ids or contribution_records: + condition = Contribution.event_id.in_(changed_event_ids) & ~Contribution.is_deleted + if contribution_records: + condition = db.or_(condition, Contribution.id.in_(contribution_records)) + changed_contributions.update(Contribution.query.filter(condition).options(joinedload('subcontributions'))) + + for contribution in changed_contributions: yield contribution changed_subcontributions.update(contribution.subcontributions) + if changed_contributions: + changed_contribution_ids = {c.id for c in changed_contributions} + changed_attachments.update( + Attachment.query.filter( + ~Attachment.is_deleted, + Attachment.folder.has(db.and_(AttachmentFolder.contribution_id.in_(changed_contribution_ids), + ~AttachmentFolder.is_deleted)) + ) + ) + changed_notes.update(EventNote.query.filter(EventNote.contribution_id.in_(changed_contribution_ids), + ~EventNote.is_deleted)) + # Same for subcontributions if subcontribution_records: changed_subcontributions.update(SubContribution.query.filter(SubContribution.id.in_(subcontribution_records))) + + if changed_subcontributions: + changed_subcontribution_ids = {sc.id for sc in changed_subcontributions} + changed_attachments.update( + Attachment.query.filter( + ~Attachment.is_deleted, + Attachment.folder.has(db.and_(AttachmentFolder.subcontribution_id.in_(changed_subcontribution_ids), + ~AttachmentFolder.is_deleted)) + ) + ) + changed_notes.update(EventNote.query.filter(EventNote.subcontribution_id.in_(changed_subcontribution_ids), + ~EventNote.is_deleted)) + yield from changed_subcontributions + yield from changed_attachments + yield from changed_notes + + +def _process_cascaded_locations(records): + contributions = {rec.contribution for rec in records if rec.type == EntryType.contribution} + events = {rec.event for rec in records if rec.type == EntryType.event} + event_ids = {e.id for e in events} + session_ids = {rec.session_id for rec in records if rec.type == EntryType.session} + + # location of the event changed + yield from events + # location of the contribution changed + yield from contributions + # location of contributions inside an event may be inherited + # we don't check the inheritance since we're lazy and the chain is non-trivial + yield from Contribution.query.filter(Contribution.event_id.in_(event_ids), ~Contribution.is_deleted) + # location of a contribution inside a session may be inherited as well + yield from Contribution.query.filter(Contribution.session_id.in_(session_ids), ~Contribution.is_deleted) diff --git a/livesync/indico_livesync/task.py b/livesync/indico_livesync/task.py index 7a2f254..a68cfab 100644 --- a/livesync/indico_livesync/task.py +++ b/livesync/indico_livesync/task.py @@ -17,14 +17,19 @@ from indico_livesync.util import clean_old_entries @celery.periodic_task(run_every=crontab(minute='*/15'), plugin='livesync') def scheduled_update(): from indico_livesync.plugin import LiveSyncPlugin + if LiveSyncPlugin.settings.get('disable_queue_runs'): + LiveSyncPlugin.logger.warning('Queue runs are disabled') + return clean_old_entries() for agent in LiveSyncAgent.query.all(): if agent.backend is None: LiveSyncPlugin.logger.warning('Skipping agent %s; backend not found', agent.name) continue - if not agent.initial_data_exported: - LiveSyncPlugin.logger.warning('Skipping agent %s; initial export not performed yet', agent.name) + backend = agent.create_backend() + queue_allowed, reason = backend.check_queue_status() + if not queue_allowed: + LiveSyncPlugin.logger.warning('Skipping agent %s; queue runs disabled: %s', agent.name, reason) continue LiveSyncPlugin.logger.info('Running agent %s', agent.name) - agent.create_backend().run() + backend.run() db.session.commit() diff --git a/livesync/indico_livesync/templates/plugin_details_extra.html b/livesync/indico_livesync/templates/plugin_details_extra.html index 307cfed..78f85f0 100644 --- a/livesync/indico_livesync/templates/plugin_details_extra.html +++ b/livesync/indico_livesync/templates/plugin_details_extra.html @@ -26,8 +26,8 @@ {% trans %}Name{% endtrans %} {% trans %}Backend{% endtrans %} {% trans %}Last Run{% endtrans %} - {% trans %}Initial Export{% endtrans %} {% trans %}Queue{% endtrans %} + {% trans %}Status{% endtrans %} {% trans %}Actions{% endtrans %} @@ -50,14 +50,19 @@ {% trans %}Never{% endtrans %} {%- endif -%} + {{ agent.queue.filter_by(processed=false).count() }} - {% if agent.initial_data_exported %} - {% trans %}Done{% endtrans %} + {% if agent.backend %} + {% set queue_ready, reason = agent.create_backend().check_queue_status() %} + {% if queue_ready %} + {% trans %}Ready{% endtrans %} + {% else %} + {{ reason }} + {% endif %} {% else %} - {% trans %}Pending{% endtrans %} + {% trans %}n/a{% endtrans %} {% endif %} - {{ agent.queue.filter_by(processed=false).count() }}