From a5595615a4be7c5b3babc6cb19dadf39fafa1506 Mon Sep 17 00:00:00 2001 From: Adrian Moennich Date: Fri, 28 May 2021 11:28:54 +0200 Subject: [PATCH] LiveSync: Use efficient queries in queue runs While simplify is still very spammy in case of large queues, at least the actual data dumping for the export itself no longer spams queries. This is especially crucial in the Citadel plugin where we have massive parallelism which can easily exhaust the DB connection pool if we get queries there. For the same reason, the citadel ID mapping is now queried eagerly so we don't need to try creating and then delete+update if we realize there's already an existing citadel mapping, as we can simply check before even sending something and then convert the creation to an update. --- citadel/indico_citadel/backend.py | 51 +++++++--- livesync/indico_livesync/base.py | 36 +++++-- livesync/indico_livesync/initial.py | 134 +++++++++++++++++---------- livesync/indico_livesync/simplify.py | 13 ++- livesync/indico_livesync/uploader.py | 27 +++++- livesync/tests/simplify_test.py | 6 ++ livesync/tests/uploader_test.py | 12 ++- 7 files changed, 204 insertions(+), 75 deletions(-) diff --git a/citadel/indico_citadel/backend.py b/citadel/indico_citadel/backend.py index 3a8e316..b09e7aa 100644 --- a/citadel/indico_citadel/backend.py +++ b/citadel/indico_citadel/backend.py @@ -20,7 +20,7 @@ from requests.adapters import HTTPAdapter from requests.exceptions import RequestException from sqlalchemy import select from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm import contains_eager +from sqlalchemy.orm import contains_eager, joinedload from sqlalchemy.orm.attributes import flag_modified from urllib3 import Retry from werkzeug.urls import url_join @@ -48,9 +48,9 @@ def _format_change_str(change): def _print_record(record): - obj_type, obj_id, data, changes = record + obj_type, obj_id, citadel_id, data, changes = record print() # verbose_iterator during initial exports doesn't end its line - print(f'{_format_change_str(changes)}: {obj_type.name} {obj_id}') + print(f'{_format_change_str(changes)}: {obj_type.name} {obj_id} [{citadel_id}]') if data is not None: print(highlight(pformat(data), lexer, formatter)) return record @@ -118,12 +118,14 @@ class LiveSyncCitadelUploader(Uploader): CitadelIdMap.create(object_type, object_id, new_citadel_id) db.session.commit() except IntegrityError: + # XXX this should no longer happen under any circumstances! # if we already have a mapping entry, delete the newly created record and # update the existing one in case something changed in the meantime self.logger.error(f'{object_type.name.title()} %d already in citadel; deleting+updating', object_id) db.session.rollback() self._citadel_delete(session, new_citadel_id, delete_mapping=False) existing_citadel_id = CitadelIdMap.get_citadel_id(object_type, object_id) + db.session.close() assert existing_citadel_id is not None self._citadel_update(session, existing_citadel_id, data) resp.close() @@ -147,7 +149,11 @@ class LiveSyncCitadelUploader(Uploader): try: resp = session.delete(url_join(self.search_app, f'api/record/{citadel_id}')) self.logger.debug('Deleted %d from citadel', citadel_id) - resp.raise_for_status() + if resp.status_code == 410: + # gone - record was probably already deleted + self.logger.warning('Record %d was already deleted on citadel', citadel_id) + else: + resp.raise_for_status() resp.close() except RequestException as exc: if resp := exc.response: @@ -158,21 +164,25 @@ class LiveSyncCitadelUploader(Uploader): db.session.commit() def upload_record(self, entry, session): - object_type, object_id, data, change_type = entry + object_type, object_id, citadel_id, data, change_type = entry - if change_type & SimpleChange.created: - self._citadel_create(session, object_type, object_id, data) - elif change_type & SimpleChange.deleted: - citadel_id = CitadelIdMap.get_citadel_id(object_type, object_id) + if change_type == SimpleChange.deleted: if citadel_id is None: self.logger.warning('Cannot delete %s %s: No citadel ID found', object_type.name, object_id) return self._citadel_delete(session, citadel_id, delete_mapping=True) - elif change_type & SimpleChange.updated: - citadel_id = CitadelIdMap.get_citadel_id(object_type, object_id) + elif change_type == SimpleChange.updated or (change_type == SimpleChange.created and citadel_id is not None): if citadel_id is None: raise Exception(f'Cannot update {object_type.name} {object_id}: No citadel ID found') + if change_type == SimpleChange.created: + self.logger.warning('Citadel ID exists for %s %s (%s); updating instead', + object_type.name, object_id, citadel_id) self._citadel_update(session, citadel_id, data) + elif change_type == SimpleChange.created: + self._citadel_create(session, object_type, object_id, data) + else: + # we shouldn't get any combined change type bitmasks here + raise Exception(f'invalid change type: {change_type}') def upload_file(self, entry, session): self.logger.debug('Uploading attachment %d (%s) [%s]', entry.attachment.file.id, @@ -202,9 +212,16 @@ class LiveSyncCitadelUploader(Uploader): resp.close() return False - def run_initial(self, records, total): + def _precache_categories(self): cte = Category.get_tree_cte(lambda cat: db.func.json_build_object('id', cat.id, 'title', cat.title)) self.categories = dict(db.session.execute(select([cte.c.id, cte.c.path])).fetchall()) + + def run(self, records): + self._precache_categories() + return super().run(records) + + def run_initial(self, records, total): + self._precache_categories() return super().run_initial(records, total) def _get_retry_config(self, initial): @@ -235,7 +252,9 @@ class LiveSyncCitadelUploader(Uploader): session.headers = self.headers dumped_records = ( ( - get_entry_type(rec), rec.id, + get_entry_type(rec), + rec.id, + rec.citadel_id_mapping.citadel_id if rec.citadel_id_mapping else None, self.dump_record(rec) if not (change_type & SimpleChange.deleted) else None, change_type ) for rec, change_type in records @@ -293,7 +312,11 @@ class LiveSyncCitadelBackend(LiveSyncBackendBase): query = super().get_initial_query(model_cls, force) if not force: query = query.filter(~model_cls.citadel_id_mapping.has()) - return query + return query.options(joinedload(model_cls.citadel_id_mapping)) + + def get_data_query(self, model_cls, ids): + query = super().get_data_query(model_cls, ids) + return query.options(joinedload(model_cls.citadel_id_mapping)) def process_queue(self, uploader): super().process_queue(uploader) diff --git a/livesync/indico_livesync/base.py b/livesync/indico_livesync/base.py index 95cce69..fbd45bb 100644 --- a/livesync/indico_livesync/base.py +++ b/livesync/indico_livesync/base.py @@ -123,6 +123,7 @@ class LiveSyncBackendBase: raise NotImplementedError uploader = self.uploader(self, verbose=verbose, from_cli=from_cli) + self._precache_categories() self.process_queue(uploader) self.update_last_run() @@ -144,6 +145,27 @@ class LiveSyncBackendBase: }[model_cls] return fn() + def get_data_query(self, model_cls, ids): + """Get the export query for a given model and set of ids. + + Supported models are `Event`, `Contribution`, `SubContribution`, + `Attachment` and `EventNote`. + + This is very similar to `get_initial_query` except that it will only + export the specified records. + + :param model_cls: The model class to query + :param ids: A collection of ids to query + """ + fn = { + Event: query_events, + Contribution: query_contributions, + SubContribution: query_subcontributions, + Attachment: query_attachments, + EventNote: query_notes, + }[model_cls] + return fn(ids=ids) + def run_initial_export(self, batch_size, force=False, verbose=False): """Runs the initial export. @@ -154,12 +176,7 @@ class LiveSyncBackendBase: raise NotImplementedError uploader = self.uploader(self, verbose=verbose, from_cli=True) - - Category.allow_relationship_preloading = True - Category.preload_relationships(Category.query, 'acl_entries', - strategy=lambda rel: apply_acl_entry_strategy(subqueryload(rel), - CategoryPrincipal)) - _category_cache = Category.query.all() # noqa: F841 + self._precache_categories() events = self.get_initial_query(Event, force) contributions = self.get_initial_query(Contribution, force) @@ -212,3 +229,10 @@ class LiveSyncBackendBase: """ self.agent.initial_data_exported = False self.agent.queue.delete() + + def _precache_categories(self): + Category.allow_relationship_preloading = True + Category.preload_relationships(Category.query, 'acl_entries', + strategy=lambda rel: apply_acl_entry_strategy(subqueryload(rel), + CategoryPrincipal)) + self._category_cache = Category.query.all() diff --git a/livesync/indico_livesync/initial.py b/livesync/indico_livesync/initial.py index bb73c85..536d236 100644 --- a/livesync/indico_livesync/initial.py +++ b/livesync/indico_livesync/initial.py @@ -46,11 +46,14 @@ def _get_excluded_category_filter(event_model=Event): return True -def query_events(): +def query_events(ids=None): + if ids is None: + export_filter = ~Event.is_deleted & _get_excluded_category_filter() + else: + export_filter = Event.id.in_(ids) return ( Event.query - .filter_by(is_deleted=False) - .filter(_get_excluded_category_filter()) + .filter(export_filter) .options( apply_acl_entry_strategy(selectinload(Event.acl_entries), EventPrincipal), selectinload(Event.person_links).joinedload('person').joinedload('user').load_only('is_system'), @@ -61,7 +64,7 @@ def query_events(): ) -def query_contributions(): +def query_contributions(ids=None): event_strategy = contains_eager(Contribution.event) event_strategy.joinedload(Event.own_venue) event_strategy.joinedload(Event.own_room).options(raiseload('*'), joinedload('location')) @@ -79,10 +82,15 @@ def query_contributions(): session_block_session_strategy.joinedload(Session.own_venue) session_block_session_strategy.joinedload(Session.own_room).options(raiseload('*'), joinedload('location')) + if ids is None: + export_filter = ~Contribution.is_deleted & ~Event.is_deleted & _get_excluded_category_filter() + else: + export_filter = Contribution.id.in_(ids) + return ( Contribution.query .join(Event) - .filter(~Contribution.is_deleted, ~Event.is_deleted, _get_excluded_category_filter()) + .filter(export_filter) .options( selectinload(Contribution.acl_entries), selectinload(Contribution.person_links).joinedload('person').joinedload('user').load_only('is_system'), @@ -98,7 +106,7 @@ def query_contributions(): ) -def query_subcontributions(): +def query_subcontributions(ids=None): contrib_event = db.aliased(Event) contrib_session = db.aliased(Session) contrib_block = db.aliased(SessionBlock) @@ -123,14 +131,21 @@ def query_subcontributions(): session_block_strategy.joinedload(contrib_block.own_venue) session_block_strategy.joinedload(contrib_block.own_room).options(raiseload('*'), joinedload('location')) + if ids is None: + export_filter = db.and_(~SubContribution.is_deleted, + ~Contribution.is_deleted, + ~contrib_event.is_deleted, + _get_excluded_category_filter(contrib_event)) + else: + export_filter = SubContribution.id.in_(ids) + return ( SubContribution.query .join(Contribution) .join(Contribution.event.of_type(contrib_event)) .outerjoin(Contribution.session.of_type(contrib_session)) .outerjoin(Contribution.session_block.of_type(contrib_block)) - .filter(~SubContribution.is_deleted, ~Contribution.is_deleted, ~contrib_event.is_deleted, - _get_excluded_category_filter(contrib_event)) + .filter(export_filter) .options( selectinload(SubContribution.person_links).joinedload('person').joinedload('user').load_only('is_system'), contrib_strategy, @@ -142,7 +157,7 @@ def query_subcontributions(): ) -def query_attachments(): +def query_attachments(ids=None): contrib_event = db.aliased(Event) contrib_session = db.aliased(Session) subcontrib_contrib = db.aliased(Contribution) @@ -183,6 +198,35 @@ def query_attachments(): session_strategy.contains_eager(Session.event.of_type(session_event)).selectinload(session_event.acl_entries) apply_acl_entry_strategy(session_strategy.selectinload(Session.acl_entries), SessionPrincipal) + if ids is None: + export_filter = db.and_( + ~Attachment.is_deleted, + ~AttachmentFolder.is_deleted, + db.or_( + AttachmentFolder.link_type != LinkType.event, + ~Event.is_deleted & _get_excluded_category_filter(), + ), + db.or_( + AttachmentFolder.link_type != LinkType.contribution, + ~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event) + ), + db.or_( + AttachmentFolder.link_type != LinkType.subcontribution, + db.and_( + ~SubContribution.is_deleted, + ~subcontrib_contrib.is_deleted, + ~subcontrib_event.is_deleted, + _get_excluded_category_filter(subcontrib_event) + ) + ), + db.or_( + AttachmentFolder.link_type != LinkType.session, + ~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event) + ) + ) + else: + export_filter = Attachment.id.in_(ids) + return ( Attachment.query .join(Attachment.folder) @@ -197,32 +241,13 @@ def query_attachments(): .outerjoin(subcontrib_contrib.session.of_type(subcontrib_session)) .outerjoin(AttachmentFolder.session) .outerjoin(Session.event.of_type(session_event)) - .filter(~Attachment.is_deleted, ~AttachmentFolder.is_deleted) + .filter(export_filter) .filter(AttachmentFolder.link_type != LinkType.category) - .filter(db.or_( - AttachmentFolder.link_type != LinkType.event, - ~Event.is_deleted & _get_excluded_category_filter(), - )) - .filter(db.or_( - AttachmentFolder.link_type != LinkType.contribution, - ~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event) - )) - .filter(db.or_( - AttachmentFolder.link_type != LinkType.subcontribution, - db.and_(~SubContribution.is_deleted, - ~subcontrib_contrib.is_deleted, - ~subcontrib_event.is_deleted, - _get_excluded_category_filter(subcontrib_event)) - )) - .filter(db.or_( - AttachmentFolder.link_type != LinkType.session, - ~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event) - )) .order_by(Attachment.id) ) -def query_notes(): +def query_notes(ids=None): contrib_event = db.aliased(Event) contrib_session = db.aliased(Session) subcontrib_contrib = db.aliased(Contribution) @@ -260,6 +285,34 @@ def query_notes(): session_strategy.contains_eager(Session.event.of_type(session_event)).selectinload(session_event.acl_entries) apply_acl_entry_strategy(session_strategy.selectinload(Session.acl_entries), SessionPrincipal) + if ids is None: + export_filter = db.and_( + ~EventNote.is_deleted, + db.or_( + EventNote.link_type != LinkType.event, + ~Event.is_deleted & _get_excluded_category_filter() + ), + db.or_( + EventNote.link_type != LinkType.contribution, + ~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event) + ), + db.or_( + EventNote.link_type != LinkType.subcontribution, + db.and_( + ~SubContribution.is_deleted, + ~subcontrib_contrib.is_deleted, + ~subcontrib_event.is_deleted, + _get_excluded_category_filter(subcontrib_event) + ) + ), + db.or_( + EventNote.link_type != LinkType.session, + ~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event) + ) + ) + else: + export_filter = EventNote.id.in_(ids) + return ( EventNote.query .outerjoin(EventNote.linked_event) @@ -272,26 +325,7 @@ def query_notes(): .outerjoin(subcontrib_contrib.session.of_type(subcontrib_session)) .outerjoin(EventNote.session) .outerjoin(Session.event.of_type(session_event)) - .filter(~EventNote.is_deleted) - .filter(db.or_( - EventNote.link_type != LinkType.event, - ~Event.is_deleted & _get_excluded_category_filter() - )) - .filter(db.or_( - EventNote.link_type != LinkType.contribution, - ~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event) - )) - .filter(db.or_( - EventNote.link_type != LinkType.subcontribution, - db.and_(~SubContribution.is_deleted, - ~subcontrib_contrib.is_deleted, - ~subcontrib_event.is_deleted, - _get_excluded_category_filter(subcontrib_event)) - )) - .filter(db.or_( - EventNote.link_type != LinkType.session, - ~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event) - )) + .filter(export_filter) .options( note_strategy, joinedload(EventNote.current_revision).joinedload(EventNoteRevision.user).joinedload('_affiliation'), diff --git a/livesync/indico_livesync/simplify.py b/livesync/indico_livesync/simplify.py index d4c7032..008df18 100644 --- a/livesync/indico_livesync/simplify.py +++ b/livesync/indico_livesync/simplify.py @@ -33,6 +33,17 @@ class SimpleChange(int, IndicoEnum): CREATED_DELETED = SimpleChange.created | SimpleChange.deleted +def _get_final_change(change_flags): + if change_flags & SimpleChange.deleted: + assert not (change_flags & SimpleChange.created) # filtered out earlier + return SimpleChange.deleted + elif change_flags & SimpleChange.created: + return SimpleChange.created + elif change_flags & SimpleChange.updated: + return SimpleChange.updated + raise Exception(f'Invalid change flags: {change_flags}') + + def process_records(records): """Converts queue entries into object changes. @@ -87,7 +98,7 @@ def process_records(records): # discard any change where the object was both created and deleted del changes[obj] - return changes + return {obj: _get_final_change(flags) for obj, flags in changes.items()} def _process_cascaded_category_contents(records): diff --git a/livesync/indico_livesync/uploader.py b/livesync/indico_livesync/uploader.py index c313695..bcaf627 100644 --- a/livesync/indico_livesync/uploader.py +++ b/livesync/indico_livesync/uploader.py @@ -6,6 +6,7 @@ # see the LICENSE file for more details. import re +from collections import defaultdict from indico.core.db import db from indico.util.console import verbose_iterator @@ -29,13 +30,15 @@ class Uploader: :param records: an iterable containing queue entries """ self_name = type(self).__name__ - simplified = process_records(records).items() + simplified = self.query_data(process_records(records)).items() total = len(simplified) if self.from_cli: simplified = self._make_verbose(simplified, total) try: self.logger.info(f'{self_name} uploading %d changes from %d records', total, len(records)) - self.upload_records(simplified) + if not self.upload_records(simplified): + self.logger.warning('uploader indicated a failure') + return except Exception: self.logger.exception(f'{self_name} failed') if self.from_cli: @@ -64,6 +67,26 @@ class Uploader: print_total_time=True ) + def query_data(self, records): + """Query the data needed to dump records efficiently. + + This function queries the verbose data for the given records. + + :param records: an dict mapping objects to changes + :return: a dict of the same structure + """ + by_model = defaultdict(set) + change_by_obj = {} + for obj, change in records.items(): + by_model[type(obj)].add(obj.id) + change_by_obj[type(obj), obj.id] = change + rv = {} + for model, ids in by_model.items(): + for obj in self.backend.get_data_query(model, ids).yield_per(5000): + rv[obj] = change_by_obj[model, obj.id] + assert len(records) == len(rv) + return rv + def upload_records(self, records, initial=False): """Executed to upload records. diff --git a/livesync/tests/simplify_test.py b/livesync/tests/simplify_test.py index c9959ee..c27617b 100644 --- a/livesync/tests/simplify_test.py +++ b/livesync/tests/simplify_test.py @@ -92,6 +92,12 @@ def test_process_records_simplify(changes, db, create_event, dummy_agent): # created+deleted items are discarded if expected[i] & CREATED_DELETED == CREATED_DELETED: expected[i] = 0 + elif expected[i] & SimpleChange.deleted: + expected[i] = SimpleChange.deleted + elif expected[i] & SimpleChange.created: + expected[i] = SimpleChange.created + elif expected[i] & SimpleChange.updated: + expected[i] = SimpleChange.updated db.session.flush() diff --git a/livesync/tests/uploader_test.py b/livesync/tests/uploader_test.py index 9249f48..dd10f34 100644 --- a/livesync/tests/uploader_test.py +++ b/livesync/tests/uploader_test.py @@ -8,6 +8,7 @@ from operator import attrgetter from unittest.mock import MagicMock +from indico_livesync.base import LiveSyncBackendBase from indico_livesync.models.queue import ChangeType, EntryType, LiveSyncQueueEntry from indico_livesync.simplify import SimpleChange, _process_cascaded_event_contents from indico_livesync.uploader import Uploader @@ -23,6 +24,7 @@ class RecordingUploader(Uploader): def upload_records(self, records, initial=False): self._uploaded = list(records) + return True @property def all_uploaded(self): @@ -36,6 +38,12 @@ class FailingUploader(RecordingUploader): raise Exception('All your data are belong to us!') +class TestBackend(LiveSyncBackendBase): + def __init__(self): + super().__init__(MagicMock()) + self.plugin = MagicMock() + + def test_run_initial(mocker): """Test the initial upload""" mocker.patch.object(Uploader, 'processed_records', autospec=True) @@ -54,7 +62,7 @@ def _sorted_process_cascaded_event_contents(records, additional_events=None, *, def test_run(mocker, monkeypatch, db, create_event, dummy_agent): """Test uploading queued data""" - uploader = RecordingUploader(MagicMock()) + uploader = RecordingUploader(TestBackend()) events = tuple(create_event(id_=evt_id) for evt_id in range(4)) records = tuple(LiveSyncQueueEntry(change=ChangeType.created, type=EntryType.event, event_id=evt.id, @@ -80,7 +88,7 @@ def test_run(mocker, monkeypatch, db, create_event, dummy_agent): def test_run_failing(mocker, monkeypatch, db, create_event, dummy_agent): """Test a failing queue run""" - uploader = FailingUploader(MagicMock()) + uploader = FailingUploader(TestBackend()) events = tuple(create_event(id_=evt_id) for evt_id in range(10)) records = tuple(LiveSyncQueueEntry(change=ChangeType.created, type=EntryType.event, event_id=evt.id,