LiveSync: add sessions, use proper DB queries

This commit is contained in:
Pedro Ferreira 2016-07-21 15:36:02 +02:00
parent 8929042f2a
commit 230848dbd9
9 changed files with 311 additions and 201 deletions

View File

@ -23,8 +23,8 @@ from terminaltables import AsciiTable
from indico.core.db import db, DBMgr
from indico.core.db.sqlalchemy.util.session import update_session_options
from indico.util.console import cformat, conferenceHolderIterator
from MaKaC.conference import ConferenceHolder
from indico.modules.events.models.events import Event
from indico.util.console import cformat
from indico_livesync.models.agents import LiveSyncAgent
@ -83,22 +83,13 @@ def initial_export(agent_id, force=False):
print cformat('To re-run it, use %{yellow!}--force%{reset}')
return
def _iter_events():
for i, (_, event) in enumerate(conferenceHolderIterator(ConferenceHolder(), deepness='event'), 1):
yield event
if i % 1000 == 0:
# Clean local ZEO cache
transaction.abort()
with DBMgr.getInstance().global_connection():
agent.create_backend().run_initial_export(_iter_events())
agent.create_backend().run_initial_export(Event.find(is_deleted=False))
agent.initial_data_exported = True
db.session.commit()
@cli_manager.option('agent_id')
@cli_manager.option('--force', action='store_true')
@cli_manager.option('--force', action='store_true', help="Run even if initial export was not done")
def run(agent_id, force=False):
"""Runs the livesync agent"""
update_session_options(db)

View File

