Populate the queue on changes

This commit is contained in:
Adrian Moennich 2014-11-04 11:47:42 +01:00
parent 6b6c5200e3
commit 2dd972455c
6 changed files with 303 additions and 19 deletions

View File

@ -75,3 +75,37 @@ def initial_export(agent_id, force=False):
print 'TODO: run initial export'
agent.initial_data_exported = True
db.session.commit()
@cli_manager.command
def create_agent(agent_type, name=None):
"""Creates a new agent"""
update_session_options(db)
try:
agent_class = current_plugin.agent_classes[agent_type]
except KeyError:
print 'No such agent type'
return
# TODO: Prompt for agent type specific settings
agent = LiveSyncAgent(backend_name=agent_type, name=name or agent_class.title)
db.session.add(agent)
db.session.commit()
print agent
# TODO: delete_agent, update_agent, create_task
@cli_manager.command
def run(agent_id=None):
if agent_id is None:
agents = LiveSyncAgent.find_all()
else:
agent = LiveSyncAgent.find_first(id=int(agent_id))
if agent is None:
print 'No such agent'
return
agents = [agent]
for agent in agents:
pass # TODO

View File

@ -0,0 +1,155 @@
# This file is part of Indico.
# Copyright (C) 2002 - 2014 European Organization for Nuclear Research (CERN).
#
# Indico is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# Indico is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Indico; if not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from collections import defaultdict
from flask import g
from indico.core import signals
from MaKaC.accessControl import AccessController
from MaKaC.conference import Conference, Contribution, SubContribution, Category, Session
from indico_livesync.models.queue import LiveSyncQueueEntry, ChangeType
from indico_livesync.util import obj_ref
class LiveSyncSignalHandler(object):
def __init__(self, plugin):
# request
plugin.connect(signals.after_process, self._apply_changes)
plugin.connect(signals.before_retry, self._clear_changes)
# moved
plugin.connect(signals.category_moved, self._moved)
plugin.connect(signals.event_moved, self._moved)
# created
plugin.connect(signals.category_created, self._created)
plugin.connect(signals.event_created, self._created)
plugin.connect(signals.contribution_created, self._created)
plugin.connect(signals.subcontribution_created, self._created)
# deleted
plugin.connect(signals.category_deleted, self._deleted)
plugin.connect(signals.event_deleted, self._deleted)
plugin.connect(signals.contribution_deleted, self._deleted)
plugin.connect(signals.subcontribution_deleted, self._deleted)
# title
plugin.connect(signals.category_title_changed, self._title_changed)
plugin.connect(signals.event_data_changed, self._title_changed)
plugin.connect(signals.contribution_title_changed, self._title_changed)
plugin.connect(signals.subcontribution_title_changed, self._title_changed)
# data
plugin.connect(signals.category_data_changed, self._data_changed)
plugin.connect(signals.event_data_changed, self._data_changed)
plugin.connect(signals.contribution_data_changed, self._data_changed)
plugin.connect(signals.subcontribution_data_changed, self._data_changed)
# protection
plugin.connect(signals.category_protection_changed, self._protection_changed)
plugin.connect(signals.event_protection_changed, self._protection_changed)
plugin.connect(signals.contribution_protection_changed, self._protection_changed)
# ACLs
plugin.connect(signals.access_granted, self._acl_changed)
plugin.connect(signals.access_revoked, self._acl_changed)
plugin.connect(signals.modification_granted, self._acl_changed)
plugin.connect(signals.modification_revoked, self._acl_changed)
# domain access
plugin.connect(signals.category_domain_access_granted, self._domain_changed)
plugin.connect(signals.category_domain_access_revoked, self._domain_changed)
plugin.connect(signals.event_domain_access_granted, self._domain_changed)
plugin.connect(signals.event_domain_access_revoked, self._domain_changed)
def _moved(self, obj, old_parent, new_parent, **kwargs):
print '_moved', obj, old_parent, new_parent
self.register_change(obj, 'moved')
category_protection = old_parent.isProtected()
new_category_protection = new_parent.isProtected()
if category_protection != new_category_protection and obj.getAccessProtectionLevel() == 0:
self.register_change(obj, ChangeType.protection_changed)
def _created(self, obj, parent, **kwargs):
print '_created', obj, parent
self.register_change(parent, ChangeType.data_changed)
self.register_change(obj, ChangeType.created)
def _deleted(self, obj, **kwargs):
print '_deleted', obj, kwargs
parent = kwargs.pop('parent', None)
self.register_deletion(obj, parent)
def _title_changed(self, obj, **kwargs):
if kwargs.pop('attr', 'title') != 'title':
return
print '_title_changed', obj, kwargs
self.register_change(obj, ChangeType.title_changed)
def _data_changed(self, obj, **kwargs):
print '_data_changed', obj, kwargs
self.register_change(obj, ChangeType.data_changed)
def _protection_changed(self, obj, old, new, **kwargs):
print '_protection_changed', obj, old, new
if new == 0: # inheriting
new = 1 if obj.isProtected() else -1
if old != new:
self.register_change(obj, ChangeType.protection_changed)
def _acl_changed(self, obj, principal, **kwargs):
print '_acl_changed', obj, principal
self._handle_acl_change(obj)
def _handle_acl_change(self, obj, child=False):
if isinstance(obj, (Conference, Contribution, SubContribution, Category)):
# if it was a child, emit data_changed instead
if child:
self.register_change(obj, ChangeType.data_changed)
else:
self.register_change(obj, ChangeType.protection_changed)
elif isinstance(obj, Session):
owner = obj.getOwner()
if owner:
self.register_change(owner, ChangeType.data_changed)
elif isinstance(obj, AccessController):
self._handle_acl_change(obj.getOwner(), child=False)
else:
self._handle_acl_change(obj.getOwner(), child=True)
def _domain_changed(self, obj, **kwargs):
print '_domain_changed', obj
self.register_change(obj, ChangeType.protection_changed)
def register_deletion(self, obj, parent):
self.init_livesync_g()
g.livesync[obj_ref(obj, parent)].add(ChangeType.deleted)
def register_change(self, obj, action):
self.init_livesync_g()
g.livesync[obj_ref(obj)].add(action)
def init_livesync_g(self):
if not hasattr(g, 'livesync'):
g.livesync = defaultdict(set)
def _apply_changes(self, sender, **kwargs):
if not hasattr(g, 'livesync'):
return
for ref, changes in g.livesync.iteritems():
LiveSyncQueueEntry.create(changes, ref)
def _clear_changes(self, sender, **kwargs):
if not hasattr(g, 'livesync'):
return
del g.livesync

