Merge branch '2.0-maintenance'

This commit is contained in:
Adrian Moennich 2018-11-26 14:57:53 +01:00
commit dea4280576
9 changed files with 1231 additions and 0 deletions

View File

@ -0,0 +1,28 @@
# This file is part of Indico.
# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN).
#
# Indico is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# Indico is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Indico; if not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from indico.core import signals
from indico.util.i18n import make_bound_gettext
_ = make_bound_gettext('storage_s3')
@signals.import_tasks.connect
def _import_tasks(sender, **kwargs):
import indico_storage_s3.task

View File

@ -0,0 +1,25 @@
# This file is part of Indico.
# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN).
#
# Indico is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# Indico is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Indico; if not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from indico.core.plugins import IndicoPluginBlueprint
from indico_storage_s3.controllers import RHBuckets
blueprint = IndicoPluginBlueprint('storage_s3', __name__, url_prefix='/api/plugin/s3')
blueprint.add_url_rule('/buckets', 'buckets', RHBuckets)

View File

@ -0,0 +1,77 @@
# This file is part of Indico.
# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN).
#
# Indico is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# Indico is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Indico; if not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from flask import current_app, jsonify, request
from werkzeug.exceptions import NotFound, Unauthorized
from indico.core.config import config
from indico.core.db import db
from indico.core.storage import StoredFileMixin
from indico.core.storage.backend import ReadOnlyStorageMixin, get_storage
from indico.web.rh import RH
from indico_storage_s3.storage import DynamicS3Storage, S3Storage, S3StorageBase
class RHBuckets(RH):
"""Provide information on used S3 buckets"""
def _check_access(self):
from indico_storage_s3.plugin import S3StoragePlugin
auth = request.authorization
if not S3StoragePlugin.settings.get('bucket_info_enabled'):
raise NotFound
username = S3StoragePlugin.settings.get('username')
password = S3StoragePlugin.settings.get('password')
if not auth or not auth.password or auth.username != username or auth.password != password:
response = current_app.response_class('Authorization required', 401,
{'WWW-Authenticate': 'Basic realm="Indico - S3 Buckets"'})
raise Unauthorized(response=response)
def _get_static_info(self, storage):
return {
'bucket': storage.bucket_name,
'dynamic': False,
}
def _get_dynamic_info(self, backend_name, storage):
buckets = set()
for model in StoredFileMixin.__subclasses__():
query = (db.session.query(db.func.split_part(model.storage_file_id, '//', 1).distinct())
.filter(model.storage_file_id.isnot(None), model.storage_backend == backend_name))
buckets.update(bucket for bucket, in query)
return {
'buckets': sorted(buckets),
'dynamic': True,
'template': storage.bucket_name_template,
}
def _process(self):
data = {}
for key in config.STORAGE_BACKENDS:
storage = get_storage(key)
if not isinstance(storage, S3StorageBase):
continue
readonly = isinstance(storage, ReadOnlyStorageMixin)
data[key] = {'readonly': readonly, 'meta': storage.meta}
if isinstance(storage, S3Storage):
data[key].update(self._get_static_info(storage))
elif isinstance(storage, DynamicS3Storage):
data[key].update(self._get_dynamic_info(key, storage))
return jsonify(data)

View File

