Adrian Moennich a5595615a4 LiveSync: Use efficient queries in queue runs
While simplify is still very spammy in case of large queues, at least
the actual data dumping for the export itself no longer spams queries.
This is especially crucial in the Citadel plugin where we have massive
parallelism which can easily exhaust the DB connection pool if we get
queries there.

For the same reason, the citadel ID mapping is now queried eagerly
so we don't need to try creating and then delete+update if we realize
there's already an existing citadel mapping, as we can simply check
before even sending something and then convert the creation to an
update.
2021-05-28 11:28:54 +02:00

108 lines
3.8 KiB
Python

# This file is part of the Indico plugins.
# Copyright (C) 2002 - 2021 CERN
#
# The Indico plugins are free software; you can redistribute
# them and/or modify them under the terms of the MIT License;
# see the LICENSE file for more details.
import re
from collections import defaultdict
from indico.core.db import db
from indico.util.console import verbose_iterator
from indico.util.string import strip_control_chars
from indico_livesync.simplify import SimpleChange, process_records
class Uploader:
"""Handles batch data upload to a remote service."""
def __init__(self, backend, verbose=False, from_cli=False):
self.backend = backend
self.verbose = verbose
self.from_cli = from_cli
self.logger = backend.plugin.logger
def run(self, records):
"""Runs the batch upload
:param records: an iterable containing queue entries
"""
self_name = type(self).__name__
simplified = self.query_data(process_records(records)).items()
total = len(simplified)
if self.from_cli:
simplified = self._make_verbose(simplified, total)
try:
self.logger.info(f'{self_name} uploading %d changes from %d records', total, len(records))
if not self.upload_records(simplified):
self.logger.warning('uploader indicated a failure')
return
except Exception:
self.logger.exception(f'{self_name} failed')
if self.from_cli:
raise
return
self.processed_records(records)
self.logger.info(f'{self_name} finished (%d total changes from %d records)', total, len(records))
def run_initial(self, records, total):
"""Runs the initial batch upload
:param records: an iterable containing records
:param total: the total of records to be exported
:return: True if everything was successful, False if not
"""
records = ((rec, SimpleChange.created) for rec in records)
records = self._make_verbose(records, total)
return self.upload_records(records, initial=True)
def _make_verbose(self, iterator, total):
return verbose_iterator(
iterator,
total,
lambda entry: entry[0].id,
lambda entry: re.sub(r'\s+', ' ', strip_control_chars(getattr(entry[0], 'title', ''))),
print_total_time=True
)
def query_data(self, records):
"""Query the data needed to dump records efficiently.
This function queries the verbose data for the given records.
:param records: an dict mapping objects to changes
:return: a dict of the same structure
"""
by_model = defaultdict(set)
change_by_obj = {}
for obj, change in records.items():
by_model[type(obj)].add(obj.id)
change_by_obj[type(obj), obj.id] = change
rv = {}
for model, ids in by_model.items():
for obj in self.backend.get_data_query(model, ids).yield_per(5000):
rv[obj] = change_by_obj[model, obj.id]
assert len(records) == len(rv)
return rv
def upload_records(self, records, initial=False):
"""Executed to upload records.
:param records: an iterator of records to upload (or queue entries)
:param initial: whether the upload is part of an initial export
:return: True if everything was successful, False if not
"""
raise NotImplementedError # pragma: no cover
def processed_records(self, records):
"""Executed after successfully uploading a batch of records from the queue.
:param records: a list of queue entries
"""
for record in records:
self.logger.debug('Marking as processed: %s', record)
record.processed = True
db.session.commit()