diff --git a/citadel/indico_citadel/backend.py b/citadel/indico_citadel/backend.py index 3a8e316..b09e7aa 100644 --- a/citadel/indico_citadel/backend.py +++ b/citadel/indico_citadel/backend.py @@ -20,7 +20,7 @@ from requests.adapters import HTTPAdapter from requests.exceptions import RequestException from sqlalchemy import select from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm import contains_eager +from sqlalchemy.orm import contains_eager, joinedload from sqlalchemy.orm.attributes import flag_modified from urllib3 import Retry from werkzeug.urls import url_join @@ -48,9 +48,9 @@ def _format_change_str(change): def _print_record(record): - obj_type, obj_id, data, changes = record + obj_type, obj_id, citadel_id, data, changes = record print() # verbose_iterator during initial exports doesn't end its line - print(f'{_format_change_str(changes)}: {obj_type.name} {obj_id}') + print(f'{_format_change_str(changes)}: {obj_type.name} {obj_id} [{citadel_id}]') if data is not None: print(highlight(pformat(data), lexer, formatter)) return record @@ -118,12 +118,14 @@ class LiveSyncCitadelUploader(Uploader): CitadelIdMap.create(object_type, object_id, new_citadel_id) db.session.commit() except IntegrityError: + # XXX this should no longer happen under any circumstances! # if we already have a mapping entry, delete the newly created record and # update the existing one in case something changed in the meantime self.logger.error(f'{object_type.name.title()} %d already in citadel; deleting+updating', object_id) db.session.rollback() self._citadel_delete(session, new_citadel_id, delete_mapping=False) existing_citadel_id = CitadelIdMap.get_citadel_id(object_type, object_id) + db.session.close() assert existing_citadel_id is not None self._citadel_update(session, existing_citadel_id, data) resp.close() @@ -147,7 +149,11 @@ class LiveSyncCitadelUploader(Uploader): try: resp = session.delete(url_join(self.search_app, f'api/record/{citadel_id}')) self.logger.debug('Deleted %d from citadel', citadel_id) - resp.raise_for_status() + if resp.status_code == 410: + # gone - record was probably already deleted + self.logger.warning('Record %d was already deleted on citadel', citadel_id) + else: + resp.raise_for_status() resp.close() except RequestException as exc: if resp := exc.response: @@ -158,21 +164,25 @@ class LiveSyncCitadelUploader(Uploader): db.session.commit() def upload_record(self, entry, session): - object_type, object_id, data, change_type = entry + object_type, object_id, citadel_id, data, change_type = entry - if change_type & SimpleChange.created: - self._citadel_create(session, object_type, object_id, data) - elif change_type & SimpleChange.deleted: - citadel_id = CitadelIdMap.get_citadel_id(object_type, object_id) + if change_type == SimpleChange.deleted: if citadel_id is None: self.logger.warning('Cannot delete %s %s: No citadel ID found', object_type.name, object_id) return self._citadel_delete(session, citadel_id, delete_mapping=True) - elif change_type & SimpleChange.updated: - citadel_id = CitadelIdMap.get_citadel_id(object_type, object_id) + elif change_type == SimpleChange.updated or (change_type == SimpleChange.created and citadel_id is not None): if citadel_id is None: raise Exception(f'Cannot update {object_type.name} {object_id}: No citadel ID found') + if change_type == SimpleChange.created: + self.logger.warning('Citadel ID exists for %s %s (%s); updating instead', + object_type.name, object_id, citadel_id) self._citadel_update(session, citadel_id, data) + elif change_type == SimpleChange.created: + self._citadel_create(session, object_type, object_id, data) + else: + # we shouldn't get any combined change type bitmasks here + raise Exception(f'invalid change type: {change_type}') def upload_file(self, entry, session): self.logger.debug('Uploading attachment %d (%s) [%s]', entry.attachment.file.id, @@ -202,9 +212,16 @@ class LiveSyncCitadelUploader(Uploader): resp.close() return False - def run_initial(self, records, total): + def _precache_categories(self): cte = Category.get_tree_cte(lambda cat: db.func.json_build_object('id', cat.id, 'title', cat.title)) self.categories = dict(db.session.execute(select([cte.c.id, cte.c.path])).fetchall()) + + def run(self, records): + self._precache_categories() + return super().run(records) + + def run_initial(self, records, total): + self._precache_categories() return super().run_initial(records, total) def _get_retry_config(self, initial): @@ -235,7 +252,9 @@ class LiveSyncCitadelUploader(Uploader): session.headers = self.headers dumped_records = ( ( - get_entry_type(rec), rec.id, + get_entry_type(rec), + rec.id, + rec.citadel_id_mapping.citadel_id if rec.citadel_id_mapping else None, self.dump_record(rec) if not (change_type & SimpleChange.deleted) else None, change_type ) for rec, change_type in records @@ -293,7 +312,11 @@ class LiveSyncCitadelBackend(LiveSyncBackendBase): query = super().get_initial_query(model_cls, force) if not force: query = query.filter(~model_cls.citadel_id_mapping.has()) - return query + return query.options(joinedload(model_cls.citadel_id_mapping)) + + def get_data_query(self, model_cls, ids): + query = super().get_data_query(model_cls, ids) + return query.options(joinedload(model_cls.citadel_id_mapping)) def process_queue(self, uploader): super().process_queue(uploader) diff --git a/livesync/indico_livesync/base.py b/livesync/indico_livesync/base.py index 95cce69..fbd45bb 100644 --- a/livesync/indico_livesync/base.py +++ b/livesync/indico_livesync/base.py @@ -123,6 +123,7 @@ class LiveSyncBackendBase: raise NotImplementedError uploader = self.uploader(self, verbose=verbose, from_cli=from_cli) + self._precache_categories() self.process_queue(uploader) self.update_last_run() @@ -144,6 +145,27 @@ class LiveSyncBackendBase: }[model_cls] return fn() + def get_data_query(self, model_cls, ids): + """Get the export query for a given model and set of ids. + + Supported models are `Event`, `Contribution`, `SubContribution`, + `Attachment` and `EventNote`. + + This is very similar to `get_initial_query` except that it will only + export the specified records. + + :param model_cls: The model class to query + :param ids: A collection of ids to query + """ + fn = { + Event: query_events, + Contribution: query_contributions, + SubContribution: query_subcontributions, + Attachment: query_attachments, + EventNote: query_notes, + }[model_cls] + return fn(ids=ids) + def run_initial_export(self, batch_size, force=False, verbose=False): """Runs the initial export. @@ -154,12 +176,7 @@ class LiveSyncBackendBase: raise NotImplementedError uploader = self.uploader(self, verbose=verbose, from_cli=True) - - Category.allow_relationship_preloading = True - Category.preload_relationships(Category.query, 'acl_entries', - strategy=lambda rel: apply_acl_entry_strategy(subqueryload(rel), - CategoryPrincipal)) - _category_cache = Category.query.all() # noqa: F841 + self._precache_categories() events = self.get_initial_query(Event, force) contributions = self.get_initial_query(Contribution, force) @@ -212,3 +229,10 @@ class LiveSyncBackendBase: """ self.agent.initial_data_exported = False self.agent.queue.delete() + + def _precache_categories(self): + Category.allow_relationship_preloading = True + Category.preload_relationships(Category.query, 'acl_entries', + strategy=lambda rel: apply_acl_entry_strategy(subqueryload(rel), + CategoryPrincipal)) + self._category_cache = Category.query.all() diff --git a/livesync/indico_livesync/initial.py b/livesync/indico_livesync/initial.py index bb73c85..536d236 100644 --- a/livesync/indico_livesync/initial.py +++ b/livesync/indico_livesync/initial.py @@ -46,11 +46,14 @@ def _get_excluded_category_filter(event_model=Event): return True -def query_events(): +def query_events(ids=None): + if ids is None: + export_filter = ~Event.is_deleted & _get_excluded_category_filter() + else: + export_filter = Event.id.in_(ids) return ( Event.query - .filter_by(is_deleted=False) - .filter(_get_excluded_category_filter()) + .filter(export_filter) .options( apply_acl_entry_strategy(selectinload(Event.acl_entries), EventPrincipal), selectinload(Event.person_links).joinedload('person').joinedload('user').load_only('is_system'), @@ -61,7 +64,7 @@ def query_events(): ) -def query_contributions(): +def query_contributions(ids=None): event_strategy = contains_eager(Contribution.event) event_strategy.joinedload(Event.own_venue) event_strategy.joinedload(Event.own_room).options(raiseload('*'), joinedload('location')) @@ -79,10 +82,15 @@ def query_contributions(): session_block_session_strategy.joinedload(Session.own_venue) session_block_session_strategy.joinedload(Session.own_room).options(raiseload('*'), joinedload('location')) + if ids is None: + export_filter = ~Contribution.is_deleted & ~Event.is_deleted & _get_excluded_category_filter() + else: + export_filter = Contribution.id.in_(ids) + return ( Contribution.query .join(Event) - .filter(~Contribution.is_deleted, ~Event.is_deleted, _get_excluded_category_filter()) + .filter(export_filter) .options( selectinload(Contribution.acl_entries), selectinload(Contribution.person_links).joinedload('person').joinedload('user').load_only('is_system'), @@ -98,7 +106,7 @@ def query_contributions(): ) -def query_subcontributions(): +def query_subcontributions(ids=None): contrib_event = db.aliased(Event) contrib_session = db.aliased(Session) contrib_block = db.aliased(SessionBlock) @@ -123,14 +131,21 @@ def query_subcontributions(): session_block_strategy.joinedload(contrib_block.own_venue) session_block_strategy.joinedload(contrib_block.own_room).options(raiseload('*'), joinedload('location')) + if ids is None: + export_filter = db.and_(~SubContribution.is_deleted, + ~Contribution.is_deleted, + ~contrib_event.is_deleted, + _get_excluded_category_filter(contrib_event)) + else: + export_filter = SubContribution.id.in_(ids) + return ( SubContribution.query .join(Contribution) .join(Contribution.event.of_type(contrib_event)) .outerjoin(Contribution.session.of_type(contrib_session)) .outerjoin(Contribution.session_block.of_type(contrib_block)) - .filter(~SubContribution.is_deleted, ~Contribution.is_deleted, ~contrib_event.is_deleted, - _get_excluded_category_filter(contrib_event)) + .filter(export_filter) .options( selectinload(SubContribution.person_links).joinedload('person').joinedload('user').load_only('is_system'), contrib_strategy, @@ -142,7 +157,7 @@ def query_subcontributions(): ) -def query_attachments(): +def query_attachments(ids=None): contrib_event = db.aliased(Event) contrib_session = db.aliased(Session) subcontrib_contrib = db.aliased(Contribution) @@ -183,6 +198,35 @@ def query_attachments(): session_strategy.contains_eager(Session.event.of_type(session_event)).selectinload(session_event.acl_entries) apply_acl_entry_strategy(session_strategy.selectinload(Session.acl_entries), SessionPrincipal) + if ids is None: + export_filter = db.and_( + ~Attachment.is_deleted, + ~AttachmentFolder.is_deleted, + db.or_( + AttachmentFolder.link_type != LinkType.event, + ~Event.is_deleted & _get_excluded_category_filter(), + ), + db.or_( + AttachmentFolder.link_type != LinkType.contribution, + ~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event) + ), + db.or_( + AttachmentFolder.link_type != LinkType.subcontribution, + db.and_( + ~SubContribution.is_deleted, + ~subcontrib_contrib.is_deleted, + ~subcontrib_event.is_deleted, + _get_excluded_category_filter(subcontrib_event) + ) + ), + db.or_( + AttachmentFolder.link_type != LinkType.session, + ~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event) + ) + ) + else: + export_filter = Attachment.id.in_(ids) + return ( Attachment.query .join(Attachment.folder) @@ -197,32 +241,13 @@ def query_attachments(): .outerjoin(subcontrib_contrib.session.of_type(subcontrib_session)) .outerjoin(AttachmentFolder.session) .outerjoin(Session.event.of_type(session_event)) - .filter(~Attachment.is_deleted, ~AttachmentFolder.is_deleted) + .filter(export_filter) .filter(AttachmentFolder.link_type != LinkType.category) - .filter(db.or_( - AttachmentFolder.link_type != LinkType.event, - ~Event.is_deleted & _get_excluded_category_filter(), - )) - .filter(db.or_( - AttachmentFolder.link_type != LinkType.contribution, - ~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event) - )) - .filter(db.or_( - AttachmentFolder.link_type != LinkType.subcontribution, - db.and_(~SubContribution.is_deleted, - ~subcontrib_contrib.is_deleted, - ~subcontrib_event.is_deleted, - _get_excluded_category_filter(subcontrib_event)) - )) - .filter(db.or_( - AttachmentFolder.link_type != LinkType.session, - ~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event) - )) .order_by(Attachment.id) ) -def query_notes(): +def query_notes(ids=None): contrib_event = db.aliased(Event) contrib_session = db.aliased(Session) subcontrib_contrib = db.aliased(Contribution) @@ -260,6 +285,34 @@ def query_notes(): session_strategy.contains_eager(Session.event.of_type(session_event)).selectinload(session_event.acl_entries) apply_acl_entry_strategy(session_strategy.selectinload(Session.acl_entries), SessionPrincipal) + if ids is None: + export_filter = db.and_( + ~EventNote.is_deleted, + db.or_( + EventNote.link_type != LinkType.event, + ~Event.is_deleted & _get_excluded_category_filter() + ), + db.or_( + EventNote.link_type != LinkType.contribution, + ~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event) + ), + db.or_( + EventNote.link_type != LinkType.subcontribution, + db.and_( + ~SubContribution.is_deleted, + ~subcontrib_contrib.is_deleted, + ~subcontrib_event.is_deleted, + _get_excluded_category_filter(subcontrib_event) + ) + ), + db.or_( + EventNote.link_type != LinkType.session, + ~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event) + ) + ) + else: + export_filter = EventNote.id.in_(ids) + return ( EventNote.query .outerjoin(EventNote.linked_event) @@ -272,26 +325,7 @@ def query_notes(): .outerjoin(subcontrib_contrib.session.of_type(subcontrib_session)) .outerjoin(EventNote.session) .outerjoin(Session.event.of_type(session_event)) - .filter(~EventNote.is_deleted) - .filter(db.or_( - EventNote.link_type != LinkType.event, - ~Event.is_deleted & _get_excluded_category_filter() - )) - .filter(db.or_( - EventNote.link_type != LinkType.contribution, - ~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event) - )) - .filter(db.or_( - EventNote.link_type != LinkType.subcontribution, - db.and_(~SubContribution.is_deleted, - ~subcontrib_contrib.is_deleted, - ~subcontrib_event.is_deleted, - _get_excluded_category_filter(subcontrib_event)) - )) - .filter(db.or_( - EventNote.link_type != LinkType.session, - ~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event) - )) + .filter(export_filter) .options( note_strategy, joinedload(EventNote.current_revision).joinedload(EventNoteRevision.user).joinedload('_affiliation'), diff --git a/livesync/indico_livesync/simplify.py b/livesync/indico_livesync/simplify.py index d4c7032..008df18 100644 --- a/livesync/indico_livesync/simplify.py +++ b/livesync/indico_livesync/simplify.py @@ -33,6 +33,17 @@ class SimpleChange(int, IndicoEnum): CREATED_DELETED = SimpleChange.created | SimpleChange.deleted +def _get_final_change(change_flags): + if change_flags & SimpleChange.deleted: + assert not (change_flags & SimpleChange.created) # filtered out earlier + return SimpleChange.deleted + elif change_flags & SimpleChange.created: + return SimpleChange.created + elif change_flags & SimpleChange.updated: + return SimpleChange.updated + raise Exception(f'Invalid change flags: {change_flags}') + + def process_records(records): """Converts queue entries into object changes. @@ -87,7 +98,7 @@ def process_records(records): # discard any change where the object was both created and deleted del changes[obj] - return changes + return {obj: _get_final_change(flags) for obj, flags in changes.items()} def _process_cascaded_category_contents(records): diff --git a/livesync/indico_livesync/uploader.py b/livesync/indico_livesync/uploader.py index c313695..bcaf627 100644 --- a/livesync/indico_livesync/uploader.py +++ b/livesync/indico_livesync/uploader.py @@ -6,6 +6,7 @@ # see the LICENSE file for more details. import re +from collections import defaultdict from indico.core.db import db from indico.util.console import verbose_iterator @@ -29,13 +30,15 @@ class Uploader: :param records: an iterable containing queue entries """ self_name = type(self).__name__ - simplified = process_records(records).items() + simplified = self.query_data(process_records(records)).items() total = len(simplified) if self.from_cli: simplified = self._make_verbose(simplified, total) try: self.logger.info(f'{self_name} uploading %d changes from %d records', total, len(records)) - self.upload_records(simplified) + if not self.upload_records(simplified): + self.logger.warning('uploader indicated a failure') + return except Exception: self.logger.exception(f'{self_name} failed') if self.from_cli: @@ -64,6 +67,26 @@ class Uploader: print_total_time=True ) + def query_data(self, records): + """Query the data needed to dump records efficiently. + + This function queries the verbose data for the given records. + + :param records: an dict mapping objects to changes + :return: a dict of the same structure + """ + by_model = defaultdict(set) + change_by_obj = {} + for obj, change in records.items(): + by_model[type(obj)].add(obj.id) + change_by_obj[type(obj), obj.id] = change + rv = {} + for model, ids in by_model.items(): + for obj in self.backend.get_data_query(model, ids).yield_per(5000): + rv[obj] = change_by_obj[model, obj.id] + assert len(records) == len(rv) + return rv + def upload_records(self, records, initial=False): """Executed to upload records. diff --git a/livesync/tests/simplify_test.py b/livesync/tests/simplify_test.py index c9959ee..c27617b 100644 --- a/livesync/tests/simplify_test.py +++ b/livesync/tests/simplify_test.py @@ -92,6 +92,12 @@ def test_process_records_simplify(changes, db, create_event, dummy_agent): # created+deleted items are discarded if expected[i] & CREATED_DELETED == CREATED_DELETED: expected[i] = 0 + elif expected[i] & SimpleChange.deleted: + expected[i] = SimpleChange.deleted + elif expected[i] & SimpleChange.created: + expected[i] = SimpleChange.created + elif expected[i] & SimpleChange.updated: + expected[i] = SimpleChange.updated db.session.flush() diff --git a/livesync/tests/uploader_test.py b/livesync/tests/uploader_test.py index 9249f48..dd10f34 100644 --- a/livesync/tests/uploader_test.py +++ b/livesync/tests/uploader_test.py @@ -8,6 +8,7 @@ from operator import attrgetter from unittest.mock import MagicMock +from indico_livesync.base import LiveSyncBackendBase from indico_livesync.models.queue import ChangeType, EntryType, LiveSyncQueueEntry from indico_livesync.simplify import SimpleChange, _process_cascaded_event_contents from indico_livesync.uploader import Uploader @@ -23,6 +24,7 @@ class RecordingUploader(Uploader): def upload_records(self, records, initial=False): self._uploaded = list(records) + return True @property def all_uploaded(self): @@ -36,6 +38,12 @@ class FailingUploader(RecordingUploader): raise Exception('All your data are belong to us!') +class TestBackend(LiveSyncBackendBase): + def __init__(self): + super().__init__(MagicMock()) + self.plugin = MagicMock() + + def test_run_initial(mocker): """Test the initial upload""" mocker.patch.object(Uploader, 'processed_records', autospec=True) @@ -54,7 +62,7 @@ def _sorted_process_cascaded_event_contents(records, additional_events=None, *, def test_run(mocker, monkeypatch, db, create_event, dummy_agent): """Test uploading queued data""" - uploader = RecordingUploader(MagicMock()) + uploader = RecordingUploader(TestBackend()) events = tuple(create_event(id_=evt_id) for evt_id in range(4)) records = tuple(LiveSyncQueueEntry(change=ChangeType.created, type=EntryType.event, event_id=evt.id, @@ -80,7 +88,7 @@ def test_run(mocker, monkeypatch, db, create_event, dummy_agent): def test_run_failing(mocker, monkeypatch, db, create_event, dummy_agent): """Test a failing queue run""" - uploader = FailingUploader(MagicMock()) + uploader = FailingUploader(TestBackend()) events = tuple(create_event(id_=evt_id) for evt_id in range(10)) records = tuple(LiveSyncQueueEntry(change=ChangeType.created, type=EntryType.event, event_id=evt.id,