mirror of
https://github.com/lucaspalomodevelop/indico-plugins.git
synced 2026-03-12 23:27:22 +00:00
LiveSync: Remove chunking
This commit is contained in:
parent
96b4a36480
commit
25af285d6e
@ -9,7 +9,6 @@ import re
|
||||
|
||||
from indico.core.db import db
|
||||
from indico.util.console import verbose_iterator
|
||||
from indico.util.iterables import grouper
|
||||
from indico.util.string import strip_control_chars
|
||||
|
||||
from indico_livesync.simplify import SimpleChange, process_records
|
||||
@ -18,9 +17,6 @@ from indico_livesync.simplify import SimpleChange, process_records
|
||||
class Uploader:
|
||||
"""Handles batch data upload to a remote service."""
|
||||
|
||||
#: Number of queue entries to process at a time
|
||||
BATCH_SIZE = 100
|
||||
|
||||
def __init__(self, backend, verbose=False, from_cli=False):
|
||||
self.backend = backend
|
||||
self.verbose = verbose
|
||||
@ -34,13 +30,11 @@ class Uploader:
|
||||
"""
|
||||
self_name = type(self).__name__
|
||||
simplified = process_records(records).items()
|
||||
chunks = list(grouper(simplified, self.BATCH_SIZE, skip_missing=True))
|
||||
try:
|
||||
for i, batch in enumerate(chunks, 1):
|
||||
self.logger.info(f'{self_name} uploading chunk %d/%d', i, len(chunks))
|
||||
self.upload_records(batch)
|
||||
self.logger.info(f'{self_name} uploading %d changes from %d records', len(simplified), len(records))
|
||||
self.upload_records(simplified)
|
||||
except Exception:
|
||||
self.logger.exception(f'{self_name} could not upload batch')
|
||||
self.logger.exception(f'{self_name} failed')
|
||||
if self.from_cli:
|
||||
raise
|
||||
return
|
||||
@ -64,7 +58,7 @@ class Uploader:
|
||||
return self.upload_records(records, initial=True)
|
||||
|
||||
def upload_records(self, records, initial=False):
|
||||
"""Executed for a batch of up to `BATCH_SIZE` records
|
||||
"""Executed to upload records.
|
||||
|
||||
:param records: an iterator of records to upload (or queue entries)
|
||||
:param initial: whether the upload is part of an initial export
|
||||
|
||||
@ -14,14 +14,15 @@ from indico_livesync.uploader import Uploader
|
||||
|
||||
|
||||
class RecordingUploader(Uploader):
|
||||
"""An uploader which logs each 'upload'"""
|
||||
"""An uploader which logs each 'upload'."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._uploaded = []
|
||||
self.logger = MagicMock()
|
||||
|
||||
def upload_records(self, records, initial=False):
|
||||
self._uploaded.append(list(records))
|
||||
self._uploaded = list(records)
|
||||
|
||||
@property
|
||||
def all_uploaded(self):
|
||||
@ -29,16 +30,10 @@ class RecordingUploader(Uploader):
|
||||
|
||||
|
||||
class FailingUploader(RecordingUploader):
|
||||
"""An uploader where the second batch fails"""
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._n = 0
|
||||
"""An uploader where uploading fails."""
|
||||
|
||||
def upload_records(self, records, initial=False):
|
||||
super().upload_records(records)
|
||||
self._n += 1
|
||||
if self._n == 2:
|
||||
raise Exception('All your data are belong to us!')
|
||||
raise Exception('All your data are belong to us!')
|
||||
|
||||
|
||||
def test_run_initial(mocker):
|
||||
@ -48,7 +43,7 @@ def test_run_initial(mocker):
|
||||
uploader = RecordingUploader(MagicMock())
|
||||
records = tuple(MagicMock(id=evt_id) for evt_id in range(4))
|
||||
uploader.run_initial(records, 4)
|
||||
assert uploader.all_uploaded == [[(record, SimpleChange.created) for record in records]]
|
||||
assert uploader.all_uploaded == [(record, SimpleChange.created) for record in records]
|
||||
# During an initial export there are no records to mark as processed
|
||||
assert not uploader.processed_records.called
|
||||
|
||||
@ -60,7 +55,6 @@ def _sorted_process_cascaded_event_contents(records, additional_events=None, *,
|
||||
def test_run(mocker, monkeypatch, db, create_event, dummy_agent):
|
||||
"""Test uploading queued data"""
|
||||
uploader = RecordingUploader(MagicMock())
|
||||
uploader.BATCH_SIZE = 3
|
||||
|
||||
events = tuple(create_event(id_=evt_id) for evt_id in range(4))
|
||||
records = tuple(LiveSyncQueueEntry(change=ChangeType.created, type=EntryType.event, event_id=evt.id,
|
||||
@ -77,9 +71,7 @@ def test_run(mocker, monkeypatch, db, create_event, dummy_agent):
|
||||
uploader.run(records)
|
||||
|
||||
objs = [(record.object, int(SimpleChange.created)) for record in records]
|
||||
assert uploader.all_uploaded == [objs[:3], objs[3:]]
|
||||
assert len(uploader.all_uploaded[0]) == 3
|
||||
assert len(uploader.all_uploaded[1]) == 1
|
||||
assert uploader.all_uploaded == objs
|
||||
# All records should be marked as processed
|
||||
assert all(record.processed for record in records)
|
||||
# After the queue run the changes should be committed
|
||||
@ -89,7 +81,6 @@ def test_run(mocker, monkeypatch, db, create_event, dummy_agent):
|
||||
def test_run_failing(mocker, monkeypatch, db, create_event, dummy_agent):
|
||||
"""Test a failing queue run"""
|
||||
uploader = FailingUploader(MagicMock())
|
||||
uploader.BATCH_SIZE = 3
|
||||
|
||||
events = tuple(create_event(id_=evt_id) for evt_id in range(10))
|
||||
records = tuple(LiveSyncQueueEntry(change=ChangeType.created, type=EntryType.event, event_id=evt.id,
|
||||
@ -105,10 +96,9 @@ def test_run_failing(mocker, monkeypatch, db, create_event, dummy_agent):
|
||||
_sorted_process_cascaded_event_contents)
|
||||
|
||||
uploader.run(records)
|
||||
objs = [(record.object, int(SimpleChange.created)) for record in records]
|
||||
assert uploader.logger.exception.called
|
||||
# No uploads should happen after a failed batch
|
||||
assert uploader._uploaded == [objs[:3], objs[3:6]]
|
||||
assert not uploader._uploaded
|
||||
# No records should be marked as processed
|
||||
assert not any(record.processed for record in records)
|
||||
# And nothing should have been committed
|
||||
|
||||
@ -37,8 +37,6 @@ def _print_record(obj_type, obj_id, data, changes, *, print_blank, verbose):
|
||||
|
||||
|
||||
class DebugUploader(Uploader):
|
||||
BATCH_SIZE = 5
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._is_queue_run = False
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user