@ -24,15 +24,15 @@ from sqlalchemy import inspect
from indico.core import signals
from indico.core.db.sqlalchemy.links import LinkType
from indico.core.db.sqlalchemy.protection import ProtectionMode
from indico.modules.categories.models.categories import Category
from indico.modules.events import Event
from indico.modules.events.contributions.models.contributions import Contribution
from indico.modules.events.contributions.models.subcontributions import SubContribution
from indico.modules.events.sessions import Session
from indico.util.event import unify_event_args
from MaKaC.conference import Conference
from indico_livesync.models.queue import LiveSyncQueueEntry, ChangeType
from indico_livesync.util import obj_ref, is_ref_excluded
from indico_livesync.util import obj_ref, get_excluded_categories
def connect_signals(plugin):
@ -40,8 +40,8 @@ def connect_signals(plugin):
plugin.connect(signals.after_process, _apply_changes)
plugin.connect(signals.before_retry, _clear_changes)
# moved
# plugin.connect(signals.category.moved, _moved)
# plugin.connect(signals.event.moved, _moved)
plugin.connect(signals.category.moved, _moved)
plugin.connect(signals.event.moved, _moved)
# created
plugin.connect(signals.event.created, _created)
plugin.connect(signals.event.contribution_created, _created)
@ -59,23 +59,15 @@ def connect_signals(plugin):
plugin.connect(signals.event.timetable_entry_updated, _timetable_changed)
plugin.connect(signals.event.timetable_entry_deleted, _timetable_changed)
# protection
# plugin.connect(signals.category.protection_changed, _protection_changed_legacy)
# plugin.connect(signals.event.protection_changed, _protection_changed_legacy)
plugin.connect(signals.acl.protection_changed, _category_protection_changed, sender=Category)
plugin.connect(signals.acl.protection_changed, _protection_changed, sender=Event)
plugin.connect(signals.acl.protection_changed, _protection_changed, sender=Session)
plugin.connect(signals.acl.protection_changed, _protection_changed, sender=Contribution)
# ACLs
# plugin.connect(signals.acl.access_granted, _acl_changed_legacy)
# plugin.connect(signals.acl.access_revoked, _acl_changed_legacy)
# plugin.connect(signals.acl.modification_granted, _acl_changed_legacy)
# plugin.connect(signals.acl.modification_revoked, _acl_changed_legacy)
plugin.connect(signals.acl.entry_changed, _acl_entry_changed, sender=Event)
plugin.connect(signals.acl.entry_changed, _acl_entry_changed, sender=Session)
plugin.connect(signals.acl.entry_changed, _acl_entry_changed, sender=Contribution)
# domain access
# plugin.connect(signals.category.domain_access_granted, _domain_changed)
# plugin.connect(signals.category.domain_access_revoked, _domain_changed)
# plugin.connect(signals.event.domain_access_granted, _domain_changed)
# plugin.connect(signals.event.domain_access_revoked, _domain_changed)
plugin.connect(signals.acl.entry_changed, _protection_changed, sender=Category)
plugin.connect(signals.acl.entry_changed, _protection_changed, sender=Event)
plugin.connect(signals.acl.entry_changed, _protection_changed, sender=Session)
plugin.connect(signals.acl.entry_changed, _protection_changed, sender=Contribution)
# notes
plugin.connect(signals.event.notes.note_added, _note_changed)
plugin.connect(signals.event.notes.note_deleted, _note_changed)
@ -87,12 +79,13 @@ def connect_signals(plugin):
plugin.connect(signals.attachments.attachment_updated, _attachment_changed)
def _moved(obj, old_parent, new_parent, **kwargs):
def _moved(obj, old_parent, **kwargs):
_register_change(obj, ChangeType.moved)
category_protection = old_parent.isProtected()
new_category_protection = new_parent.isProtected()
category_protection = old_parent.effective_protection_mode
new_category_protection = obj.protection_parent.effective_protection_mode
if category_protection != new_category_protection and obj.getAccessProtectionLevel() == 0:
# Event is inheriting and protection of new parent is different
if category_protection != new_category_protection and obj.is_inheriting:
_register_change(obj, ChangeType.protection_changed)
@ -110,6 +103,7 @@ def _created(obj, **kwargs):
_register_change(obj, ChangeType.created)
@unify_event_args
def _deleted(obj, **kwargs):
_register_deletion(obj)
@ -122,38 +116,17 @@ def _timetable_changed(entry, **kwargs):
_register_change(entry.event_new, ChangeType.data_changed)
def _protection_changed_legacy(obj, old, new, **kwargs):
if new == 0: # inheriting
new = 1 if obj.isProtected() else -1
if old != new:
_register_change(obj, ChangeType.protection_changed)
def _category_protection_changed(sender, obj, mode, old_mode, **kwargs):
parent_mode = obj.protection_parent.effective_protection_mode
if ((old_mode == ProtectionMode.inheriting and parent_mode == mode) or
(old_mode == parent_mode and mode == ProtectionMode.inheriting)):
return
_protection_changed(sender, obj, mode=mode, old_mode=old_mode, **kwargs)
def _protection_changed(sender, obj, **kwargs):
if isinstance(obj, Session):
_register_change(obj.event_new, ChangeType.protection_changed)
else:
_register_change(obj, ChangeType.protection_changed)
def _acl_changed_legacy(obj, **kwargs):
_handle_acl_change(obj)
def _acl_entry_changed(sender, obj, **kwargs):
if not inspect(obj).persistent:
return
if isinstance(obj, Session):
# if a session acl is changed we need to update all inheriting
# contributions in that session
for contrib in obj.contributions:
if contrib.protection_mode == ProtectionMode.inheriting:
_register_change(contrib, ChangeType.protection_changed)
else:
_register_change(obj, ChangeType.protection_changed)
def _domain_changed(obj, **kwargs):
_register_change(obj, ChangeType.protection_changed)
@ -164,17 +137,17 @@ def _note_changed(note, **kwargs):
def _attachment_changed(attachment_or_folder, **kwargs):
folder = getattr(attachment_or_folder, 'folder', attachment_or_folder)
if folder.link_type not in (LinkType.category, LinkType.session):
if not isinstance(folder.object, Category) and not isinstance(folder.object, Session):
_register_change(folder.object.event_new, ChangeType.data_changed)
def _apply_changes(sender, **kwargs):
excluded_categories = get_excluded_categories()
if not hasattr(g, 'livesync_changes'):
return
for ref, changes in g.livesync_changes.iteritems():
if is_ref_excluded(ref):
continue
LiveSyncQueueEntry.create(changes, ref)
LiveSyncQueueEntry.create(changes, ref, excluded_categories=excluded_categories)
def _clear_changes(sender, **kwargs):
@ -183,18 +156,6 @@ def _clear_changes(sender, **kwargs):
del g.livesync_changes
def _handle_acl_change(obj):
if isinstance(obj, Category):
_register_change(obj, ChangeType.protection_changed)
elif isinstance(obj, Conference):
if obj.getOwner():
_register_change(obj, ChangeType.data_changed)
elif isinstance(obj, AccessController):
_handle_acl_change(obj.getOwner())
else:
raise TypeError('Unexpected object: {}'.format(type(obj).__name__))
def _register_deletion(obj):
_init_livesync_g()
g.livesync_changes[obj_ref(obj)].add(ChangeType.deleted)

