diff --git a/livesync/indico_livesync/cli.py b/livesync/indico_livesync/cli.py index 86fa043..5443c90 100644 --- a/livesync/indico_livesync/cli.py +++ b/livesync/indico_livesync/cli.py @@ -23,8 +23,8 @@ from terminaltables import AsciiTable from indico.core.db import db, DBMgr from indico.core.db.sqlalchemy.util.session import update_session_options -from indico.util.console import cformat, conferenceHolderIterator -from MaKaC.conference import ConferenceHolder +from indico.modules.events.models.events import Event +from indico.util.console import cformat from indico_livesync.models.agents import LiveSyncAgent @@ -83,22 +83,13 @@ def initial_export(agent_id, force=False): print cformat('To re-run it, use %{yellow!}--force%{reset}') return - def _iter_events(): - for i, (_, event) in enumerate(conferenceHolderIterator(ConferenceHolder(), deepness='event'), 1): - yield event - if i % 1000 == 0: - # Clean local ZEO cache - transaction.abort() - - with DBMgr.getInstance().global_connection(): - agent.create_backend().run_initial_export(_iter_events()) - + agent.create_backend().run_initial_export(Event.find(is_deleted=False)) agent.initial_data_exported = True db.session.commit() @cli_manager.option('agent_id') -@cli_manager.option('--force', action='store_true') +@cli_manager.option('--force', action='store_true', help="Run even if initial export was not done") def run(agent_id, force=False): """Runs the livesync agent""" update_session_options(db) diff --git a/livesync/indico_livesync/handler.py b/livesync/indico_livesync/handler.py index 18d35b4..59018f7 100644 --- a/livesync/indico_livesync/handler.py +++ b/livesync/indico_livesync/handler.py @@ -24,15 +24,15 @@ from sqlalchemy import inspect from indico.core import signals from indico.core.db.sqlalchemy.links import LinkType from indico.core.db.sqlalchemy.protection import ProtectionMode +from indico.modules.categories.models.categories import Category from indico.modules.events import Event from indico.modules.events.contributions.models.contributions import Contribution from indico.modules.events.contributions.models.subcontributions import SubContribution from indico.modules.events.sessions import Session from indico.util.event import unify_event_args -from MaKaC.conference import Conference from indico_livesync.models.queue import LiveSyncQueueEntry, ChangeType -from indico_livesync.util import obj_ref, is_ref_excluded +from indico_livesync.util import obj_ref, get_excluded_categories def connect_signals(plugin): @@ -40,8 +40,8 @@ def connect_signals(plugin): plugin.connect(signals.after_process, _apply_changes) plugin.connect(signals.before_retry, _clear_changes) # moved - # plugin.connect(signals.category.moved, _moved) - # plugin.connect(signals.event.moved, _moved) + plugin.connect(signals.category.moved, _moved) + plugin.connect(signals.event.moved, _moved) # created plugin.connect(signals.event.created, _created) plugin.connect(signals.event.contribution_created, _created) @@ -59,23 +59,15 @@ def connect_signals(plugin): plugin.connect(signals.event.timetable_entry_updated, _timetable_changed) plugin.connect(signals.event.timetable_entry_deleted, _timetable_changed) # protection - # plugin.connect(signals.category.protection_changed, _protection_changed_legacy) - # plugin.connect(signals.event.protection_changed, _protection_changed_legacy) + plugin.connect(signals.acl.protection_changed, _category_protection_changed, sender=Category) + plugin.connect(signals.acl.protection_changed, _protection_changed, sender=Event) plugin.connect(signals.acl.protection_changed, _protection_changed, sender=Session) plugin.connect(signals.acl.protection_changed, _protection_changed, sender=Contribution) # ACLs - # plugin.connect(signals.acl.access_granted, _acl_changed_legacy) - # plugin.connect(signals.acl.access_revoked, _acl_changed_legacy) - # plugin.connect(signals.acl.modification_granted, _acl_changed_legacy) - # plugin.connect(signals.acl.modification_revoked, _acl_changed_legacy) - plugin.connect(signals.acl.entry_changed, _acl_entry_changed, sender=Event) - plugin.connect(signals.acl.entry_changed, _acl_entry_changed, sender=Session) - plugin.connect(signals.acl.entry_changed, _acl_entry_changed, sender=Contribution) - # domain access - # plugin.connect(signals.category.domain_access_granted, _domain_changed) - # plugin.connect(signals.category.domain_access_revoked, _domain_changed) - # plugin.connect(signals.event.domain_access_granted, _domain_changed) - # plugin.connect(signals.event.domain_access_revoked, _domain_changed) + plugin.connect(signals.acl.entry_changed, _protection_changed, sender=Category) + plugin.connect(signals.acl.entry_changed, _protection_changed, sender=Event) + plugin.connect(signals.acl.entry_changed, _protection_changed, sender=Session) + plugin.connect(signals.acl.entry_changed, _protection_changed, sender=Contribution) # notes plugin.connect(signals.event.notes.note_added, _note_changed) plugin.connect(signals.event.notes.note_deleted, _note_changed) @@ -87,12 +79,13 @@ def connect_signals(plugin): plugin.connect(signals.attachments.attachment_updated, _attachment_changed) -def _moved(obj, old_parent, new_parent, **kwargs): +def _moved(obj, old_parent, **kwargs): _register_change(obj, ChangeType.moved) - category_protection = old_parent.isProtected() - new_category_protection = new_parent.isProtected() + category_protection = old_parent.effective_protection_mode + new_category_protection = obj.protection_parent.effective_protection_mode - if category_protection != new_category_protection and obj.getAccessProtectionLevel() == 0: + # Event is inheriting and protection of new parent is different + if category_protection != new_category_protection and obj.is_inheriting: _register_change(obj, ChangeType.protection_changed) @@ -110,6 +103,7 @@ def _created(obj, **kwargs): _register_change(obj, ChangeType.created) +@unify_event_args def _deleted(obj, **kwargs): _register_deletion(obj) @@ -122,38 +116,17 @@ def _timetable_changed(entry, **kwargs): _register_change(entry.event_new, ChangeType.data_changed) -def _protection_changed_legacy(obj, old, new, **kwargs): - if new == 0: # inheriting - new = 1 if obj.isProtected() else -1 - if old != new: - _register_change(obj, ChangeType.protection_changed) +def _category_protection_changed(sender, obj, mode, old_mode, **kwargs): + parent_mode = obj.protection_parent.effective_protection_mode + if ((old_mode == ProtectionMode.inheriting and parent_mode == mode) or + (old_mode == parent_mode and mode == ProtectionMode.inheriting)): + return + _protection_changed(sender, obj, mode=mode, old_mode=old_mode, **kwargs) def _protection_changed(sender, obj, **kwargs): - if isinstance(obj, Session): - _register_change(obj.event_new, ChangeType.protection_changed) - else: - _register_change(obj, ChangeType.protection_changed) - - -def _acl_changed_legacy(obj, **kwargs): - _handle_acl_change(obj) - - -def _acl_entry_changed(sender, obj, **kwargs): if not inspect(obj).persistent: return - if isinstance(obj, Session): - # if a session acl is changed we need to update all inheriting - # contributions in that session - for contrib in obj.contributions: - if contrib.protection_mode == ProtectionMode.inheriting: - _register_change(contrib, ChangeType.protection_changed) - else: - _register_change(obj, ChangeType.protection_changed) - - -def _domain_changed(obj, **kwargs): _register_change(obj, ChangeType.protection_changed) @@ -164,17 +137,17 @@ def _note_changed(note, **kwargs): def _attachment_changed(attachment_or_folder, **kwargs): folder = getattr(attachment_or_folder, 'folder', attachment_or_folder) - if folder.link_type not in (LinkType.category, LinkType.session): + if not isinstance(folder.object, Category) and not isinstance(folder.object, Session): _register_change(folder.object.event_new, ChangeType.data_changed) def _apply_changes(sender, **kwargs): + excluded_categories = get_excluded_categories() + if not hasattr(g, 'livesync_changes'): return for ref, changes in g.livesync_changes.iteritems(): - if is_ref_excluded(ref): - continue - LiveSyncQueueEntry.create(changes, ref) + LiveSyncQueueEntry.create(changes, ref, excluded_categories=excluded_categories) def _clear_changes(sender, **kwargs): @@ -183,18 +156,6 @@ def _clear_changes(sender, **kwargs): del g.livesync_changes -def _handle_acl_change(obj): - if isinstance(obj, Category): - _register_change(obj, ChangeType.protection_changed) - elif isinstance(obj, Conference): - if obj.getOwner(): - _register_change(obj, ChangeType.data_changed) - elif isinstance(obj, AccessController): - _handle_acl_change(obj.getOwner()) - else: - raise TypeError('Unexpected object: {}'.format(type(obj).__name__)) - - def _register_deletion(obj): _init_livesync_g() g.livesync_changes[obj_ref(obj)].add(ChangeType.deleted) diff --git a/livesync/indico_livesync/marcxml.py b/livesync/indico_livesync/marcxml.py index d0b57a7..5539361 100644 --- a/livesync/indico_livesync/marcxml.py +++ b/livesync/indico_livesync/marcxml.py @@ -23,19 +23,23 @@ from MaKaC.accessControl import AccessWrapper from MaKaC.common.output import outputGenerator from MaKaC.common.xmlGen import XMLGen +from indico.modules.categories.models.categories import Category +from indico.modules.events.contributions.models.contributions import Contribution +from indico.modules.events.contributions.models.subcontributions import SubContribution +from indico.modules.events.models.events import Event + from indico_livesync import SimpleChange -from indico_livesync.models.queue import EntryType -from indico_livesync.util import make_compound_id, obj_deref, obj_ref +from indico_livesync.util import compound_id, obj_ref class MARCXMLGenerator: - """Generates MARCXML based on Indico objects""" + """Generate MARCXML based on Indico objects.""" @classmethod def records_to_xml(cls, records): mg = MARCXMLGenerator() - for ref, change in records.iteritems(): - mg.safe_add_object(ref, bool(change & SimpleChange.deleted)) + for entry, change in records.iteritems(): + mg.safe_add_object(entry, bool(change & SimpleChange.deleted)) return mg.get_xml() @classmethod @@ -55,44 +59,39 @@ class MARCXMLGenerator: aw.setUser(User.find_first(is_admin=True).as_avatar) self.output_generator = outputGenerator(aw, self.xml_generator) - def safe_add_object(self, ref, deleted=False): + def safe_add_object(self, obj, deleted=False): try: - self.add_object(ref, deleted) + self.add_object(obj, deleted) except Exception: - current_plugin.logger.exception('Could not process %s', ref) + current_plugin.logger.exception('Could not process %s', obj) - def add_object(self, ref, deleted=False): + def add_object(self, obj, deleted=False): if self.closed: raise RuntimeError('Cannot add object to closed xml generator') if deleted: xg = XMLGen(init=False) xg.openTag(b'record') xg.openTag(b'datafield', [[b'tag', b'970'], [b'ind1', b' '], [b'ind2', b' ']]) - xg.writeTag(b'subfield', b'INDICO.{}'.format(make_compound_id(ref)), [[b'code', b'a']]) + xg.writeTag(b'subfield', b'INDICO.{}'.format(compound_id(obj)), [[b'code', b'a']]) xg.closeTag(b'datafield') xg.openTag(b'datafield', [[b'tag', b'980'], [b'ind1', b' '], [b'ind2', b' ']]) xg.writeTag(b'subfield', b'DELETED', [[b'code', b'c']]) xg.closeTag(b'datafield') xg.closeTag(b'record') self.xml_generator.xml += xg.xml - elif ref['type'] in {EntryType.event, EntryType.contribution, EntryType.subcontribution}: - obj = obj_deref(ref) - if obj is None: - raise ValueError('Cannot add deleted object') - elif isinstance(obj, Category) and not obj.getOwner(): - raise ValueError('Cannot add object without owner: {}'.format(obj)) + elif isinstance(obj, (Event, Contribution, SubContribution)): if obj.is_deleted or obj.event_new.is_deleted: pass - elif ref['type'] == EntryType.event: + elif isinstance(obj, Event): self.xml_generator.xml += self._event_to_marcxml(obj) - elif ref['type'] == EntryType.contribution: + elif isinstance(obj, Contribution): self.xml_generator.xml += self._contrib_to_marcxml(obj) - elif ref['type'] == EntryType.subcontribution: + elif isinstance(obj, SubContribution): self.xml_generator.xml += self._subcontrib_to_marcxml(obj) - elif ref['type'] == EntryType.category: + elif isinstance(obj, Category): pass # we don't send category updates else: - raise ValueError('unknown object ref: {}'.format(ref['type'])) + raise ValueError('unknown object ref: {}'.format(obj)) return self.xml_generator.getXml() def get_xml(self): diff --git a/livesync/indico_livesync/migrations/201607151736_205f944640f6_add_session_to_livesyncqueueentry.py b/livesync/indico_livesync/migrations/201607151736_205f944640f6_add_session_to_livesyncqueueentry.py new file mode 100644 index 0000000..97f8596 --- /dev/null +++ b/livesync/indico_livesync/migrations/201607151736_205f944640f6_add_session_to_livesyncqueueentry.py @@ -0,0 +1,84 @@ +"""Add session to LiveSyncQueueEntry + +Revision ID: 205f944640f6 +Revises: 230c086da074 +Create Date: 2016-07-15 17:36:12.702497 +""" + +import sqlalchemy as sa +from alembic import op + + +revision = '205f944640f6' +down_revision = '230c086da074' + + +def upgrade(): + op.add_column('queues', sa.Column('session_id', sa.Integer(), nullable=True), schema='plugin_livesync') + op.create_foreign_key(None, + 'queues', 'sessions', + ['session_id'], ['id'], + source_schema='plugin_livesync', referent_schema='events') + op.create_index(None, 'queues', ['session_id'], unique=False, schema='plugin_livesync') + + op.drop_constraint('ck_queues_valid_enum_type', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_category_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_event_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_contribution_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_subcontribution_entry', 'queues', schema='plugin_livesync') + + op.create_check_constraint('valid_enum_type', 'queues', + 'type IN (1, 2, 3, 4, 5)', + schema='plugin_livesync') + op.create_check_constraint('valid_category_entry', 'queues', + 'type != 1 OR (contribution_id IS NULL AND event_id IS NULL AND session_id IS NULL AND ' + 'subcontribution_id IS NULL AND category_id IS NOT NULL)', + schema='plugin_livesync') + op.create_check_constraint('valid_event_entry', 'queues', + 'type != 2 OR (category_id IS NULL AND contribution_id IS NULL AND session_id IS NULL ' + 'AND subcontribution_id IS NULL AND event_id IS NOT NULL)', + schema='plugin_livesync') + op.create_check_constraint('valid_session_entry', 'queues', + 'type != 5 OR (category_id IS NULL AND event_id IS NULL AND contribution_id IS NULL ' + 'AND subcontribution_id IS NULL AND session_id IS NOT NULL)', + schema='plugin_livesync') + op.create_check_constraint('valid_contribution_entry', 'queues', + 'type != 3 OR (category_id IS NULL AND event_id IS NULL AND session_id IS NULL AND ' + 'subcontribution_id IS NULL AND contribution_id IS NOT NULL)', + schema='plugin_livesync') + op.create_check_constraint('valid_subcontribution_entry', 'queues', + 'type != 4 OR (category_id IS NULL AND contribution_id IS NULL AND session_id IS NULL ' + 'AND event_id IS NULL AND subcontribution_id IS NOT NULL)', + schema='plugin_livesync') + + +def downgrade(): + op.drop_constraint('ck_queues_valid_category_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_event_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_session_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_contribution_entry', 'queues', schema='plugin_livesync') + op.drop_constraint('ck_queues_valid_subcontribution_entry', 'queues', schema='plugin_livesync') + + op.drop_column('queues', 'session_id', schema='plugin_livesync') + + op.create_check_constraint('valid_category_entry', 'queues', + 'type != 1 OR (contribution_id IS NULL AND event_id IS NULL AND ' + 'subcontribution_id IS NULL AND category_id IS NOT NULL)', + schema='plugin_livesync') + op.create_check_constraint('valid_event_entry', 'queues', + 'type != 2 OR (category_id IS NULL AND contribution_id IS NULL AND ' + 'subcontribution_id IS NULL AND event_id IS NOT NULL)', + schema='plugin_livesync') + op.create_check_constraint('valid_contribution_entry', 'queues', + 'type != 3 OR (category_id IS NULL AND event_id IS NULL AND ' + 'subcontribution_id IS NULL AND contribution_id IS NOT NULL)', + schema='plugin_livesync') + op.create_check_constraint('valid_subcontribution_entry', 'queues', + 'type != 4 OR (category_id IS NULL AND contribution_id IS NULL AND ' + 'event_id IS NULL AND subcontribution_id IS NOT NULL)', + schema='plugin_livesync') + + op.drop_constraint('ck_queues_valid_enum_type', 'queues', schema='plugin_livesync') + op.create_check_constraint('valid_enum_type', 'queues', + 'type IN (1, 2, 3, 4)', + schema='plugin_livesync') diff --git a/livesync/indico_livesync/models/queue.py b/livesync/indico_livesync/models/queue.py index 36b7cd3..a51fa1b 100644 --- a/livesync/indico_livesync/models/queue.py +++ b/livesync/indico_livesync/models/queue.py @@ -19,12 +19,14 @@ from __future__ import unicode_literals from werkzeug.datastructures import ImmutableDict from indico.core.db.sqlalchemy import db, UTCDateTime, PyIntEnum +from indico.modules.categories.models.categories import Category +from indico.modules.events.models.events import Event from indico.util.date_time import now_utc from indico.util.string import return_ascii, format_repr from indico.util.struct.enum import IndicoEnum from indico_livesync.models.agents import LiveSyncAgent -from indico_livesync.util import obj_deref, obj_ref +from indico_livesync.util import obj_deref class ChangeType(int, IndicoEnum): @@ -40,13 +42,15 @@ class EntryType(int, IndicoEnum): event = 2 contribution = 3 subcontribution = 4 + session = 5 _column_for_types = { EntryType.category: 'category_id', EntryType.event: 'event_id', EntryType.contribution: 'contribution_id', - EntryType.subcontribution: 'subcontribution_id' + EntryType.subcontribution: 'subcontribution_id', + EntryType.session: 'session_id' } @@ -113,7 +117,7 @@ class LiveSyncQueueEntry(db.Model): nullable=True ) - #: The event ID of the changed event/contribution/subcontribution + #: ID of the changed event event_id = db.Column( db.Integer, db.ForeignKey('events.events.id'), @@ -121,7 +125,7 @@ class LiveSyncQueueEntry(db.Model): nullable=True ) - #: The contribution ID of the changed contribution/subcontribution + #: ID of the changed contribution contrib_id = db.Column( 'contribution_id', db.Integer, @@ -130,7 +134,16 @@ class LiveSyncQueueEntry(db.Model): nullable=True ) - #: The subcontribution ID of the changed subcontribution + #: ID of the changed session + session_id = db.Column( + 'session_id', + db.Integer, + db.ForeignKey('events.sessions.id'), + index=True, + nullable=True + ) + + #: ID of the changed subcontribution subcontrib_id = db.Column( 'subcontribution_id', db.Integer, @@ -145,8 +158,28 @@ class LiveSyncQueueEntry(db.Model): backref=db.backref('queue', cascade='all, delete-orphan', lazy='dynamic') ) - event = db.relationship( + category = db.relationship( + 'Category', + lazy=True, + backref=db.backref( + 'livesync_queue_entries', + cascade='all, delete-orphan', + lazy=True + ) + ) + + event_new = db.relationship( 'Event', + lazy=True, + backref=db.backref( + 'livesync_queue_entries', + cascade='all, delete-orphan', + lazy=True + ) + ) + + session = db.relationship( + 'Session', lazy=False, backref=db.backref( 'livesync_queue_entries', @@ -175,63 +208,60 @@ class LiveSyncQueueEntry(db.Model): ) ) - @property - def category(self): - return CategoryManager().getById(str(self.category_id), True) - - @category.setter - def category(self, value): - self.category_id = int(value.id) if value is not None else None - @property def object(self): - """Returns the changed object""" - return obj_deref(self.object_ref) + """Return the changed object.""" + if self.type == EntryType.category: + return self.category + elif self.type == EntryType.event: + return self.event_new + elif self.type == EntryType.session: + return self.session + elif self.type == EntryType.contribution: + return self.contribution + elif self.type == EntryType.subcontribution: + return self.subcontribution @property def object_ref(self): - """Returns the reference of the changed object""" + """Return the reference of the changed object.""" return ImmutableDict(type=self.type, category_id=self.category_id, event_id=self.event_id, - contrib_id=self.contrib_id, subcontrib_id=self.subcontrib_id) + session_id=self.session_id, contrib_id=self.contrib_id, subcontrib_id=self.subcontrib_id) @return_ascii def __repr__(self): - ref_repr = '{}.{}.{}.{}'.format(self.category_id if self.category_id else 'x', - self.event_id if self.event_id else 'x', - self.contrib_id if self.contrib_id else 'x', - self.subcontrib_id if self.subcontrib_id else 'x') + ref_repr = '{}.{}.{}.{}.{}'.format(self.category_id if self.category_id else 'x', + self.event_id if self.event_id else 'x', + self.session_id if self.session_id else 'x', + self.contrib_id if self.contrib_id else 'x', + self.subcontrib_id if self.subcontrib_id else 'x') return format_repr(self, 'agent', 'id', 'type', 'change', _text=ref_repr) @classmethod - def create(cls, changes, ref): - """Creates a new change in all queues + def create(cls, changes, ref, excluded_categories=set()): + """Create a new change in all queues. :param changes: the change types, an iterable containing :class:`ChangeType` :param ref: the object reference (returned by `obj_ref`) of the changed object + :param excluded_categories: set of categories (IDs) whose items + will not be tracked """ ref = dict(ref) - for agent in LiveSyncAgent.find(): - for change in changes: + obj = obj_deref(ref) + + if isinstance(obj, Category): + if any(c.id in excluded_categories for c in obj.chain_query): + return + else: + event = obj if isinstance(obj, Event) else obj.event_new + if excluded_categories & set(event.category_chain): + return + + for change in changes: + for agent in LiveSyncAgent.find(): entry = cls(agent=agent, change=change, **ref) db.session.add(entry) + db.session.flush() - - def iter_subentries(self): - """Iterates through all children - - The only field of the yielded items that should be used are - `type`, `object` and `object_ref`. - """ - if self.type not in {EntryType.category, EntryType.event, EntryType.contribution}: - return - if self.type == EntryType.category: - for event in self.object.iterAllConferences(): - yield LiveSyncQueueEntry(change=self.change, **obj_ref(event)) - elif self.type == EntryType.event: - for contrib in self.object.contributions: - yield LiveSyncQueueEntry(change=self.change, **obj_ref(contrib)) - elif self.type == EntryType.contribution: - for subcontrib in self.object.subcontributions: - yield LiveSyncQueueEntry(change=self.change, **obj_ref(subcontrib)) diff --git a/livesync/indico_livesync/simplify.py b/livesync/indico_livesync/simplify.py index 4c7c4ee..bab97a6 100644 --- a/livesync/indico_livesync/simplify.py +++ b/livesync/indico_livesync/simplify.py @@ -16,10 +16,18 @@ from __future__ import unicode_literals +import itertools from collections import defaultdict -from indico.util.struct.enum import IndicoEnum +from sqlalchemy.orm import joinedload +from indico.core.db import db +from indico.modules.categories.models.categories import Category +from indico.modules.events.models.events import Event +from indico.modules.events.contributions.models.contributions import Contribution +from indico.modules.events.contributions.models.subcontributions import SubContribution + +from indico.util.struct.enum import IndicoEnum from indico_livesync.models.queue import ChangeType, EntryType @@ -36,6 +44,7 @@ def process_records(records): :return: a dict mapping object references to `SimpleChange` bitsets """ changes = defaultdict(int) + cascaded_records = set() for record in records: if record.change != ChangeType.deleted and record.object is None: @@ -44,23 +53,80 @@ def process_records(records): continue if record.change == ChangeType.created: assert record.type != EntryType.category - changes[record.object_ref] |= SimpleChange.created + changes[record.object] |= SimpleChange.created elif record.change == ChangeType.deleted: assert record.type != EntryType.category - changes[record.object_ref] |= SimpleChange.deleted + changes[record.object] |= SimpleChange.deleted elif record.change in {ChangeType.moved, ChangeType.protection_changed}: - for ref in _cascade(record): - changes[ref] |= SimpleChange.updated + cascaded_records.add(record) elif record.change == ChangeType.data_changed: assert record.type != EntryType.category - changes[record.object_ref] |= SimpleChange.updated + changes[record.object] |= SimpleChange.updated + + for obj in _process_cascaded(cascaded_records): + changes[obj] |= SimpleChange.updated return changes -def _cascade(record): - yield record.object_ref - for subrecord in record.iter_subentries(): - if subrecord.object: - for item in _cascade(subrecord): - yield item +def _process_cascaded(records): + category_prot_records = {rec.category_id for rec in records if rec.type == EntryType.category + and rec.change == ChangeType.protection_changed} + category_move_records = {rec.category_id for rec in records if rec.type == EntryType.category + and rec.change == ChangeType.moved} + + event_records = {rec.event_id for rec in records if rec.type == EntryType.event} + session_records = {rec.session_id for rec in records if rec.type == EntryType.session} + contribution_records = {rec.contribution_id for rec in records if rec.type == EntryType.contribution} + subcontribution_records = {rec.subcontribution_id for rec in records if rec.type == EntryType.subcontribution} + + changed_events = set() + changed_contributions = set() + changed_subcontributions = set() + + category_prot_records -= category_move_records # A move already implies sending the whole record + + # Protection changes are handled differently, as there may not be the need to re-generate the record + if category_prot_records: + for categ in Category.find(Category.id.in_(category_prot_records)): + cte = categ.get_protection_parent_cte() + # Update only children that inherit + inheriting_categ_children = (Event.query + .join(cte, db.and_((Event.category_id == cte.c.id), + (cte.c.protection_parent == categ.id)))) + inheriting_direct_children = Event.find((Event.category_id == categ.id) & Event.is_inheriting) + + changed_events.update(itertools.chain(inheriting_direct_children, inheriting_categ_children)) + + # Add move operations and explicitly-passed event records + if category_move_records: + changed_events.update(Event.find(Event.category_chain_overlaps(category_move_records))) + if event_records: + changed_events.update(Event.find(Event.id.in_(event_records))) + + for event in changed_events: + yield event + + # Sessions are added (explicitly changed only, since they don't need to be sent anywhere) + if session_records: + changed_contributions.update(Contribution + .find(Contribution.session_id.in_(session_records), ~Contribution.is_deleted)) + + # Contributions are added (implictly + explicitly changed) + changed_event_ids = {ev.id for ev in changed_events} + + condition = Contribution.event_id.in_(changed_event_ids) & ~Contribution.is_deleted + if contribution_records: + condition = db.or_(condition, Contribution.id.in_(contribution_records)) + contrib_query = Contribution.find(condition).options(joinedload('subcontributions')) + + for contribution in contrib_query: + yield contribution + changed_subcontributions.update(contribution.subcontributions) + + # Same for subcontributions + if subcontribution_records: + changed_subcontributions.update(SubContribution + .find(SubContribution.contribution_id.in_(subcontribution_records))) + for subcontrib in changed_subcontributions: + yield subcontrib diff --git a/livesync/indico_livesync/uploader.py b/livesync/indico_livesync/uploader.py index 4262e7b..471e7cf 100644 --- a/livesync/indico_livesync/uploader.py +++ b/livesync/indico_livesync/uploader.py @@ -87,7 +87,6 @@ class Uploader(object): self.logger.debug('Marking as processed: %s', record) record.processed = True db.session.commit() - transaction.abort() # clear ZEO cache class MARCXMLUploader(Uploader): diff --git a/livesync/indico_livesync/util.py b/livesync/indico_livesync/util.py index b60ba80..71897ce 100644 --- a/livesync/indico_livesync/util.py +++ b/livesync/indico_livesync/util.py @@ -20,12 +20,13 @@ from datetime import timedelta from werkzeug.datastructures import ImmutableDict +from indico.modules.categories.models.categories import Category from indico.modules.events import Event from indico.modules.events.contributions.models.contributions import Contribution +from indico.modules.events.sessions.models.sessions import Session from indico.modules.events.contributions.models.subcontributions import SubContribution from indico.util.caching import memoize_request from indico.util.date_time import now_utc -from MaKaC.conference import Conference def obj_ref(obj): @@ -35,8 +36,8 @@ def obj_ref(obj): ref = {'type': EntryType.category, 'category_id': obj.id} elif isinstance(obj, Event): ref = {'type': EntryType.event, 'event_id': obj.id} - elif isinstance(obj, Conference): - ref = {'type': EntryType.event, 'event_id': int(obj.id)} + elif isinstance(obj, Session): + ref = {'type': EntryType.session, 'session_id': obj.id} elif isinstance(obj, Contribution): ref = {'type': EntryType.contribution, 'contrib_id': obj.id} elif isinstance(obj, SubContribution): @@ -46,36 +47,24 @@ def obj_ref(obj): return ImmutableDict(ref) +@memoize_request def obj_deref(ref): """Returns the object identified by `ref`""" from indico_livesync.models.queue import EntryType if ref['type'] == EntryType.category: - return CategoryManager().getById(ref['category_id'], True) + return Category.get_one(ref['category_id']) elif ref['type'] == EntryType.event: - return Event.get(ref['event_id']) + return Event.get_one(ref['event_id']) + elif ref['type'] == EntryType.session: + return Session.get_one(ref['session_id']) elif ref['type'] == EntryType.contribution: - return Contribution.get(ref['contrib_id']) + return Contribution.get_one(ref['contrib_id']) elif ref['type'] == EntryType.subcontribution: - return SubContribution.get(ref['subcontrib_id']) + return SubContribution.get_one(ref['subcontrib_id']) else: raise ValueError('Unexpected object type: {}'.format(ref['type'])) -def make_compound_id(ref): - """Returns the compound ID for the referenced object""" - from indico_livesync.models.queue import EntryType - if ref['type'] == EntryType.category: - raise ValueError('Compound IDs are not supported for categories') - obj = obj_deref(ref) - if isinstance(obj, Event): - return unicode(obj.id) - elif isinstance(obj, Contribution): - return '{}.{}'.format(obj.event_id, obj.id) - elif isinstance(obj, SubContribution): - return '{}.{}.{}'.format(obj.contribution.event_id, obj.contribution_id, obj.id) - raise ValueError('Unexpected object type: {}'.format(ref['type'])) - - def clean_old_entries(): """Deletes obsolete entries from the queues""" from indico_livesync.plugin import LiveSyncPlugin @@ -91,25 +80,18 @@ def clean_old_entries(): @memoize_request def get_excluded_categories(): - """Get all excluded category IDs""" + """Get excluded category IDs.""" from indico_livesync.plugin import LiveSyncPlugin - todo = {x['id'] for x in LiveSyncPlugin.settings.get('excluded_categories')} - excluded = set() - while todo: - category_id = todo.pop() - try: - category = CategoryManager().getById(category_id) - except KeyError: - continue - excluded.add(category.getId()) - todo.update(category.subcategories) - return excluded + return {int(x['id']) for x in LiveSyncPlugin.settings.get('excluded_categories')} -def is_ref_excluded(ref): - from indico_livesync.models.queue import EntryType - if ref['type'] == EntryType.category: - return ref['category_id'] in get_excluded_categories() - else: - obj = obj_deref(ref) - return unicode(obj.event_new.category_id) in {unicode(x) for x in get_excluded_categories()} +def compound_id(obj): + """Generate a hierarchical compound ID, separated by dots.""" + if isinstance(obj, (Category, Session)): + raise TypeError('Compound IDs are not supported for this entry type') + elif isinstance(obj, Event): + return unicode(obj.id) + elif isinstance(obj, Contribution): + return '{}.{}'.format(obj.event_id, obj.id) + elif isinstance(obj, SubContribution): + return '{}.{}.{}'.format(obj.contribution.event_id, obj.contribution_id, obj.id) diff --git a/livesync_debug/indico_livesync_debug/backend.py b/livesync_debug/indico_livesync_debug/backend.py index 39d55b7..d25c244 100644 --- a/livesync_debug/indico_livesync_debug/backend.py +++ b/livesync_debug/indico_livesync_debug/backend.py @@ -20,7 +20,6 @@ from indico.util.console import cformat from indico.util.struct.iterables import grouper from indico_livesync import LiveSyncBackendBase, SimpleChange, MARCXMLGenerator, process_records, Uploader -from indico_livesync.util import obj_deref def _change_str(change): @@ -48,9 +47,8 @@ class LiveSyncDebugBackend(LiveSyncBackendBase): self._print() self._print(cformat('%{white!}Simplified/cascaded changes:%{reset}')) - for ref, change in process_records(records).iteritems(): - obj = obj_deref(ref) - self._print(cformat('%{white!}{}%{reset}: {}').format(_change_str(change), obj or ref)) + for obj, change in process_records(records).iteritems(): + self._print(cformat('%{white!}{}%{reset}: {}').format(_change_str(change), obj)) self._print() self._print(cformat('%{white!}Resulting MarcXML:%{reset}'))