# Copyright (c) 2018 Forschungszentrum Juelich GmbH
# Author: Yann Leprince <y.leprince@fz-juelich.de>
#
# This software is made available under the MIT licence, see LICENCE.txt.
"""Encoding / decoding of Neuroglancer precomputed chunks.
The central component here is the :class:`ChunkEncoder` base class. Use
:func:`get_encoder` for instantiating a concrete encoder object.
"""
import numpy as np
__all__ = [
"get_encoder",
"add_argparse_options",
"IncompatibleEncoderError",
"InvalidInfoError",
"InvalidFormatError",
"ChunkEncoder",
"RawChunkEncoder",
"CompressedSegmentationEncoder",
"JpegChunkEncoder",
]
# TODO move to a data_type module
"""List of possible values for ``data_type``."""
NEUROGLANCER_DATA_TYPES = ("uint8", "uint16", "uint32", "uint64", "float32")
[docs]
def get_encoder(info, scale_info, encoder_options={}):
"""Create an Encoder object for the provided scale.
:param dict info: a Neuroglancer *info* dictionary (:ref:`info`) containing
general encoding parameters (``data_type`` and
``num_channels``)
:param dict scale_info: an element of (``info["scales"]``) containing
scale-specific encoding parameters
(``encoding`` and encoding-specific parameters)
:param dict encoder_options: extrinsic encoder parameters
:returns: an instance of a chunk encoder
:rtype: ChunkEncoder
:raises InvalidInfoError: if the provided *info* dict is invalid
"""
try:
data_type = info["data_type"]
num_channels = info["num_channels"]
encoding = scale_info["encoding"]
except KeyError as exc:
raise InvalidInfoError("The info dict is missing an essential key "
f"{exc}") from exc
if not isinstance(num_channels, int) or not num_channels > 0:
raise InvalidInfoError(f"Invalid value {num_channels} for "
"num_channels (must be a positive integer)")
if data_type not in NEUROGLANCER_DATA_TYPES:
raise InvalidInfoError(f"Invalid data_type {data_type} (should be one "
f"of {NEUROGLANCER_DATA_TYPES})")
try:
if encoding == "raw":
return RawChunkEncoder(data_type, num_channels)
elif encoding == "compressed_segmentation":
try:
block_size = scale_info["compressed_segmentation_block_size"]
except KeyError:
raise InvalidInfoError(
'Encoding is set to "compressed_segmentation" but '
'"compressed_segmentation_block_size" is missing')
return CompressedSegmentationEncoder(data_type, num_channels,
block_size)
elif encoding == "jpeg":
jpeg_plane = encoder_options.get("jpeg_plane", "xy")
jpeg_quality = encoder_options.get("jpeg_quality", 95)
return JpegChunkEncoder(data_type, num_channels,
jpeg_plane=jpeg_plane,
jpeg_quality=jpeg_quality)
else:
raise InvalidInfoError(f"Invalid encoding {encoding}")
except IncompatibleEncoderError as exc:
raise InvalidInfoError(str(exc)) from exc
[docs]
def add_argparse_options(parser, allow_lossy=False):
"""Add command-line options for chunk encoding.
:param parser: an instance of :class:`argparse.ArgumentParser`
:param bool allow_lossy: show parameters for lossy encodings (i.e. jpeg)
The extrinsic encoder parameters can be obtained from command-line
arguments with :func:`add_argparse_options` and passed to
:func:`get_encoder`::
import argparse
parser = argparse.ArgumentParser()
add_argparse_options(parser)
args = parser.parse_args()
get_encoder(info, scale_info, vars(args))
"""
import argparse
if allow_lossy:
def jpeg_quality(arg):
q = int(arg)
if not 1 <= q <= 100:
raise argparse.ArgumentTypeError(
"JPEG quality must be between 1 and 100")
return q
group = parser.add_argument_group("Options for JPEG compression")
group.add_argument("--jpeg-quality", type=jpeg_quality,
default=95, metavar="Q",
help="JPEG quality factor (1 is worst, 100 is "
"best, values above 95 increase file size but "
"provide hardly any extra quality)")
group.add_argument("--jpeg-plane", choices=("xy", "xz"), default="xy",
help='plane of JPEG compression (default: xy)')
[docs]
class IncompatibleEncoderError(Exception):
"""Raised when an Encoder cannot handle the requested data type."""
pass
# TODO inherit from a new DataError class?
[docs]
class InvalidInfoError(Exception):
"""Raised when an *info* dict is invalid or inconsistent."""
pass
[docs]
class ChunkEncoder:
"""Encode/decode chunks from NumPy arrays to byte buffers.
:param str data_type: data type supported by Neuroglancer
:param int num_channels: number of image channels
"""
lossy = False
"""True if this encoder is lossy."""
mime_type = "application/octet-stream"
"""MIME type of the encoded chunk."""
def __init__(self, data_type, num_channels):
self.num_channels = num_channels
self.dtype = np.dtype(data_type).newbyteorder("<")
[docs]
def encode(self, chunk):
"""Encode a chunk from a NumPy array into bytes.
:param numpy.ndarray chunk: array with four dimensions (C, Z, Y, X)
:returns: encoded chunk
:rtype: bytes
"""
raise NotImplementedError
[docs]
def decode(self, buf, chunk_size):
"""Decode a chunk from bytes into a NumPy array.
:param bytes buf: encoded chunk
:param tuple chunk_size: the 3-D size of the chunk (X, Y, Z)
:returns: chunk contained in a 4-D NumPy array (C, Z, Y, X)
:rtype: numpy.ndarray
:raises InvalidFormatError: if there the chunk data cannot be decoded
properly
"""
raise NotImplementedError
[docs]
class RawChunkEncoder(ChunkEncoder):
"""Codec for to the Neuroglancer raw chunk format.
:param str data_type: data type supported by Neuroglancer
:param int num_channels: number of image channels
"""
lossy = False
[docs]
def encode(self, chunk):
chunk = np.asarray(chunk).astype(self.dtype, casting="safe")
assert chunk.ndim == 4
assert chunk.shape[0] == self.num_channels
buf = chunk.tobytes()
return buf
[docs]
def decode(self, buf, chunk_size):
try:
return np.frombuffer(buf, dtype=self.dtype).reshape(
(self.num_channels,
chunk_size[2], chunk_size[1], chunk_size[0]))
except Exception as exc:
raise InvalidFormatError(f"Cannot decode raw-encoded chunk: {exc}"
) from exc
[docs]
class CompressedSegmentationEncoder(ChunkEncoder):
"""Codec for to the Neuroglancer precomputed chunk format.
:param str data_type: data type supported by Neuroglancer
:param int num_channels: number of image channels
:param list block_size: ``block_size`` for the compressed segmentation
compression algorithm
:raises IncompatibleEncoderError: if data_type or num_channels are
unsupported
"""
lossy = False
def __init__(self, data_type, num_channels, block_size):
if data_type not in ("uint32", "uint64"):
raise IncompatibleEncoderError(
"The compressed_segmentation encoding can only handle uint32 "
"or uint64 data_type")
super().__init__(data_type, num_channels)
self.block_size = block_size
[docs]
def encode(self, chunk):
from neuroglancer_scripts import _compressed_segmentation
chunk = np.asarray(chunk).astype(self.dtype, casting="safe")
assert chunk.ndim == 4
assert chunk.shape[0] == self.num_channels
buf = _compressed_segmentation.encode_chunk(chunk, self.block_size)
return buf
[docs]
def decode(self, buf, chunk_size):
from neuroglancer_scripts import _compressed_segmentation
chunk = np.empty(
(self.num_channels, chunk_size[2], chunk_size[1], chunk_size[0]),
dtype=self.dtype
)
_compressed_segmentation.decode_chunk_into(chunk, buf, self.block_size)
return chunk
[docs]
class JpegChunkEncoder(ChunkEncoder):
"""Codec for to the Neuroglancer raw chunk format.
:param str data_type: data type supported by Neuroglancer
:param int num_channels: number of image channels
:param int jpeg_quality: quality factor for JPEG compression
:param str jpeg_plane: plane of JPEG compression (``"xy"`` or ``"xz"``)
:raises IncompatibleEncoderError: if data_type or num_channels are
unsupported
"""
lossy = True
mime_type = "image/jpeg"
def __init__(self, data_type, num_channels,
jpeg_quality=95, jpeg_plane="xy"):
if data_type != "uint8" or num_channels not in (1, 3):
raise IncompatibleEncoderError(
"The JPEG encoding can only handle uint8 data_type with 1 or "
"3 channels")
super().__init__(data_type, num_channels)
assert 1 <= jpeg_quality <= 100
assert jpeg_plane in ("xy", "xz")
self.jpeg_quality = jpeg_quality
self.jpeg_plane = jpeg_plane
[docs]
def encode(self, chunk):
from neuroglancer_scripts import _jpeg
assert np.can_cast(chunk.dtype, self.dtype, casting="safe")
assert chunk.ndim == 4
assert chunk.shape[0] == self.num_channels
buf = _jpeg.encode_chunk(chunk, self.jpeg_quality, self.jpeg_plane)
return buf
[docs]
def decode(self, buf, chunk_size):
from neuroglancer_scripts import _jpeg
return _jpeg.decode_chunk(buf, chunk_size, self.num_channels)