View File

@ -23,19 +23,23 @@ from MaKaC.accessControl import AccessWrapper
from MaKaC.common.output import outputGenerator
from MaKaC.common.xmlGen import XMLGen
from indico.modules.categories.models.categories import Category
from indico.modules.events.contributions.models.contributions import Contribution
from indico.modules.events.contributions.models.subcontributions import SubContribution
from indico.modules.events.models.events import Event
from indico_livesync import SimpleChange
from indico_livesync.models.queue import EntryType
from indico_livesync.util import make_compound_id, obj_deref, obj_ref
from indico_livesync.util import compound_id, obj_ref
class MARCXMLGenerator:
"""Generates MARCXML based on Indico objects"""
"""Generate MARCXML based on Indico objects."""
@classmethod
def records_to_xml(cls, records):
mg = MARCXMLGenerator()
for ref, change in records.iteritems():
mg.safe_add_object(ref, bool(change & SimpleChange.deleted))
for entry, change in records.iteritems():
mg.safe_add_object(entry, bool(change & SimpleChange.deleted))
return mg.get_xml()
@classmethod
@ -55,44 +59,39 @@ class MARCXMLGenerator:
aw.setUser(User.find_first(is_admin=True).as_avatar)
self.output_generator = outputGenerator(aw, self.xml_generator)
def safe_add_object(self, ref, deleted=False):
def safe_add_object(self, obj, deleted=False):
try:
self.add_object(ref, deleted)
self.add_object(obj, deleted)
except Exception:
current_plugin.logger.exception('Could not process %s', ref)
current_plugin.logger.exception('Could not process %s', obj)
def add_object(self, ref, deleted=False):
def add_object(self, obj, deleted=False):
if self.closed:
raise RuntimeError('Cannot add object to closed xml generator')
if deleted:
xg = XMLGen(init=False)
xg.openTag(b'record')
xg.openTag(b'datafield', [[b'tag', b'970'], [b'ind1', b' '], [b'ind2', b' ']])
xg.writeTag(b'subfield', b'INDICO.{}'.format(make_compound_id(ref)), [[b'code', b'a']])
xg.writeTag(b'subfield', b'INDICO.{}'.format(compound_id(obj)), [[b'code', b'a']])
xg.closeTag(b'datafield')
xg.openTag(b'datafield', [[b'tag', b'980'], [b'ind1', b' '], [b'ind2', b' ']])
xg.writeTag(b'subfield', b'DELETED', [[b'code', b'c']])
xg.closeTag(b'datafield')
xg.closeTag(b'record')
self.xml_generator.xml += xg.xml
elif ref['type'] in {EntryType.event, EntryType.contribution, EntryType.subcontribution}:
obj = obj_deref(ref)
if obj is None:
raise ValueError('Cannot add deleted object')
elif isinstance(obj, Category) and not obj.getOwner():
raise ValueError('Cannot add object without owner: {}'.format(obj))
elif isinstance(obj, (Event, Contribution, SubContribution)):
if obj.is_deleted or obj.event_new.is_deleted:
pass
elif ref['type'] == EntryType.event:
elif isinstance(obj, Event):
self.xml_generator.xml += self._event_to_marcxml(obj)
elif ref['type'] == EntryType.contribution:
elif isinstance(obj, Contribution):
self.xml_generator.xml += self._contrib_to_marcxml(obj)
elif ref['type'] == EntryType.subcontribution:
elif isinstance(obj, SubContribution):
self.xml_generator.xml += self._subcontrib_to_marcxml(obj)
elif ref['type'] == EntryType.category:
elif isinstance(obj, Category):
pass # we don't send category updates
else:
raise ValueError('unknown object ref: {}'.format(ref['type']))
raise ValueError('unknown object ref: {}'.format(obj))
return self.xml_generator.getXml()
def get_xml(self):

