Livesync: Upload metadata records in batches

Until now we were fetching livesync queue entries in batches but the resulting number of actual resulting records could be much higher
This commit is contained in:
Pedro Ferreira 2015-06-23 11:18:11 +02:00
parent 7a1046a22b
commit d2b05cd2c3
3 changed files with 51 additions and 22 deletions

View File

@ -23,7 +23,7 @@ from MaKaC.common.output import outputGenerator
from MaKaC.common.xmlGen import XMLGen
from indico.modules.users import User
from indico_livesync import process_records, SimpleChange
from indico_livesync import SimpleChange
from indico_livesync.util import make_compound_id, obj_deref, obj_ref
@ -32,11 +32,8 @@ class MARCXMLGenerator:
@classmethod
def records_to_xml(cls, records):
processed = process_records(records)
if not processed:
return None
mg = MARCXMLGenerator()
for ref, change in processed.iteritems():
for ref, change in records.iteritems():
mg.safe_add_object(ref, bool(change & SimpleChange.deleted))
return mg.get_xml()

View File

@ -20,8 +20,7 @@ import transaction
from indico.core.db import db
from indico.util.struct.iterables import grouper
from indico_livesync import MARCXMLGenerator
from indico_livesync import MARCXMLGenerator, process_records
class Uploader(object):
@ -45,7 +44,10 @@ class Uploader(object):
for i, batch in enumerate(grouper(records, self.BATCH_SIZE, skip_missing=True), 1):
self.logger.info('{} processing batch {}'.format(self_name, i))
try:
self.upload_records(batch, from_queue=True)
for j, proc_batch in enumerate(grouper(
process_records(batch).iteritems(), self.BATCH_SIZE, skip_missing=True), 1):
self.logger.info('{} uploading chunk #{} (batch {})'.format(self_name, j, i))
self.upload_records({k: v for k, v in proc_batch}, from_queue=True)
except Exception:
self.logger.exception('{} could not upload batch'.format(self_name))
return
@ -58,9 +60,14 @@ class Uploader(object):
:param events: an iterable containing events
"""
self_name = type(self).__name__
for i, batch in enumerate(grouper(events, self.INITIAL_BATCH_SIZE, skip_missing=True), 1):
self.logger.debug('{} processing initial batch {}'.format(type(self).__name__, i))
self.upload_records(batch, from_queue=False)
self.logger.debug('{} processing initial batch {}'.format(self_name, i))
for j, processed_batch in enumerate(grouper(
batch, self.BATCH_SIZE, skip_missing=True), 1):
self.logger.info('{} uploading initial chunk #{} (batch {})'.format(self_name, j, i))
self.upload_records(processed_batch, from_queue=False)
def upload_records(self, records, from_queue):
"""Executed for a batch of up to `BATCH_SIZE` records

View File

@ -14,10 +14,20 @@
# You should have received a copy of the GNU General Public License
# along with Indico; if not, see <http://www.gnu.org/licenses/>.
from mock import MagicMock
import pytest
from mock import MagicMock, Mock
from werkzeug.datastructures import ImmutableDict
from indico_livesync import SimpleChange
from indico_livesync.models.queue import LiveSyncQueueEntry, ChangeType
from indico_livesync.uploader import Uploader, MARCXMLUploader
from indico_livesync.util import obj_ref
from MaKaC.conference import Conference
def _rm_none(dict_):
return ImmutableDict((k, v) for k, v in dict_.iteritems() if v is not None)
class RecordingUploader(Uploader):
@ -28,7 +38,14 @@ class RecordingUploader(Uploader):
self.logger = MagicMock()
def upload_records(self, records, from_queue):
self._uploaded.append((records, from_queue))
if from_queue:
self._uploaded.append((set((_rm_none(rec), op) for rec, op in records.iteritems()), from_queue))
else:
self._uploaded.append((set(records), from_queue))
@property
def all_uploaded(self):
return self._uploaded
class FailingUploader(RecordingUploader):
@ -44,17 +61,21 @@ class FailingUploader(RecordingUploader):
raise Exception('All your data are belong to us!')
@pytest.fixture(autouse=True)
def mock_resolved_zodb_objects(mocker):
mocker.patch.object(LiveSyncQueueEntry, 'object', autospec=True)
def test_run_initial(mocker):
"""Test the initial upload"""
mocker.patch.object(Uploader, 'processed_records', autospec=True)
uploader = RecordingUploader(MagicMock())
uploader.INITIAL_BATCH_SIZE = 3
events = tuple(range(4))
uploader.run_initial(events)
records = tuple(Mock(spec=Conference, id=evt_id) for evt_id in xrange(4))
uploader.run_initial(records)
# We expect two batches, with the second one being smaller (i.e. no None padding, just the events)
batches = events[:3], events[3:]
assert len(batches[0]) > len(batches[1])
assert uploader._uploaded == [(batches[0], False), (batches[1], False)]
batches = set(records[:3]), set(records[3:])
assert uploader.all_uploaded == [(batches[0], False), (batches[1], False)]
# During an initial export there are no records to mark as processed
assert not uploader.processed_records.called
@ -64,10 +85,12 @@ def test_run(mocker):
db = mocker.patch('indico_livesync.uploader.db')
uploader = RecordingUploader(MagicMock())
uploader.BATCH_SIZE = 3
records = tuple(LiveSyncQueueEntry(change=ChangeType.created) for _ in xrange(4))
records = tuple(
LiveSyncQueueEntry(change=ChangeType.created, type='event', event_id=evt_id) for evt_id in xrange(4))
uploader.run(records)
batches = records[:3], records[3:]
assert uploader._uploaded == [(batches[0], True), (batches[1], True)]
refs = tuple((_rm_none(record.object_ref), int(SimpleChange.created)) for record in records)
batches = set(refs[:3]), set(refs[3:])
assert uploader.all_uploaded == [(batches[0], True), (batches[1], True)]
# All records should be marked as processed
assert all(record.processed for record in records)
# Marking records as processed is committed immediately
@ -79,11 +102,13 @@ def test_run_failing(mocker):
db = mocker.patch('indico_livesync.uploader.db')
uploader = FailingUploader(MagicMock())
uploader.BATCH_SIZE = 3
records = tuple(LiveSyncQueueEntry(change=ChangeType.created) for _ in xrange(10))
records = tuple(
LiveSyncQueueEntry(change=ChangeType.created, type='event', event_id=evt_id) for evt_id in xrange(10))
uploader.run(records)
refs = tuple((_rm_none(record.object_ref), int(SimpleChange.created)) for record in records)
assert uploader.logger.exception.called
# No uploads should happen after a failed batch
assert uploader._uploaded == [(records[:3], True), (records[3:6], True)]
assert uploader._uploaded == [(set(refs[:3]), True), (set(refs[3:6]), True)]
# Only successful records should be marked as processed
assert all(record.processed for record in records[:3])
assert not any(record.processed for record in records[3:])