@ -0,0 +1,513 @@
# This file is part of Indico.
# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN).
#
# Indico is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# Indico is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Indico; if not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import errno
import json
import os
import re
import stat
import subprocess
import tempfile
from collections import defaultdict
from contextlib import contextmanager
from datetime import datetime
from operator import itemgetter
import boto3
import click
from botocore.exceptions import ClientError
from flask.cli import with_appcontext
from jinja2.filters import do_filesizeformat
from sqlalchemy import inspect
from sqlalchemy.orm import joinedload, lazyload, load_only
from sqlalchemy.sql.elements import Tuple
from indico.cli.core import cli_group
from indico.core.config import config
from indico.core.db import db
from indico.core.storage import StoredFileMixin
from indico.core.storage.backend import FileSystemStorage, get_storage
from indico.modules.designer.models.images import DesignerImageFile
from indico.modules.events.abstracts.models.files import AbstractFile
from indico.modules.events.layout.models.images import ImageFile
from indico.modules.events.papers.models.files import PaperFile
from indico.modules.events.papers.models.templates import PaperTemplate
from indico.modules.events.registration.models.registrations import RegistrationData
from indico.modules.events.static.models.static import StaticSite, StaticSiteState
from indico.util.console import cformat
from indico.util.date_time import format_human_timedelta
from indico.util.fs import secure_filename
from indico.util.string import crc32
from indico.util.struct.iterables import committing_iterator
from indico_storage_s3.storage import S3StorageBase
click.disable_unicode_literals_warning = True
SPECIAL_FILTERS = {
StaticSite: {'state': StaticSiteState.success}
}
FILE_DT_MAP = {
AbstractFile: lambda obj: obj.abstract.submitted_dt,
DesignerImageFile: lambda obj: obj.template.event.created_dt if obj.template.event else None,
ImageFile: lambda obj: obj.event.created_dt,
PaperFile: lambda obj: obj.paper_revision.submitted_dt,
PaperTemplate: lambda obj: obj.event.created_dt,
RegistrationData: lambda obj: obj.registration.submitted_dt,
StaticSite: lambda obj: obj.event.created_dt
}
QUERY_OPTIONS = {
AbstractFile: joinedload('abstract').load_only('submitted_dt'),
DesignerImageFile: joinedload('template').joinedload('event').load_only('created_dt'),
ImageFile: joinedload('event').load_only('created_dt'),
PaperFile: joinedload('paper_revision').load_only('submitted_dt'),
PaperTemplate: joinedload('event').load_only('created_dt'),
RegistrationData: joinedload('registration').load_only('submitted_dt'),
StaticSite: joinedload('event').load_only('created_dt'),
}
def rmlinktree(path):
# based on shutil.rmtree but only delete symlinks and directories
names = os.listdir(path)
for name in names:
fullname = os.path.join(path, name)
mode = os.lstat(fullname).st_mode
if stat.S_ISDIR(mode):
rmlinktree(fullname)
elif stat.S_ISLNK(mode):
os.remove(fullname)
else:
raise Exception('Tried to delete {} (not a directory/symlink)'.format(fullname))
os.rmdir(path)
class S3Importer(object):
def __init__(self, get_bucket_name, static_bucket_name, output_file, source_backend_names, rclone_remote,
s3_endpoint, s3_profile, s3_bucket_policy):
self.get_bucket_name = get_bucket_name
self.static_bucket_name = static_bucket_name
self.source_backend_names = source_backend_names
self.output_file = output_file
self.s3_endpoint = s3_endpoint
self.s3_bucket_policy = s3_bucket_policy
self.buckets = {}
self.s3 = boto3.session.Session(profile_name=s3_profile).resource('s3', endpoint_url=s3_endpoint)
self.s3_client = boto3.session.Session(profile_name=s3_profile).client('s3', endpoint_url=s3_endpoint)
self.rclone_remote = rclone_remote
self.rclone_queue = {}
self.used_storage_paths = set()
def query_chunked(self, model, chunk_size):
pks = inspect(model).primary_key
query = base_query = self.make_query(model).order_by(*pks)
while True:
row = None
for row in query.limit(chunk_size):
yield row
if row is None:
# no rows in the query
break
query = base_query.filter(Tuple(*pks) > inspect(row).identity)
def make_query(self, model):
cols = ['storage_backend', 'storage_file_id', 'md5']
if model.add_file_date_column:
cols.append('created_dt')
opts = QUERY_OPTIONS.get(model)
return (model.query
.filter(model.storage_file_id.isnot(None), model.storage_backend.in_(self.source_backend_names))
.filter_by(**SPECIAL_FILTERS.get(model, {}))
.options(*((opts,) if opts else ()))
.options(lazyload('*'), load_only(*cols)))
def run(self):
models = {model: self.make_query(model).count() for model in StoredFileMixin.__subclasses__()}
models = {model: total for model, total in models.iteritems() if total}
labels = {model: cformat('Processing %{blue!}{}%{reset} (%{cyan}{}%{reset} rows)').format(model.__name__, total)
for model, total in models.iteritems()}
max_length = max(len(x) for x in labels.itervalues())
labels = {model: label.ljust(max_length) for model, label in labels.iteritems()}
for model, total in sorted(models.items(), key=itemgetter(1)):
with click.progressbar(self.query_chunked(model, 1000), length=total, label=labels[model],
show_percent=True, show_pos=True) as objects:
for obj in self.flush_rclone_iterator(objects, 1000):
try:
self.process_obj(obj)
except Exception as exc:
click.echo(cformat('\n%{red!}Error processing %{reset}%{yellow}{}%{red!}: %{reset}%{yellow!}{}')
.format(obj, exc))
click.secho('All done!', fg='green')
click.echo('Add the following entries to your STORAGE_BACKENDS:')
for bucket, data in sorted(self.buckets.viewitems(), key=itemgetter(0)):
click.echo("'{}': 's3-readonly:host={},bucket={}',".format(
data['backend'], self.s3_endpoint.replace('https://', ''), bucket))
def process_obj(self, obj):
new_storage_path, new_filename = self.build_storage_path(obj)
if new_storage_path in self.used_storage_paths:
raise Exception('Non-unique storage path: {}'.format(new_storage_path))
self.used_storage_paths.add(new_storage_path)
bucket_name, backend = self.get_bucket_name(self.get_object_dt(obj))
bucket_info = self.get_bucket_info(bucket_name, backend, create=(not self.rclone_remote))
assert backend == bucket_info['backend']
if new_storage_path not in bucket_info['initial_keys']:
if self.rclone_remote:
self.queue_for_rclone(obj, bucket_name, new_storage_path)
else:
with obj.open() as f:
content_md5 = obj.md5.decode('hex').encode('base64').strip()
self.s3_client.put_object(Body=f, Bucket=bucket_name, Key=new_storage_path,
ContentType=obj.content_type, ContentMD5=content_md5)
self.emit_update(obj, backend, new_storage_path, new_filename)
def build_storage_path(self, obj, _new_path_re=re.compile(r'^(event/\d+/registrations/\d+/\d+/\d+-)(\d+)(-.*)$')):
old_filename = obj.filename
new_filename = secure_filename(obj.filename, None)
assert new_filename
obj.filename = new_filename
new_storage_path = obj._build_storage_path()[1]
obj.filename = old_filename
assert new_storage_path
if not isinstance(obj, RegistrationData):
return new_storage_path, new_filename
match = _new_path_re.match(obj.storage_file_id)
if match:
# already in the current format
assert obj.storage_file_id == new_storage_path.replace('-0-', '-{}-'.format(match.group(2)))
return obj.storage_file_id, new_filename
else:
match = _new_path_re.match(new_storage_path)
return '{}{}{}'.format(match.group(1), crc32(obj.storage_file_id), match.group(3)), new_filename
def queue_for_rclone(self, obj, bucket, key):
# XXX: we assume the file is local so the context manager doesn't create a
# temporary file which becomes invalid afterwards
with obj.get_local_path() as file_path:
pass
while not os.path.exists(file_path):
raw_input(cformat('\n%{red}File not found on disk: %{yellow}{}').format(file_path))
try:
queue_entry = self.rclone_queue[bucket]
except KeyError:
tmpdir = tempfile.mkdtemp(prefix='indico-s3-import-')
self.rclone_queue[bucket] = queue_entry = {
'path': tmpdir,
'files': 0,
'bytes': 0,
}
fskey = os.path.join(queue_entry['path'], key)
fsdir = os.path.dirname(fskey)
try:
os.makedirs(fsdir)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
os.symlink(file_path, fskey)
queue_entry['files'] += 1
queue_entry['bytes'] += obj.size
def flush_rclone_iterator(self, iterable, n=100):
for i, data in enumerate(iterable, 1):
yield data
if i % n == 0:
self.flush_rclone()
db.session.rollback() # reduce memory usage
self.flush_rclone()
def flush_rclone(self):
if not self.rclone_remote or not self.rclone_queue:
return
click.echo()
for name, data in self.buckets.viewitems():
if not data['exists']:
self.create_bucket(name)
data['exists'] = True
for bucket, data in self.rclone_queue.viewitems():
click.echo(cformat('Copying %{cyan}{}%{reset} files (%{cyan}{}%{reset}) to %{cyan}{}%{reset} via rclone')
.format(data['files'], do_filesizeformat(data['bytes']), bucket))
start = datetime.now()
try:
subprocess.check_call([
'rclone', 'copy', '--copy-links',
data['path'], '{}:{}'.format(self.rclone_remote, bucket)
])
except subprocess.CalledProcessError:
click.secho('\nError while running rclone', fg='red')
raise
duration = (datetime.now() - start)
click.echo('...finished after {}'.format(format_human_timedelta(duration, 'minutes', narrow=True)))
rmlinktree(data['path'])
self.rclone_queue.clear()
def bucket_exists(self, name):
try:
self.s3_client.head_bucket(Bucket=name)
return True
except ClientError as exc:
if int(exc.response['Error']['Code']) == 404:
return False
raise
def create_bucket(self, name):
click.echo(cformat('Creating bucket %{green}{}%{reset}').format(name))
self.s3_client.create_bucket(Bucket=name)
if self.s3_bucket_policy:
self.s3_client.put_bucket_policy(Bucket=name, Policy=self.s3_bucket_policy)
def get_bucket_info(self, bucket_name, backend, create=False):
try:
return self.buckets[bucket_name]
except KeyError:
click.echo(cformat('\nChecking bucket %{yellow}{}%{reset}').format(bucket_name))
exists = True
if not self.bucket_exists(bucket_name):
if create:
click.echo()
self.create_bucket(bucket_name)
else:
exists = False
data = {'backend': backend, 'exists': exists}
if exists:
bucket = self.s3.Bucket(bucket_name)
data['initial_keys'] = {f.key for f in bucket.objects.all()}
else:
data['initial_keys'] = set()
self.buckets[bucket_name] = data
return data
def get_object_dt(self, obj):
cls = type(obj)
if cls.add_file_date_column:
return obj.created_dt
fn = FILE_DT_MAP.get(cls) or (lambda _: datetime.now())
return fn(obj)
def emit_update(self, obj, backend, file_id, filename):
data = {'m': type(obj).__name__, 'pk': inspect(obj).identity}
if obj.storage_backend != backend:
data.update({
'ob': obj.storage_backend,
'nb': backend,
})
if obj.storage_file_id != file_id:
data.update({
'of': obj.storage_file_id,
'nf': file_id,
})
if obj.filename != filename:
data.update({
'on': obj.filename,
'nn': filename,
})
json.dump(data, self.output_file)
self.output_file.write('\n')
def apply_changes(data_file, revert=False):
mapping = {
'pk': 'pk',
'ob': 'old_storage_backend',
'nb': 'new_storage_backend',
'of': 'old_storage_file_id',
'nf': 'new_storage_file_id',
'on': 'old_filename',
'nn': 'new_filename',
}
click.echo('Parsing data...')
data = defaultdict(list)
for line in data_file:
line_data = json.loads(line)
converted = {mapping[k]: v for k, v in line_data.viewitems() if k in mapping}
data[line_data['m']].append(converted)
models = {model: len(data[model.__name__])
for model in StoredFileMixin.__subclasses__()
if model.__name__ in data and len(data[model.__name__])}
labels = {model: cformat('Processing %{blue!}{}%{reset} (%{cyan}{}%{reset} rows)').format(model.__name__, total)
for model, total in models.iteritems()}
max_length = max(len(x) for x in labels.itervalues())
labels = {model: label.ljust(max_length) for model, label in labels.iteritems()}
for model, total in sorted(models.items(), key=itemgetter(1)):
pks = inspect(model).primary_key
with click.progressbar(data[model.__name__], length=total, label=labels[model],
show_percent=True, show_pos=True) as entries:
for entry in committing_iterator(entries, 1000):
updates = {}
key = 'old' if revert else 'new'
if key + '_storage_backend' in entry:
updates[model.storage_backend] = entry[key + '_storage_backend']
if key + '_storage_file_id' in entry:
updates[model.storage_file_id] = entry[key + '_storage_file_id']
if key + '_filename' in entry:
updates[model.filename] = entry[key + '_filename']
model.query.filter(Tuple(*pks) == entry['pk']).update(updates, synchronize_session=False)
@contextmanager
def monkeypatch_registration_file_time():
# RegistrationData objects have the current timestamp in their
# storage_file_id to ensure uniqueness in case lf re-upload, but
# here we want reliable filenames
from indico.modules.events.registration.models import registrations
class FakeTime(object):
def time(self):
return 0
orig_time = registrations.time
registrations.time = FakeTime()
yield
registrations.time = orig_time
@cli_group()
@with_appcontext
def cli():
"""Migrate data to S3.
Use the `copy` subcommand to copy data to S3. This can be done
safely while Indico is running. At the end it will show you what
you need to add to your `indico.conf`.
Once you updated your config with the new storage backends, you
can use the `apply` subcommand to update your database so files
will actually be loaded using the new S3 storage backends.
In case you ever need to switch back to your previous storage,
you can use `revert` to undo the database changes.
"""
if config.DB_LOG:
click.secho('Warning: The database logger is currently enabled (DB_LOG = True).\n'
'This will slow down the migration. Unless you database is very small, please disable it.',
fg='yellow')
click.confirm('Continue anyway?', abort=True)
@cli.command()
@click.option('-s', '--source-backend', 'source_backend_names', metavar='NAME', multiple=True,
help='Storage backend names from which to copy files. If omitted, all non-S3 backends are used.')
@click.option('-B', '--bucket-name', 'bucket_names', metavar='NAME', required=True, multiple=True,
help='Bucket(s) to copy files to')
@click.option('-S', '--static-bucket-name', metavar='NAME', required=True,
help='Bucket to copy static site packages to')
@click.option('-e', '--s3-endpoint', metavar='URL', help='Custom S3 endpoint, e.g. https://s3.example.com')
@click.option('-p', '--s3-profile', metavar='NAME', help='S3 profile to use when loading credentials')
@click.option('-P', '--s3-bucket-policy', 's3_bucket_policy_file', metavar='PATH', type=click.File(),
help='Bucket policy file to use when creating buckets')
@click.option('-r', '--rclone', metavar='NAME',
help='If set, use rclone to copy files which is usually faster. The value of this option is the name of '
'the rclone remote used to copy files to S3.')
@click.argument('output', metavar='PATH', type=click.File('wb'))
def copy(source_backend_names, bucket_names, static_bucket_name, s3_endpoint, s3_profile, s3_bucket_policy_file,
rclone, output):
"""Copy files to S3.
This command copies files to S3 and records the necessary database changes
in a JSONL file.
Multiple bucket names can be specified; in that case the bucket name can change
based on the year a file was created in. The last bucket name will be the default,
while any other bucket name must include a conditional indicating when to use it:
\b
-B '<2001:indico-pre-2001'
-B '<2009:indico-<year>'
-B 'indico-<year>-<month>'
The static bucket name cannot contain any placeholders.
The indico storage backend will get the same name as the bucket by default,
but this can be overridden, e.g. `-B 'indico-<year>/s3-<year>'` would name
the bucket 'indico-2018' but use a backend named 's3-2018'. It is your
responsibility to ensure that placeholders match between the two names.
S3 credentials should be specified in the usual places, i.e.
`~/.aws/credentials` for regular S3 access and `~/.config/rclone/rclone.conf`
when using rclone.
"""
bucket_names = [tuple(x.split('/', 1)) if '/' in x else (x, x.split(':', 1)[-1]) for x in bucket_names]
if ':' in bucket_names[-1][0]:
raise click.UsageError('Last bucket name cannot contain criteria')
if not all(':' in x[0] for x in bucket_names[:-1]):
raise click.UsageError('All but the last bucket name need to contain criteria')
matches = [(re.match(r'^(<|>|==|<=|>=)\s*(\d{4}):(.+)$', name), backend) for name, backend in bucket_names[:-1]]
if not all(x[0] for x in matches):
raise click.UsageError("Could not parse '{}'".format(bucket_names[matches.index(None)]))
criteria = [(match.groups(), backend) for match, backend in matches]
# Build and compile a function to get the bucket/backend name to avoid
# processing the criteria for every single file (can be millions for large
# instances)
code = ['def get_bucket_name(dt):']
if criteria:
for i, ((op, value, bucket), backend) in enumerate(criteria):
code.append(' {}if dt.year {} {}:'.format('el' if i else '', op, value))
code.append(' bucket, backend = {!r}'.format((bucket, backend)))
code.append(' else:')
code.append(' bucket, backend = {!r}'.format(bucket_names[-1]))
else:
code.append(' bucket, backend = {!r}'.format(bucket_names[-1]))
code.append(' bucket = bucket.replace("<year>", dt.strftime("%Y"))')
code.append(' bucket = bucket.replace("<month>", dt.strftime("%m"))')
code.append(' bucket = bucket.replace("<week>", dt.strftime("%W"))')
code.append(' backend = backend.replace("<year>", dt.strftime("%Y"))')
code.append(' backend = backend.replace("<month>", dt.strftime("%m"))')
code.append(' backend = backend.replace("<week>", dt.strftime("%W"))')
code.append(' return bucket, backend')
d = {}
exec '\n'.join(code) in d
if not source_backend_names:
source_backend_names = [x for x in config.STORAGE_BACKENDS if not isinstance(get_storage(x), S3StorageBase)]
if rclone:
invalid = [x for x in source_backend_names if not isinstance(get_storage(x), FileSystemStorage)]
if invalid:
click.secho('Found unsupported storage backends: {}'.format(', '.join(sorted(invalid))), fg='yellow')
click.secho('The backends might not work together with `--rclone`', fg='yellow')
click.confirm('Continue anyway?', abort=True)
s3_bucket_policy = s3_bucket_policy_file.read() if s3_bucket_policy_file else None
imp = S3Importer(d['get_bucket_name'], static_bucket_name,
output, source_backend_names, rclone,
s3_endpoint, s3_profile, s3_bucket_policy)
with monkeypatch_registration_file_time():
imp.run()
@cli.command()
@click.argument('data', metavar='PATH', type=click.File('rb'))
def apply(data):
"""Apply DB updates.
This command updates the DB with the changes recorded while
running the `copy` command.
"""
apply_changes(data)
@cli.command()
@click.argument('data', metavar='PATH', type=click.File('rb'))
def revert(data):
"""Revert DB updates.
This command reverts the changes made to the database by the
`apply` command.
"""
apply_changes(data, revert=True)