View File

@ -0,0 +1,84 @@
"""Add session to LiveSyncQueueEntry
Revision ID: 205f944640f6
Revises: 230c086da074
Create Date: 2016-07-15 17:36:12.702497
"""
import sqlalchemy as sa
from alembic import op
revision = '205f944640f6'
down_revision = '230c086da074'
def upgrade():
op.add_column('queues', sa.Column('session_id', sa.Integer(), nullable=True), schema='plugin_livesync')
op.create_foreign_key(None,
'queues', 'sessions',
['session_id'], ['id'],
source_schema='plugin_livesync', referent_schema='events')
op.create_index(None, 'queues', ['session_id'], unique=False, schema='plugin_livesync')
op.drop_constraint('ck_queues_valid_enum_type', 'queues', schema='plugin_livesync')
op.drop_constraint('ck_queues_valid_category_entry', 'queues', schema='plugin_livesync')
op.drop_constraint('ck_queues_valid_event_entry', 'queues', schema='plugin_livesync')
op.drop_constraint('ck_queues_valid_contribution_entry', 'queues', schema='plugin_livesync')
op.drop_constraint('ck_queues_valid_subcontribution_entry', 'queues', schema='plugin_livesync')
op.create_check_constraint('valid_enum_type', 'queues',
'type IN (1, 2, 3, 4, 5)',
schema='plugin_livesync')
op.create_check_constraint('valid_category_entry', 'queues',
'type != 1 OR (contribution_id IS NULL AND event_id IS NULL AND session_id IS NULL AND '
'subcontribution_id IS NULL AND category_id IS NOT NULL)',
schema='plugin_livesync')
op.create_check_constraint('valid_event_entry', 'queues',
'type != 2 OR (category_id IS NULL AND contribution_id IS NULL AND session_id IS NULL '
'AND subcontribution_id IS NULL AND event_id IS NOT NULL)',
schema='plugin_livesync')
op.create_check_constraint('valid_session_entry', 'queues',
'type != 5 OR (category_id IS NULL AND event_id IS NULL AND contribution_id IS NULL '
'AND subcontribution_id IS NULL AND session_id IS NOT NULL)',
schema='plugin_livesync')
op.create_check_constraint('valid_contribution_entry', 'queues',
'type != 3 OR (category_id IS NULL AND event_id IS NULL AND session_id IS NULL AND '
'subcontribution_id IS NULL AND contribution_id IS NOT NULL)',
schema='plugin_livesync')
op.create_check_constraint('valid_subcontribution_entry', 'queues',
'type != 4 OR (category_id IS NULL AND contribution_id IS NULL AND session_id IS NULL '
'AND event_id IS NULL AND subcontribution_id IS NOT NULL)',
schema='plugin_livesync')
def downgrade():
op.drop_constraint('ck_queues_valid_category_entry', 'queues', schema='plugin_livesync')
op.drop_constraint('ck_queues_valid_event_entry', 'queues', schema='plugin_livesync')
op.drop_constraint('ck_queues_valid_session_entry', 'queues', schema='plugin_livesync')
op.drop_constraint('ck_queues_valid_contribution_entry', 'queues', schema='plugin_livesync')
op.drop_constraint('ck_queues_valid_subcontribution_entry', 'queues', schema='plugin_livesync')
op.drop_column('queues', 'session_id', schema='plugin_livesync')
op.create_check_constraint('valid_category_entry', 'queues',
'type != 1 OR (contribution_id IS NULL AND event_id IS NULL AND '
'subcontribution_id IS NULL AND category_id IS NOT NULL)',
schema='plugin_livesync')
op.create_check_constraint('valid_event_entry', 'queues',
'type != 2 OR (category_id IS NULL AND contribution_id IS NULL AND '
'subcontribution_id IS NULL AND event_id IS NOT NULL)',
schema='plugin_livesync')
op.create_check_constraint('valid_contribution_entry', 'queues',
'type != 3 OR (category_id IS NULL AND event_id IS NULL AND '
'subcontribution_id IS NULL AND contribution_id IS NOT NULL)',
schema='plugin_livesync')
op.create_check_constraint('valid_subcontribution_entry', 'queues',
'type != 4 OR (category_id IS NULL AND contribution_id IS NULL AND '
'event_id IS NULL AND subcontribution_id IS NOT NULL)',
schema='plugin_livesync')
op.drop_constraint('ck_queues_valid_enum_type', 'queues', schema='plugin_livesync')
op.create_check_constraint('valid_enum_type', 'queues',
'type IN (1, 2, 3, 4)',
schema='plugin_livesync')

