From 0ba1673b73301fbfae9e88149b5abb794022e6f2 Mon Sep 17 00:00:00 2001 From: Adrian Moennich Date: Tue, 18 Nov 2014 11:33:31 +0100 Subject: [PATCH] Cleanup old queue entries fixes #5 --- livesync/indico_livesync/plugin.py | 12 +++++++++++- livesync/indico_livesync/task.py | 3 +++ livesync/indico_livesync/util.py | 16 +++++++++++++++ livesync/tests/util_test.py | 31 +++++++++++++++++++++++++++++- 4 files changed, 60 insertions(+), 2 deletions(-) diff --git a/livesync/indico_livesync/plugin.py b/livesync/indico_livesync/plugin.py index 4401463..1a98747 100644 --- a/livesync/indico_livesync/plugin.py +++ b/livesync/indico_livesync/plugin.py @@ -18,6 +18,9 @@ from __future__ import unicode_literals from flask import request +from wtforms.validators import NumberRange +from wtforms.fields.html5 import IntegerField + from indico.core.plugins import IndicoPlugin, wrap_cli_manager from indico.core.plugins.views import WPPlugins from indico.util.i18n import _ @@ -31,6 +34,12 @@ from indico_livesync.handler import connect_signals class SettingsForm(IndicoForm): + queue_entry_ttl = IntegerField(_('Queue entry TTL'), [NumberRange(min=0)], + description=_("How many days should processed entries be kept in the queue. " + "The time counts from the creation of the queue entries, so if the " + "LiveSync task is not running for some time, queue entries may be " + "deleted during the next run after processing them. Setting it to 0 " + "disables automatic deletion.")) excluded_categories = MultipleItemsField(_('Excluded categories'), fields=(('id', _('Category ID')),), description=_("Changes to objects inside these categories or any of their " "subcategories are excluded.")) @@ -44,7 +53,8 @@ class LiveSyncPlugin(IndicoPlugin): """ settings_form = SettingsForm - default_settings = {'excluded_categories': []} + default_settings = {'excluded_categories': [], + 'queue_entry_ttl': 0} def init(self): super(LiveSyncPlugin, self).init() diff --git a/livesync/indico_livesync/task.py b/livesync/indico_livesync/task.py index d0c527b..fb4a2a2 100644 --- a/livesync/indico_livesync/task.py +++ b/livesync/indico_livesync/task.py @@ -21,6 +21,7 @@ from indico.modules.scheduler.tasks.periodic import PeriodicUniqueTask from indico.util.date_time import now_utc from indico_livesync.models.agents import LiveSyncAgent +from indico_livesync.util import clean_old_entries class LiveSyncTask(PeriodicUniqueTask): @@ -40,6 +41,8 @@ class LiveSyncTask(PeriodicUniqueTask): plugin = LiveSyncPlugin.instance # RuntimeError if not active with plugin.plugin_context(): + clean_old_entries() + for agent in LiveSyncAgent.find_all(): if agent.backend is None: self.logger.warning('Skipping agent {}; backend not found'.format(agent.name)) diff --git a/livesync/indico_livesync/util.py b/livesync/indico_livesync/util.py index 72fd1e9..7599df1 100644 --- a/livesync/indico_livesync/util.py +++ b/livesync/indico_livesync/util.py @@ -16,8 +16,11 @@ from __future__ import unicode_literals +from datetime import timedelta + from werkzeug.datastructures import ImmutableDict +from indico.util.date_time import now_utc from MaKaC.conference import Conference, Contribution, SubContribution, Category, CategoryManager, ConferenceHolder @@ -70,3 +73,16 @@ def make_compound_id(ref): return '{}.{}.{}'.format(ref['event_id'], ref['contrib_id'], ref['subcontrib_id']) else: raise ValueError('Unexpected object type: {}'.format(ref['type'])) + + +def clean_old_entries(): + """Deletes obsolete entries from the queues""" + from indico_livesync.plugin import LiveSyncPlugin + from indico_livesync.models.queue import LiveSyncQueueEntry + + queue_entry_ttl = LiveSyncPlugin.settings.get('queue_entry_ttl') + if not queue_entry_ttl: + return + expire_threshold = now_utc() - timedelta(days=queue_entry_ttl) + LiveSyncQueueEntry.find(LiveSyncQueueEntry.processed, + LiveSyncQueueEntry.timestamp < expire_threshold).delete(synchronize_session='fetch') diff --git a/livesync/tests/util_test.py b/livesync/tests/util_test.py index 38dbb4e..b56840f 100644 --- a/livesync/tests/util_test.py +++ b/livesync/tests/util_test.py @@ -14,9 +14,16 @@ # You should have received a copy of the GNU General Public License # along with Indico; if not, see . +from datetime import timedelta + import pytest -from indico_livesync.util import make_compound_id +from indico.util.date_time import now_utc + +from indico_livesync.models.agents import LiveSyncAgent +from indico_livesync.models.queue import LiveSyncQueueEntry, ChangeType +from indico_livesync.plugin import LiveSyncPlugin +from indico_livesync.util import make_compound_id, clean_old_entries @pytest.mark.parametrize(('ref', 'expected'), ( @@ -32,3 +39,25 @@ def test_make_compound_id(ref, expected): def test_make_compound_id_errors(ref_type): with pytest.raises(ValueError): make_compound_id({'type': ref_type}) + + +def test_clean_old_entries(db): + now = now_utc() + agent = LiveSyncAgent(name='dummy', backend_name='dummy') + for processed in (True, False): + for day in range(10): + db.session.add(LiveSyncQueueEntry(agent=agent, change=ChangeType.created, type='dummy', processed=processed, + timestamp=now - timedelta(days=day, hours=12))) + db.session.flush() + # Nothing deleted with the setting's default value + clean_old_entries() + assert LiveSyncQueueEntry.find().count() == 20 + # Nothing deleted when explicitly set to 0 (which is the default) + LiveSyncPlugin.settings.set('queue_entry_ttl', 0) + clean_old_entries() + assert LiveSyncQueueEntry.find().count() == 20 + # Only the correct entries deleted, and no unprocessed ones + LiveSyncPlugin.settings.set('queue_entry_ttl', 3) + clean_old_entries() + assert LiveSyncQueueEntry.find(processed=False).count() == 10 + assert LiveSyncQueueEntry.find(processed=True).count() == 3