View File

@ -0,0 +1,124 @@
# This file is part of Indico.
# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN).
#
# Indico is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# Indico is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Indico; if not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import sys
import click
from markupsafe import Markup
from wtforms.fields import BooleanField, StringField
from wtforms.validators import DataRequired
from indico.cli.core import cli_group
from indico.core import signals
from indico.core.config import config
from indico.core.plugins import IndicoPlugin, url_for_plugin
from indico.core.storage.backend import ReadOnlyStorageMixin, get_storage
from indico.web.forms.base import IndicoForm
from indico.web.forms.fields import IndicoPasswordField
from indico.web.forms.validators import HiddenUnless
from indico.web.forms.widgets import SwitchWidget
from indico_storage_s3 import _
from indico_storage_s3.blueprint import blueprint
from indico_storage_s3.migrate import cli as migrate_cli
from indico_storage_s3.storage import (DynamicS3Storage, ReadOnlyDynamicS3Storage, ReadOnlyS3Storage, S3Storage,
S3StorageBase)
class SettingsForm(IndicoForm):
bucket_info_enabled = BooleanField(_("Bucket info API"), widget=SwitchWidget())
username = StringField(_("Username"), [HiddenUnless('bucket_info_enabled', preserve_data=True), DataRequired()],
description=_("The username to access the S3 bucket info endpoint"))
password = IndicoPasswordField(_('Password'),
[HiddenUnless('bucket_info_enabled', preserve_data=True), DataRequired()],
toggle=True,
description=_("The password to access the S3 bucket info endpoint"))
def __init__(self, *args, **kwargs):
super(SettingsForm, self).__init__(*args, **kwargs)
url = Markup('<strong><code>{}</code></strong>').format(url_for_plugin('storage_s3.buckets'))
self.bucket_info_enabled.description = _("Enables an API on {url} that returns information on all S3 buckets "
"currently in use, including dynamically-named ones.").format(url=url)
class S3StoragePlugin(IndicoPlugin):
"""S3 Storage
Provides S3 storage backends.
"""
configurable = True
settings_form = SettingsForm
default_settings = {
'bucket_info_enabled': False,
'username': '',
'password': ''
}
def init(self):
super(S3StoragePlugin, self).init()
self.connect(signals.get_storage_backends, self._get_storage_backends)
self.connect(signals.plugin.cli, self._extend_indico_cli)
def _get_storage_backends(self, sender, **kwargs):
yield S3Storage
yield DynamicS3Storage
yield ReadOnlyS3Storage
yield ReadOnlyDynamicS3Storage
def get_blueprints(self):
return blueprint
def _extend_indico_cli(self, sender, **kwargs):
@cli_group()
def s3():
"""Manage S3 storage."""
@s3.command()
@click.option('--storage', default=None, metavar='NAME', help='Storage backend to create bucket for')
def create_bucket(storage):
"""Create s3 storage bucket."""
storages = [storage] if storage else config.STORAGE_BACKENDS
for key in storages:
try:
storage_instance = get_storage(key)
except RuntimeError:
if storage:
click.echo('Storage {} does not exist'.format(key))
sys.exit(1)
continue
if isinstance(storage_instance, ReadOnlyStorageMixin):
if storage:
click.echo('Storage {} is read-only'.format(key))
sys.exit(1)
continue
if isinstance(storage_instance, S3StorageBase):
bucket_name = storage_instance._get_current_bucket_name()
if storage_instance._bucket_exists(bucket_name):
click.echo('Storage {}: bucket {} already exists'.format(key, bucket_name))
continue
storage_instance._create_bucket(bucket_name)
click.echo('Storage {}: bucket {} created'.format(key, bucket_name))
elif storage:
click.echo('Storage {} is not an s3 storage'.format(key))
sys.exit(1)
s3.add_command(migrate_cli, name='migrate')
return s3

