LiveSync: Eager-load data in initial exports (#108)

Co-authored-by: Adrian Moennich <adrian.moennich@cern.ch>
This commit is contained in:
Pedro Lourenço 2021-04-07 16:25:47 +01:00 committed by GitHub
parent a6625281d5
commit 775e75f724
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 279 additions and 17 deletions

View File

@ -88,15 +88,16 @@ class LiveSyncBackendBase:
uploader.run(records)
self.update_last_run()
def run_initial_export(self, events):
def run_initial_export(self, events, total=None):
"""Runs the initial export.
This process is expected to take a very long time.
:param events: iterable of all events in this indico instance
:param events: iterable of all records in this indico instance
:param total: (optional) the total of records to be exported
"""
if self.uploader is None: # pragma: no cover
raise NotImplementedError
uploader = self.uploader(self)
uploader.run_initial(events)
uploader.run_initial(events, total)

View File

@ -7,13 +7,17 @@
import click
from flask_pluginengine import current_plugin
from sqlalchemy.orm import subqueryload
from terminaltables import AsciiTable
from indico.cli.core import cli_group
from indico.core.db import db
from indico.modules.events.models.events import Event
from indico.modules.categories import Category
from indico.modules.categories.models.principals import CategoryPrincipal
from indico.util.console import cformat
from indico_livesync.initial import (apply_acl_entry_strategy, query_attachments, query_contributions, query_events,
query_notes)
from indico_livesync.models.agents import LiveSyncAgent
@ -73,7 +77,20 @@ def initial_export(agent_id, force):
print(cformat('To re-run it, use %{yellow!}--force%{reset}'))
return
agent.create_backend().run_initial_export(Event.query.filter_by(is_deleted=False))
Category.allow_relationship_preloading = True
Category.preload_relationships(Category.query, 'acl_entries',
strategy=lambda rel: apply_acl_entry_strategy(subqueryload(rel), CategoryPrincipal))
_category_cache = Category.query.all() # noqa: F841
backend = agent.create_backend()
events = query_events()
backend.run_initial_export(events.yield_per(5000), events.count())
contributions = query_contributions()
backend.run_initial_export(contributions.yield_per(5000), contributions.count())
attachments = query_attachments()
backend.run_initial_export(attachments.yield_per(5000), attachments.count())
notes = query_notes()
backend.run_initial_export(notes.yield_per(5000), notes.count())
agent.initial_data_exported = True
db.session.commit()

View File

@ -0,0 +1,241 @@
# This file is part of the Indico plugins.
# Copyright (C) 2002 - 2021 CERN
#
# The Indico plugins are free software; you can redistribute
# them and/or modify them under the terms of the MIT License;
# see the LICENSE file for more details.
from sqlalchemy.orm import contains_eager, joinedload, load_only, raiseload, selectinload
from indico.core.db import db
from indico.core.db.sqlalchemy.links import LinkType
from indico.modules.attachments import Attachment, AttachmentFolder
from indico.modules.attachments.models.principals import AttachmentFolderPrincipal, AttachmentPrincipal
from indico.modules.events import Event
from indico.modules.events.contributions import Contribution
from indico.modules.events.contributions.models.principals import ContributionPrincipal
from indico.modules.events.contributions.models.subcontributions import SubContribution
from indico.modules.events.models.principals import EventPrincipal
from indico.modules.events.notes.models.notes import EventNote, EventNoteRevision
from indico.modules.events.sessions import Session
from indico.modules.events.sessions.models.blocks import SessionBlock
from indico.modules.events.sessions.models.principals import SessionPrincipal
def apply_acl_entry_strategy(rel, principal):
user_strategy = rel.joinedload('user')
user_strategy.raiseload('*')
user_strategy.load_only('id')
rel.joinedload('local_group').load_only('id')
if principal.allow_networks:
rel.joinedload('ip_network_group').load_only('id')
if principal.allow_category_roles:
rel.joinedload('category_role').load_only('id')
if principal.allow_event_roles:
rel.joinedload('event_role').load_only('id')
if principal.allow_registration_forms:
rel.joinedload('registration_form').load_only('id')
return rel
def query_events():
return (
Event.query
.filter_by(is_deleted=False)
.options(
apply_acl_entry_strategy(selectinload(Event.acl_entries), EventPrincipal),
selectinload(Event.person_links),
joinedload(Event.own_venue),
joinedload(Event.own_room).options(raiseload('*'), joinedload('location')),
)
.order_by(Event.id)
)
def query_contributions():
event_strategy = contains_eager(Contribution.event)
event_strategy.joinedload(Event.own_venue)
event_strategy.joinedload(Event.own_room).options(raiseload('*'), joinedload('location'))
apply_acl_entry_strategy(event_strategy.selectinload(Event.acl_entries), EventPrincipal)
session_strategy = joinedload(Contribution.session)
apply_acl_entry_strategy(session_strategy.selectinload(Session.acl_entries), SessionPrincipal)
session_strategy.joinedload(Session.own_venue)
session_strategy.joinedload(Session.own_room).options(raiseload('*'), joinedload('location'))
session_block_strategy = joinedload(Contribution.session_block)
session_block_strategy.joinedload(SessionBlock.own_venue)
session_block_strategy.joinedload(SessionBlock.own_room).options(raiseload('*'), joinedload('location'))
session_block_session_strategy = session_block_strategy.joinedload(SessionBlock.session)
session_block_session_strategy.joinedload(Session.own_venue)
session_block_session_strategy.joinedload(Session.own_room).options(raiseload('*'), joinedload('location'))
return (
Contribution.query
.join(Event)
.filter(~Contribution.is_deleted, ~Event.is_deleted)
.options(
selectinload(Contribution.acl_entries),
selectinload(Contribution.person_links),
event_strategy,
session_strategy,
session_block_strategy,
joinedload(Contribution.type),
joinedload(Contribution.own_venue),
joinedload(Contribution.own_room).options(raiseload('*'), joinedload('location')),
joinedload(Contribution.timetable_entry),
)
.order_by(Contribution.id)
)
def query_attachments():
contrib_event = db.aliased(Event)
contrib_session = db.aliased(Session)
subcontrib_contrib = db.aliased(Contribution)
subcontrib_session = db.aliased(Session)
subcontrib_event = db.aliased(Event)
session_event = db.aliased(Event)
attachment_strategy = apply_acl_entry_strategy(selectinload(Attachment.acl_entries), AttachmentPrincipal)
folder_strategy = contains_eager(Attachment.folder)
folder_strategy.load_only('id', 'protection_mode', 'link_type', 'category_id', 'event_id', 'linked_event_id',
'contribution_id', 'subcontribution_id', 'session_id')
apply_acl_entry_strategy(folder_strategy.selectinload(AttachmentFolder.acl_entries), AttachmentFolderPrincipal)
# event
apply_acl_entry_strategy(folder_strategy.contains_eager(AttachmentFolder.linked_event)
.selectinload(Event.acl_entries), EventPrincipal)
# contribution
contrib_strategy = folder_strategy.contains_eager(AttachmentFolder.contribution)
apply_acl_entry_strategy(contrib_strategy.selectinload(Contribution.acl_entries), ContributionPrincipal)
apply_acl_entry_strategy(contrib_strategy.contains_eager(Contribution.event.of_type(contrib_event))
.selectinload(contrib_event.acl_entries), EventPrincipal)
apply_acl_entry_strategy(contrib_strategy.contains_eager(Contribution.session.of_type(contrib_session))
.selectinload(contrib_session.acl_entries), SessionPrincipal)
# subcontribution
subcontrib_strategy = folder_strategy.contains_eager(AttachmentFolder.subcontribution)
subcontrib_contrib_strategy = subcontrib_strategy.contains_eager(
SubContribution.contribution.of_type(subcontrib_contrib)
)
apply_acl_entry_strategy(subcontrib_contrib_strategy
.selectinload(subcontrib_contrib.acl_entries), ContributionPrincipal)
apply_acl_entry_strategy(subcontrib_contrib_strategy
.contains_eager(subcontrib_contrib.event.of_type(subcontrib_event))
.selectinload(subcontrib_event.acl_entries), EventPrincipal)
apply_acl_entry_strategy(subcontrib_contrib_strategy
.contains_eager(subcontrib_contrib.session.of_type(subcontrib_session))
.selectinload(subcontrib_session.acl_entries), SessionPrincipal)
# session
session_strategy = folder_strategy.contains_eager(AttachmentFolder.session)
session_strategy.contains_eager(Session.event.of_type(session_event)).selectinload(session_event.acl_entries)
apply_acl_entry_strategy(session_strategy.selectinload(Session.acl_entries), SessionPrincipal)
return (
Attachment.query
.join(Attachment.folder)
.options(folder_strategy, attachment_strategy, joinedload(Attachment.user).joinedload('_affiliation'))
.outerjoin(AttachmentFolder.linked_event)
.outerjoin(AttachmentFolder.contribution)
.outerjoin(Contribution.event.of_type(contrib_event))
.outerjoin(Contribution.session.of_type(contrib_session))
.outerjoin(AttachmentFolder.subcontribution)
.outerjoin(SubContribution.contribution.of_type(subcontrib_contrib))
.outerjoin(subcontrib_contrib.event.of_type(subcontrib_event))
.outerjoin(subcontrib_contrib.session.of_type(subcontrib_session))
.outerjoin(AttachmentFolder.session)
.outerjoin(Session.event.of_type(session_event))
.filter(~Attachment.is_deleted, ~AttachmentFolder.is_deleted)
.filter(AttachmentFolder.link_type != LinkType.category)
.filter(db.or_(
AttachmentFolder.link_type != LinkType.event,
~Event.is_deleted
))
.filter(db.or_(
AttachmentFolder.link_type != LinkType.contribution,
~Contribution.is_deleted & ~contrib_event.is_deleted
))
.filter(db.or_(
AttachmentFolder.link_type != LinkType.subcontribution,
~SubContribution.is_deleted & ~subcontrib_contrib.is_deleted & ~subcontrib_event.is_deleted
))
.filter(db.or_(
AttachmentFolder.link_type != LinkType.session,
~Session.is_deleted & ~session_event.is_deleted
))
.order_by(Attachment.id)
)
def query_notes():
contrib_event = db.aliased(Event)
contrib_session = db.aliased(Session)
subcontrib_contrib = db.aliased(Contribution)
subcontrib_session = db.aliased(Session)
subcontrib_event = db.aliased(Event)
session_event = db.aliased(Event)
note_strategy = load_only('id', 'link_type', 'event_id', 'linked_event_id', 'contribution_id',
'subcontribution_id', 'session_id', 'html')
# event
apply_acl_entry_strategy(note_strategy.contains_eager(EventNote.linked_event)
.selectinload(Event.acl_entries), EventPrincipal)
# contribution
contrib_strategy = note_strategy.contains_eager(EventNote.contribution)
apply_acl_entry_strategy(contrib_strategy.selectinload(Contribution.acl_entries), ContributionPrincipal)
apply_acl_entry_strategy(contrib_strategy.contains_eager(Contribution.event.of_type(contrib_event))
.selectinload(contrib_event.acl_entries), EventPrincipal)
apply_acl_entry_strategy(contrib_strategy.contains_eager(Contribution.session.of_type(contrib_session))
.selectinload(contrib_session.acl_entries), SessionPrincipal)
# subcontribution
subcontrib_strategy = note_strategy.contains_eager(EventNote.subcontribution)
subcontrib_contrib_strategy = subcontrib_strategy.contains_eager(
SubContribution.contribution.of_type(subcontrib_contrib)
)
apply_acl_entry_strategy(subcontrib_contrib_strategy
.selectinload(subcontrib_contrib.acl_entries), ContributionPrincipal)
apply_acl_entry_strategy(subcontrib_contrib_strategy
.contains_eager(subcontrib_contrib.event.of_type(subcontrib_event))
.selectinload(subcontrib_event.acl_entries), EventPrincipal)
apply_acl_entry_strategy(subcontrib_contrib_strategy
.contains_eager(subcontrib_contrib.session.of_type(subcontrib_session))
.selectinload(subcontrib_session.acl_entries), SessionPrincipal)
# session
session_strategy = note_strategy.contains_eager(EventNote.session)
session_strategy.contains_eager(Session.event.of_type(session_event)).selectinload(session_event.acl_entries)
apply_acl_entry_strategy(session_strategy.selectinload(Session.acl_entries), SessionPrincipal)
return (
EventNote.query
.outerjoin(EventNote.linked_event)
.outerjoin(EventNote.contribution)
.outerjoin(Contribution.event.of_type(contrib_event))
.outerjoin(Contribution.session.of_type(contrib_session))
.outerjoin(EventNote.subcontribution)
.outerjoin(SubContribution.contribution.of_type(subcontrib_contrib))
.outerjoin(subcontrib_contrib.event.of_type(subcontrib_event))
.outerjoin(subcontrib_contrib.session.of_type(subcontrib_session))
.outerjoin(EventNote.session)
.outerjoin(Session.event.of_type(session_event))
.filter(~EventNote.is_deleted)
.filter(db.or_(
EventNote.link_type != LinkType.event,
~Event.is_deleted
))
.filter(db.or_(
EventNote.link_type != LinkType.contribution,
~Contribution.is_deleted & ~contrib_event.is_deleted
))
.filter(db.or_(
EventNote.link_type != LinkType.subcontribution,
~SubContribution.is_deleted & ~subcontrib_contrib.is_deleted & ~subcontrib_event.is_deleted
))
.filter(db.or_(
EventNote.link_type != LinkType.session,
~Session.is_deleted & ~session_event.is_deleted
))
.options(
note_strategy,
joinedload(EventNote.current_revision).raiseload(EventNoteRevision.user),
)
.order_by(EventNote.id)
)

View File

@ -5,8 +5,12 @@
# them and/or modify them under the terms of the MIT License;
# see the LICENSE file for more details.
from operator import attrgetter
from indico.core.db import db
from indico.util.console import verbose_iterator
from indico.util.iterables import grouper
from indico.util.string import str_to_ascii
from indico_livesync.marcxml import MARCXMLGenerator
from indico_livesync.simplify import process_records
@ -18,7 +22,7 @@ class Uploader:
#: Number of queue entries to process at a time
BATCH_SIZE = 100
#: Number of events to process at a time during initial export
INITIAL_BATCH_SIZE = 100
INITIAL_BATCH_SIZE = 500
def __init__(self, backend):
self.backend = backend
@ -44,19 +48,18 @@ class Uploader:
self.processed_records(batch)
self.logger.info('%s finished', self_name)
def run_initial(self, events):
def run_initial(self, events, total=None):
"""Runs the initial batch upload
:param events: an iterable containing events
:param total: (optional) the total of records to be exported
"""
self_name = type(self).__name__
for i, batch in enumerate(grouper(events, self.INITIAL_BATCH_SIZE, skip_missing=True), 1):
self.logger.debug('%s processing initial batch %d', self_name, i)
for j, processed_batch in enumerate(grouper(
batch, self.BATCH_SIZE, skip_missing=True), 1):
self.logger.info('%s uploading initial chunk #%d (batch %d)', self_name, j, i)
self.upload_records(processed_batch, from_queue=False)
if total is not None:
events = verbose_iterator(events, total, attrgetter('id'),
lambda obj: str_to_ascii(getattr(obj, 'title', '')),
print_total_time=True)
for batch in grouper(events, self.INITIAL_BATCH_SIZE, skip_missing=True):
self.upload_records(batch, from_queue=False)
def upload_records(self, records, from_queue):
"""Executed for a batch of up to `BATCH_SIZE` records

View File

@ -36,8 +36,8 @@ def test_run_initial():
mock_uploader = MagicMock()
backend.uploader = lambda x: mock_uploader
events = object()
backend.run_initial_export(events)
mock_uploader.run_initial.assert_called_with(events)
backend.run_initial_export(events, 1)
mock_uploader.run_initial.assert_called_with(events, 1)
def test_run(mocker):