LiveSync: Use Celery

This commit is contained in:
Adrian Moennich 2015-05-20 18:02:03 +02:00
parent 381278fcc1
commit db8227162f
8 changed files with 37 additions and 88 deletions

View File

@ -16,8 +16,10 @@
from __future__ import unicode_literals
from indico.core import signals
from indico.util.i18n import make_bound_gettext
_ = make_bound_gettext('livesync')
__all__ = ('LiveSyncPluginBase', 'LiveSyncBackendBase', 'AgentForm', 'SimpleChange', 'process_records',
'MARCXMLGenerator', 'Uploader', 'MARCXMLUploader')
@ -27,3 +29,8 @@ from .forms import AgentForm
from .simplify import SimpleChange, process_records
from .marcxml import MARCXMLGenerator
from .uploader import Uploader, MARCXMLUploader
@signals.import_tasks.connect
def _import_tasks(sender, **kwargs):
import indico_livesync.task

View File

@ -69,13 +69,11 @@ class LiveSyncBackendBase(object):
except IndexError:
return 'no description available'
def __init__(self, agent, task=None):
def __init__(self, agent):
"""
:param agent: a `LiveSyncAgent` instance
:param task: a `LiveSyncTask` instance if running as a task
"""
self.agent = agent
self.task = task
def fetch_records(self, count=None):
query = (self.agent.queue

View File

@ -16,22 +16,17 @@
from __future__ import unicode_literals
import sys
import transaction
from dateutil import rrule
from flask_pluginengine import current_plugin
from flask_script import Manager
from terminaltables import AsciiTable
from indico.core.db import db, DBMgr
from indico.core.db.sqlalchemy.util.session import update_session_options
from indico.modules.scheduler import Client
from indico.util.console import cformat, conferenceHolderIterator
from MaKaC.conference import ConferenceHolder
from indico_livesync.models.agents import LiveSyncAgent
from indico_livesync.task import LiveSyncTask
cli_manager = Manager(usage="Manages LiveSync")
@ -130,19 +125,3 @@ def run(agent_id, force=False):
db.session.commit()
finally:
transaction.abort()
@cli_manager.command
def create_task(interval):
"""Creates a livesync task running every N minutes"""
update_session_options(db)
try:
interval = int(interval)
if interval < 1:
raise ValueError
except ValueError:
print 'Invalid interval, must be a number >=1'
sys.exit(1)
with DBMgr.getInstance().global_connection(commit=True):
Client().enqueue(LiveSyncTask(rrule.MINUTELY, interval=interval))
print 'Task created'

View File

@ -76,9 +76,9 @@ class LiveSyncAgent(db.Model):
from indico_livesync.plugin import LiveSyncPlugin
return LiveSyncPlugin.instance.backend_classes.get(self.backend_name)
def create_backend(self, task=None):
def create_backend(self):
"""Creates a new backend instance"""
return self.backend(self, task)
return self.backend(self)
@return_ascii
def __repr__(self):

View File

@ -16,41 +16,27 @@
from __future__ import unicode_literals
from indico.core.db import DBMgr, db
from indico.modules.scheduler.tasks.periodic import PeriodicUniqueTask
from indico.util.date_time import now_utc
from celery.schedules import crontab
from indico.core.celery import celery
from indico.core.db import db
from indico_livesync.models.agents import LiveSyncAgent
from indico_livesync.util import clean_old_entries
class LiveSyncTask(PeriodicUniqueTask):
DISABLE_ZODB_HOOK = True
@property
def logger(self):
return self.getLogger()
def extend_runtime(self):
# Make the task manager believe the task is running since a much shorter time
self.setOnRunningListSince(now_utc())
DBMgr.getInstance().commit()
def run(self):
from indico_livesync.plugin import LiveSyncPlugin
plugin = LiveSyncPlugin.instance # RuntimeError if not active
with plugin.plugin_context():
clean_old_entries()
for agent in LiveSyncAgent.find_all():
if agent.backend is None:
self.logger.warning('Skipping agent {}; backend not found'.format(agent.name))
continue
if not agent.initial_data_exported:
self.logger.warning('Skipping agent {}; initial export not performed yet'.format(agent.name))
continue
with DBMgr.getInstance().global_connection():
self.logger.info('Running agent {}'.format(agent.name))
agent.create_backend(self).run()
db.session.commit()
@celery.periodic_task(run_every=crontab(minute='*/15'))
def scheduled_update():
from indico_livesync.plugin import LiveSyncPlugin
with LiveSyncPlugin.instance.plugin_context():
clean_old_entries()
for agent in LiveSyncAgent.find_all():
if agent.backend is None:
LiveSyncPlugin.logger.warning('Skipping agent {}; backend not found'.format(agent.name))
continue
if not agent.initial_data_exported:
LiveSyncPlugin.logger.warning('Skipping agent {}; initial export not performed yet'.format(agent.name))
continue
LiveSyncPlugin.logger.info('Running agent {}'.format(agent.name))
agent.create_backend().run()
db.session.commit()

View File

@ -34,8 +34,7 @@ class Uploader(object):
def __init__(self, backend):
self.backend = backend
self.task = backend.task
self.logger = backend.task.logger if backend.task else backend.plugin.logger
self.logger = backend.plugin.logger
def run(self, records):
"""Runs the batch upload
@ -51,8 +50,6 @@ class Uploader(object):
self.logger.exception('{} could not upload batch'.format(self_name))
return
self.processed_records(batch)
if self.task:
self.task.extend_runtime()
def run_initial(self, events):
"""Runs the initial batch upload

View File

@ -21,7 +21,7 @@ from setuptools import setup, find_packages
setup(
name='indico_livesync',
version='0.2',
version='0.3',
url='https://github.com/indico/indico-plugins',
license='https://www.gnu.org/licenses/gpl-3.0.txt',
author='Indico Team',
@ -32,7 +32,7 @@ setup(
platforms='any',
install_requires=[
'terminaltables==1.1.1',
'indico>=1.9.2'
'indico>=1.9.3'
],
classifiers=[
'Environment :: Plugins',

View File

@ -44,18 +44,10 @@ class FailingUploader(RecordingUploader):
raise Exception('All your data are belong to us!')
def create_mock_backend(has_task=False):
agent = MagicMock()
if not has_task:
agent.task = None
# else we use the default mock attribute
return agent
def test_run_initial(mocker):
"""Test the initial upload"""
mocker.patch.object(Uploader, 'processed_records', autospec=True)
uploader = RecordingUploader(create_mock_backend())
uploader = RecordingUploader(MagicMock())
uploader.INITIAL_BATCH_SIZE = 3
events = tuple(range(4))
uploader.run_initial(events)
@ -70,7 +62,7 @@ def test_run_initial(mocker):
def test_run(mocker):
"""Test uploading queued data"""
db = mocker.patch('indico_livesync.uploader.db')
uploader = RecordingUploader(create_mock_backend())
uploader = RecordingUploader(MagicMock())
uploader.BATCH_SIZE = 3
records = tuple(LiveSyncQueueEntry(change=ChangeType.created) for _ in xrange(4))
uploader.run(records)
@ -82,20 +74,10 @@ def test_run(mocker):
assert db.session.commit.call_count == 2
def test_run_extend_task(mocker):
"""Test if the task is extended"""
mocker.patch('indico_livesync.uploader.db')
uploader = RecordingUploader(create_mock_backend(has_task=True))
uploader.BATCH_SIZE = 3
records = tuple(LiveSyncQueueEntry(change=ChangeType.created) for _ in xrange(4))
uploader.run(records)
assert uploader.backend.task.extend_runtime.call_count == 2
def test_run_failing(mocker):
"""Test a failing queue run"""
db = mocker.patch('indico_livesync.uploader.db')
uploader = FailingUploader(create_mock_backend())
uploader = FailingUploader(MagicMock())
uploader.BATCH_SIZE = 3
records = tuple(LiveSyncQueueEntry(change=ChangeType.created) for _ in xrange(10))
uploader.run(records)
@ -114,7 +96,7 @@ def test_marcxml_run(mocker):
mocker.patch('indico_livesync.uploader.db')
mocker.patch.object(MARCXMLUploader, 'upload_xml', autospec=True)
mxg = mocker.patch('indico_livesync.uploader.MARCXMLGenerator')
uploader = MARCXMLUploader(create_mock_backend())
uploader = MARCXMLUploader(MagicMock())
uploader.run([LiveSyncQueueEntry(change=ChangeType.created)])
assert mxg.records_to_xml.called
assert not mxg.objects_to_xml.called
@ -130,6 +112,6 @@ def test_marcxml_empty_result(mocker):
"""Test if the MARCXML uploader doesn't upload empty records"""
mocker.patch('indico_livesync.uploader.MARCXMLGenerator.objects_to_xml', return_value=None)
mocker.patch.object(MARCXMLUploader, 'upload_xml', autospec=True)
uploader = MARCXMLUploader(create_mock_backend())
uploader = MARCXMLUploader(MagicMock())
uploader.run_initial([1])
assert not uploader.upload_xml.called