From 25af285d6efdd3bf2a4862b5bd0c43e424a61542 Mon Sep 17 00:00:00 2001 From: Adrian Moennich Date: Thu, 27 May 2021 19:30:18 +0200 Subject: [PATCH] LiveSync: Remove chunking --- livesync/indico_livesync/uploader.py | 14 +++------- livesync/tests/uploader_test.py | 26 ++++++------------- .../indico_livesync_debug/backend.py | 2 -- 3 files changed, 12 insertions(+), 30 deletions(-) diff --git a/livesync/indico_livesync/uploader.py b/livesync/indico_livesync/uploader.py index c6c8b86..17adb80 100644 --- a/livesync/indico_livesync/uploader.py +++ b/livesync/indico_livesync/uploader.py @@ -9,7 +9,6 @@ import re from indico.core.db import db from indico.util.console import verbose_iterator -from indico.util.iterables import grouper from indico.util.string import strip_control_chars from indico_livesync.simplify import SimpleChange, process_records @@ -18,9 +17,6 @@ from indico_livesync.simplify import SimpleChange, process_records class Uploader: """Handles batch data upload to a remote service.""" - #: Number of queue entries to process at a time - BATCH_SIZE = 100 - def __init__(self, backend, verbose=False, from_cli=False): self.backend = backend self.verbose = verbose @@ -34,13 +30,11 @@ class Uploader: """ self_name = type(self).__name__ simplified = process_records(records).items() - chunks = list(grouper(simplified, self.BATCH_SIZE, skip_missing=True)) try: - for i, batch in enumerate(chunks, 1): - self.logger.info(f'{self_name} uploading chunk %d/%d', i, len(chunks)) - self.upload_records(batch) + self.logger.info(f'{self_name} uploading %d changes from %d records', len(simplified), len(records)) + self.upload_records(simplified) except Exception: - self.logger.exception(f'{self_name} could not upload batch') + self.logger.exception(f'{self_name} failed') if self.from_cli: raise return @@ -64,7 +58,7 @@ class Uploader: return self.upload_records(records, initial=True) def upload_records(self, records, initial=False): - """Executed for a batch of up to `BATCH_SIZE` records + """Executed to upload records. :param records: an iterator of records to upload (or queue entries) :param initial: whether the upload is part of an initial export diff --git a/livesync/tests/uploader_test.py b/livesync/tests/uploader_test.py index f146386..9249f48 100644 --- a/livesync/tests/uploader_test.py +++ b/livesync/tests/uploader_test.py @@ -14,14 +14,15 @@ from indico_livesync.uploader import Uploader class RecordingUploader(Uploader): - """An uploader which logs each 'upload'""" + """An uploader which logs each 'upload'.""" + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._uploaded = [] self.logger = MagicMock() def upload_records(self, records, initial=False): - self._uploaded.append(list(records)) + self._uploaded = list(records) @property def all_uploaded(self): @@ -29,16 +30,10 @@ class RecordingUploader(Uploader): class FailingUploader(RecordingUploader): - """An uploader where the second batch fails""" - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._n = 0 + """An uploader where uploading fails.""" def upload_records(self, records, initial=False): - super().upload_records(records) - self._n += 1 - if self._n == 2: - raise Exception('All your data are belong to us!') + raise Exception('All your data are belong to us!') def test_run_initial(mocker): @@ -48,7 +43,7 @@ def test_run_initial(mocker): uploader = RecordingUploader(MagicMock()) records = tuple(MagicMock(id=evt_id) for evt_id in range(4)) uploader.run_initial(records, 4) - assert uploader.all_uploaded == [[(record, SimpleChange.created) for record in records]] + assert uploader.all_uploaded == [(record, SimpleChange.created) for record in records] # During an initial export there are no records to mark as processed assert not uploader.processed_records.called @@ -60,7 +55,6 @@ def _sorted_process_cascaded_event_contents(records, additional_events=None, *, def test_run(mocker, monkeypatch, db, create_event, dummy_agent): """Test uploading queued data""" uploader = RecordingUploader(MagicMock()) - uploader.BATCH_SIZE = 3 events = tuple(create_event(id_=evt_id) for evt_id in range(4)) records = tuple(LiveSyncQueueEntry(change=ChangeType.created, type=EntryType.event, event_id=evt.id, @@ -77,9 +71,7 @@ def test_run(mocker, monkeypatch, db, create_event, dummy_agent): uploader.run(records) objs = [(record.object, int(SimpleChange.created)) for record in records] - assert uploader.all_uploaded == [objs[:3], objs[3:]] - assert len(uploader.all_uploaded[0]) == 3 - assert len(uploader.all_uploaded[1]) == 1 + assert uploader.all_uploaded == objs # All records should be marked as processed assert all(record.processed for record in records) # After the queue run the changes should be committed @@ -89,7 +81,6 @@ def test_run(mocker, monkeypatch, db, create_event, dummy_agent): def test_run_failing(mocker, monkeypatch, db, create_event, dummy_agent): """Test a failing queue run""" uploader = FailingUploader(MagicMock()) - uploader.BATCH_SIZE = 3 events = tuple(create_event(id_=evt_id) for evt_id in range(10)) records = tuple(LiveSyncQueueEntry(change=ChangeType.created, type=EntryType.event, event_id=evt.id, @@ -105,10 +96,9 @@ def test_run_failing(mocker, monkeypatch, db, create_event, dummy_agent): _sorted_process_cascaded_event_contents) uploader.run(records) - objs = [(record.object, int(SimpleChange.created)) for record in records] assert uploader.logger.exception.called # No uploads should happen after a failed batch - assert uploader._uploaded == [objs[:3], objs[3:6]] + assert not uploader._uploaded # No records should be marked as processed assert not any(record.processed for record in records) # And nothing should have been committed diff --git a/livesync_debug/indico_livesync_debug/backend.py b/livesync_debug/indico_livesync_debug/backend.py index d6ffc0b..904f185 100644 --- a/livesync_debug/indico_livesync_debug/backend.py +++ b/livesync_debug/indico_livesync_debug/backend.py @@ -37,8 +37,6 @@ def _print_record(obj_type, obj_id, data, changes, *, print_blank, verbose): class DebugUploader(Uploader): - BATCH_SIZE = 5 - def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._is_queue_run = False