Citadel: Make thread counts configurable

This commit is contained in:
Adrian Moennich 2021-05-20 11:18:35 +02:00
parent 3ef0b988ba
commit ca2e860415
6 changed files with 56 additions and 29 deletions

View File

@ -57,9 +57,6 @@ def _print_record(record):
class LiveSyncCitadelUploader(Uploader):
PARALLELISM_RECORDS = 250
PARALLELISM_FILES = 200
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@ -207,15 +204,29 @@ class LiveSyncCitadelUploader(Uploader):
self.categories = dict(db.session.execute(select([cte.c.id, cte.c.path])).fetchall())
return super().run_initial(records, total)
def upload_records(self, records):
def _get_retry_config(self, initial):
if initial:
return Retry(
total=10,
backoff_factor=3,
status_forcelist=[502, 503, 504],
allowed_methods=frozenset(['POST', 'PUT', 'DELETE'])
)
else:
return Retry(
total=2,
backoff_factor=3,
status_forcelist=[502, 503, 504],
allowed_methods=frozenset(['POST', 'PUT', 'DELETE'])
)
def upload_records(self, records, initial=False):
setting = 'num_threads_records_initial' if initial else 'num_threads_records'
num_threads = self.backend.plugin.settings.get(setting)
self.logger.debug('Using %d parallel threads', num_threads)
session = requests.Session()
retry = Retry(
total=10,
backoff_factor=3,
status_forcelist=[502, 503, 504],
allowed_methods=frozenset(['POST', 'PUT', 'DELETE'])
)
session.mount(self.search_app, HTTPAdapter(max_retries=retry, pool_maxsize=self.PARALLELISM_RECORDS))
session.mount(self.search_app, HTTPAdapter(max_retries=self._get_retry_config(initial),
pool_maxsize=num_threads))
session.headers = self.headers
dumped_records = (
(
@ -228,21 +239,19 @@ class LiveSyncCitadelUploader(Uploader):
if self.verbose:
dumped_records = (_print_record(x) for x in dumped_records)
uploader = parallelize(self.upload_record, entries=dumped_records, batch_size=self.PARALLELISM_RECORDS)
uploader = parallelize(self.upload_record, entries=dumped_records, batch_size=num_threads)
__, aborted = uploader(session)
return not aborted
def upload_files(self, files):
def upload_files(self, files, initial=False):
setting = 'num_threads_files_initial' if initial else 'num_threads_files'
num_threads = self.backend.plugin.settings.get(setting)
self.logger.debug('Using %d parallel threads', num_threads)
session = requests.Session()
retry = Retry(
total=10,
backoff_factor=3,
status_forcelist=[502, 503, 504],
allowed_methods=frozenset(['PUT'])
)
session.mount(self.search_app, HTTPAdapter(max_retries=retry, pool_maxsize=self.PARALLELISM_FILES))
session.mount(self.search_app, HTTPAdapter(max_retries=self._get_retry_config(initial),
pool_maxsize=num_threads))
session.headers = self.headers
uploader = parallelize(self.upload_file, entries=files, batch_size=self.PARALLELISM_FILES)
uploader = parallelize(self.upload_file, entries=files, batch_size=num_threads)
results, aborted = uploader(session)
return len(results), sum(1 for success in results if not success), aborted
@ -306,7 +315,7 @@ class LiveSyncCitadelBackend(LiveSyncBackendBase):
self.set_initial_file_upload_state(True)
return True
def run_export_files(self, batch=1000, force=False, max_size=None, verbose=True):
def run_export_files(self, batch=1000, force=False, max_size=None, verbose=True, initial=False):
from indico_citadel.plugin import CitadelPlugin
if max_size is None:
@ -335,7 +344,7 @@ class LiveSyncCitadelBackend(LiveSyncBackendBase):
print_total_time=True)
else:
self.plugin.logger.info(f'{total} files need to be uploaded')
total, errors, aborted = uploader.upload_files(attachments)
total, errors, aborted = uploader.upload_files(attachments, initial=initial)
return total, errors, aborted
def check_reset_status(self):

View File

@ -42,7 +42,8 @@ def upload(batch, force, max_size):
print('Citadel is not properly configured.')
return
total, errors, aborted = backend.run_export_files(batch, force, max_size=max_size)
initial = not agent.settings.get('file_upload_done')
total, errors, aborted = backend.run_export_files(batch, force, max_size=max_size, initial=initial)
if not errors and not aborted:
print(f'{total} files uploaded')
if max_size is None:

View File

@ -36,6 +36,18 @@ class CitadelSettingsForm(IndicoForm):
'for indexing that have not been uploaded before during the next queue '
'run, which may take a long time on larger instances. You may want '
'to run a manual upload for the new file size first!'))
num_threads_records = IntegerField(_('Parallel threads (records)'), [NumberRange(min=1, max=500)],
description=_('Number of threads to use when uploading records.'))
num_threads_records_initial = IntegerField(_('Parallel threads (records, initial export)'),
[NumberRange(min=1, max=500)],
description=_('Number of threads to use when uploading records during '
'the initial export.'))
num_threads_files = IntegerField(_('Parallel threads (files)'), [NumberRange(min=1, max=500)],
description=_('Number of threads to use when uploading files.'))
num_threads_files_initial = IntegerField(_('Parallel threads (files, initial export)'),
[NumberRange(min=1, max=500)],
description=_('Number of threads to use when uploading files during '
'the initial export.'))
disable_search = BooleanField(_('Disable search'), widget=SwitchWidget(),
description=_('This disables the search integration of the plugin. When this option '
'is used, the internal Indico search interface will be used. This may '
@ -60,6 +72,10 @@ class CitadelPlugin(LiveSyncPluginBase):
'tex', 'txt', 'wdp'
],
'max_file_size': 10,
'num_threads_records': 5,
'num_threads_records_initial': 25,
'num_threads_files': 5,
'num_threads_files_initial': 25,
'disable_search': False,
}
backend_classes = {'citadel': LiveSyncCitadelBackend}

View File

@ -61,12 +61,13 @@ class Uploader:
lambda entry: re.sub(r'\s+', ' ', strip_control_chars(getattr(entry[0], 'title', ''))),
print_total_time=True
)
return self.upload_records(records)
return self.upload_records(records, initial=True)
def upload_records(self, records):
def upload_records(self, records, initial=False):
"""Executed for a batch of up to `BATCH_SIZE` records
:param records: an iterator of records to upload (or queue entries)
:param initial: whether the upload is part of an initial export
:return: True if everything was successful, False if not
"""
raise NotImplementedError # pragma: no cover

View File

@ -20,7 +20,7 @@ class RecordingUploader(Uploader):
self._uploaded = []
self.logger = MagicMock()
def upload_records(self, records):
def upload_records(self, records, initial=False):
self._uploaded.append(list(records))
@property
@ -34,7 +34,7 @@ class FailingUploader(RecordingUploader):
super().__init__(*args, **kwargs)
self._n = 0
def upload_records(self, records):
def upload_records(self, records, initial=False):
super().upload_records(records)
self._n += 1
if self._n == 2:

View File

@ -56,7 +56,7 @@ class DebugUploader(Uploader):
return schema.dump(obj)
raise ValueError(f'unknown object ref: {obj}')
def upload_records(self, records):
def upload_records(self, records, initial=False):
dumped_records = (
(
type(rec).__name__, rec.id,