LiveSync: Use efficient queries in queue runs

While simplify is still very spammy in case of large queues, at least
the actual data dumping for the export itself no longer spams queries.
This is especially crucial in the Citadel plugin where we have massive
parallelism which can easily exhaust the DB connection pool if we get
queries there.

For the same reason, the citadel ID mapping is now queried eagerly
so we don't need to try creating and then delete+update if we realize
there's already an existing citadel mapping, as we can simply check
before even sending something and then convert the creation to an
update.
This commit is contained in:
Adrian Moennich 2021-05-28 11:28:54 +02:00
parent 22d5dc00a7
commit a5595615a4
7 changed files with 204 additions and 75 deletions

View File

@ -20,7 +20,7 @@ from requests.adapters import HTTPAdapter
from requests.exceptions import RequestException
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import contains_eager, joinedload
from sqlalchemy.orm.attributes import flag_modified
from urllib3 import Retry
from werkzeug.urls import url_join
@ -48,9 +48,9 @@ def _format_change_str(change):
def _print_record(record):
obj_type, obj_id, data, changes = record
obj_type, obj_id, citadel_id, data, changes = record
print() # verbose_iterator during initial exports doesn't end its line
print(f'{_format_change_str(changes)}: {obj_type.name} {obj_id}')
print(f'{_format_change_str(changes)}: {obj_type.name} {obj_id} [{citadel_id}]')
if data is not None:
print(highlight(pformat(data), lexer, formatter))
return record
@ -118,12 +118,14 @@ class LiveSyncCitadelUploader(Uploader):
CitadelIdMap.create(object_type, object_id, new_citadel_id)
db.session.commit()
except IntegrityError:
# XXX this should no longer happen under any circumstances!
# if we already have a mapping entry, delete the newly created record and
# update the existing one in case something changed in the meantime
self.logger.error(f'{object_type.name.title()} %d already in citadel; deleting+updating', object_id)
db.session.rollback()
self._citadel_delete(session, new_citadel_id, delete_mapping=False)
existing_citadel_id = CitadelIdMap.get_citadel_id(object_type, object_id)
db.session.close()
assert existing_citadel_id is not None
self._citadel_update(session, existing_citadel_id, data)
resp.close()
@ -147,7 +149,11 @@ class LiveSyncCitadelUploader(Uploader):
try:
resp = session.delete(url_join(self.search_app, f'api/record/{citadel_id}'))
self.logger.debug('Deleted %d from citadel', citadel_id)
resp.raise_for_status()
if resp.status_code == 410:
# gone - record was probably already deleted
self.logger.warning('Record %d was already deleted on citadel', citadel_id)
else:
resp.raise_for_status()
resp.close()
except RequestException as exc:
if resp := exc.response:
@ -158,21 +164,25 @@ class LiveSyncCitadelUploader(Uploader):
db.session.commit()
def upload_record(self, entry, session):
object_type, object_id, data, change_type = entry
object_type, object_id, citadel_id, data, change_type = entry
if change_type & SimpleChange.created:
self._citadel_create(session, object_type, object_id, data)
elif change_type & SimpleChange.deleted:
citadel_id = CitadelIdMap.get_citadel_id(object_type, object_id)
if change_type == SimpleChange.deleted:
if citadel_id is None:
self.logger.warning('Cannot delete %s %s: No citadel ID found', object_type.name, object_id)
return
self._citadel_delete(session, citadel_id, delete_mapping=True)
elif change_type & SimpleChange.updated:
citadel_id = CitadelIdMap.get_citadel_id(object_type, object_id)
elif change_type == SimpleChange.updated or (change_type == SimpleChange.created and citadel_id is not None):
if citadel_id is None:
raise Exception(f'Cannot update {object_type.name} {object_id}: No citadel ID found')
if change_type == SimpleChange.created:
self.logger.warning('Citadel ID exists for %s %s (%s); updating instead',
object_type.name, object_id, citadel_id)
self._citadel_update(session, citadel_id, data)
elif change_type == SimpleChange.created:
self._citadel_create(session, object_type, object_id, data)
else:
# we shouldn't get any combined change type bitmasks here
raise Exception(f'invalid change type: {change_type}')
def upload_file(self, entry, session):
self.logger.debug('Uploading attachment %d (%s) [%s]', entry.attachment.file.id,
@ -202,9 +212,16 @@ class LiveSyncCitadelUploader(Uploader):
resp.close()
return False
def run_initial(self, records, total):
def _precache_categories(self):
cte = Category.get_tree_cte(lambda cat: db.func.json_build_object('id', cat.id, 'title', cat.title))
self.categories = dict(db.session.execute(select([cte.c.id, cte.c.path])).fetchall())
def run(self, records):
self._precache_categories()
return super().run(records)
def run_initial(self, records, total):
self._precache_categories()
return super().run_initial(records, total)
def _get_retry_config(self, initial):
@ -235,7 +252,9 @@ class LiveSyncCitadelUploader(Uploader):
session.headers = self.headers
dumped_records = (
(
get_entry_type(rec), rec.id,
get_entry_type(rec),
rec.id,
rec.citadel_id_mapping.citadel_id if rec.citadel_id_mapping else None,
self.dump_record(rec) if not (change_type & SimpleChange.deleted) else None,
change_type
) for rec, change_type in records
@ -293,7 +312,11 @@ class LiveSyncCitadelBackend(LiveSyncBackendBase):
query = super().get_initial_query(model_cls, force)
if not force:
query = query.filter(~model_cls.citadel_id_mapping.has())
return query
return query.options(joinedload(model_cls.citadel_id_mapping))
def get_data_query(self, model_cls, ids):
query = super().get_data_query(model_cls, ids)
return query.options(joinedload(model_cls.citadel_id_mapping))
def process_queue(self, uploader):
super().process_queue(uploader)

View File

@ -123,6 +123,7 @@ class LiveSyncBackendBase:
raise NotImplementedError
uploader = self.uploader(self, verbose=verbose, from_cli=from_cli)
self._precache_categories()
self.process_queue(uploader)
self.update_last_run()
@ -144,6 +145,27 @@ class LiveSyncBackendBase:
}[model_cls]
return fn()
def get_data_query(self, model_cls, ids):
"""Get the export query for a given model and set of ids.
Supported models are `Event`, `Contribution`, `SubContribution`,
`Attachment` and `EventNote`.
This is very similar to `get_initial_query` except that it will only
export the specified records.
:param model_cls: The model class to query
:param ids: A collection of ids to query
"""
fn = {
Event: query_events,
Contribution: query_contributions,
SubContribution: query_subcontributions,
Attachment: query_attachments,
EventNote: query_notes,
}[model_cls]
return fn(ids=ids)
def run_initial_export(self, batch_size, force=False, verbose=False):
"""Runs the initial export.
@ -154,12 +176,7 @@ class LiveSyncBackendBase:
raise NotImplementedError
uploader = self.uploader(self, verbose=verbose, from_cli=True)
Category.allow_relationship_preloading = True
Category.preload_relationships(Category.query, 'acl_entries',
strategy=lambda rel: apply_acl_entry_strategy(subqueryload(rel),
CategoryPrincipal))
_category_cache = Category.query.all() # noqa: F841
self._precache_categories()
events = self.get_initial_query(Event, force)
contributions = self.get_initial_query(Contribution, force)
@ -212,3 +229,10 @@ class LiveSyncBackendBase:
"""
self.agent.initial_data_exported = False
self.agent.queue.delete()
def _precache_categories(self):
Category.allow_relationship_preloading = True
Category.preload_relationships(Category.query, 'acl_entries',
strategy=lambda rel: apply_acl_entry_strategy(subqueryload(rel),
CategoryPrincipal))
self._category_cache = Category.query.all()

View File

@ -46,11 +46,14 @@ def _get_excluded_category_filter(event_model=Event):
return True
def query_events():
def query_events(ids=None):
if ids is None:
export_filter = ~Event.is_deleted & _get_excluded_category_filter()
else:
export_filter = Event.id.in_(ids)
return (
Event.query
.filter_by(is_deleted=False)
.filter(_get_excluded_category_filter())
.filter(export_filter)
.options(
apply_acl_entry_strategy(selectinload(Event.acl_entries), EventPrincipal),
selectinload(Event.person_links).joinedload('person').joinedload('user').load_only('is_system'),
@ -61,7 +64,7 @@ def query_events():
)
def query_contributions():
def query_contributions(ids=None):
event_strategy = contains_eager(Contribution.event)
event_strategy.joinedload(Event.own_venue)
event_strategy.joinedload(Event.own_room).options(raiseload('*'), joinedload('location'))
@ -79,10 +82,15 @@ def query_contributions():
session_block_session_strategy.joinedload(Session.own_venue)
session_block_session_strategy.joinedload(Session.own_room).options(raiseload('*'), joinedload('location'))
if ids is None:
export_filter = ~Contribution.is_deleted & ~Event.is_deleted & _get_excluded_category_filter()
else:
export_filter = Contribution.id.in_(ids)
return (
Contribution.query
.join(Event)
.filter(~Contribution.is_deleted, ~Event.is_deleted, _get_excluded_category_filter())
.filter(export_filter)
.options(
selectinload(Contribution.acl_entries),
selectinload(Contribution.person_links).joinedload('person').joinedload('user').load_only('is_system'),
@ -98,7 +106,7 @@ def query_contributions():
)
def query_subcontributions():
def query_subcontributions(ids=None):
contrib_event = db.aliased(Event)
contrib_session = db.aliased(Session)
contrib_block = db.aliased(SessionBlock)
@ -123,14 +131,21 @@ def query_subcontributions():
session_block_strategy.joinedload(contrib_block.own_venue)
session_block_strategy.joinedload(contrib_block.own_room).options(raiseload('*'), joinedload('location'))
if ids is None:
export_filter = db.and_(~SubContribution.is_deleted,
~Contribution.is_deleted,
~contrib_event.is_deleted,
_get_excluded_category_filter(contrib_event))
else:
export_filter = SubContribution.id.in_(ids)
return (
SubContribution.query
.join(Contribution)
.join(Contribution.event.of_type(contrib_event))
.outerjoin(Contribution.session.of_type(contrib_session))
.outerjoin(Contribution.session_block.of_type(contrib_block))
.filter(~SubContribution.is_deleted, ~Contribution.is_deleted, ~contrib_event.is_deleted,
_get_excluded_category_filter(contrib_event))
.filter(export_filter)
.options(
selectinload(SubContribution.person_links).joinedload('person').joinedload('user').load_only('is_system'),
contrib_strategy,
@ -142,7 +157,7 @@ def query_subcontributions():
)
def query_attachments():
def query_attachments(ids=None):
contrib_event = db.aliased(Event)
contrib_session = db.aliased(Session)
subcontrib_contrib = db.aliased(Contribution)
@ -183,6 +198,35 @@ def query_attachments():
session_strategy.contains_eager(Session.event.of_type(session_event)).selectinload(session_event.acl_entries)
apply_acl_entry_strategy(session_strategy.selectinload(Session.acl_entries), SessionPrincipal)
if ids is None:
export_filter = db.and_(
~Attachment.is_deleted,
~AttachmentFolder.is_deleted,
db.or_(
AttachmentFolder.link_type != LinkType.event,
~Event.is_deleted & _get_excluded_category_filter(),
),
db.or_(
AttachmentFolder.link_type != LinkType.contribution,
~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event)
),
db.or_(
AttachmentFolder.link_type != LinkType.subcontribution,
db.and_(
~SubContribution.is_deleted,
~subcontrib_contrib.is_deleted,
~subcontrib_event.is_deleted,
_get_excluded_category_filter(subcontrib_event)
)
),
db.or_(
AttachmentFolder.link_type != LinkType.session,
~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event)
)
)
else:
export_filter = Attachment.id.in_(ids)
return (
Attachment.query
.join(Attachment.folder)
@ -197,32 +241,13 @@ def query_attachments():
.outerjoin(subcontrib_contrib.session.of_type(subcontrib_session))
.outerjoin(AttachmentFolder.session)
.outerjoin(Session.event.of_type(session_event))
.filter(~Attachment.is_deleted, ~AttachmentFolder.is_deleted)
.filter(export_filter)
.filter(AttachmentFolder.link_type != LinkType.category)
.filter(db.or_(
AttachmentFolder.link_type != LinkType.event,
~Event.is_deleted & _get_excluded_category_filter(),
))
.filter(db.or_(
AttachmentFolder.link_type != LinkType.contribution,
~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event)
))
.filter(db.or_(
AttachmentFolder.link_type != LinkType.subcontribution,
db.and_(~SubContribution.is_deleted,
~subcontrib_contrib.is_deleted,
~subcontrib_event.is_deleted,
_get_excluded_category_filter(subcontrib_event))
))
.filter(db.or_(
AttachmentFolder.link_type != LinkType.session,
~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event)
))
.order_by(Attachment.id)
)
def query_notes():
def query_notes(ids=None):
contrib_event = db.aliased(Event)
contrib_session = db.aliased(Session)
subcontrib_contrib = db.aliased(Contribution)
@ -260,6 +285,34 @@ def query_notes():
session_strategy.contains_eager(Session.event.of_type(session_event)).selectinload(session_event.acl_entries)
apply_acl_entry_strategy(session_strategy.selectinload(Session.acl_entries), SessionPrincipal)
if ids is None:
export_filter = db.and_(
~EventNote.is_deleted,
db.or_(
EventNote.link_type != LinkType.event,
~Event.is_deleted & _get_excluded_category_filter()
),
db.or_(
EventNote.link_type != LinkType.contribution,
~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event)
),
db.or_(
EventNote.link_type != LinkType.subcontribution,
db.and_(
~SubContribution.is_deleted,
~subcontrib_contrib.is_deleted,
~subcontrib_event.is_deleted,
_get_excluded_category_filter(subcontrib_event)
)
),
db.or_(
EventNote.link_type != LinkType.session,
~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event)
)
)
else:
export_filter = EventNote.id.in_(ids)
return (
EventNote.query
.outerjoin(EventNote.linked_event)
@ -272,26 +325,7 @@ def query_notes():
.outerjoin(subcontrib_contrib.session.of_type(subcontrib_session))
.outerjoin(EventNote.session)
.outerjoin(Session.event.of_type(session_event))
.filter(~EventNote.is_deleted)
.filter(db.or_(
EventNote.link_type != LinkType.event,
~Event.is_deleted & _get_excluded_category_filter()
))
.filter(db.or_(
EventNote.link_type != LinkType.contribution,
~Contribution.is_deleted & ~contrib_event.is_deleted & _get_excluded_category_filter(contrib_event)
))
.filter(db.or_(
EventNote.link_type != LinkType.subcontribution,
db.and_(~SubContribution.is_deleted,
~subcontrib_contrib.is_deleted,
~subcontrib_event.is_deleted,
_get_excluded_category_filter(subcontrib_event))
))
.filter(db.or_(
EventNote.link_type != LinkType.session,
~Session.is_deleted & ~session_event.is_deleted & _get_excluded_category_filter(session_event)
))
.filter(export_filter)
.options(
note_strategy,
joinedload(EventNote.current_revision).joinedload(EventNoteRevision.user).joinedload('_affiliation'),

View File

@ -33,6 +33,17 @@ class SimpleChange(int, IndicoEnum):
CREATED_DELETED = SimpleChange.created | SimpleChange.deleted
def _get_final_change(change_flags):
if change_flags & SimpleChange.deleted:
assert not (change_flags & SimpleChange.created) # filtered out earlier
return SimpleChange.deleted
elif change_flags & SimpleChange.created:
return SimpleChange.created
elif change_flags & SimpleChange.updated:
return SimpleChange.updated
raise Exception(f'Invalid change flags: {change_flags}')
def process_records(records):
"""Converts queue entries into object changes.
@ -87,7 +98,7 @@ def process_records(records):
# discard any change where the object was both created and deleted
del changes[obj]
return changes
return {obj: _get_final_change(flags) for obj, flags in changes.items()}
def _process_cascaded_category_contents(records):

View File

@ -6,6 +6,7 @@
# see the LICENSE file for more details.
import re
from collections import defaultdict
from indico.core.db import db
from indico.util.console import verbose_iterator
@ -29,13 +30,15 @@ class Uploader:
:param records: an iterable containing queue entries
"""
self_name = type(self).__name__
simplified = process_records(records).items()
simplified = self.query_data(process_records(records)).items()
total = len(simplified)
if self.from_cli:
simplified = self._make_verbose(simplified, total)
try:
self.logger.info(f'{self_name} uploading %d changes from %d records', total, len(records))
self.upload_records(simplified)
if not self.upload_records(simplified):
self.logger.warning('uploader indicated a failure')
return
except Exception:
self.logger.exception(f'{self_name} failed')
if self.from_cli:
@ -64,6 +67,26 @@ class Uploader:
print_total_time=True
)
def query_data(self, records):
"""Query the data needed to dump records efficiently.
This function queries the verbose data for the given records.
:param records: an dict mapping objects to changes
:return: a dict of the same structure
"""
by_model = defaultdict(set)
change_by_obj = {}
for obj, change in records.items():
by_model[type(obj)].add(obj.id)
change_by_obj[type(obj), obj.id] = change
rv = {}
for model, ids in by_model.items():
for obj in self.backend.get_data_query(model, ids).yield_per(5000):
rv[obj] = change_by_obj[model, obj.id]
assert len(records) == len(rv)
return rv
def upload_records(self, records, initial=False):
"""Executed to upload records.

View File

@ -92,6 +92,12 @@ def test_process_records_simplify(changes, db, create_event, dummy_agent):
# created+deleted items are discarded
if expected[i] & CREATED_DELETED == CREATED_DELETED:
expected[i] = 0
elif expected[i] & SimpleChange.deleted:
expected[i] = SimpleChange.deleted
elif expected[i] & SimpleChange.created:
expected[i] = SimpleChange.created
elif expected[i] & SimpleChange.updated:
expected[i] = SimpleChange.updated
db.session.flush()

View File

@ -8,6 +8,7 @@
from operator import attrgetter
from unittest.mock import MagicMock
from indico_livesync.base import LiveSyncBackendBase
from indico_livesync.models.queue import ChangeType, EntryType, LiveSyncQueueEntry
from indico_livesync.simplify import SimpleChange, _process_cascaded_event_contents
from indico_livesync.uploader import Uploader
@ -23,6 +24,7 @@ class RecordingUploader(Uploader):
def upload_records(self, records, initial=False):
self._uploaded = list(records)
return True
@property
def all_uploaded(self):
@ -36,6 +38,12 @@ class FailingUploader(RecordingUploader):
raise Exception('All your data are belong to us!')
class TestBackend(LiveSyncBackendBase):
def __init__(self):
super().__init__(MagicMock())
self.plugin = MagicMock()
def test_run_initial(mocker):
"""Test the initial upload"""
mocker.patch.object(Uploader, 'processed_records', autospec=True)
@ -54,7 +62,7 @@ def _sorted_process_cascaded_event_contents(records, additional_events=None, *,
def test_run(mocker, monkeypatch, db, create_event, dummy_agent):
"""Test uploading queued data"""
uploader = RecordingUploader(MagicMock())
uploader = RecordingUploader(TestBackend())
events = tuple(create_event(id_=evt_id) for evt_id in range(4))
records = tuple(LiveSyncQueueEntry(change=ChangeType.created, type=EntryType.event, event_id=evt.id,
@ -80,7 +88,7 @@ def test_run(mocker, monkeypatch, db, create_event, dummy_agent):
def test_run_failing(mocker, monkeypatch, db, create_event, dummy_agent):
"""Test a failing queue run"""
uploader = FailingUploader(MagicMock())
uploader = FailingUploader(TestBackend())
events = tuple(create_event(id_=evt_id) for evt_id in range(10))
records = tuple(LiveSyncQueueEntry(change=ChangeType.created, type=EntryType.event, event_id=evt.id,