Cleanup old queue entries

fixes #5
This commit is contained in:
Adrian Moennich 2014-11-18 11:33:31 +01:00
parent de796e8d3d
commit 0ba1673b73
4 changed files with 60 additions and 2 deletions

View File

@ -18,6 +18,9 @@ from __future__ import unicode_literals
from flask import request
from wtforms.validators import NumberRange
from wtforms.fields.html5 import IntegerField
from indico.core.plugins import IndicoPlugin, wrap_cli_manager
from indico.core.plugins.views import WPPlugins
from indico.util.i18n import _
@ -31,6 +34,12 @@ from indico_livesync.handler import connect_signals
class SettingsForm(IndicoForm):
queue_entry_ttl = IntegerField(_('Queue entry TTL'), [NumberRange(min=0)],
description=_("How many days should processed entries be kept in the queue. "
"The time counts from the creation of the queue entries, so if the "
"LiveSync task is not running for some time, queue entries may be "
"deleted during the next run after processing them. Setting it to 0 "
"disables automatic deletion."))
excluded_categories = MultipleItemsField(_('Excluded categories'), fields=(('id', _('Category ID')),),
description=_("Changes to objects inside these categories or any of their "
"subcategories are excluded."))
@ -44,7 +53,8 @@ class LiveSyncPlugin(IndicoPlugin):
"""
settings_form = SettingsForm
default_settings = {'excluded_categories': []}
default_settings = {'excluded_categories': [],
'queue_entry_ttl': 0}
def init(self):
super(LiveSyncPlugin, self).init()

View File

@ -21,6 +21,7 @@ from indico.modules.scheduler.tasks.periodic import PeriodicUniqueTask
from indico.util.date_time import now_utc
from indico_livesync.models.agents import LiveSyncAgent
from indico_livesync.util import clean_old_entries
class LiveSyncTask(PeriodicUniqueTask):
@ -40,6 +41,8 @@ class LiveSyncTask(PeriodicUniqueTask):
plugin = LiveSyncPlugin.instance # RuntimeError if not active
with plugin.plugin_context():
clean_old_entries()
for agent in LiveSyncAgent.find_all():
if agent.backend is None:
self.logger.warning('Skipping agent {}; backend not found'.format(agent.name))

View File

@ -16,8 +16,11 @@
from __future__ import unicode_literals
from datetime import timedelta
from werkzeug.datastructures import ImmutableDict
from indico.util.date_time import now_utc
from MaKaC.conference import Conference, Contribution, SubContribution, Category, CategoryManager, ConferenceHolder
@ -70,3 +73,16 @@ def make_compound_id(ref):
return '{}.{}.{}'.format(ref['event_id'], ref['contrib_id'], ref['subcontrib_id'])
else:
raise ValueError('Unexpected object type: {}'.format(ref['type']))
def clean_old_entries():
"""Deletes obsolete entries from the queues"""
from indico_livesync.plugin import LiveSyncPlugin
from indico_livesync.models.queue import LiveSyncQueueEntry
queue_entry_ttl = LiveSyncPlugin.settings.get('queue_entry_ttl')
if not queue_entry_ttl:
return
expire_threshold = now_utc() - timedelta(days=queue_entry_ttl)
LiveSyncQueueEntry.find(LiveSyncQueueEntry.processed,
LiveSyncQueueEntry.timestamp < expire_threshold).delete(synchronize_session='fetch')

View File

@ -14,9 +14,16 @@
# You should have received a copy of the GNU General Public License
# along with Indico; if not, see <http://www.gnu.org/licenses/>.
from datetime import timedelta
import pytest
from indico_livesync.util import make_compound_id
from indico.util.date_time import now_utc
from indico_livesync.models.agents import LiveSyncAgent
from indico_livesync.models.queue import LiveSyncQueueEntry, ChangeType
from indico_livesync.plugin import LiveSyncPlugin
from indico_livesync.util import make_compound_id, clean_old_entries
@pytest.mark.parametrize(('ref', 'expected'), (
@ -32,3 +39,25 @@ def test_make_compound_id(ref, expected):
def test_make_compound_id_errors(ref_type):
with pytest.raises(ValueError):
make_compound_id({'type': ref_type})
def test_clean_old_entries(db):
now = now_utc()
agent = LiveSyncAgent(name='dummy', backend_name='dummy')
for processed in (True, False):
for day in range(10):
db.session.add(LiveSyncQueueEntry(agent=agent, change=ChangeType.created, type='dummy', processed=processed,
timestamp=now - timedelta(days=day, hours=12)))
db.session.flush()
# Nothing deleted with the setting's default value
clean_old_entries()
assert LiveSyncQueueEntry.find().count() == 20
# Nothing deleted when explicitly set to 0 (which is the default)
LiveSyncPlugin.settings.set('queue_entry_ttl', 0)
clean_old_entries()
assert LiveSyncQueueEntry.find().count() == 20
# Only the correct entries deleted, and no unprocessed ones
LiveSyncPlugin.settings.set('queue_entry_ttl', 3)
clean_old_entries()
assert LiveSyncQueueEntry.find(processed=False).count() == 10
assert LiveSyncQueueEntry.find(processed=True).count() == 3