diff --git a/citadel/indico_citadel/backend.py b/citadel/indico_citadel/backend.py index 511fbec..3c62071 100644 --- a/citadel/indico_citadel/backend.py +++ b/citadel/indico_citadel/backend.py @@ -57,9 +57,6 @@ def _print_record(record): class LiveSyncCitadelUploader(Uploader): - PARALLELISM_RECORDS = 250 - PARALLELISM_FILES = 200 - def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -207,15 +204,29 @@ class LiveSyncCitadelUploader(Uploader): self.categories = dict(db.session.execute(select([cte.c.id, cte.c.path])).fetchall()) return super().run_initial(records, total) - def upload_records(self, records): + def _get_retry_config(self, initial): + if initial: + return Retry( + total=10, + backoff_factor=3, + status_forcelist=[502, 503, 504], + allowed_methods=frozenset(['POST', 'PUT', 'DELETE']) + ) + else: + return Retry( + total=2, + backoff_factor=3, + status_forcelist=[502, 503, 504], + allowed_methods=frozenset(['POST', 'PUT', 'DELETE']) + ) + + def upload_records(self, records, initial=False): + setting = 'num_threads_records_initial' if initial else 'num_threads_records' + num_threads = self.backend.plugin.settings.get(setting) + self.logger.debug('Using %d parallel threads', num_threads) session = requests.Session() - retry = Retry( - total=10, - backoff_factor=3, - status_forcelist=[502, 503, 504], - allowed_methods=frozenset(['POST', 'PUT', 'DELETE']) - ) - session.mount(self.search_app, HTTPAdapter(max_retries=retry, pool_maxsize=self.PARALLELISM_RECORDS)) + session.mount(self.search_app, HTTPAdapter(max_retries=self._get_retry_config(initial), + pool_maxsize=num_threads)) session.headers = self.headers dumped_records = ( ( @@ -228,21 +239,19 @@ class LiveSyncCitadelUploader(Uploader): if self.verbose: dumped_records = (_print_record(x) for x in dumped_records) - uploader = parallelize(self.upload_record, entries=dumped_records, batch_size=self.PARALLELISM_RECORDS) + uploader = parallelize(self.upload_record, entries=dumped_records, batch_size=num_threads) __, aborted = uploader(session) return not aborted - def upload_files(self, files): + def upload_files(self, files, initial=False): + setting = 'num_threads_files_initial' if initial else 'num_threads_files' + num_threads = self.backend.plugin.settings.get(setting) + self.logger.debug('Using %d parallel threads', num_threads) session = requests.Session() - retry = Retry( - total=10, - backoff_factor=3, - status_forcelist=[502, 503, 504], - allowed_methods=frozenset(['PUT']) - ) - session.mount(self.search_app, HTTPAdapter(max_retries=retry, pool_maxsize=self.PARALLELISM_FILES)) + session.mount(self.search_app, HTTPAdapter(max_retries=self._get_retry_config(initial), + pool_maxsize=num_threads)) session.headers = self.headers - uploader = parallelize(self.upload_file, entries=files, batch_size=self.PARALLELISM_FILES) + uploader = parallelize(self.upload_file, entries=files, batch_size=num_threads) results, aborted = uploader(session) return len(results), sum(1 for success in results if not success), aborted @@ -306,7 +315,7 @@ class LiveSyncCitadelBackend(LiveSyncBackendBase): self.set_initial_file_upload_state(True) return True - def run_export_files(self, batch=1000, force=False, max_size=None, verbose=True): + def run_export_files(self, batch=1000, force=False, max_size=None, verbose=True, initial=False): from indico_citadel.plugin import CitadelPlugin if max_size is None: @@ -335,7 +344,7 @@ class LiveSyncCitadelBackend(LiveSyncBackendBase): print_total_time=True) else: self.plugin.logger.info(f'{total} files need to be uploaded') - total, errors, aborted = uploader.upload_files(attachments) + total, errors, aborted = uploader.upload_files(attachments, initial=initial) return total, errors, aborted def check_reset_status(self): diff --git a/citadel/indico_citadel/cli.py b/citadel/indico_citadel/cli.py index 0e21690..e01aefc 100644 --- a/citadel/indico_citadel/cli.py +++ b/citadel/indico_citadel/cli.py @@ -42,7 +42,8 @@ def upload(batch, force, max_size): print('Citadel is not properly configured.') return - total, errors, aborted = backend.run_export_files(batch, force, max_size=max_size) + initial = not agent.settings.get('file_upload_done') + total, errors, aborted = backend.run_export_files(batch, force, max_size=max_size, initial=initial) if not errors and not aborted: print(f'{total} files uploaded') if max_size is None: diff --git a/citadel/indico_citadel/plugin.py b/citadel/indico_citadel/plugin.py index 8af2e19..e025459 100644 --- a/citadel/indico_citadel/plugin.py +++ b/citadel/indico_citadel/plugin.py @@ -36,6 +36,18 @@ class CitadelSettingsForm(IndicoForm): 'for indexing that have not been uploaded before during the next queue ' 'run, which may take a long time on larger instances. You may want ' 'to run a manual upload for the new file size first!')) + num_threads_records = IntegerField(_('Parallel threads (records)'), [NumberRange(min=1, max=500)], + description=_('Number of threads to use when uploading records.')) + num_threads_records_initial = IntegerField(_('Parallel threads (records, initial export)'), + [NumberRange(min=1, max=500)], + description=_('Number of threads to use when uploading records during ' + 'the initial export.')) + num_threads_files = IntegerField(_('Parallel threads (files)'), [NumberRange(min=1, max=500)], + description=_('Number of threads to use when uploading files.')) + num_threads_files_initial = IntegerField(_('Parallel threads (files, initial export)'), + [NumberRange(min=1, max=500)], + description=_('Number of threads to use when uploading files during ' + 'the initial export.')) disable_search = BooleanField(_('Disable search'), widget=SwitchWidget(), description=_('This disables the search integration of the plugin. When this option ' 'is used, the internal Indico search interface will be used. This may ' @@ -60,6 +72,10 @@ class CitadelPlugin(LiveSyncPluginBase): 'tex', 'txt', 'wdp' ], 'max_file_size': 10, + 'num_threads_records': 5, + 'num_threads_records_initial': 25, + 'num_threads_files': 5, + 'num_threads_files_initial': 25, 'disable_search': False, } backend_classes = {'citadel': LiveSyncCitadelBackend} diff --git a/livesync/indico_livesync/uploader.py b/livesync/indico_livesync/uploader.py index 2d6fc58..c6c8b86 100644 --- a/livesync/indico_livesync/uploader.py +++ b/livesync/indico_livesync/uploader.py @@ -61,12 +61,13 @@ class Uploader: lambda entry: re.sub(r'\s+', ' ', strip_control_chars(getattr(entry[0], 'title', ''))), print_total_time=True ) - return self.upload_records(records) + return self.upload_records(records, initial=True) - def upload_records(self, records): + def upload_records(self, records, initial=False): """Executed for a batch of up to `BATCH_SIZE` records :param records: an iterator of records to upload (or queue entries) + :param initial: whether the upload is part of an initial export :return: True if everything was successful, False if not """ raise NotImplementedError # pragma: no cover diff --git a/livesync/tests/uploader_test.py b/livesync/tests/uploader_test.py index bc92210..3792f90 100644 --- a/livesync/tests/uploader_test.py +++ b/livesync/tests/uploader_test.py @@ -20,7 +20,7 @@ class RecordingUploader(Uploader): self._uploaded = [] self.logger = MagicMock() - def upload_records(self, records): + def upload_records(self, records, initial=False): self._uploaded.append(list(records)) @property @@ -34,7 +34,7 @@ class FailingUploader(RecordingUploader): super().__init__(*args, **kwargs) self._n = 0 - def upload_records(self, records): + def upload_records(self, records, initial=False): super().upload_records(records) self._n += 1 if self._n == 2: diff --git a/livesync_debug/indico_livesync_debug/backend.py b/livesync_debug/indico_livesync_debug/backend.py index eebe87f..d6ffc0b 100644 --- a/livesync_debug/indico_livesync_debug/backend.py +++ b/livesync_debug/indico_livesync_debug/backend.py @@ -56,7 +56,7 @@ class DebugUploader(Uploader): return schema.dump(obj) raise ValueError(f'unknown object ref: {obj}') - def upload_records(self, records): + def upload_records(self, records, initial=False): dumped_records = ( ( type(rec).__name__, rec.id,