Source code for kaiju_images.images

import asyncio
import uuid
from enum import Enum
from typing import List, Optional, Union

import sqlalchemy as sa

from kaiju_tools.rpc import AbstractRPCCompatible
from kaiju_db.services import DatabaseService, SQLService
from kaiju_files.services import FileService, FileConverterService
from kaiju_tools.exceptions import ValidationError

from kaiju_images.tables import images

__all__ = ['ImageService']


[docs]class ImageService(SQLService, AbstractRPCCompatible): """ This service manages images including storage / conversion and galleries. The concept is that you have an image file linked both to a file service and to an image service. Each image file may have different versions (sizes, colors etc.) derived from image converters. Such versions are interlinked using the `original_id` attribute present in each row (except original itself). Also there's an option to create a "gallery" - an arbitrary user collection of images. It is managed by the `correlation_id` attribute. .. code-block:: python { "id": "", # image unique id "version", # version string (as in converters, for the original it's `None`) "correlation_id": "", # arbitrary correlation id to link different images in a gallery "original_id": "", # link to an original image id (for the original it's always `None`) "file_id": "", # link to a file object id in file service table "metadata": {} # additional metadata from converters / user / image service } To create an image use a default `SQLService.create` method. If you want to use a converter instead to create multiple version, you can use a provided method. .. code-block:: python images = await image_service.convert_from_file( file_id=file_id, converter_id=converter_id, metadata={'test': True}) You can access galleries by correlation id using such method: .. code-block:: python gallery = await image_service.get_gallery(correlation_id) """
[docs] class Versions(Enum): original = None
service_name = 'images' table = images file_service = FileService converter_classes = ['ImageConverter']
[docs] def __init__( self, app, database_service: DatabaseService, file_service: FileService = None, file_converter_service: FileConverterService = None, permissions=None, logger=None ): """ :param app: :param database_service: :param file_service: :param file_converter_service: :param logger: """ super().__init__(app=app, database_service=database_service, logger=logger) AbstractRPCCompatible.__init__(self, permissions=permissions) self.file_service = self.discover_service(file_service) self.converters_service = self.discover_service(file_converter_service, required=False)
@property def routes(self) -> dict: return { **super().routes, 'convert': self.convert_image, 'convert_from_file': self.convert_from_file, 'gallery': self.get_gallery, 'versions.delete': self.delete_versions, 'converters.list': self.list_converters }
[docs] async def list_converters(self, *args, conditions=None, **kws): cond = { 'cls': self.converter_classes } if not conditions: conditions = cond else: conditions.update(cond) return await self.converters_service.list_converters(conditions=conditions, *args, **kws)
async def _convert_file( self, original_id, converter_id, file_id, converter_settings, metadata, correlation_id, columns: Optional[Union[str, List[str]]] = '*', file_path=None): if self.converters_service is None: raise RuntimeError('Converter service is not available.') original, converter_data = await asyncio.gather( self.get(id=original_id, columns=columns), self.converters_service.convert( id=converter_id, file_id=file_id, file_path=file_path, settings=converter_settings, metadata=metadata) ) versions = converter_data['versions'] data = [ { 'file_id': version['file']['id'], 'original_id': original_id, 'version': version['meta']['version'], 'meta': version['meta'], 'correlation_id': correlation_id } for version in versions ] data = await self.m_create(data, columns=columns) return [original, *data]
[docs] async def convert_image( self, id: uuid.UUID, converter_id: uuid.UUID, converter_settings: dict = None, metadata: dict = None, columns: Optional[Union[str, List[str]]] = '*'): """Converts an image with a converter and saves all its versions. The image must be of original version. """ original = await self.get(id=id, columns=['file_id', 'version', 'correlation_id']) if original['version'] != self.Versions.original.value: raise ValidationError('Image version must be "%s".' % self.Versions.original.value) file_id = original['file_id'] correlation_id = original['correlation_id'] return await self._convert_file( original_id=id, file_id=file_id, converter_id=converter_id, converter_settings=converter_settings, metadata=metadata, correlation_id=correlation_id, columns=columns)
[docs] async def convert_from_file( self, file_id: uuid.UUID, converter_id: uuid.UUID, converter_settings: dict = None, metadata: dict = None, correlation_id: uuid.UUID = None, columns: Optional[Union[str, List[str]]] = '*'): """Converts file with a converter. This function will create an original version record as well. """ original = await self.create({ 'file_id': file_id, 'original_id': None, 'version': None, 'meta': metadata, 'correlation_id': correlation_id }) original_id = original['id'] if correlation_id is None: correlation_id = original_id sql = self.table.update().where( self.table.c.id == original_id ).values( correlation_id=correlation_id ) await self._wrap_update(self._db.execute(sql)) return await self._convert_file( original_id=original_id, file_id=file_id, converter_id=converter_id, converter_settings=converter_settings, metadata=metadata, correlation_id=correlation_id, columns=columns)
[docs] async def delete_versions( self, id: Union[uuid.UUID, List[uuid.UUID]], versions: Union[str, List[str]], columns: Optional[Union[str, List[str]]] = '*'): """Remove particular versions of images. :param id: list of original image ids :param versions: list of image versions (excl. "original") :param columns: columns to return """ if isinstance(id, uuid.UUID): ids = [id] else: ids = id if isinstance(versions, str): versions = [versions] if self.Versions.original.value in versions: raise ValidationError( 'Deleting image originals is prohibited.' ' Use "delete" method instead to remove the whole version tree.') sql = self.table.delete().where( sa.and_( self.table.c.original_id.in_(ids), self.table.c.version.in_(versions) ) ) if columns: columns = self._sql_get_columns(columns) sql = sql.returning(*columns) if isinstance(id, uuid.UUID): data = await self._wrap_delete(self._db.fetchrow(sql)) else: data = await self._wrap_delete(self._db.fetch(sql)) return data else: await self._wrap_delete(self._db.execute(sql))
@staticmethod def _get_correlation_id(): return uuid.uuid4()
[docs] def prepare_insert_data(self, session, data: dict): file_id = data['file_id'] original_id = data.get('original_id', None) correlation_id = data.get('correlation_id', None) version = data.get('version', None) meta = data.get('meta', None) data = self._create_image( file_id=file_id, original_id=original_id, version=version, correlation_id=correlation_id, meta=meta) return data
def _create_image( self, file_id: uuid.UUID, original_id: uuid.UUID = None, version: str = None, correlation_id: uuid.UUID = None, meta: dict = None, **_): if version is None: version = self.Versions.original.value if version != self.Versions.original.value: if not original_id: raise ValueError('Image of version "%s" must have "original_id".' % version) if correlation_id is None: correlation_id = self._get_correlation_id() data = { 'version': version, 'file_id': file_id, 'original_id': original_id, 'correlation_id': correlation_id, 'meta': meta if meta else {} } return data