View File

@ -0,0 +1,255 @@
# This file is part of Indico.
# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN).
#
# Indico is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# Indico is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Indico; if not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import hashlib
import hmac
import sys
import threading
from contextlib import contextmanager
from datetime import date
from io import BytesIO
from tempfile import NamedTemporaryFile
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from werkzeug.datastructures import Headers
from werkzeug.utils import cached_property, redirect
from indico.core.config import config
from indico.core.storage import Storage, StorageError
from indico.core.storage.backend import ReadOnlyStorageMixin, StorageReadOnlyError
from indico.util.fs import get_file_checksum
from indico.util.string import return_ascii
from indico.web.flask.util import send_file
s3_session_cache = threading.local()
class S3StorageBase(Storage):
simple_data = False
def __init__(self, data):
self.parsed_data = data = self._parse_data(data)
self.endpoint_url = data.get('host')
if self.endpoint_url and '://' not in self.endpoint_url:
self.endpoint_url = 'https://' + self.endpoint_url
self.session_kwargs = {}
self.client_kwargs = {}
if 'profile' in data:
self.session_kwargs['profile_name'] = data['profile']
if 'access_key' in data:
self.session_kwargs['aws_access_key_id'] = data['access_key']
if 'secret_key' in data:
self.session_kwargs['aws_secret_access_key'] = data['secret_key']
if 'addressing_style' in data:
self.client_kwargs['config'] = Config(s3={'addressing_style': data['addressing_style']})
self.bucket_policy_file = data.get('bucket_policy_file')
self.bucket_versioning = data.get('bucket_versioning') in ('1', 'true', 'yes')
self.proxy_downloads = data.get('proxy') in ('1', 'true', 'yes')
self.meta = data.get('meta')
@cached_property
def session(self):
key = '__'.join('{}_{}'.format(k, v) for k, v in sorted(self.session_kwargs.viewitems()))
try:
return getattr(s3_session_cache, key)
except AttributeError:
session = boto3.session.Session(**self.session_kwargs)
setattr(s3_session_cache, key, session)
return session
@cached_property
def client(self):
return self.session.client('s3', endpoint_url=self.endpoint_url, **self.client_kwargs)
def _get_current_bucket_name(self):
raise NotImplementedError
def _parse_file_id(self, file_id):
raise NotImplementedError
def open(self, file_id):
bucket, id_ = self._parse_file_id(file_id)
try:
s3_object = self.client.get_object(Bucket=bucket, Key=id_)['Body']
return BytesIO(s3_object.read())
except Exception as e:
raise StorageError('Could not open "{}": {}'.format(file_id, e)), None, sys.exc_info()[2]
@contextmanager
def get_local_path(self, file_id):
with NamedTemporaryFile(suffix='indico.s3', dir=config.TEMP_DIR) as tmpfile:
self._copy_file(self.open(file_id), tmpfile)
tmpfile.flush()
yield tmpfile.name
def _save(self, bucket, name, content_type, fileobj):
fileobj = self._ensure_fileobj(fileobj)
checksum = get_file_checksum(fileobj)
fileobj.seek(0)
content_md5 = checksum.decode('hex').encode('base64').strip()
self.client.put_object(Body=fileobj, Bucket=bucket, Key=name,
ContentType=content_type, ContentMD5=content_md5)
return checksum
def delete(self, file_id):
bucket, id_ = self._parse_file_id(file_id)
try:
self.client.delete_object(bucket, id_)
except Exception as e:
raise StorageError('Could not delete "{}": {}'.format(file_id, e)), None, sys.exc_info()[2]
def getsize(self, file_id):
bucket, id_ = self._parse_file_id(file_id)
try:
return self.client.head_object(Bucket=bucket, Key=id_)['ContentLength']
except Exception as e:
raise StorageError('Could not get size of "{}": {}'.format(file_id, e)), None, sys.exc_info()[2]
def send_file(self, file_id, content_type, filename, inline=True):
if self.proxy_downloads:
return send_file(filename, self.open(file_id), content_type, inline=inline)
try:
bucket, id_ = self._parse_file_id(file_id)
content_disp = 'inline' if inline else 'attachment'
h = Headers()
h.add('Content-Disposition', content_disp, filename=filename)
url = self.client.generate_presigned_url('get_object',
Params={'Bucket': bucket,
'Key': id_,
'ResponseContentDisposition': h.get('Content-Disposition'),
'ResponseContentType': content_type},
ExpiresIn=120)
return redirect(url)
except Exception as e:
raise StorageError('Could not send file "{}": {}'.format(file_id, e)), None, sys.exc_info()[2]
def _create_bucket(self, name):
from indico_storage_s3.plugin import S3StoragePlugin
if self._bucket_exists(name):
S3StoragePlugin.logger.info('Bucket %s already exists', name)
return
self.client.create_bucket(Bucket=name)
S3StoragePlugin.logger.info('New bucket created: %s', name)
if self.bucket_versioning:
self.client.put_bucket_versioning(Bucket=name, VersioningConfiguration={'Status': 'Enabled'})
if self.bucket_policy_file:
with open(self.bucket_policy_file) as f:
policy = f.read()
self.client.put_bucket_policy(Bucket=name, Policy=policy)
def _bucket_exists(self, name):
try:
self.client.head_bucket(Bucket=name)
return True
except ClientError as exc:
if int(exc.response['Error']['Code']) == 404:
return False
raise
class S3Storage(S3StorageBase):
name = 's3'
def __init__(self, data):
super(S3Storage, self).__init__(data)
self.bucket_name = self.parsed_data['bucket']
if len(self.bucket_name) > 63:
raise StorageError('Bucket name cannot be longer than 63 chars')
@return_ascii
def __repr__(self):
return '<{}: {}>'.format(type(self).__name__, self.bucket_name)
def _get_current_bucket_name(self):
return self.bucket_name
def _parse_file_id(self, file_id):
return self.bucket_name, file_id
def save(self, name, content_type, filename, fileobj):
try:
bucket = self._get_current_bucket_name()
checksum = self._save(bucket, name, content_type, fileobj)
return name, checksum
except Exception as e:
raise StorageError('Could not save "{}": {}'.format(name, e)), None, sys.exc_info()[2]
class DynamicS3Storage(S3StorageBase):
name = 's3-dynamic'
def __init__(self, data):
super(DynamicS3Storage, self).__init__(data)
self.bucket_name_template = self.parsed_data['bucket_template']
self.bucket_secret = (self.parsed_data.get('bucket_secret', '') or
self.session._session.get_scoped_config().get('indico_bucket_secret', ''))
if not any(x in self.bucket_name_template for x in ('<year>', '<month>', '<week>')):
raise StorageError('At least one date placeholder is required when using dynamic bucket names')
if not self.bucket_secret:
raise StorageError('A bucket secret is required when using dynamic bucket names')
if len(self._replace_bucket_placeholders(self.bucket_name_template, date.today())) > 46:
raise StorageError('Bucket name cannot be longer than 46 chars (to keep at least 16 hash chars)')
@return_ascii
def __repr__(self):
return '<{}: {}>'.format(type(self).__name__, self.bucket_name_template)
def _parse_file_id(self, file_id):
return file_id.split('//', 1)
def _get_current_bucket_name(self):
return self._get_bucket_name(date.today())
def _get_bucket_name(self, date):
name = self._replace_bucket_placeholders(self.bucket_name_template, date)
token = hmac.new(self.bucket_secret.encode('utf-8'), name, hashlib.md5).hexdigest()
return '{}-{}'.format(name, token)[:63]
def _replace_bucket_placeholders(self, name, date):
name = name.replace('<year>', date.strftime('%Y'))
name = name.replace('<month>', date.strftime('%m'))
name = name.replace('<week>', date.strftime('%W'))
return name
def save(self, name, content_type, filename, fileobj):
try:
bucket = self._get_current_bucket_name()
checksum = self._save(bucket, name, content_type, fileobj)
file_id = '{}//{}'.format(bucket, name)
return file_id, checksum
except Exception as e:
raise StorageError('Could not save "{}": {}'.format(name, e)), None, sys.exc_info()[2]
class ReadOnlyS3Storage(ReadOnlyStorageMixin, S3Storage):
name = 's3-readonly'
def _create_bucket(self, name):
raise StorageReadOnlyError('Cannot write to read-only storage')
class ReadOnlyDynamicS3Storage(ReadOnlyStorageMixin, DynamicS3Storage):
name = 's3-dynamic-readonly'
def _create_bucket(self, name):
raise StorageReadOnlyError('Cannot write to read-only storage')