View File

@ -32,9 +32,13 @@ def upgrade():
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('agent_id', sa.Integer(), nullable=False, index=True),
sa.Column('timestamp', UTCDateTime(), nullable=False),
sa.Column('change', sa.String(), nullable=False),
sa.Column('data', postgresql.JSON(), nullable=False),
sa.Column('change', sa.SmallInteger(), nullable=False),
sa.Column('category_id', sa.String()),
sa.Column('event_id', sa.String()),
sa.Column('contrib_id', sa.String()),
sa.Column('subcontrib_id', sa.String()),
sa.ForeignKeyConstraint(['agent_id'], ['plugin_livesync.agents.id']),
sa.CheckConstraint('change IN (1, 2, 3, 4, 5, 6)'),
sa.PrimaryKeyConstraint('id'),
schema='plugin_livesync')

View File

@ -16,18 +16,27 @@
from __future__ import unicode_literals
from sqlalchemy.dialects.postgresql import JSON
from indico.core.db.sqlalchemy import db, UTCDateTime
from indico.util.date_time import now_utc
from indico.util.string import return_ascii
from indico.util.struct.enum import IndicoEnum
from indico_livesync.models.agents import LiveSyncAgent
class ChangeType(int, IndicoEnum):
created = 1
deleted = 2
moved = 3
data_changed = 4
title_changed = 5
protection_changed = 6
class LiveSyncQueueEntry(db.Model):
__tablename__ = 'queues'
__table_args__ = {'schema': 'plugin_livesync'}
__table_args__ = (db.CheckConstraint('change IN ({})'.format(', '.join(map(str, ChangeType)))),
{'schema': 'plugin_livesync'})
#: Entry ID
id = db.Column(
@ -39,6 +48,7 @@ class LiveSyncQueueEntry(db.Model):
agent_id = db.Column(
db.Integer,
db.ForeignKey('plugin_livesync.agents.id'),
nullable=False,
index=True
)
@ -49,18 +59,34 @@ class LiveSyncQueueEntry(db.Model):
default=now_utc
)
# XXX: maybe use an Enum for this?
# XXX: or should it be an array? old code seems to have records with multiple changes
#: the change type
#: the change type, a :class:`ChangeType`
change = db.Column(
db.String,
db.SmallInteger,
nullable=False
)
#: Data related to the change
data = db.Column(
JSON,
nullable=False,
#: The ID of the changed category
category_id = db.Column(
db.String,
nullable=True
)
#: The event ID of the changed event/contribution/subcontribution
event_id = db.Column(
db.String,
nullable=True
)
#: The contribution ID of the changed contribution/subcontribution
contrib_id = db.Column(
db.String,
nullable=True
)
#: The subcontribution ID of the changed subcontribution
subcontrib_id = db.Column(
db.String,
nullable=True
)
#: The associated :class:LiveSyncAgent
@ -71,15 +97,22 @@ class LiveSyncQueueEntry(db.Model):
@return_ascii
def __repr__(self):
return '<LiveSyncQueueEntry({}, {}, {}, {})>'.format(self.agent, self.id, self.change, self.data)
return '<LiveSyncQueueEntry({}, {}, {}, {})>'.format(self.agent, self.id, ChangeType(self.change).name,
self.data)
@classmethod
def create(cls, change, data):
def create(cls, changes, obj_ref):
"""Creates a new change in all queues
:param change: the change type
:param data: the associated data (a json-serializable object)
:param changes: the change types, an iterable containing
:class:`ChangeType`
:param obj_ref: the object reference (returned by `obj_ref`)
of the changed object
"""
obj_ref = dict(obj_ref)
obj_ref.pop('type')
for agent in LiveSyncAgent.find():
db.session.add(cls(agent=agent, change=change, data=data))
for change in changes:
entry = cls(agent=agent, change=change, **obj_ref)
db.session.add(entry)
db.session.flush()

View File

@ -19,6 +19,7 @@ from __future__ import unicode_literals
from indico.core.plugins import IndicoPlugin, wrap_cli_manager
from indico_livesync.cli import cli_manager
from indico_livesync.handler import LiveSyncSignalHandler
class LiveSyncPlugin(IndicoPlugin):
@ -31,8 +32,8 @@ class LiveSyncPlugin(IndicoPlugin):
def init(self):
super(LiveSyncPlugin, self).init()
# noinspection PyAttributeOutsideInit
self.agent_classes = {}
self.signal_handler = LiveSyncSignalHandler(self)
def add_cli_command(self, manager):
manager.add_command('livesync', wrap_cli_manager(cli_manager, self))

View File

@ -0,0 +1,57 @@
# This file is part of Indico.
# Copyright (C) 2002 - 2014 European Organization for Nuclear Research (CERN).
#
# Indico is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# Indico is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Indico; if not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from MaKaC.conference import Conference, Contribution, SubContribution, Category, CategoryManager, ConferenceHolder
from werkzeug.datastructures import ImmutableDict
def obj_ref(obj, parent=None):
"""Returns a tuple identifying a category/event/contrib/subcontrib"""
if isinstance(obj, Category):
ref = {'type': 'category', 'category_id': obj.id}
elif isinstance(obj, Conference):
ref = {'type': 'event', 'event_id': obj.id}
elif isinstance(obj, Contribution):
event = parent or obj.getConference().id
ref = {'type': 'contribution', 'event_id': event, 'contrib_id': obj.id}
elif isinstance(obj, SubContribution):
contrib = parent or obj.getContribution()
ref = {'type': 'subcontribution',
'event_id': contrib.getConference().id, 'contrib_id': contrib.id, 'subcontrib_id': obj.id}
else:
raise ValueError('Unexpected object: {}'.format(obj.__class__.__name__))
return ImmutableDict(ref)
def obj_deref(ref):
"""Returns the object identified by `ref`"""
if ref['type'] == 'category':
try:
return CategoryManager().getById(ref['category_id'])
except KeyError:
return None
elif ref['type'] in {'event', 'contribution', 'subcontribution'}:
event = ConferenceHolder().getById(ref['event_id'], quiet=True)
if ref['type'] == 'event' or not event:
return event
contrib = event.getContributionById(ref['contrib_id'])
if ref['type'] == 'contribution' or not contrib:
return contrib
return contrib.getSubContributionById(ref['subcontrib_id'])
else:
raise ValueError('Unexpected object type: {}'.format(ref['type']))