From 2dd972455cf17ec11d463d761e7826f585583ec7 Mon Sep 17 00:00:00 2001 From: Adrian Moennich Date: Tue, 4 Nov 2014 11:47:42 +0100 Subject: [PATCH] Populate the queue on changes --- livesync/indico_livesync/cli.py | 34 ++++ livesync/indico_livesync/handler.py | 155 ++++++++++++++++++ ...201410311223_1c2b0e17447d_create_tables.py | 8 +- livesync/indico_livesync/models/queue.py | 65 ++++++-- livesync/indico_livesync/plugin.py | 3 +- livesync/indico_livesync/util.py | 57 +++++++ 6 files changed, 303 insertions(+), 19 deletions(-) create mode 100644 livesync/indico_livesync/handler.py create mode 100644 livesync/indico_livesync/util.py diff --git a/livesync/indico_livesync/cli.py b/livesync/indico_livesync/cli.py index c6b8296..1b0d1d5 100644 --- a/livesync/indico_livesync/cli.py +++ b/livesync/indico_livesync/cli.py @@ -75,3 +75,37 @@ def initial_export(agent_id, force=False): print 'TODO: run initial export' agent.initial_data_exported = True db.session.commit() + + +@cli_manager.command +def create_agent(agent_type, name=None): + """Creates a new agent""" + update_session_options(db) + try: + agent_class = current_plugin.agent_classes[agent_type] + except KeyError: + print 'No such agent type' + return + # TODO: Prompt for agent type specific settings + agent = LiveSyncAgent(backend_name=agent_type, name=name or agent_class.title) + db.session.add(agent) + db.session.commit() + print agent + + +# TODO: delete_agent, update_agent, create_task + + +@cli_manager.command +def run(agent_id=None): + if agent_id is None: + agents = LiveSyncAgent.find_all() + else: + agent = LiveSyncAgent.find_first(id=int(agent_id)) + if agent is None: + print 'No such agent' + return + agents = [agent] + + for agent in agents: + pass # TODO diff --git a/livesync/indico_livesync/handler.py b/livesync/indico_livesync/handler.py new file mode 100644 index 0000000..7b029bd --- /dev/null +++ b/livesync/indico_livesync/handler.py @@ -0,0 +1,155 @@ +# This file is part of Indico. +# Copyright (C) 2002 - 2014 European Organization for Nuclear Research (CERN). +# +# Indico is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 3 of the +# License, or (at your option) any later version. +# +# Indico is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Indico; if not, see . + +from __future__ import unicode_literals + +from collections import defaultdict + +from flask import g + +from indico.core import signals +from MaKaC.accessControl import AccessController +from MaKaC.conference import Conference, Contribution, SubContribution, Category, Session + +from indico_livesync.models.queue import LiveSyncQueueEntry, ChangeType +from indico_livesync.util import obj_ref + + +class LiveSyncSignalHandler(object): + def __init__(self, plugin): + # request + plugin.connect(signals.after_process, self._apply_changes) + plugin.connect(signals.before_retry, self._clear_changes) + # moved + plugin.connect(signals.category_moved, self._moved) + plugin.connect(signals.event_moved, self._moved) + # created + plugin.connect(signals.category_created, self._created) + plugin.connect(signals.event_created, self._created) + plugin.connect(signals.contribution_created, self._created) + plugin.connect(signals.subcontribution_created, self._created) + # deleted + plugin.connect(signals.category_deleted, self._deleted) + plugin.connect(signals.event_deleted, self._deleted) + plugin.connect(signals.contribution_deleted, self._deleted) + plugin.connect(signals.subcontribution_deleted, self._deleted) + # title + plugin.connect(signals.category_title_changed, self._title_changed) + plugin.connect(signals.event_data_changed, self._title_changed) + plugin.connect(signals.contribution_title_changed, self._title_changed) + plugin.connect(signals.subcontribution_title_changed, self._title_changed) + # data + plugin.connect(signals.category_data_changed, self._data_changed) + plugin.connect(signals.event_data_changed, self._data_changed) + plugin.connect(signals.contribution_data_changed, self._data_changed) + plugin.connect(signals.subcontribution_data_changed, self._data_changed) + # protection + plugin.connect(signals.category_protection_changed, self._protection_changed) + plugin.connect(signals.event_protection_changed, self._protection_changed) + plugin.connect(signals.contribution_protection_changed, self._protection_changed) + # ACLs + plugin.connect(signals.access_granted, self._acl_changed) + plugin.connect(signals.access_revoked, self._acl_changed) + plugin.connect(signals.modification_granted, self._acl_changed) + plugin.connect(signals.modification_revoked, self._acl_changed) + # domain access + plugin.connect(signals.category_domain_access_granted, self._domain_changed) + plugin.connect(signals.category_domain_access_revoked, self._domain_changed) + plugin.connect(signals.event_domain_access_granted, self._domain_changed) + plugin.connect(signals.event_domain_access_revoked, self._domain_changed) + + def _moved(self, obj, old_parent, new_parent, **kwargs): + print '_moved', obj, old_parent, new_parent + self.register_change(obj, 'moved') + category_protection = old_parent.isProtected() + new_category_protection = new_parent.isProtected() + + if category_protection != new_category_protection and obj.getAccessProtectionLevel() == 0: + self.register_change(obj, ChangeType.protection_changed) + + def _created(self, obj, parent, **kwargs): + print '_created', obj, parent + self.register_change(parent, ChangeType.data_changed) + self.register_change(obj, ChangeType.created) + + def _deleted(self, obj, **kwargs): + print '_deleted', obj, kwargs + parent = kwargs.pop('parent', None) + self.register_deletion(obj, parent) + + def _title_changed(self, obj, **kwargs): + if kwargs.pop('attr', 'title') != 'title': + return + print '_title_changed', obj, kwargs + self.register_change(obj, ChangeType.title_changed) + + def _data_changed(self, obj, **kwargs): + print '_data_changed', obj, kwargs + self.register_change(obj, ChangeType.data_changed) + + def _protection_changed(self, obj, old, new, **kwargs): + print '_protection_changed', obj, old, new + if new == 0: # inheriting + new = 1 if obj.isProtected() else -1 + if old != new: + self.register_change(obj, ChangeType.protection_changed) + + def _acl_changed(self, obj, principal, **kwargs): + print '_acl_changed', obj, principal + self._handle_acl_change(obj) + + def _handle_acl_change(self, obj, child=False): + if isinstance(obj, (Conference, Contribution, SubContribution, Category)): + # if it was a child, emit data_changed instead + if child: + self.register_change(obj, ChangeType.data_changed) + else: + self.register_change(obj, ChangeType.protection_changed) + elif isinstance(obj, Session): + owner = obj.getOwner() + if owner: + self.register_change(owner, ChangeType.data_changed) + elif isinstance(obj, AccessController): + self._handle_acl_change(obj.getOwner(), child=False) + else: + self._handle_acl_change(obj.getOwner(), child=True) + + def _domain_changed(self, obj, **kwargs): + print '_domain_changed', obj + self.register_change(obj, ChangeType.protection_changed) + + def register_deletion(self, obj, parent): + self.init_livesync_g() + g.livesync[obj_ref(obj, parent)].add(ChangeType.deleted) + + def register_change(self, obj, action): + self.init_livesync_g() + g.livesync[obj_ref(obj)].add(action) + + def init_livesync_g(self): + if not hasattr(g, 'livesync'): + g.livesync = defaultdict(set) + + def _apply_changes(self, sender, **kwargs): + if not hasattr(g, 'livesync'): + return + for ref, changes in g.livesync.iteritems(): + LiveSyncQueueEntry.create(changes, ref) + + def _clear_changes(self, sender, **kwargs): + if not hasattr(g, 'livesync'): + return + del g.livesync diff --git a/livesync/indico_livesync/migrations/201410311223_1c2b0e17447d_create_tables.py b/livesync/indico_livesync/migrations/201410311223_1c2b0e17447d_create_tables.py index 17c9386..4b2662f 100644 --- a/livesync/indico_livesync/migrations/201410311223_1c2b0e17447d_create_tables.py +++ b/livesync/indico_livesync/migrations/201410311223_1c2b0e17447d_create_tables.py @@ -32,9 +32,13 @@ def upgrade(): sa.Column('id', sa.Integer(), nullable=False), sa.Column('agent_id', sa.Integer(), nullable=False, index=True), sa.Column('timestamp', UTCDateTime(), nullable=False), - sa.Column('change', sa.String(), nullable=False), - sa.Column('data', postgresql.JSON(), nullable=False), + sa.Column('change', sa.SmallInteger(), nullable=False), + sa.Column('category_id', sa.String()), + sa.Column('event_id', sa.String()), + sa.Column('contrib_id', sa.String()), + sa.Column('subcontrib_id', sa.String()), sa.ForeignKeyConstraint(['agent_id'], ['plugin_livesync.agents.id']), + sa.CheckConstraint('change IN (1, 2, 3, 4, 5, 6)'), sa.PrimaryKeyConstraint('id'), schema='plugin_livesync') diff --git a/livesync/indico_livesync/models/queue.py b/livesync/indico_livesync/models/queue.py index b511896..89451b8 100644 --- a/livesync/indico_livesync/models/queue.py +++ b/livesync/indico_livesync/models/queue.py @@ -16,18 +16,27 @@ from __future__ import unicode_literals -from sqlalchemy.dialects.postgresql import JSON - from indico.core.db.sqlalchemy import db, UTCDateTime from indico.util.date_time import now_utc from indico.util.string import return_ascii +from indico.util.struct.enum import IndicoEnum from indico_livesync.models.agents import LiveSyncAgent +class ChangeType(int, IndicoEnum): + created = 1 + deleted = 2 + moved = 3 + data_changed = 4 + title_changed = 5 + protection_changed = 6 + + class LiveSyncQueueEntry(db.Model): __tablename__ = 'queues' - __table_args__ = {'schema': 'plugin_livesync'} + __table_args__ = (db.CheckConstraint('change IN ({})'.format(', '.join(map(str, ChangeType)))), + {'schema': 'plugin_livesync'}) #: Entry ID id = db.Column( @@ -39,6 +48,7 @@ class LiveSyncQueueEntry(db.Model): agent_id = db.Column( db.Integer, db.ForeignKey('plugin_livesync.agents.id'), + nullable=False, index=True ) @@ -49,18 +59,34 @@ class LiveSyncQueueEntry(db.Model): default=now_utc ) - # XXX: maybe use an Enum for this? - # XXX: or should it be an array? old code seems to have records with multiple changes - #: the change type + #: the change type, a :class:`ChangeType` change = db.Column( - db.String, + db.SmallInteger, nullable=False ) - #: Data related to the change - data = db.Column( - JSON, - nullable=False, + #: The ID of the changed category + category_id = db.Column( + db.String, + nullable=True + ) + + #: The event ID of the changed event/contribution/subcontribution + event_id = db.Column( + db.String, + nullable=True + ) + + #: The contribution ID of the changed contribution/subcontribution + contrib_id = db.Column( + db.String, + nullable=True + ) + + #: The subcontribution ID of the changed subcontribution + subcontrib_id = db.Column( + db.String, + nullable=True ) #: The associated :class:LiveSyncAgent @@ -71,15 +97,22 @@ class LiveSyncQueueEntry(db.Model): @return_ascii def __repr__(self): - return ''.format(self.agent, self.id, self.change, self.data) + return ''.format(self.agent, self.id, ChangeType(self.change).name, + self.data) @classmethod - def create(cls, change, data): + def create(cls, changes, obj_ref): """Creates a new change in all queues - :param change: the change type - :param data: the associated data (a json-serializable object) + :param changes: the change types, an iterable containing + :class:`ChangeType` + :param obj_ref: the object reference (returned by `obj_ref`) + of the changed object """ + obj_ref = dict(obj_ref) + obj_ref.pop('type') for agent in LiveSyncAgent.find(): - db.session.add(cls(agent=agent, change=change, data=data)) + for change in changes: + entry = cls(agent=agent, change=change, **obj_ref) + db.session.add(entry) db.session.flush() diff --git a/livesync/indico_livesync/plugin.py b/livesync/indico_livesync/plugin.py index bd6eebc..0cad969 100644 --- a/livesync/indico_livesync/plugin.py +++ b/livesync/indico_livesync/plugin.py @@ -19,6 +19,7 @@ from __future__ import unicode_literals from indico.core.plugins import IndicoPlugin, wrap_cli_manager from indico_livesync.cli import cli_manager +from indico_livesync.handler import LiveSyncSignalHandler class LiveSyncPlugin(IndicoPlugin): @@ -31,8 +32,8 @@ class LiveSyncPlugin(IndicoPlugin): def init(self): super(LiveSyncPlugin, self).init() - # noinspection PyAttributeOutsideInit self.agent_classes = {} + self.signal_handler = LiveSyncSignalHandler(self) def add_cli_command(self, manager): manager.add_command('livesync', wrap_cli_manager(cli_manager, self)) diff --git a/livesync/indico_livesync/util.py b/livesync/indico_livesync/util.py new file mode 100644 index 0000000..cc13d58 --- /dev/null +++ b/livesync/indico_livesync/util.py @@ -0,0 +1,57 @@ +# This file is part of Indico. +# Copyright (C) 2002 - 2014 European Organization for Nuclear Research (CERN). +# +# Indico is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 3 of the +# License, or (at your option) any later version. +# +# Indico is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Indico; if not, see . + +from __future__ import unicode_literals + +from MaKaC.conference import Conference, Contribution, SubContribution, Category, CategoryManager, ConferenceHolder +from werkzeug.datastructures import ImmutableDict + + +def obj_ref(obj, parent=None): + """Returns a tuple identifying a category/event/contrib/subcontrib""" + if isinstance(obj, Category): + ref = {'type': 'category', 'category_id': obj.id} + elif isinstance(obj, Conference): + ref = {'type': 'event', 'event_id': obj.id} + elif isinstance(obj, Contribution): + event = parent or obj.getConference().id + ref = {'type': 'contribution', 'event_id': event, 'contrib_id': obj.id} + elif isinstance(obj, SubContribution): + contrib = parent or obj.getContribution() + ref = {'type': 'subcontribution', + 'event_id': contrib.getConference().id, 'contrib_id': contrib.id, 'subcontrib_id': obj.id} + else: + raise ValueError('Unexpected object: {}'.format(obj.__class__.__name__)) + return ImmutableDict(ref) + + +def obj_deref(ref): + """Returns the object identified by `ref`""" + if ref['type'] == 'category': + try: + return CategoryManager().getById(ref['category_id']) + except KeyError: + return None + elif ref['type'] in {'event', 'contribution', 'subcontribution'}: + event = ConferenceHolder().getById(ref['event_id'], quiet=True) + if ref['type'] == 'event' or not event: + return event + contrib = event.getContributionById(ref['contrib_id']) + if ref['type'] == 'contribution' or not contrib: + return contrib + return contrib.getSubContributionById(ref['subcontrib_id']) + else: + raise ValueError('Unexpected object type: {}'.format(ref['type']))