View File

@ -0,0 +1,53 @@
# This file is part of Indico.
# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN).
#
# Indico is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# Indico is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Indico; if not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import re
from datetime import date
from celery.schedules import crontab
from dateutil.relativedelta import relativedelta
from indico.core.celery import celery
from indico.core.config import config
from indico.core.storage.backend import ReadOnlyStorageMixin, get_storage
from indico_storage_s3.plugin import DynamicS3Storage
@celery.periodic_task(run_every=crontab(minute=0, hour=1))
def create_bucket():
for key in config.STORAGE_BACKENDS:
storage = get_storage(key)
if not isinstance(storage, DynamicS3Storage) or isinstance(storage, ReadOnlyStorageMixin):
continue
today = date.today()
placeholders = set(re.findall('<.*?>', storage.bucket_name_template))
if not placeholders:
continue
elif placeholders == {'<year>', '<week>'}:
bucket_date = today + relativedelta(weeks=1)
bucket = storage._get_bucket_name(bucket_date)
storage._create_bucket(bucket)
elif placeholders == {'<year>', '<month>'} or placeholders == {'<year>'}:
if '<month>' in placeholders or today.month == 12:
bucket_date = today + relativedelta(months=1)
bucket = storage._get_bucket_name(bucket_date)
storage._create_bucket(bucket)
else:
raise RuntimeError('Invalid placeholder combination in bucket name template: {}'
.format(storage.bucket_name_template))

