From 775e75f724efcb9fca3fd3402f5237aaf9fe0449 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Louren=C3=A7o?= Date: Wed, 7 Apr 2021 16:25:47 +0100 Subject: [PATCH] LiveSync: Eager-load data in initial exports (#108) Co-authored-by: Adrian Moennich --- livesync/indico_livesync/base.py | 7 +- livesync/indico_livesync/cli.py | 21 ++- livesync/indico_livesync/initial.py | 241 +++++++++++++++++++++++++++ livesync/indico_livesync/uploader.py | 23 +-- livesync/tests/agent_test.py | 4 +- 5 files changed, 279 insertions(+), 17 deletions(-) create mode 100644 livesync/indico_livesync/initial.py diff --git a/livesync/indico_livesync/base.py b/livesync/indico_livesync/base.py index 0b4d2ab..ee8d7ed 100644 --- a/livesync/indico_livesync/base.py +++ b/livesync/indico_livesync/base.py @@ -88,15 +88,16 @@ class LiveSyncBackendBase: uploader.run(records) self.update_last_run() - def run_initial_export(self, events): + def run_initial_export(self, events, total=None): """Runs the initial export. This process is expected to take a very long time. - :param events: iterable of all events in this indico instance + :param events: iterable of all records in this indico instance + :param total: (optional) the total of records to be exported """ if self.uploader is None: # pragma: no cover raise NotImplementedError uploader = self.uploader(self) - uploader.run_initial(events) + uploader.run_initial(events, total) diff --git a/livesync/indico_livesync/cli.py b/livesync/indico_livesync/cli.py index 012cf54..b2d0a53 100644 --- a/livesync/indico_livesync/cli.py +++ b/livesync/indico_livesync/cli.py @@ -7,13 +7,17 @@ import click from flask_pluginengine import current_plugin +from sqlalchemy.orm import subqueryload from terminaltables import AsciiTable from indico.cli.core import cli_group from indico.core.db import db -from indico.modules.events.models.events import Event +from indico.modules.categories import Category +from indico.modules.categories.models.principals import CategoryPrincipal from indico.util.console import cformat +from indico_livesync.initial import (apply_acl_entry_strategy, query_attachments, query_contributions, query_events, + query_notes) from indico_livesync.models.agents import LiveSyncAgent @@ -73,7 +77,20 @@ def initial_export(agent_id, force): print(cformat('To re-run it, use %{yellow!}--force%{reset}')) return - agent.create_backend().run_initial_export(Event.query.filter_by(is_deleted=False)) + Category.allow_relationship_preloading = True + Category.preload_relationships(Category.query, 'acl_entries', + strategy=lambda rel: apply_acl_entry_strategy(subqueryload(rel), CategoryPrincipal)) + _category_cache = Category.query.all() # noqa: F841 + + backend = agent.create_backend() + events = query_events() + backend.run_initial_export(events.yield_per(5000), events.count()) + contributions = query_contributions() + backend.run_initial_export(contributions.yield_per(5000), contributions.count()) + attachments = query_attachments() + backend.run_initial_export(attachments.yield_per(5000), attachments.count()) + notes = query_notes() + backend.run_initial_export(notes.yield_per(5000), notes.count()) agent.initial_data_exported = True db.session.commit() diff --git a/livesync/indico_livesync/initial.py b/livesync/indico_livesync/initial.py new file mode 100644 index 0000000..e602068 --- /dev/null +++ b/livesync/indico_livesync/initial.py @@ -0,0 +1,241 @@ +# This file is part of the Indico plugins. +# Copyright (C) 2002 - 2021 CERN +# +# The Indico plugins are free software; you can redistribute +# them and/or modify them under the terms of the MIT License; +# see the LICENSE file for more details. + +from sqlalchemy.orm import contains_eager, joinedload, load_only, raiseload, selectinload + +from indico.core.db import db +from indico.core.db.sqlalchemy.links import LinkType +from indico.modules.attachments import Attachment, AttachmentFolder +from indico.modules.attachments.models.principals import AttachmentFolderPrincipal, AttachmentPrincipal +from indico.modules.events import Event +from indico.modules.events.contributions import Contribution +from indico.modules.events.contributions.models.principals import ContributionPrincipal +from indico.modules.events.contributions.models.subcontributions import SubContribution +from indico.modules.events.models.principals import EventPrincipal +from indico.modules.events.notes.models.notes import EventNote, EventNoteRevision +from indico.modules.events.sessions import Session +from indico.modules.events.sessions.models.blocks import SessionBlock +from indico.modules.events.sessions.models.principals import SessionPrincipal + + +def apply_acl_entry_strategy(rel, principal): + user_strategy = rel.joinedload('user') + user_strategy.raiseload('*') + user_strategy.load_only('id') + rel.joinedload('local_group').load_only('id') + if principal.allow_networks: + rel.joinedload('ip_network_group').load_only('id') + if principal.allow_category_roles: + rel.joinedload('category_role').load_only('id') + if principal.allow_event_roles: + rel.joinedload('event_role').load_only('id') + if principal.allow_registration_forms: + rel.joinedload('registration_form').load_only('id') + return rel + + +def query_events(): + return ( + Event.query + .filter_by(is_deleted=False) + .options( + apply_acl_entry_strategy(selectinload(Event.acl_entries), EventPrincipal), + selectinload(Event.person_links), + joinedload(Event.own_venue), + joinedload(Event.own_room).options(raiseload('*'), joinedload('location')), + ) + .order_by(Event.id) + ) + + +def query_contributions(): + event_strategy = contains_eager(Contribution.event) + event_strategy.joinedload(Event.own_venue) + event_strategy.joinedload(Event.own_room).options(raiseload('*'), joinedload('location')) + apply_acl_entry_strategy(event_strategy.selectinload(Event.acl_entries), EventPrincipal) + + session_strategy = joinedload(Contribution.session) + apply_acl_entry_strategy(session_strategy.selectinload(Session.acl_entries), SessionPrincipal) + session_strategy.joinedload(Session.own_venue) + session_strategy.joinedload(Session.own_room).options(raiseload('*'), joinedload('location')) + + session_block_strategy = joinedload(Contribution.session_block) + session_block_strategy.joinedload(SessionBlock.own_venue) + session_block_strategy.joinedload(SessionBlock.own_room).options(raiseload('*'), joinedload('location')) + session_block_session_strategy = session_block_strategy.joinedload(SessionBlock.session) + session_block_session_strategy.joinedload(Session.own_venue) + session_block_session_strategy.joinedload(Session.own_room).options(raiseload('*'), joinedload('location')) + + return ( + Contribution.query + .join(Event) + .filter(~Contribution.is_deleted, ~Event.is_deleted) + .options( + selectinload(Contribution.acl_entries), + selectinload(Contribution.person_links), + event_strategy, + session_strategy, + session_block_strategy, + joinedload(Contribution.type), + joinedload(Contribution.own_venue), + joinedload(Contribution.own_room).options(raiseload('*'), joinedload('location')), + joinedload(Contribution.timetable_entry), + ) + .order_by(Contribution.id) + ) + + +def query_attachments(): + contrib_event = db.aliased(Event) + contrib_session = db.aliased(Session) + subcontrib_contrib = db.aliased(Contribution) + subcontrib_session = db.aliased(Session) + subcontrib_event = db.aliased(Event) + session_event = db.aliased(Event) + + attachment_strategy = apply_acl_entry_strategy(selectinload(Attachment.acl_entries), AttachmentPrincipal) + folder_strategy = contains_eager(Attachment.folder) + folder_strategy.load_only('id', 'protection_mode', 'link_type', 'category_id', 'event_id', 'linked_event_id', + 'contribution_id', 'subcontribution_id', 'session_id') + apply_acl_entry_strategy(folder_strategy.selectinload(AttachmentFolder.acl_entries), AttachmentFolderPrincipal) + # event + apply_acl_entry_strategy(folder_strategy.contains_eager(AttachmentFolder.linked_event) + .selectinload(Event.acl_entries), EventPrincipal) + # contribution + contrib_strategy = folder_strategy.contains_eager(AttachmentFolder.contribution) + apply_acl_entry_strategy(contrib_strategy.selectinload(Contribution.acl_entries), ContributionPrincipal) + apply_acl_entry_strategy(contrib_strategy.contains_eager(Contribution.event.of_type(contrib_event)) + .selectinload(contrib_event.acl_entries), EventPrincipal) + apply_acl_entry_strategy(contrib_strategy.contains_eager(Contribution.session.of_type(contrib_session)) + .selectinload(contrib_session.acl_entries), SessionPrincipal) + # subcontribution + subcontrib_strategy = folder_strategy.contains_eager(AttachmentFolder.subcontribution) + subcontrib_contrib_strategy = subcontrib_strategy.contains_eager( + SubContribution.contribution.of_type(subcontrib_contrib) + ) + apply_acl_entry_strategy(subcontrib_contrib_strategy + .selectinload(subcontrib_contrib.acl_entries), ContributionPrincipal) + apply_acl_entry_strategy(subcontrib_contrib_strategy + .contains_eager(subcontrib_contrib.event.of_type(subcontrib_event)) + .selectinload(subcontrib_event.acl_entries), EventPrincipal) + apply_acl_entry_strategy(subcontrib_contrib_strategy + .contains_eager(subcontrib_contrib.session.of_type(subcontrib_session)) + .selectinload(subcontrib_session.acl_entries), SessionPrincipal) + # session + session_strategy = folder_strategy.contains_eager(AttachmentFolder.session) + session_strategy.contains_eager(Session.event.of_type(session_event)).selectinload(session_event.acl_entries) + apply_acl_entry_strategy(session_strategy.selectinload(Session.acl_entries), SessionPrincipal) + + return ( + Attachment.query + .join(Attachment.folder) + .options(folder_strategy, attachment_strategy, joinedload(Attachment.user).joinedload('_affiliation')) + .outerjoin(AttachmentFolder.linked_event) + .outerjoin(AttachmentFolder.contribution) + .outerjoin(Contribution.event.of_type(contrib_event)) + .outerjoin(Contribution.session.of_type(contrib_session)) + .outerjoin(AttachmentFolder.subcontribution) + .outerjoin(SubContribution.contribution.of_type(subcontrib_contrib)) + .outerjoin(subcontrib_contrib.event.of_type(subcontrib_event)) + .outerjoin(subcontrib_contrib.session.of_type(subcontrib_session)) + .outerjoin(AttachmentFolder.session) + .outerjoin(Session.event.of_type(session_event)) + .filter(~Attachment.is_deleted, ~AttachmentFolder.is_deleted) + .filter(AttachmentFolder.link_type != LinkType.category) + .filter(db.or_( + AttachmentFolder.link_type != LinkType.event, + ~Event.is_deleted + )) + .filter(db.or_( + AttachmentFolder.link_type != LinkType.contribution, + ~Contribution.is_deleted & ~contrib_event.is_deleted + )) + .filter(db.or_( + AttachmentFolder.link_type != LinkType.subcontribution, + ~SubContribution.is_deleted & ~subcontrib_contrib.is_deleted & ~subcontrib_event.is_deleted + )) + .filter(db.or_( + AttachmentFolder.link_type != LinkType.session, + ~Session.is_deleted & ~session_event.is_deleted + )) + .order_by(Attachment.id) + ) + + +def query_notes(): + contrib_event = db.aliased(Event) + contrib_session = db.aliased(Session) + subcontrib_contrib = db.aliased(Contribution) + subcontrib_session = db.aliased(Session) + subcontrib_event = db.aliased(Event) + session_event = db.aliased(Event) + + note_strategy = load_only('id', 'link_type', 'event_id', 'linked_event_id', 'contribution_id', + 'subcontribution_id', 'session_id', 'html') + # event + apply_acl_entry_strategy(note_strategy.contains_eager(EventNote.linked_event) + .selectinload(Event.acl_entries), EventPrincipal) + # contribution + contrib_strategy = note_strategy.contains_eager(EventNote.contribution) + apply_acl_entry_strategy(contrib_strategy.selectinload(Contribution.acl_entries), ContributionPrincipal) + apply_acl_entry_strategy(contrib_strategy.contains_eager(Contribution.event.of_type(contrib_event)) + .selectinload(contrib_event.acl_entries), EventPrincipal) + apply_acl_entry_strategy(contrib_strategy.contains_eager(Contribution.session.of_type(contrib_session)) + .selectinload(contrib_session.acl_entries), SessionPrincipal) + # subcontribution + subcontrib_strategy = note_strategy.contains_eager(EventNote.subcontribution) + subcontrib_contrib_strategy = subcontrib_strategy.contains_eager( + SubContribution.contribution.of_type(subcontrib_contrib) + ) + apply_acl_entry_strategy(subcontrib_contrib_strategy + .selectinload(subcontrib_contrib.acl_entries), ContributionPrincipal) + apply_acl_entry_strategy(subcontrib_contrib_strategy + .contains_eager(subcontrib_contrib.event.of_type(subcontrib_event)) + .selectinload(subcontrib_event.acl_entries), EventPrincipal) + apply_acl_entry_strategy(subcontrib_contrib_strategy + .contains_eager(subcontrib_contrib.session.of_type(subcontrib_session)) + .selectinload(subcontrib_session.acl_entries), SessionPrincipal) + # session + session_strategy = note_strategy.contains_eager(EventNote.session) + session_strategy.contains_eager(Session.event.of_type(session_event)).selectinload(session_event.acl_entries) + apply_acl_entry_strategy(session_strategy.selectinload(Session.acl_entries), SessionPrincipal) + + return ( + EventNote.query + .outerjoin(EventNote.linked_event) + .outerjoin(EventNote.contribution) + .outerjoin(Contribution.event.of_type(contrib_event)) + .outerjoin(Contribution.session.of_type(contrib_session)) + .outerjoin(EventNote.subcontribution) + .outerjoin(SubContribution.contribution.of_type(subcontrib_contrib)) + .outerjoin(subcontrib_contrib.event.of_type(subcontrib_event)) + .outerjoin(subcontrib_contrib.session.of_type(subcontrib_session)) + .outerjoin(EventNote.session) + .outerjoin(Session.event.of_type(session_event)) + .filter(~EventNote.is_deleted) + .filter(db.or_( + EventNote.link_type != LinkType.event, + ~Event.is_deleted + )) + .filter(db.or_( + EventNote.link_type != LinkType.contribution, + ~Contribution.is_deleted & ~contrib_event.is_deleted + )) + .filter(db.or_( + EventNote.link_type != LinkType.subcontribution, + ~SubContribution.is_deleted & ~subcontrib_contrib.is_deleted & ~subcontrib_event.is_deleted + )) + .filter(db.or_( + EventNote.link_type != LinkType.session, + ~Session.is_deleted & ~session_event.is_deleted + )) + .options( + note_strategy, + joinedload(EventNote.current_revision).raiseload(EventNoteRevision.user), + ) + .order_by(EventNote.id) + ) diff --git a/livesync/indico_livesync/uploader.py b/livesync/indico_livesync/uploader.py index 4f49ebb..7096b2c 100644 --- a/livesync/indico_livesync/uploader.py +++ b/livesync/indico_livesync/uploader.py @@ -5,8 +5,12 @@ # them and/or modify them under the terms of the MIT License; # see the LICENSE file for more details. +from operator import attrgetter + from indico.core.db import db +from indico.util.console import verbose_iterator from indico.util.iterables import grouper +from indico.util.string import str_to_ascii from indico_livesync.marcxml import MARCXMLGenerator from indico_livesync.simplify import process_records @@ -18,7 +22,7 @@ class Uploader: #: Number of queue entries to process at a time BATCH_SIZE = 100 #: Number of events to process at a time during initial export - INITIAL_BATCH_SIZE = 100 + INITIAL_BATCH_SIZE = 500 def __init__(self, backend): self.backend = backend @@ -44,19 +48,18 @@ class Uploader: self.processed_records(batch) self.logger.info('%s finished', self_name) - def run_initial(self, events): + def run_initial(self, events, total=None): """Runs the initial batch upload :param events: an iterable containing events + :param total: (optional) the total of records to be exported """ - self_name = type(self).__name__ - for i, batch in enumerate(grouper(events, self.INITIAL_BATCH_SIZE, skip_missing=True), 1): - self.logger.debug('%s processing initial batch %d', self_name, i) - - for j, processed_batch in enumerate(grouper( - batch, self.BATCH_SIZE, skip_missing=True), 1): - self.logger.info('%s uploading initial chunk #%d (batch %d)', self_name, j, i) - self.upload_records(processed_batch, from_queue=False) + if total is not None: + events = verbose_iterator(events, total, attrgetter('id'), + lambda obj: str_to_ascii(getattr(obj, 'title', '')), + print_total_time=True) + for batch in grouper(events, self.INITIAL_BATCH_SIZE, skip_missing=True): + self.upload_records(batch, from_queue=False) def upload_records(self, records, from_queue): """Executed for a batch of up to `BATCH_SIZE` records diff --git a/livesync/tests/agent_test.py b/livesync/tests/agent_test.py index 19ade01..68b0cf8 100644 --- a/livesync/tests/agent_test.py +++ b/livesync/tests/agent_test.py @@ -36,8 +36,8 @@ def test_run_initial(): mock_uploader = MagicMock() backend.uploader = lambda x: mock_uploader events = object() - backend.run_initial_export(events) - mock_uploader.run_initial.assert_called_with(events) + backend.run_initial_export(events, 1) + mock_uploader.run_initial.assert_called_with(events, 1) def test_run(mocker):