From d2b05cd2c39b3bea8596e2c3ef2ef75d207b4150 Mon Sep 17 00:00:00 2001 From: Pedro Ferreira Date: Tue, 23 Jun 2015 11:18:11 +0200 Subject: [PATCH] Livesync: Upload metadata records in batches Until now we were fetching livesync queue entries in batches but the resulting number of actual resulting records could be much higher --- livesync/indico_livesync/marcxml.py | 7 ++-- livesync/indico_livesync/uploader.py | 17 +++++++--- livesync/tests/uploader_test.py | 49 +++++++++++++++++++++------- 3 files changed, 51 insertions(+), 22 deletions(-) diff --git a/livesync/indico_livesync/marcxml.py b/livesync/indico_livesync/marcxml.py index 7989a1e..2e376fc 100644 --- a/livesync/indico_livesync/marcxml.py +++ b/livesync/indico_livesync/marcxml.py @@ -23,7 +23,7 @@ from MaKaC.common.output import outputGenerator from MaKaC.common.xmlGen import XMLGen from indico.modules.users import User -from indico_livesync import process_records, SimpleChange +from indico_livesync import SimpleChange from indico_livesync.util import make_compound_id, obj_deref, obj_ref @@ -32,11 +32,8 @@ class MARCXMLGenerator: @classmethod def records_to_xml(cls, records): - processed = process_records(records) - if not processed: - return None mg = MARCXMLGenerator() - for ref, change in processed.iteritems(): + for ref, change in records.iteritems(): mg.safe_add_object(ref, bool(change & SimpleChange.deleted)) return mg.get_xml() diff --git a/livesync/indico_livesync/uploader.py b/livesync/indico_livesync/uploader.py index 24ed427..a6569cf 100644 --- a/livesync/indico_livesync/uploader.py +++ b/livesync/indico_livesync/uploader.py @@ -20,8 +20,7 @@ import transaction from indico.core.db import db from indico.util.struct.iterables import grouper - -from indico_livesync import MARCXMLGenerator +from indico_livesync import MARCXMLGenerator, process_records class Uploader(object): @@ -45,7 +44,10 @@ class Uploader(object): for i, batch in enumerate(grouper(records, self.BATCH_SIZE, skip_missing=True), 1): self.logger.info('{} processing batch {}'.format(self_name, i)) try: - self.upload_records(batch, from_queue=True) + for j, proc_batch in enumerate(grouper( + process_records(batch).iteritems(), self.BATCH_SIZE, skip_missing=True), 1): + self.logger.info('{} uploading chunk #{} (batch {})'.format(self_name, j, i)) + self.upload_records({k: v for k, v in proc_batch}, from_queue=True) except Exception: self.logger.exception('{} could not upload batch'.format(self_name)) return @@ -58,9 +60,14 @@ class Uploader(object): :param events: an iterable containing events """ + self_name = type(self).__name__ for i, batch in enumerate(grouper(events, self.INITIAL_BATCH_SIZE, skip_missing=True), 1): - self.logger.debug('{} processing initial batch {}'.format(type(self).__name__, i)) - self.upload_records(batch, from_queue=False) + self.logger.debug('{} processing initial batch {}'.format(self_name, i)) + + for j, processed_batch in enumerate(grouper( + batch, self.BATCH_SIZE, skip_missing=True), 1): + self.logger.info('{} uploading initial chunk #{} (batch {})'.format(self_name, j, i)) + self.upload_records(processed_batch, from_queue=False) def upload_records(self, records, from_queue): """Executed for a batch of up to `BATCH_SIZE` records diff --git a/livesync/tests/uploader_test.py b/livesync/tests/uploader_test.py index ed5ee56..3ac857b 100644 --- a/livesync/tests/uploader_test.py +++ b/livesync/tests/uploader_test.py @@ -14,10 +14,20 @@ # You should have received a copy of the GNU General Public License # along with Indico; if not, see . -from mock import MagicMock +import pytest +from mock import MagicMock, Mock +from werkzeug.datastructures import ImmutableDict +from indico_livesync import SimpleChange from indico_livesync.models.queue import LiveSyncQueueEntry, ChangeType from indico_livesync.uploader import Uploader, MARCXMLUploader +from indico_livesync.util import obj_ref + +from MaKaC.conference import Conference + + +def _rm_none(dict_): + return ImmutableDict((k, v) for k, v in dict_.iteritems() if v is not None) class RecordingUploader(Uploader): @@ -28,7 +38,14 @@ class RecordingUploader(Uploader): self.logger = MagicMock() def upload_records(self, records, from_queue): - self._uploaded.append((records, from_queue)) + if from_queue: + self._uploaded.append((set((_rm_none(rec), op) for rec, op in records.iteritems()), from_queue)) + else: + self._uploaded.append((set(records), from_queue)) + + @property + def all_uploaded(self): + return self._uploaded class FailingUploader(RecordingUploader): @@ -44,17 +61,21 @@ class FailingUploader(RecordingUploader): raise Exception('All your data are belong to us!') +@pytest.fixture(autouse=True) +def mock_resolved_zodb_objects(mocker): + mocker.patch.object(LiveSyncQueueEntry, 'object', autospec=True) + + def test_run_initial(mocker): """Test the initial upload""" mocker.patch.object(Uploader, 'processed_records', autospec=True) uploader = RecordingUploader(MagicMock()) uploader.INITIAL_BATCH_SIZE = 3 - events = tuple(range(4)) - uploader.run_initial(events) + records = tuple(Mock(spec=Conference, id=evt_id) for evt_id in xrange(4)) + uploader.run_initial(records) # We expect two batches, with the second one being smaller (i.e. no None padding, just the events) - batches = events[:3], events[3:] - assert len(batches[0]) > len(batches[1]) - assert uploader._uploaded == [(batches[0], False), (batches[1], False)] + batches = set(records[:3]), set(records[3:]) + assert uploader.all_uploaded == [(batches[0], False), (batches[1], False)] # During an initial export there are no records to mark as processed assert not uploader.processed_records.called @@ -64,10 +85,12 @@ def test_run(mocker): db = mocker.patch('indico_livesync.uploader.db') uploader = RecordingUploader(MagicMock()) uploader.BATCH_SIZE = 3 - records = tuple(LiveSyncQueueEntry(change=ChangeType.created) for _ in xrange(4)) + records = tuple( + LiveSyncQueueEntry(change=ChangeType.created, type='event', event_id=evt_id) for evt_id in xrange(4)) uploader.run(records) - batches = records[:3], records[3:] - assert uploader._uploaded == [(batches[0], True), (batches[1], True)] + refs = tuple((_rm_none(record.object_ref), int(SimpleChange.created)) for record in records) + batches = set(refs[:3]), set(refs[3:]) + assert uploader.all_uploaded == [(batches[0], True), (batches[1], True)] # All records should be marked as processed assert all(record.processed for record in records) # Marking records as processed is committed immediately @@ -79,11 +102,13 @@ def test_run_failing(mocker): db = mocker.patch('indico_livesync.uploader.db') uploader = FailingUploader(MagicMock()) uploader.BATCH_SIZE = 3 - records = tuple(LiveSyncQueueEntry(change=ChangeType.created) for _ in xrange(10)) + records = tuple( + LiveSyncQueueEntry(change=ChangeType.created, type='event', event_id=evt_id) for evt_id in xrange(10)) uploader.run(records) + refs = tuple((_rm_none(record.object_ref), int(SimpleChange.created)) for record in records) assert uploader.logger.exception.called # No uploads should happen after a failed batch - assert uploader._uploaded == [(records[:3], True), (records[3:6], True)] + assert uploader._uploaded == [(set(refs[:3]), True), (set(refs[3:6]), True)] # Only successful records should be marked as processed assert all(record.processed for record in records[:3]) assert not any(record.processed for record in records[3:])