diff --git a/storage_s3/indico_storage_s3/__init__.py b/storage_s3/indico_storage_s3/__init__.py new file mode 100644 index 0000000..d3785c5 --- /dev/null +++ b/storage_s3/indico_storage_s3/__init__.py @@ -0,0 +1,28 @@ +# This file is part of Indico. +# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN). +# +# Indico is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 3 of the +# License, or (at your option) any later version. +# +# Indico is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Indico; if not, see . + +from __future__ import unicode_literals + +from indico.core import signals +from indico.util.i18n import make_bound_gettext + + +_ = make_bound_gettext('storage_s3') + + +@signals.import_tasks.connect +def _import_tasks(sender, **kwargs): + import indico_storage_s3.task diff --git a/storage_s3/indico_storage_s3/blueprint.py b/storage_s3/indico_storage_s3/blueprint.py new file mode 100644 index 0000000..5d3b7ac --- /dev/null +++ b/storage_s3/indico_storage_s3/blueprint.py @@ -0,0 +1,25 @@ +# This file is part of Indico. +# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN). +# +# Indico is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 3 of the +# License, or (at your option) any later version. +# +# Indico is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Indico; if not, see . + +from __future__ import unicode_literals + +from indico.core.plugins import IndicoPluginBlueprint + +from indico_storage_s3.controllers import RHBuckets + + +blueprint = IndicoPluginBlueprint('storage_s3', __name__, url_prefix='/api/plugin/s3') +blueprint.add_url_rule('/buckets', 'buckets', RHBuckets) diff --git a/storage_s3/indico_storage_s3/controllers.py b/storage_s3/indico_storage_s3/controllers.py new file mode 100644 index 0000000..eaea835 --- /dev/null +++ b/storage_s3/indico_storage_s3/controllers.py @@ -0,0 +1,77 @@ +# This file is part of Indico. +# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN). +# +# Indico is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 3 of the +# License, or (at your option) any later version. +# +# Indico is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Indico; if not, see . + +from __future__ import unicode_literals + +from flask import current_app, jsonify, request +from werkzeug.exceptions import NotFound, Unauthorized + +from indico.core.config import config +from indico.core.db import db +from indico.core.storage import StoredFileMixin +from indico.core.storage.backend import ReadOnlyStorageMixin, get_storage +from indico.web.rh import RH + +from indico_storage_s3.storage import DynamicS3Storage, S3Storage, S3StorageBase + + +class RHBuckets(RH): + """Provide information on used S3 buckets""" + + def _check_access(self): + from indico_storage_s3.plugin import S3StoragePlugin + auth = request.authorization + if not S3StoragePlugin.settings.get('bucket_info_enabled'): + raise NotFound + username = S3StoragePlugin.settings.get('username') + password = S3StoragePlugin.settings.get('password') + if not auth or not auth.password or auth.username != username or auth.password != password: + response = current_app.response_class('Authorization required', 401, + {'WWW-Authenticate': 'Basic realm="Indico - S3 Buckets"'}) + raise Unauthorized(response=response) + + def _get_static_info(self, storage): + return { + 'bucket': storage.bucket_name, + 'dynamic': False, + } + + def _get_dynamic_info(self, backend_name, storage): + buckets = set() + for model in StoredFileMixin.__subclasses__(): + query = (db.session.query(db.func.split_part(model.storage_file_id, '//', 1).distinct()) + .filter(model.storage_file_id.isnot(None), model.storage_backend == backend_name)) + buckets.update(bucket for bucket, in query) + + return { + 'buckets': sorted(buckets), + 'dynamic': True, + 'template': storage.bucket_name_template, + } + + def _process(self): + data = {} + for key in config.STORAGE_BACKENDS: + storage = get_storage(key) + if not isinstance(storage, S3StorageBase): + continue + readonly = isinstance(storage, ReadOnlyStorageMixin) + data[key] = {'readonly': readonly, 'meta': storage.meta} + if isinstance(storage, S3Storage): + data[key].update(self._get_static_info(storage)) + elif isinstance(storage, DynamicS3Storage): + data[key].update(self._get_dynamic_info(key, storage)) + return jsonify(data) diff --git a/storage_s3/indico_storage_s3/migrate.py b/storage_s3/indico_storage_s3/migrate.py new file mode 100644 index 0000000..b7a0d07 --- /dev/null +++ b/storage_s3/indico_storage_s3/migrate.py @@ -0,0 +1,513 @@ +# This file is part of Indico. +# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN). +# +# Indico is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 3 of the +# License, or (at your option) any later version. +# +# Indico is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Indico; if not, see . + +from __future__ import unicode_literals + +import errno +import json +import os +import re +import stat +import subprocess +import tempfile +from collections import defaultdict +from contextlib import contextmanager +from datetime import datetime +from operator import itemgetter + +import boto3 +import click +from botocore.exceptions import ClientError +from flask.cli import with_appcontext +from jinja2.filters import do_filesizeformat +from sqlalchemy import inspect +from sqlalchemy.orm import joinedload, lazyload, load_only +from sqlalchemy.sql.elements import Tuple + +from indico.cli.core import cli_group +from indico.core.config import config +from indico.core.db import db +from indico.core.storage import StoredFileMixin +from indico.core.storage.backend import FileSystemStorage, get_storage +from indico.modules.designer.models.images import DesignerImageFile +from indico.modules.events.abstracts.models.files import AbstractFile +from indico.modules.events.layout.models.images import ImageFile +from indico.modules.events.papers.models.files import PaperFile +from indico.modules.events.papers.models.templates import PaperTemplate +from indico.modules.events.registration.models.registrations import RegistrationData +from indico.modules.events.static.models.static import StaticSite, StaticSiteState +from indico.util.console import cformat +from indico.util.date_time import format_human_timedelta +from indico.util.fs import secure_filename +from indico.util.string import crc32 +from indico.util.struct.iterables import committing_iterator + +from indico_storage_s3.storage import S3StorageBase + + +click.disable_unicode_literals_warning = True + +SPECIAL_FILTERS = { + StaticSite: {'state': StaticSiteState.success} +} + +FILE_DT_MAP = { + AbstractFile: lambda obj: obj.abstract.submitted_dt, + DesignerImageFile: lambda obj: obj.template.event.created_dt if obj.template.event else None, + ImageFile: lambda obj: obj.event.created_dt, + PaperFile: lambda obj: obj.paper_revision.submitted_dt, + PaperTemplate: lambda obj: obj.event.created_dt, + RegistrationData: lambda obj: obj.registration.submitted_dt, + StaticSite: lambda obj: obj.event.created_dt +} + +QUERY_OPTIONS = { + AbstractFile: joinedload('abstract').load_only('submitted_dt'), + DesignerImageFile: joinedload('template').joinedload('event').load_only('created_dt'), + ImageFile: joinedload('event').load_only('created_dt'), + PaperFile: joinedload('paper_revision').load_only('submitted_dt'), + PaperTemplate: joinedload('event').load_only('created_dt'), + RegistrationData: joinedload('registration').load_only('submitted_dt'), + StaticSite: joinedload('event').load_only('created_dt'), +} + + +def rmlinktree(path): + # based on shutil.rmtree but only delete symlinks and directories + names = os.listdir(path) + for name in names: + fullname = os.path.join(path, name) + mode = os.lstat(fullname).st_mode + if stat.S_ISDIR(mode): + rmlinktree(fullname) + elif stat.S_ISLNK(mode): + os.remove(fullname) + else: + raise Exception('Tried to delete {} (not a directory/symlink)'.format(fullname)) + os.rmdir(path) + + +class S3Importer(object): + def __init__(self, get_bucket_name, static_bucket_name, output_file, source_backend_names, rclone_remote, + s3_endpoint, s3_profile, s3_bucket_policy): + self.get_bucket_name = get_bucket_name + self.static_bucket_name = static_bucket_name + self.source_backend_names = source_backend_names + self.output_file = output_file + self.s3_endpoint = s3_endpoint + self.s3_bucket_policy = s3_bucket_policy + self.buckets = {} + self.s3 = boto3.session.Session(profile_name=s3_profile).resource('s3', endpoint_url=s3_endpoint) + self.s3_client = boto3.session.Session(profile_name=s3_profile).client('s3', endpoint_url=s3_endpoint) + self.rclone_remote = rclone_remote + self.rclone_queue = {} + self.used_storage_paths = set() + + def query_chunked(self, model, chunk_size): + pks = inspect(model).primary_key + query = base_query = self.make_query(model).order_by(*pks) + while True: + row = None + for row in query.limit(chunk_size): + yield row + if row is None: + # no rows in the query + break + query = base_query.filter(Tuple(*pks) > inspect(row).identity) + + def make_query(self, model): + cols = ['storage_backend', 'storage_file_id', 'md5'] + if model.add_file_date_column: + cols.append('created_dt') + opts = QUERY_OPTIONS.get(model) + return (model.query + .filter(model.storage_file_id.isnot(None), model.storage_backend.in_(self.source_backend_names)) + .filter_by(**SPECIAL_FILTERS.get(model, {})) + .options(*((opts,) if opts else ())) + .options(lazyload('*'), load_only(*cols))) + + def run(self): + models = {model: self.make_query(model).count() for model in StoredFileMixin.__subclasses__()} + models = {model: total for model, total in models.iteritems() if total} + labels = {model: cformat('Processing %{blue!}{}%{reset} (%{cyan}{}%{reset} rows)').format(model.__name__, total) + for model, total in models.iteritems()} + max_length = max(len(x) for x in labels.itervalues()) + labels = {model: label.ljust(max_length) for model, label in labels.iteritems()} + for model, total in sorted(models.items(), key=itemgetter(1)): + with click.progressbar(self.query_chunked(model, 1000), length=total, label=labels[model], + show_percent=True, show_pos=True) as objects: + for obj in self.flush_rclone_iterator(objects, 1000): + try: + self.process_obj(obj) + except Exception as exc: + click.echo(cformat('\n%{red!}Error processing %{reset}%{yellow}{}%{red!}: %{reset}%{yellow!}{}') + .format(obj, exc)) + click.secho('All done!', fg='green') + click.echo('Add the following entries to your STORAGE_BACKENDS:') + for bucket, data in sorted(self.buckets.viewitems(), key=itemgetter(0)): + click.echo("'{}': 's3-readonly:host={},bucket={}',".format( + data['backend'], self.s3_endpoint.replace('https://', ''), bucket)) + + def process_obj(self, obj): + new_storage_path, new_filename = self.build_storage_path(obj) + if new_storage_path in self.used_storage_paths: + raise Exception('Non-unique storage path: {}'.format(new_storage_path)) + self.used_storage_paths.add(new_storage_path) + bucket_name, backend = self.get_bucket_name(self.get_object_dt(obj)) + bucket_info = self.get_bucket_info(bucket_name, backend, create=(not self.rclone_remote)) + assert backend == bucket_info['backend'] + if new_storage_path not in bucket_info['initial_keys']: + if self.rclone_remote: + self.queue_for_rclone(obj, bucket_name, new_storage_path) + else: + with obj.open() as f: + content_md5 = obj.md5.decode('hex').encode('base64').strip() + self.s3_client.put_object(Body=f, Bucket=bucket_name, Key=new_storage_path, + ContentType=obj.content_type, ContentMD5=content_md5) + self.emit_update(obj, backend, new_storage_path, new_filename) + + def build_storage_path(self, obj, _new_path_re=re.compile(r'^(event/\d+/registrations/\d+/\d+/\d+-)(\d+)(-.*)$')): + old_filename = obj.filename + new_filename = secure_filename(obj.filename, None) + assert new_filename + obj.filename = new_filename + new_storage_path = obj._build_storage_path()[1] + obj.filename = old_filename + assert new_storage_path + if not isinstance(obj, RegistrationData): + return new_storage_path, new_filename + match = _new_path_re.match(obj.storage_file_id) + if match: + # already in the current format + assert obj.storage_file_id == new_storage_path.replace('-0-', '-{}-'.format(match.group(2))) + return obj.storage_file_id, new_filename + else: + match = _new_path_re.match(new_storage_path) + return '{}{}{}'.format(match.group(1), crc32(obj.storage_file_id), match.group(3)), new_filename + + def queue_for_rclone(self, obj, bucket, key): + # XXX: we assume the file is local so the context manager doesn't create a + # temporary file which becomes invalid afterwards + with obj.get_local_path() as file_path: + pass + while not os.path.exists(file_path): + raw_input(cformat('\n%{red}File not found on disk: %{yellow}{}').format(file_path)) + try: + queue_entry = self.rclone_queue[bucket] + except KeyError: + tmpdir = tempfile.mkdtemp(prefix='indico-s3-import-') + self.rclone_queue[bucket] = queue_entry = { + 'path': tmpdir, + 'files': 0, + 'bytes': 0, + } + fskey = os.path.join(queue_entry['path'], key) + fsdir = os.path.dirname(fskey) + try: + os.makedirs(fsdir) + except OSError as exc: + if exc.errno != errno.EEXIST: + raise + os.symlink(file_path, fskey) + queue_entry['files'] += 1 + queue_entry['bytes'] += obj.size + + def flush_rclone_iterator(self, iterable, n=100): + for i, data in enumerate(iterable, 1): + yield data + if i % n == 0: + self.flush_rclone() + db.session.rollback() # reduce memory usage + self.flush_rclone() + + def flush_rclone(self): + if not self.rclone_remote or not self.rclone_queue: + return + click.echo() + for name, data in self.buckets.viewitems(): + if not data['exists']: + self.create_bucket(name) + data['exists'] = True + for bucket, data in self.rclone_queue.viewitems(): + click.echo(cformat('Copying %{cyan}{}%{reset} files (%{cyan}{}%{reset}) to %{cyan}{}%{reset} via rclone') + .format(data['files'], do_filesizeformat(data['bytes']), bucket)) + start = datetime.now() + try: + subprocess.check_call([ + 'rclone', 'copy', '--copy-links', + data['path'], '{}:{}'.format(self.rclone_remote, bucket) + ]) + except subprocess.CalledProcessError: + click.secho('\nError while running rclone', fg='red') + raise + duration = (datetime.now() - start) + click.echo('...finished after {}'.format(format_human_timedelta(duration, 'minutes', narrow=True))) + rmlinktree(data['path']) + self.rclone_queue.clear() + + def bucket_exists(self, name): + try: + self.s3_client.head_bucket(Bucket=name) + return True + except ClientError as exc: + if int(exc.response['Error']['Code']) == 404: + return False + raise + + def create_bucket(self, name): + click.echo(cformat('Creating bucket %{green}{}%{reset}').format(name)) + self.s3_client.create_bucket(Bucket=name) + if self.s3_bucket_policy: + self.s3_client.put_bucket_policy(Bucket=name, Policy=self.s3_bucket_policy) + + def get_bucket_info(self, bucket_name, backend, create=False): + try: + return self.buckets[bucket_name] + except KeyError: + click.echo(cformat('\nChecking bucket %{yellow}{}%{reset}').format(bucket_name)) + exists = True + if not self.bucket_exists(bucket_name): + if create: + click.echo() + self.create_bucket(bucket_name) + else: + exists = False + + data = {'backend': backend, 'exists': exists} + if exists: + bucket = self.s3.Bucket(bucket_name) + data['initial_keys'] = {f.key for f in bucket.objects.all()} + else: + data['initial_keys'] = set() + self.buckets[bucket_name] = data + return data + + def get_object_dt(self, obj): + cls = type(obj) + if cls.add_file_date_column: + return obj.created_dt + fn = FILE_DT_MAP.get(cls) or (lambda _: datetime.now()) + return fn(obj) + + def emit_update(self, obj, backend, file_id, filename): + data = {'m': type(obj).__name__, 'pk': inspect(obj).identity} + if obj.storage_backend != backend: + data.update({ + 'ob': obj.storage_backend, + 'nb': backend, + }) + if obj.storage_file_id != file_id: + data.update({ + 'of': obj.storage_file_id, + 'nf': file_id, + }) + if obj.filename != filename: + data.update({ + 'on': obj.filename, + 'nn': filename, + }) + json.dump(data, self.output_file) + self.output_file.write('\n') + + +def apply_changes(data_file, revert=False): + mapping = { + 'pk': 'pk', + 'ob': 'old_storage_backend', + 'nb': 'new_storage_backend', + 'of': 'old_storage_file_id', + 'nf': 'new_storage_file_id', + 'on': 'old_filename', + 'nn': 'new_filename', + } + click.echo('Parsing data...') + data = defaultdict(list) + for line in data_file: + line_data = json.loads(line) + converted = {mapping[k]: v for k, v in line_data.viewitems() if k in mapping} + data[line_data['m']].append(converted) + + models = {model: len(data[model.__name__]) + for model in StoredFileMixin.__subclasses__() + if model.__name__ in data and len(data[model.__name__])} + labels = {model: cformat('Processing %{blue!}{}%{reset} (%{cyan}{}%{reset} rows)').format(model.__name__, total) + for model, total in models.iteritems()} + max_length = max(len(x) for x in labels.itervalues()) + labels = {model: label.ljust(max_length) for model, label in labels.iteritems()} + for model, total in sorted(models.items(), key=itemgetter(1)): + pks = inspect(model).primary_key + with click.progressbar(data[model.__name__], length=total, label=labels[model], + show_percent=True, show_pos=True) as entries: + for entry in committing_iterator(entries, 1000): + updates = {} + key = 'old' if revert else 'new' + if key + '_storage_backend' in entry: + updates[model.storage_backend] = entry[key + '_storage_backend'] + if key + '_storage_file_id' in entry: + updates[model.storage_file_id] = entry[key + '_storage_file_id'] + if key + '_filename' in entry: + updates[model.filename] = entry[key + '_filename'] + model.query.filter(Tuple(*pks) == entry['pk']).update(updates, synchronize_session=False) + + +@contextmanager +def monkeypatch_registration_file_time(): + # RegistrationData objects have the current timestamp in their + # storage_file_id to ensure uniqueness in case lf re-upload, but + # here we want reliable filenames + from indico.modules.events.registration.models import registrations + + class FakeTime(object): + def time(self): + return 0 + + orig_time = registrations.time + registrations.time = FakeTime() + yield + registrations.time = orig_time + + +@cli_group() +@with_appcontext +def cli(): + """Migrate data to S3. + + Use the `copy` subcommand to copy data to S3. This can be done + safely while Indico is running. At the end it will show you what + you need to add to your `indico.conf`. + + Once you updated your config with the new storage backends, you + can use the `apply` subcommand to update your database so files + will actually be loaded using the new S3 storage backends. + + In case you ever need to switch back to your previous storage, + you can use `revert` to undo the database changes. + """ + if config.DB_LOG: + click.secho('Warning: The database logger is currently enabled (DB_LOG = True).\n' + 'This will slow down the migration. Unless you database is very small, please disable it.', + fg='yellow') + click.confirm('Continue anyway?', abort=True) + + +@cli.command() +@click.option('-s', '--source-backend', 'source_backend_names', metavar='NAME', multiple=True, + help='Storage backend names from which to copy files. If omitted, all non-S3 backends are used.') +@click.option('-B', '--bucket-name', 'bucket_names', metavar='NAME', required=True, multiple=True, + help='Bucket(s) to copy files to') +@click.option('-S', '--static-bucket-name', metavar='NAME', required=True, + help='Bucket to copy static site packages to') +@click.option('-e', '--s3-endpoint', metavar='URL', help='Custom S3 endpoint, e.g. https://s3.example.com') +@click.option('-p', '--s3-profile', metavar='NAME', help='S3 profile to use when loading credentials') +@click.option('-P', '--s3-bucket-policy', 's3_bucket_policy_file', metavar='PATH', type=click.File(), + help='Bucket policy file to use when creating buckets') +@click.option('-r', '--rclone', metavar='NAME', + help='If set, use rclone to copy files which is usually faster. The value of this option is the name of ' + 'the rclone remote used to copy files to S3.') +@click.argument('output', metavar='PATH', type=click.File('wb')) +def copy(source_backend_names, bucket_names, static_bucket_name, s3_endpoint, s3_profile, s3_bucket_policy_file, + rclone, output): + """Copy files to S3. + + This command copies files to S3 and records the necessary database changes + in a JSONL file. + + Multiple bucket names can be specified; in that case the bucket name can change + based on the year a file was created in. The last bucket name will be the default, + while any other bucket name must include a conditional indicating when to use it: + + \b + -B '<2001:indico-pre-2001' + -B '<2009:indico-' + -B 'indico--' + + The static bucket name cannot contain any placeholders. + + The indico storage backend will get the same name as the bucket by default, + but this can be overridden, e.g. `-B 'indico-/s3-'` would name + the bucket 'indico-2018' but use a backend named 's3-2018'. It is your + responsibility to ensure that placeholders match between the two names. + + S3 credentials should be specified in the usual places, i.e. + `~/.aws/credentials` for regular S3 access and `~/.config/rclone/rclone.conf` + when using rclone. + """ + bucket_names = [tuple(x.split('/', 1)) if '/' in x else (x, x.split(':', 1)[-1]) for x in bucket_names] + if ':' in bucket_names[-1][0]: + raise click.UsageError('Last bucket name cannot contain criteria') + if not all(':' in x[0] for x in bucket_names[:-1]): + raise click.UsageError('All but the last bucket name need to contain criteria') + matches = [(re.match(r'^(<|>|==|<=|>=)\s*(\d{4}):(.+)$', name), backend) for name, backend in bucket_names[:-1]] + if not all(x[0] for x in matches): + raise click.UsageError("Could not parse '{}'".format(bucket_names[matches.index(None)])) + criteria = [(match.groups(), backend) for match, backend in matches] + # Build and compile a function to get the bucket/backend name to avoid + # processing the criteria for every single file (can be millions for large + # instances) + code = ['def get_bucket_name(dt):'] + if criteria: + for i, ((op, value, bucket), backend) in enumerate(criteria): + code.append(' {}if dt.year {} {}:'.format('el' if i else '', op, value)) + code.append(' bucket, backend = {!r}'.format((bucket, backend))) + code.append(' else:') + code.append(' bucket, backend = {!r}'.format(bucket_names[-1])) + else: + code.append(' bucket, backend = {!r}'.format(bucket_names[-1])) + code.append(' bucket = bucket.replace("", dt.strftime("%Y"))') + code.append(' bucket = bucket.replace("", dt.strftime("%m"))') + code.append(' bucket = bucket.replace("", dt.strftime("%W"))') + code.append(' backend = backend.replace("", dt.strftime("%Y"))') + code.append(' backend = backend.replace("", dt.strftime("%m"))') + code.append(' backend = backend.replace("", dt.strftime("%W"))') + code.append(' return bucket, backend') + d = {} + exec '\n'.join(code) in d + if not source_backend_names: + source_backend_names = [x for x in config.STORAGE_BACKENDS if not isinstance(get_storage(x), S3StorageBase)] + if rclone: + invalid = [x for x in source_backend_names if not isinstance(get_storage(x), FileSystemStorage)] + if invalid: + click.secho('Found unsupported storage backends: {}'.format(', '.join(sorted(invalid))), fg='yellow') + click.secho('The backends might not work together with `--rclone`', fg='yellow') + click.confirm('Continue anyway?', abort=True) + s3_bucket_policy = s3_bucket_policy_file.read() if s3_bucket_policy_file else None + imp = S3Importer(d['get_bucket_name'], static_bucket_name, + output, source_backend_names, rclone, + s3_endpoint, s3_profile, s3_bucket_policy) + with monkeypatch_registration_file_time(): + imp.run() + + +@cli.command() +@click.argument('data', metavar='PATH', type=click.File('rb')) +def apply(data): + """Apply DB updates. + + This command updates the DB with the changes recorded while + running the `copy` command. + """ + apply_changes(data) + + +@cli.command() +@click.argument('data', metavar='PATH', type=click.File('rb')) +def revert(data): + """Revert DB updates. + + This command reverts the changes made to the database by the + `apply` command. + """ + apply_changes(data, revert=True) diff --git a/storage_s3/indico_storage_s3/plugin.py b/storage_s3/indico_storage_s3/plugin.py new file mode 100644 index 0000000..b0077e9 --- /dev/null +++ b/storage_s3/indico_storage_s3/plugin.py @@ -0,0 +1,124 @@ +# This file is part of Indico. +# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN). +# +# Indico is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 3 of the +# License, or (at your option) any later version. +# +# Indico is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Indico; if not, see . + +from __future__ import unicode_literals + +import sys + +import click +from markupsafe import Markup +from wtforms.fields import BooleanField, StringField +from wtforms.validators import DataRequired + +from indico.cli.core import cli_group +from indico.core import signals +from indico.core.config import config +from indico.core.plugins import IndicoPlugin, url_for_plugin +from indico.core.storage.backend import ReadOnlyStorageMixin, get_storage +from indico.web.forms.base import IndicoForm +from indico.web.forms.fields import IndicoPasswordField +from indico.web.forms.validators import HiddenUnless +from indico.web.forms.widgets import SwitchWidget + +from indico_storage_s3 import _ +from indico_storage_s3.blueprint import blueprint +from indico_storage_s3.migrate import cli as migrate_cli +from indico_storage_s3.storage import (DynamicS3Storage, ReadOnlyDynamicS3Storage, ReadOnlyS3Storage, S3Storage, + S3StorageBase) + + +class SettingsForm(IndicoForm): + bucket_info_enabled = BooleanField(_("Bucket info API"), widget=SwitchWidget()) + username = StringField(_("Username"), [HiddenUnless('bucket_info_enabled', preserve_data=True), DataRequired()], + description=_("The username to access the S3 bucket info endpoint")) + password = IndicoPasswordField(_('Password'), + [HiddenUnless('bucket_info_enabled', preserve_data=True), DataRequired()], + toggle=True, + description=_("The password to access the S3 bucket info endpoint")) + + def __init__(self, *args, **kwargs): + super(SettingsForm, self).__init__(*args, **kwargs) + url = Markup('{}').format(url_for_plugin('storage_s3.buckets')) + self.bucket_info_enabled.description = _("Enables an API on {url} that returns information on all S3 buckets " + "currently in use, including dynamically-named ones.").format(url=url) + + +class S3StoragePlugin(IndicoPlugin): + """S3 Storage + + Provides S3 storage backends. + """ + + configurable = True + settings_form = SettingsForm + default_settings = { + 'bucket_info_enabled': False, + 'username': '', + 'password': '' + } + + def init(self): + super(S3StoragePlugin, self).init() + self.connect(signals.get_storage_backends, self._get_storage_backends) + self.connect(signals.plugin.cli, self._extend_indico_cli) + + def _get_storage_backends(self, sender, **kwargs): + yield S3Storage + yield DynamicS3Storage + yield ReadOnlyS3Storage + yield ReadOnlyDynamicS3Storage + + def get_blueprints(self): + return blueprint + + def _extend_indico_cli(self, sender, **kwargs): + @cli_group() + def s3(): + """Manage S3 storage.""" + + @s3.command() + @click.option('--storage', default=None, metavar='NAME', help='Storage backend to create bucket for') + def create_bucket(storage): + """Create s3 storage bucket.""" + storages = [storage] if storage else config.STORAGE_BACKENDS + for key in storages: + try: + storage_instance = get_storage(key) + except RuntimeError: + if storage: + click.echo('Storage {} does not exist'.format(key)) + sys.exit(1) + continue + + if isinstance(storage_instance, ReadOnlyStorageMixin): + if storage: + click.echo('Storage {} is read-only'.format(key)) + sys.exit(1) + continue + + if isinstance(storage_instance, S3StorageBase): + bucket_name = storage_instance._get_current_bucket_name() + if storage_instance._bucket_exists(bucket_name): + click.echo('Storage {}: bucket {} already exists'.format(key, bucket_name)) + continue + storage_instance._create_bucket(bucket_name) + click.echo('Storage {}: bucket {} created'.format(key, bucket_name)) + elif storage: + click.echo('Storage {} is not an s3 storage'.format(key)) + sys.exit(1) + + s3.add_command(migrate_cli, name='migrate') + return s3 diff --git a/storage_s3/indico_storage_s3/storage.py b/storage_s3/indico_storage_s3/storage.py new file mode 100644 index 0000000..bc7ad5e --- /dev/null +++ b/storage_s3/indico_storage_s3/storage.py @@ -0,0 +1,255 @@ +# This file is part of Indico. +# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN). +# +# Indico is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 3 of the +# License, or (at your option) any later version. +# +# Indico is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Indico; if not, see . + +from __future__ import unicode_literals + +import hashlib +import hmac +import sys +import threading +from contextlib import contextmanager +from datetime import date +from io import BytesIO +from tempfile import NamedTemporaryFile + +import boto3 +from botocore.config import Config +from botocore.exceptions import ClientError +from werkzeug.datastructures import Headers +from werkzeug.utils import cached_property, redirect + +from indico.core.config import config +from indico.core.storage import Storage, StorageError +from indico.core.storage.backend import ReadOnlyStorageMixin, StorageReadOnlyError +from indico.util.fs import get_file_checksum +from indico.util.string import return_ascii +from indico.web.flask.util import send_file + + +s3_session_cache = threading.local() + + +class S3StorageBase(Storage): + simple_data = False + + def __init__(self, data): + self.parsed_data = data = self._parse_data(data) + self.endpoint_url = data.get('host') + if self.endpoint_url and '://' not in self.endpoint_url: + self.endpoint_url = 'https://' + self.endpoint_url + self.session_kwargs = {} + self.client_kwargs = {} + if 'profile' in data: + self.session_kwargs['profile_name'] = data['profile'] + if 'access_key' in data: + self.session_kwargs['aws_access_key_id'] = data['access_key'] + if 'secret_key' in data: + self.session_kwargs['aws_secret_access_key'] = data['secret_key'] + if 'addressing_style' in data: + self.client_kwargs['config'] = Config(s3={'addressing_style': data['addressing_style']}) + self.bucket_policy_file = data.get('bucket_policy_file') + self.bucket_versioning = data.get('bucket_versioning') in ('1', 'true', 'yes') + self.proxy_downloads = data.get('proxy') in ('1', 'true', 'yes') + self.meta = data.get('meta') + + @cached_property + def session(self): + key = '__'.join('{}_{}'.format(k, v) for k, v in sorted(self.session_kwargs.viewitems())) + try: + return getattr(s3_session_cache, key) + except AttributeError: + session = boto3.session.Session(**self.session_kwargs) + setattr(s3_session_cache, key, session) + return session + + @cached_property + def client(self): + return self.session.client('s3', endpoint_url=self.endpoint_url, **self.client_kwargs) + + def _get_current_bucket_name(self): + raise NotImplementedError + + def _parse_file_id(self, file_id): + raise NotImplementedError + + def open(self, file_id): + bucket, id_ = self._parse_file_id(file_id) + try: + s3_object = self.client.get_object(Bucket=bucket, Key=id_)['Body'] + return BytesIO(s3_object.read()) + except Exception as e: + raise StorageError('Could not open "{}": {}'.format(file_id, e)), None, sys.exc_info()[2] + + @contextmanager + def get_local_path(self, file_id): + with NamedTemporaryFile(suffix='indico.s3', dir=config.TEMP_DIR) as tmpfile: + self._copy_file(self.open(file_id), tmpfile) + tmpfile.flush() + yield tmpfile.name + + def _save(self, bucket, name, content_type, fileobj): + fileobj = self._ensure_fileobj(fileobj) + checksum = get_file_checksum(fileobj) + fileobj.seek(0) + content_md5 = checksum.decode('hex').encode('base64').strip() + self.client.put_object(Body=fileobj, Bucket=bucket, Key=name, + ContentType=content_type, ContentMD5=content_md5) + return checksum + + def delete(self, file_id): + bucket, id_ = self._parse_file_id(file_id) + try: + self.client.delete_object(bucket, id_) + except Exception as e: + raise StorageError('Could not delete "{}": {}'.format(file_id, e)), None, sys.exc_info()[2] + + def getsize(self, file_id): + bucket, id_ = self._parse_file_id(file_id) + try: + return self.client.head_object(Bucket=bucket, Key=id_)['ContentLength'] + except Exception as e: + raise StorageError('Could not get size of "{}": {}'.format(file_id, e)), None, sys.exc_info()[2] + + def send_file(self, file_id, content_type, filename, inline=True): + if self.proxy_downloads: + return send_file(filename, self.open(file_id), content_type, inline=inline) + + try: + bucket, id_ = self._parse_file_id(file_id) + content_disp = 'inline' if inline else 'attachment' + h = Headers() + h.add('Content-Disposition', content_disp, filename=filename) + url = self.client.generate_presigned_url('get_object', + Params={'Bucket': bucket, + 'Key': id_, + 'ResponseContentDisposition': h.get('Content-Disposition'), + 'ResponseContentType': content_type}, + ExpiresIn=120) + return redirect(url) + except Exception as e: + raise StorageError('Could not send file "{}": {}'.format(file_id, e)), None, sys.exc_info()[2] + + def _create_bucket(self, name): + from indico_storage_s3.plugin import S3StoragePlugin + + if self._bucket_exists(name): + S3StoragePlugin.logger.info('Bucket %s already exists', name) + return + self.client.create_bucket(Bucket=name) + S3StoragePlugin.logger.info('New bucket created: %s', name) + if self.bucket_versioning: + self.client.put_bucket_versioning(Bucket=name, VersioningConfiguration={'Status': 'Enabled'}) + if self.bucket_policy_file: + with open(self.bucket_policy_file) as f: + policy = f.read() + self.client.put_bucket_policy(Bucket=name, Policy=policy) + + def _bucket_exists(self, name): + try: + self.client.head_bucket(Bucket=name) + return True + except ClientError as exc: + if int(exc.response['Error']['Code']) == 404: + return False + raise + + +class S3Storage(S3StorageBase): + name = 's3' + + def __init__(self, data): + super(S3Storage, self).__init__(data) + self.bucket_name = self.parsed_data['bucket'] + if len(self.bucket_name) > 63: + raise StorageError('Bucket name cannot be longer than 63 chars') + + @return_ascii + def __repr__(self): + return '<{}: {}>'.format(type(self).__name__, self.bucket_name) + + def _get_current_bucket_name(self): + return self.bucket_name + + def _parse_file_id(self, file_id): + return self.bucket_name, file_id + + def save(self, name, content_type, filename, fileobj): + try: + bucket = self._get_current_bucket_name() + checksum = self._save(bucket, name, content_type, fileobj) + return name, checksum + except Exception as e: + raise StorageError('Could not save "{}": {}'.format(name, e)), None, sys.exc_info()[2] + + +class DynamicS3Storage(S3StorageBase): + name = 's3-dynamic' + + def __init__(self, data): + super(DynamicS3Storage, self).__init__(data) + self.bucket_name_template = self.parsed_data['bucket_template'] + self.bucket_secret = (self.parsed_data.get('bucket_secret', '') or + self.session._session.get_scoped_config().get('indico_bucket_secret', '')) + if not any(x in self.bucket_name_template for x in ('', '', '')): + raise StorageError('At least one date placeholder is required when using dynamic bucket names') + if not self.bucket_secret: + raise StorageError('A bucket secret is required when using dynamic bucket names') + if len(self._replace_bucket_placeholders(self.bucket_name_template, date.today())) > 46: + raise StorageError('Bucket name cannot be longer than 46 chars (to keep at least 16 hash chars)') + + @return_ascii + def __repr__(self): + return '<{}: {}>'.format(type(self).__name__, self.bucket_name_template) + + def _parse_file_id(self, file_id): + return file_id.split('//', 1) + + def _get_current_bucket_name(self): + return self._get_bucket_name(date.today()) + + def _get_bucket_name(self, date): + name = self._replace_bucket_placeholders(self.bucket_name_template, date) + token = hmac.new(self.bucket_secret.encode('utf-8'), name, hashlib.md5).hexdigest() + return '{}-{}'.format(name, token)[:63] + + def _replace_bucket_placeholders(self, name, date): + name = name.replace('', date.strftime('%Y')) + name = name.replace('', date.strftime('%m')) + name = name.replace('', date.strftime('%W')) + return name + + def save(self, name, content_type, filename, fileobj): + try: + bucket = self._get_current_bucket_name() + checksum = self._save(bucket, name, content_type, fileobj) + file_id = '{}//{}'.format(bucket, name) + return file_id, checksum + except Exception as e: + raise StorageError('Could not save "{}": {}'.format(name, e)), None, sys.exc_info()[2] + + +class ReadOnlyS3Storage(ReadOnlyStorageMixin, S3Storage): + name = 's3-readonly' + + def _create_bucket(self, name): + raise StorageReadOnlyError('Cannot write to read-only storage') + + +class ReadOnlyDynamicS3Storage(ReadOnlyStorageMixin, DynamicS3Storage): + name = 's3-dynamic-readonly' + + def _create_bucket(self, name): + raise StorageReadOnlyError('Cannot write to read-only storage') diff --git a/storage_s3/indico_storage_s3/task.py b/storage_s3/indico_storage_s3/task.py new file mode 100644 index 0000000..b4fa416 --- /dev/null +++ b/storage_s3/indico_storage_s3/task.py @@ -0,0 +1,53 @@ +# This file is part of Indico. +# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN). +# +# Indico is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 3 of the +# License, or (at your option) any later version. +# +# Indico is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Indico; if not, see . + +from __future__ import unicode_literals + +import re +from datetime import date + +from celery.schedules import crontab +from dateutil.relativedelta import relativedelta + +from indico.core.celery import celery +from indico.core.config import config +from indico.core.storage.backend import ReadOnlyStorageMixin, get_storage + +from indico_storage_s3.plugin import DynamicS3Storage + + +@celery.periodic_task(run_every=crontab(minute=0, hour=1)) +def create_bucket(): + for key in config.STORAGE_BACKENDS: + storage = get_storage(key) + if not isinstance(storage, DynamicS3Storage) or isinstance(storage, ReadOnlyStorageMixin): + continue + today = date.today() + placeholders = set(re.findall('<.*?>', storage.bucket_name_template)) + if not placeholders: + continue + elif placeholders == {'', ''}: + bucket_date = today + relativedelta(weeks=1) + bucket = storage._get_bucket_name(bucket_date) + storage._create_bucket(bucket) + elif placeholders == {'', ''} or placeholders == {''}: + if '' in placeholders or today.month == 12: + bucket_date = today + relativedelta(months=1) + bucket = storage._get_bucket_name(bucket_date) + storage._create_bucket(bucket) + else: + raise RuntimeError('Invalid placeholder combination in bucket name template: {}' + .format(storage.bucket_name_template)) diff --git a/storage_s3/setup.py b/storage_s3/setup.py new file mode 100644 index 0000000..fb92718 --- /dev/null +++ b/storage_s3/setup.py @@ -0,0 +1,44 @@ +# This file is part of Indico. +# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN). +# +# Indico is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 3 of the +# License, or (at your option) any later version. +# +# Indico is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Indico; if not, see . + +from __future__ import unicode_literals + +from setuptools import find_packages, setup + + +setup( + name='indico-plugin-storage-s3', + version='2.0.3', + description='S3 storage backend for Indico', + url='https://github.com/indico/indico-plugins', + license='https://www.gnu.org/licenses/gpl-3.0.txt', + author='Indico Team', + author_email='indico-team@cern.ch', + packages=find_packages(), + zip_safe=False, + platforms='any', + install_requires=[ + 'indico>=2.1', + 'boto3>=1.9.35,<2.0', + ], + classifiers=[ + 'Environment :: Plugins', + 'Environment :: Web Environment', + 'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)', + 'Programming Language :: Python :: 2.7' + ], + entry_points={'indico.plugins': {'storage_s3 = indico_storage_s3.plugin:S3StoragePlugin'}} +) diff --git a/storage_s3/tests/plugin_test.py b/storage_s3/tests/plugin_test.py new file mode 100644 index 0000000..994e147 --- /dev/null +++ b/storage_s3/tests/plugin_test.py @@ -0,0 +1,112 @@ +# This file is part of Indico. +# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN). +# +# Indico is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 3 of the +# License, or (at your option) any later version. +# +# Indico is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Indico; if not, see . + +from __future__ import unicode_literals + +import hashlib +import hmac + +import pytest + +from indico.core.storage import StorageError + +from indico_storage_s3 import plugin +from indico_storage_s3.storage import DynamicS3Storage, S3Storage +from indico_storage_s3.task import create_bucket + + +@pytest.fixture(autouse=True) +def mock_boto3(mocker): + mocker.patch('indico_storage_s3.storage.boto3') + + +def test_resolve_bucket_name_static(): + storage = plugin.S3Storage('bucket=test') + assert storage._get_current_bucket_name() == 'test' + + +@pytest.mark.parametrize(('date', 'name_template', 'expected_name'), ( + ('2018-04-11', 'name-', 'name-2018'), + ('2018-04-11', 'name--', 'name-2018-04'), + ('2018-04-11', 'name--', 'name-2018-15'), + ('2018-01-01', 'name--', 'name-2018-01'), + ('2019-01-01', 'name--', 'name-2019-00'), + +)) +def test_resolve_bucket_name_dynamic(freeze_time, date, name_template, expected_name): + freeze_time(date) + storage = plugin.DynamicS3Storage('bucket_template={},bucket_secret=secret'.format(name_template)) + name, token = storage._get_current_bucket_name().rsplit('-', 1) + assert name == expected_name + assert token == hmac.new(b'secret', expected_name, hashlib.md5).hexdigest() + + +class MockConfig(object): + def __init__(self): + self.STORAGE_BACKENDS = {'s3': None} + + +@pytest.mark.usefixtures('app_context') +@pytest.mark.parametrize(('date', 'name_template', 'bucket_created', 'expected_name', 'expected_error'), ( + ('2018-04-11', 'name', False, None, None), + ('2018-12-01', 'name', False, None, None), + ('2018-04-11', 'name-', False, None, None), + ('2018-01-01', 'name-', False, None, None), + ('2018-12-01', 'name-', False, None, RuntimeError), + ('2018-12-01', 'name-', False, None, RuntimeError), + ('2018-12-01', 'name--', False, None, RuntimeError), + ('2018-12-01', 'name-', True, 'name-2019', None), + ('2018-12-01', 'name--', True, 'name-2019-01', None), + ('2018-01-01', 'name--', True, 'name-2018-02', None), + ('2018-12-03', 'name--', True, 'name-2018-50', None), +)) +def test_dynamic_bucket_creation_task(freeze_time, mocker, date, name_template, bucket_created, expected_name, + expected_error): + freeze_time(date) + if '<' in name_template: + storage = plugin.DynamicS3Storage('bucket_template={},bucket_secret=secret'.format(name_template)) + else: + storage = plugin.S3Storage('bucket={}'.format(name_template)) + mocker.patch('indico_storage_s3.task.config', MockConfig()) + mocker.patch('indico_storage_s3.task.get_storage', return_value=storage) + create_bucket_call = mocker.patch.object(plugin.DynamicS3Storage, '_create_bucket') + if expected_error: + with pytest.raises(expected_error): + create_bucket() + else: + create_bucket() + if bucket_created: + token = hmac.new(b'secret', expected_name, hashlib.md5).hexdigest() + create_bucket_call.assert_called_with('{}-{}'.format(expected_name, token)) + else: + assert not create_bucket_call.called + + +def test_static_bucket_name_too_long(): + S3Storage('bucket=test' + 'x'*59) + with pytest.raises(StorageError): + S3Storage('bucket=test' + 'x'*60) + + +def test_dynamic_bucket_name_too_long(): + s = DynamicS3Storage('bucket_secret=secret,bucket_template=test-' + 'x'*37) + assert len(s._get_current_bucket_name()) == 63 + s = DynamicS3Storage('bucket_secret=secret,bucket_template=test--' + 'x'*34) + assert len(s._get_current_bucket_name()) == 63 + with pytest.raises(StorageError): + DynamicS3Storage('bucket_secret=secret,bucket_template=test-' + 'x' * 38) + with pytest.raises(StorageError): + DynamicS3Storage('bucket_secret=secret,bucket_template=test--' + 'x' * 35)