View File

@ -19,12 +19,14 @@ from __future__ import unicode_literals
from werkzeug.datastructures import ImmutableDict
from indico.core.db.sqlalchemy import db, UTCDateTime, PyIntEnum
from indico.modules.categories.models.categories import Category
from indico.modules.events.models.events import Event
from indico.util.date_time import now_utc
from indico.util.string import return_ascii, format_repr
from indico.util.struct.enum import IndicoEnum
from indico_livesync.models.agents import LiveSyncAgent
from indico_livesync.util import obj_deref, obj_ref
from indico_livesync.util import obj_deref
class ChangeType(int, IndicoEnum):
@ -40,13 +42,15 @@ class EntryType(int, IndicoEnum):
event = 2
contribution = 3
subcontribution = 4
session = 5
_column_for_types = {
EntryType.category: 'category_id',
EntryType.event: 'event_id',
EntryType.contribution: 'contribution_id',
EntryType.subcontribution: 'subcontribution_id'
EntryType.subcontribution: 'subcontribution_id',
EntryType.session: 'session_id'
}
@ -113,7 +117,7 @@ class LiveSyncQueueEntry(db.Model):
nullable=True
)
#: The event ID of the changed event/contribution/subcontribution
#: ID of the changed event
event_id = db.Column(
db.Integer,
db.ForeignKey('events.events.id'),
@ -121,7 +125,7 @@ class LiveSyncQueueEntry(db.Model):
nullable=True
)
#: The contribution ID of the changed contribution/subcontribution
#: ID of the changed contribution
contrib_id = db.Column(
'contribution_id',
db.Integer,
@ -130,7 +134,16 @@ class LiveSyncQueueEntry(db.Model):
nullable=True
)
#: The subcontribution ID of the changed subcontribution
#: ID of the changed session
session_id = db.Column(
'session_id',
db.Integer,
db.ForeignKey('events.sessions.id'),
index=True,
nullable=True
)
#: ID of the changed subcontribution
subcontrib_id = db.Column(
'subcontribution_id',
db.Integer,
@ -145,8 +158,28 @@ class LiveSyncQueueEntry(db.Model):
backref=db.backref('queue', cascade='all, delete-orphan', lazy='dynamic')
)
event = db.relationship(
category = db.relationship(
'Category',
lazy=True,
backref=db.backref(
'livesync_queue_entries',
cascade='all, delete-orphan',
lazy=True
)
)
event_new = db.relationship(
'Event',
lazy=True,
backref=db.backref(
'livesync_queue_entries',
cascade='all, delete-orphan',
lazy=True
)
)
session = db.relationship(
'Session',
lazy=False,
backref=db.backref(
'livesync_queue_entries',
@ -175,63 +208,60 @@ class LiveSyncQueueEntry(db.Model):
)
)
@property
def category(self):
return CategoryManager().getById(str(self.category_id), True)
@category.setter
def category(self, value):
self.category_id = int(value.id) if value is not None else None
@property
def object(self):
"""Returns the changed object"""
return obj_deref(self.object_ref)
"""Return the changed object."""
if self.type == EntryType.category:
return self.category
elif self.type == EntryType.event:
return self.event_new
elif self.type == EntryType.session:
return self.session
elif self.type == EntryType.contribution:
return self.contribution
elif self.type == EntryType.subcontribution:
return self.subcontribution
@property
def object_ref(self):
"""Returns the reference of the changed object"""
"""Return the reference of the changed object."""
return ImmutableDict(type=self.type, category_id=self.category_id, event_id=self.event_id,
contrib_id=self.contrib_id, subcontrib_id=self.subcontrib_id)
session_id=self.session_id, contrib_id=self.contrib_id, subcontrib_id=self.subcontrib_id)
@return_ascii
def __repr__(self):
ref_repr = '{}.{}.{}.{}'.format(self.category_id if self.category_id else 'x',
self.event_id if self.event_id else 'x',
self.contrib_id if self.contrib_id else 'x',
self.subcontrib_id if self.subcontrib_id else 'x')
ref_repr = '{}.{}.{}.{}.{}'.format(self.category_id if self.category_id else 'x',
self.event_id if self.event_id else 'x',
self.session_id if self.session_id else 'x',
self.contrib_id if self.contrib_id else 'x',
self.subcontrib_id if self.subcontrib_id else 'x')
return format_repr(self, 'agent', 'id', 'type', 'change', _text=ref_repr)
@classmethod
def create(cls, changes, ref):
"""Creates a new change in all queues
def create(cls, changes, ref, excluded_categories=set()):
"""Create a new change in all queues.
:param changes: the change types, an iterable containing
:class:`ChangeType`
:param ref: the object reference (returned by `obj_ref`)
of the changed object
:param excluded_categories: set of categories (IDs) whose items
will not be tracked
"""
ref = dict(ref)
for agent in LiveSyncAgent.find():
for change in changes:
obj = obj_deref(ref)
if isinstance(obj, Category):
if any(c.id in excluded_categories for c in obj.chain_query):
return
else:
event = obj if isinstance(obj, Event) else obj.event_new
if excluded_categories & set(event.category_chain):
return
for change in changes:
for agent in LiveSyncAgent.find():
entry = cls(agent=agent, change=change, **ref)
db.session.add(entry)
db.session.flush()
def iter_subentries(self):
"""Iterates through all children
The only field of the yielded items that should be used are
`type`, `object` and `object_ref`.
"""
if self.type not in {EntryType.category, EntryType.event, EntryType.contribution}:
return
if self.type == EntryType.category:
for event in self.object.iterAllConferences():
yield LiveSyncQueueEntry(change=self.change, **obj_ref(event))
elif self.type == EntryType.event:
for contrib in self.object.contributions:
yield LiveSyncQueueEntry(change=self.change, **obj_ref(contrib))
elif self.type == EntryType.contribution:
for subcontrib in self.object.subcontributions:
yield LiveSyncQueueEntry(change=self.change, **obj_ref(subcontrib))

View File

@ -16,10 +16,18 @@
from __future__ import unicode_literals
import itertools
from collections import defaultdict
from indico.util.struct.enum import IndicoEnum
from sqlalchemy.orm import joinedload
from indico.core.db import db
from indico.modules.categories.models.categories import Category
from indico.modules.events.models.events import Event
from indico.modules.events.contributions.models.contributions import Contribution
from indico.modules.events.contributions.models.subcontributions import SubContribution
from indico.util.struct.enum import IndicoEnum
from indico_livesync.models.queue import ChangeType, EntryType
@ -36,6 +44,7 @@ def process_records(records):
:return: a dict mapping object references to `SimpleChange` bitsets
"""
changes = defaultdict(int)
cascaded_records = set()
for record in records:
if record.change != ChangeType.deleted and record.object is None:
@ -44,23 +53,80 @@ def process_records(records):
continue
if record.change == ChangeType.created:
assert record.type != EntryType.category
changes[record.object_ref] |= SimpleChange.created
changes[record.object] |= SimpleChange.created
elif record.change == ChangeType.deleted:
assert record.type != EntryType.category
changes[record.object_ref] |= SimpleChange.deleted
changes[record.object] |= SimpleChange.deleted
elif record.change in {ChangeType.moved, ChangeType.protection_changed}:
for ref in _cascade(record):
changes[ref] |= SimpleChange.updated
cascaded_records.add(record)
elif record.change == ChangeType.data_changed:
assert record.type != EntryType.category
changes[record.object_ref] |= SimpleChange.updated
changes[record.object] |= SimpleChange.updated
for obj in _process_cascaded(cascaded_records):
changes[obj] |= SimpleChange.updated
return changes
def _cascade(record):
yield record.object_ref
for subrecord in record.iter_subentries():
if subrecord.object:
for item in _cascade(subrecord):
yield item
def _process_cascaded(records):
category_prot_records = {rec.category_id for rec in records if rec.type == EntryType.category
and rec.change == ChangeType.protection_changed}
category_move_records = {rec.category_id for rec in records if rec.type == EntryType.category
and rec.change == ChangeType.moved}
event_records = {rec.event_id for rec in records if rec.type == EntryType.event}
session_records = {rec.session_id for rec in records if rec.type == EntryType.session}
contribution_records = {rec.contribution_id for rec in records if rec.type == EntryType.contribution}
subcontribution_records = {rec.subcontribution_id for rec in records if rec.type == EntryType.subcontribution}
changed_events = set()
changed_contributions = set()
changed_subcontributions = set()
category_prot_records -= category_move_records # A move already implies sending the whole record
# Protection changes are handled differently, as there may not be the need to re-generate the record
if category_prot_records:
for categ in Category.find(Category.id.in_(category_prot_records)):
cte = categ.get_protection_parent_cte()
# Update only children that inherit
inheriting_categ_children = (Event.query
.join(cte, db.and_((Event.category_id == cte.c.id),
(cte.c.protection_parent == categ.id))))
inheriting_direct_children = Event.find((Event.category_id == categ.id) & Event.is_inheriting)
changed_events.update(itertools.chain(inheriting_direct_children, inheriting_categ_children))
# Add move operations and explicitly-passed event records
if category_move_records:
changed_events.update(Event.find(Event.category_chain_overlaps(category_move_records)))
if event_records:
changed_events.update(Event.find(Event.id.in_(event_records)))
for event in changed_events:
yield event
# Sessions are added (explicitly changed only, since they don't need to be sent anywhere)
if session_records:
changed_contributions.update(Contribution
.find(Contribution.session_id.in_(session_records), ~Contribution.is_deleted))
# Contributions are added (implictly + explicitly changed)
changed_event_ids = {ev.id for ev in changed_events}
condition = Contribution.event_id.in_(changed_event_ids) & ~Contribution.is_deleted
if contribution_records:
condition = db.or_(condition, Contribution.id.in_(contribution_records))
contrib_query = Contribution.find(condition).options(joinedload('subcontributions'))
for contribution in contrib_query:
yield contribution
changed_subcontributions.update(contribution.subcontributions)
# Same for subcontributions
if subcontribution_records:
changed_subcontributions.update(SubContribution
.find(SubContribution.contribution_id.in_(subcontribution_records)))
for subcontrib in changed_subcontributions:
yield subcontrib