44
storage_s3/setup.py Normal file
View File

@ -0,0 +1,44 @@
# This file is part of Indico.
# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN).
#
# Indico is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# Indico is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Indico; if not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from setuptools import find_packages, setup
setup(
name='indico-plugin-storage-s3',
version='2.0.3',
description='S3 storage backend for Indico',
url='https://github.com/indico/indico-plugins',
license='https://www.gnu.org/licenses/gpl-3.0.txt',
author='Indico Team',
author_email='indico-team@cern.ch',
packages=find_packages(),
zip_safe=False,
platforms='any',
install_requires=[
'indico>=2.1',
'boto3>=1.9.35,<2.0',
],
classifiers=[
'Environment :: Plugins',
'Environment :: Web Environment',
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
'Programming Language :: Python :: 2.7'
],
entry_points={'indico.plugins': {'storage_s3 = indico_storage_s3.plugin:S3StoragePlugin'}}
)

View File

@ -0,0 +1,112 @@
# This file is part of Indico.
# Copyright (C) 2002 - 2018 European Organization for Nuclear Research (CERN).
#
# Indico is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# Indico is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Indico; if not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import hashlib
import hmac
import pytest
from indico.core.storage import StorageError
from indico_storage_s3 import plugin
from indico_storage_s3.storage import DynamicS3Storage, S3Storage
from indico_storage_s3.task import create_bucket
@pytest.fixture(autouse=True)
def mock_boto3(mocker):
mocker.patch('indico_storage_s3.storage.boto3')
def test_resolve_bucket_name_static():
storage = plugin.S3Storage('bucket=test')
assert storage._get_current_bucket_name() == 'test'
@pytest.mark.parametrize(('date', 'name_template', 'expected_name'), (
('2018-04-11', 'name-<year>', 'name-2018'),
('2018-04-11', 'name-<year>-<month>', 'name-2018-04'),
('2018-04-11', 'name-<year>-<week>', 'name-2018-15'),
('2018-01-01', 'name-<year>-<week>', 'name-2018-01'),
('2019-01-01', 'name-<year>-<week>', 'name-2019-00'),
))
def test_resolve_bucket_name_dynamic(freeze_time, date, name_template, expected_name):
freeze_time(date)
storage = plugin.DynamicS3Storage('bucket_template={},bucket_secret=secret'.format(name_template))
name, token = storage._get_current_bucket_name().rsplit('-', 1)
assert name == expected_name
assert token == hmac.new(b'secret', expected_name, hashlib.md5).hexdigest()
class MockConfig(object):
def __init__(self):
self.STORAGE_BACKENDS = {'s3': None}
@pytest.mark.usefixtures('app_context')
@pytest.mark.parametrize(('date', 'name_template', 'bucket_created', 'expected_name', 'expected_error'), (
('2018-04-11', 'name', False, None, None),
('2018-12-01', 'name', False, None, None),
('2018-04-11', 'name-<year>', False, None, None),
('2018-01-01', 'name-<year>', False, None, None),
('2018-12-01', 'name-<month>', False, None, RuntimeError),
('2018-12-01', 'name-<week>', False, None, RuntimeError),
('2018-12-01', 'name-<month>-<week>', False, None, RuntimeError),
('2018-12-01', 'name-<year>', True, 'name-2019', None),
('2018-12-01', 'name-<year>-<month>', True, 'name-2019-01', None),
('2018-01-01', 'name-<year>-<month>', True, 'name-2018-02', None),
('2018-12-03', 'name-<year>-<week>', True, 'name-2018-50', None),
))
def test_dynamic_bucket_creation_task(freeze_time, mocker, date, name_template, bucket_created, expected_name,
expected_error):
freeze_time(date)
if '<' in name_template:
storage = plugin.DynamicS3Storage('bucket_template={},bucket_secret=secret'.format(name_template))
else:
storage = plugin.S3Storage('bucket={}'.format(name_template))
mocker.patch('indico_storage_s3.task.config', MockConfig())
mocker.patch('indico_storage_s3.task.get_storage', return_value=storage)
create_bucket_call = mocker.patch.object(plugin.DynamicS3Storage, '_create_bucket')
if expected_error:
with pytest.raises(expected_error):
create_bucket()
else:
create_bucket()
if bucket_created:
token = hmac.new(b'secret', expected_name, hashlib.md5).hexdigest()
create_bucket_call.assert_called_with('{}-{}'.format(expected_name, token))
else:
assert not create_bucket_call.called
def test_static_bucket_name_too_long():
S3Storage('bucket=test' + 'x'*59)
with pytest.raises(StorageError):
S3Storage('bucket=test' + 'x'*60)
def test_dynamic_bucket_name_too_long():
s = DynamicS3Storage('bucket_secret=secret,bucket_template=test-<year>' + 'x'*37)
assert len(s._get_current_bucket_name()) == 63
s = DynamicS3Storage('bucket_secret=secret,bucket_template=test-<year>-<month>' + 'x'*34)
assert len(s._get_current_bucket_name()) == 63
with pytest.raises(StorageError):
DynamicS3Storage('bucket_secret=secret,bucket_template=test-<year>' + 'x' * 38)
with pytest.raises(StorageError):
DynamicS3Storage('bucket_secret=secret,bucket_template=test-<year>-<month>' + 'x' * 35)