diff --git a/livesync/indico_livesync/__init__.py b/livesync/indico_livesync/__init__.py index 6601262..d1e2c7e 100644 --- a/livesync/indico_livesync/__init__.py +++ b/livesync/indico_livesync/__init__.py @@ -16,8 +16,10 @@ from __future__ import unicode_literals +from indico.core import signals from indico.util.i18n import make_bound_gettext + _ = make_bound_gettext('livesync') __all__ = ('LiveSyncPluginBase', 'LiveSyncBackendBase', 'AgentForm', 'SimpleChange', 'process_records', 'MARCXMLGenerator', 'Uploader', 'MARCXMLUploader') @@ -27,3 +29,8 @@ from .forms import AgentForm from .simplify import SimpleChange, process_records from .marcxml import MARCXMLGenerator from .uploader import Uploader, MARCXMLUploader + + +@signals.import_tasks.connect +def _import_tasks(sender, **kwargs): + import indico_livesync.task diff --git a/livesync/indico_livesync/base.py b/livesync/indico_livesync/base.py index 4500fb1..5bfd09d 100644 --- a/livesync/indico_livesync/base.py +++ b/livesync/indico_livesync/base.py @@ -69,13 +69,11 @@ class LiveSyncBackendBase(object): except IndexError: return 'no description available' - def __init__(self, agent, task=None): + def __init__(self, agent): """ :param agent: a `LiveSyncAgent` instance - :param task: a `LiveSyncTask` instance if running as a task """ self.agent = agent - self.task = task def fetch_records(self, count=None): query = (self.agent.queue diff --git a/livesync/indico_livesync/cli.py b/livesync/indico_livesync/cli.py index a44c96e..1b8332c 100644 --- a/livesync/indico_livesync/cli.py +++ b/livesync/indico_livesync/cli.py @@ -16,22 +16,17 @@ from __future__ import unicode_literals -import sys - import transaction -from dateutil import rrule from flask_pluginengine import current_plugin from flask_script import Manager from terminaltables import AsciiTable from indico.core.db import db, DBMgr from indico.core.db.sqlalchemy.util.session import update_session_options -from indico.modules.scheduler import Client from indico.util.console import cformat, conferenceHolderIterator from MaKaC.conference import ConferenceHolder from indico_livesync.models.agents import LiveSyncAgent -from indico_livesync.task import LiveSyncTask cli_manager = Manager(usage="Manages LiveSync") @@ -130,19 +125,3 @@ def run(agent_id, force=False): db.session.commit() finally: transaction.abort() - - -@cli_manager.command -def create_task(interval): - """Creates a livesync task running every N minutes""" - update_session_options(db) - try: - interval = int(interval) - if interval < 1: - raise ValueError - except ValueError: - print 'Invalid interval, must be a number >=1' - sys.exit(1) - with DBMgr.getInstance().global_connection(commit=True): - Client().enqueue(LiveSyncTask(rrule.MINUTELY, interval=interval)) - print 'Task created' diff --git a/livesync/indico_livesync/models/agents.py b/livesync/indico_livesync/models/agents.py index 68f9256..05b5038 100644 --- a/livesync/indico_livesync/models/agents.py +++ b/livesync/indico_livesync/models/agents.py @@ -76,9 +76,9 @@ class LiveSyncAgent(db.Model): from indico_livesync.plugin import LiveSyncPlugin return LiveSyncPlugin.instance.backend_classes.get(self.backend_name) - def create_backend(self, task=None): + def create_backend(self): """Creates a new backend instance""" - return self.backend(self, task) + return self.backend(self) @return_ascii def __repr__(self): diff --git a/livesync/indico_livesync/task.py b/livesync/indico_livesync/task.py index 0656adb..e8006cb 100644 --- a/livesync/indico_livesync/task.py +++ b/livesync/indico_livesync/task.py @@ -16,41 +16,27 @@ from __future__ import unicode_literals -from indico.core.db import DBMgr, db -from indico.modules.scheduler.tasks.periodic import PeriodicUniqueTask -from indico.util.date_time import now_utc +from celery.schedules import crontab + +from indico.core.celery import celery +from indico.core.db import db from indico_livesync.models.agents import LiveSyncAgent from indico_livesync.util import clean_old_entries -class LiveSyncTask(PeriodicUniqueTask): - DISABLE_ZODB_HOOK = True - - @property - def logger(self): - return self.getLogger() - - def extend_runtime(self): - # Make the task manager believe the task is running since a much shorter time - self.setOnRunningListSince(now_utc()) - DBMgr.getInstance().commit() - - def run(self): - from indico_livesync.plugin import LiveSyncPlugin - - plugin = LiveSyncPlugin.instance # RuntimeError if not active - with plugin.plugin_context(): - clean_old_entries() - - for agent in LiveSyncAgent.find_all(): - if agent.backend is None: - self.logger.warning('Skipping agent {}; backend not found'.format(agent.name)) - continue - if not agent.initial_data_exported: - self.logger.warning('Skipping agent {}; initial export not performed yet'.format(agent.name)) - continue - with DBMgr.getInstance().global_connection(): - self.logger.info('Running agent {}'.format(agent.name)) - agent.create_backend(self).run() - db.session.commit() +@celery.periodic_task(run_every=crontab(minute='*/15')) +def scheduled_update(): + from indico_livesync.plugin import LiveSyncPlugin + with LiveSyncPlugin.instance.plugin_context(): + clean_old_entries() + for agent in LiveSyncAgent.find_all(): + if agent.backend is None: + LiveSyncPlugin.logger.warning('Skipping agent {}; backend not found'.format(agent.name)) + continue + if not agent.initial_data_exported: + LiveSyncPlugin.logger.warning('Skipping agent {}; initial export not performed yet'.format(agent.name)) + continue + LiveSyncPlugin.logger.info('Running agent {}'.format(agent.name)) + agent.create_backend().run() + db.session.commit() diff --git a/livesync/indico_livesync/uploader.py b/livesync/indico_livesync/uploader.py index f056f14..1723258 100644 --- a/livesync/indico_livesync/uploader.py +++ b/livesync/indico_livesync/uploader.py @@ -34,8 +34,7 @@ class Uploader(object): def __init__(self, backend): self.backend = backend - self.task = backend.task - self.logger = backend.task.logger if backend.task else backend.plugin.logger + self.logger = backend.plugin.logger def run(self, records): """Runs the batch upload @@ -51,8 +50,6 @@ class Uploader(object): self.logger.exception('{} could not upload batch'.format(self_name)) return self.processed_records(batch) - if self.task: - self.task.extend_runtime() def run_initial(self, events): """Runs the initial batch upload diff --git a/livesync/setup.py b/livesync/setup.py index 3ac1fd0..f1cd979 100644 --- a/livesync/setup.py +++ b/livesync/setup.py @@ -21,7 +21,7 @@ from setuptools import setup, find_packages setup( name='indico_livesync', - version='0.2', + version='0.3', url='https://github.com/indico/indico-plugins', license='https://www.gnu.org/licenses/gpl-3.0.txt', author='Indico Team', @@ -32,7 +32,7 @@ setup( platforms='any', install_requires=[ 'terminaltables==1.1.1', - 'indico>=1.9.2' + 'indico>=1.9.3' ], classifiers=[ 'Environment :: Plugins', diff --git a/livesync/tests/uploader_test.py b/livesync/tests/uploader_test.py index efa35d0..ed5ee56 100644 --- a/livesync/tests/uploader_test.py +++ b/livesync/tests/uploader_test.py @@ -44,18 +44,10 @@ class FailingUploader(RecordingUploader): raise Exception('All your data are belong to us!') -def create_mock_backend(has_task=False): - agent = MagicMock() - if not has_task: - agent.task = None - # else we use the default mock attribute - return agent - - def test_run_initial(mocker): """Test the initial upload""" mocker.patch.object(Uploader, 'processed_records', autospec=True) - uploader = RecordingUploader(create_mock_backend()) + uploader = RecordingUploader(MagicMock()) uploader.INITIAL_BATCH_SIZE = 3 events = tuple(range(4)) uploader.run_initial(events) @@ -70,7 +62,7 @@ def test_run_initial(mocker): def test_run(mocker): """Test uploading queued data""" db = mocker.patch('indico_livesync.uploader.db') - uploader = RecordingUploader(create_mock_backend()) + uploader = RecordingUploader(MagicMock()) uploader.BATCH_SIZE = 3 records = tuple(LiveSyncQueueEntry(change=ChangeType.created) for _ in xrange(4)) uploader.run(records) @@ -82,20 +74,10 @@ def test_run(mocker): assert db.session.commit.call_count == 2 -def test_run_extend_task(mocker): - """Test if the task is extended""" - mocker.patch('indico_livesync.uploader.db') - uploader = RecordingUploader(create_mock_backend(has_task=True)) - uploader.BATCH_SIZE = 3 - records = tuple(LiveSyncQueueEntry(change=ChangeType.created) for _ in xrange(4)) - uploader.run(records) - assert uploader.backend.task.extend_runtime.call_count == 2 - - def test_run_failing(mocker): """Test a failing queue run""" db = mocker.patch('indico_livesync.uploader.db') - uploader = FailingUploader(create_mock_backend()) + uploader = FailingUploader(MagicMock()) uploader.BATCH_SIZE = 3 records = tuple(LiveSyncQueueEntry(change=ChangeType.created) for _ in xrange(10)) uploader.run(records) @@ -114,7 +96,7 @@ def test_marcxml_run(mocker): mocker.patch('indico_livesync.uploader.db') mocker.patch.object(MARCXMLUploader, 'upload_xml', autospec=True) mxg = mocker.patch('indico_livesync.uploader.MARCXMLGenerator') - uploader = MARCXMLUploader(create_mock_backend()) + uploader = MARCXMLUploader(MagicMock()) uploader.run([LiveSyncQueueEntry(change=ChangeType.created)]) assert mxg.records_to_xml.called assert not mxg.objects_to_xml.called @@ -130,6 +112,6 @@ def test_marcxml_empty_result(mocker): """Test if the MARCXML uploader doesn't upload empty records""" mocker.patch('indico_livesync.uploader.MARCXMLGenerator.objects_to_xml', return_value=None) mocker.patch.object(MARCXMLUploader, 'upload_xml', autospec=True) - uploader = MARCXMLUploader(create_mock_backend()) + uploader = MARCXMLUploader(MagicMock()) uploader.run_initial([1]) assert not uploader.upload_xml.called