View File

@ -87,7 +87,6 @@ class Uploader(object):
self.logger.debug('Marking as processed: %s', record)
record.processed = True
db.session.commit()
transaction.abort() # clear ZEO cache
class MARCXMLUploader(Uploader):

View File

@ -20,12 +20,13 @@ from datetime import timedelta
from werkzeug.datastructures import ImmutableDict
from indico.modules.categories.models.categories import Category
from indico.modules.events import Event
from indico.modules.events.contributions.models.contributions import Contribution
from indico.modules.events.sessions.models.sessions import Session
from indico.modules.events.contributions.models.subcontributions import SubContribution
from indico.util.caching import memoize_request
from indico.util.date_time import now_utc
from MaKaC.conference import Conference
def obj_ref(obj):
@ -35,8 +36,8 @@ def obj_ref(obj):
ref = {'type': EntryType.category, 'category_id': obj.id}
elif isinstance(obj, Event):
ref = {'type': EntryType.event, 'event_id': obj.id}
elif isinstance(obj, Conference):
ref = {'type': EntryType.event, 'event_id': int(obj.id)}
elif isinstance(obj, Session):
ref = {'type': EntryType.session, 'session_id': obj.id}
elif isinstance(obj, Contribution):
ref = {'type': EntryType.contribution, 'contrib_id': obj.id}
elif isinstance(obj, SubContribution):
@ -46,36 +47,24 @@ def obj_ref(obj):
return ImmutableDict(ref)
@memoize_request
def obj_deref(ref):
"""Returns the object identified by `ref`"""
from indico_livesync.models.queue import EntryType
if ref['type'] == EntryType.category:
return CategoryManager().getById(ref['category_id'], True)
return Category.get_one(ref['category_id'])
elif ref['type'] == EntryType.event:
return Event.get(ref['event_id'])
return Event.get_one(ref['event_id'])
elif ref['type'] == EntryType.session:
return Session.get_one(ref['session_id'])
elif ref['type'] == EntryType.contribution:
return Contribution.get(ref['contrib_id'])
return Contribution.get_one(ref['contrib_id'])
elif ref['type'] == EntryType.subcontribution:
return SubContribution.get(ref['subcontrib_id'])
return SubContribution.get_one(ref['subcontrib_id'])
else:
raise ValueError('Unexpected object type: {}'.format(ref['type']))
def make_compound_id(ref):
"""Returns the compound ID for the referenced object"""
from indico_livesync.models.queue import EntryType
if ref['type'] == EntryType.category:
raise ValueError('Compound IDs are not supported for categories')
obj = obj_deref(ref)
if isinstance(obj, Event):
return unicode(obj.id)
elif isinstance(obj, Contribution):
return '{}.{}'.format(obj.event_id, obj.id)
elif isinstance(obj, SubContribution):
return '{}.{}.{}'.format(obj.contribution.event_id, obj.contribution_id, obj.id)
raise ValueError('Unexpected object type: {}'.format(ref['type']))
def clean_old_entries():
"""Deletes obsolete entries from the queues"""
from indico_livesync.plugin import LiveSyncPlugin
@ -91,25 +80,18 @@ def clean_old_entries():
@memoize_request
def get_excluded_categories():
"""Get all excluded category IDs"""
"""Get excluded category IDs."""
from indico_livesync.plugin import LiveSyncPlugin
todo = {x['id'] for x in LiveSyncPlugin.settings.get('excluded_categories')}
excluded = set()
while todo:
category_id = todo.pop()
try:
category = CategoryManager().getById(category_id)
except KeyError:
continue
excluded.add(category.getId())
todo.update(category.subcategories)
return excluded
return {int(x['id']) for x in LiveSyncPlugin.settings.get('excluded_categories')}
def is_ref_excluded(ref):
from indico_livesync.models.queue import EntryType
if ref['type'] == EntryType.category:
return ref['category_id'] in get_excluded_categories()
else:
obj = obj_deref(ref)
return unicode(obj.event_new.category_id) in {unicode(x) for x in get_excluded_categories()}
def compound_id(obj):
"""Generate a hierarchical compound ID, separated by dots."""
if isinstance(obj, (Category, Session)):
raise TypeError('Compound IDs are not supported for this entry type')
elif isinstance(obj, Event):
return unicode(obj.id)
elif isinstance(obj, Contribution):
return '{}.{}'.format(obj.event_id, obj.id)
elif isinstance(obj, SubContribution):
return '{}.{}.{}'.format(obj.contribution.event_id, obj.contribution_id, obj.id)

View File

@ -20,7 +20,6 @@ from indico.util.console import cformat
from indico.util.struct.iterables import grouper
from indico_livesync import LiveSyncBackendBase, SimpleChange, MARCXMLGenerator, process_records, Uploader
from indico_livesync.util import obj_deref
def _change_str(change):
@ -48,9 +47,8 @@ class LiveSyncDebugBackend(LiveSyncBackendBase):
self._print()
self._print(cformat('%{white!}Simplified/cascaded changes:%{reset}'))
for ref, change in process_records(records).iteritems():
obj = obj_deref(ref)
self._print(cformat('%{white!}{}%{reset}: {}').format(_change_str(change), obj or ref))
for obj, change in process_records(records).iteritems():
self._print(cformat('%{white!}{}%{reset}: {}').format(_change_str(change), obj))
self._print()
self._print(cformat('%{white!}Resulting MarcXML:%{reset}'))