Source code for neuroglancer_scripts.file_accessor

# Copyright (c) 2016, 2017, 2018 Forschungszentrum Juelich GmbH
# Author: Yann Leprince <y.leprince@fz-juelich.de>
#
# This software is made available under the MIT licence, see LICENCE.txt.

"""Access to a Neuroglancer pre-computed dataset on the local filesystem.

See the :mod:`~neuroglancer_scripts.accessor` module for a description of the
API.
"""

import gzip
import os
import pathlib

import neuroglancer_scripts.accessor
from neuroglancer_scripts.accessor import _CHUNK_PATTERN_FLAT, DataAccessError

__all__ = [
    "FileAccessor",
]


_CHUNK_PATTERN_SUBDIR = "{key}/{0}-{1}/{2}-{3}/{4}-{5}"

NO_COMPRESS_MIME_TYPES = {
    "application/json",
    "image/jpeg",
    "image/png",
}


[docs] class FileAccessor(neuroglancer_scripts.accessor.Accessor): """Access a Neuroglancer pre-computed pyramid on the local file system. :param str base_dir: path to the directory containing the pyramid :param bool flat: use a flat file layout (see :ref:`layouts`) :param bool gzip: compress chunks losslessly with gzip """ can_read = True can_write = True def __init__(self, base_dir, flat=False, gzip=True, compresslevel=9): self.base_path = pathlib.Path(base_dir) if flat: self.chunk_pattern = _CHUNK_PATTERN_FLAT else: self.chunk_pattern = _CHUNK_PATTERN_SUBDIR self.gzip = gzip self.compresslevel = compresslevel
[docs] def file_exists(self, relative_path): relative_path = pathlib.Path(relative_path) file_path = self.base_path / relative_path if ".." in file_path.relative_to(self.base_path).parts: raise ValueError("only relative paths pointing under base_path " "are accepted") try: if file_path.is_file(): return True elif file_path.with_name(file_path.name + ".gz").is_file(): return True except OSError as exc: raise DataAccessError( f"Error fetching {file_path}: {exc}") from exc return False
[docs] def fetch_file(self, relative_path): relative_path = pathlib.Path(relative_path) file_path = self.base_path / relative_path if ".." in file_path.relative_to(self.base_path).parts: raise ValueError("only relative paths pointing under base_path " "are accepted") try: if file_path.is_file(): f = file_path.open("rb") elif file_path.with_name(file_path.name + ".gz").is_file(): f = gzip.open(str(file_path.with_name(file_path.name + ".gz")), "rb") else: raise DataAccessError(f"Cannot find {relative_path} in " f"{self.base_path}") with f: return f.read() except OSError as exc: raise DataAccessError( f"Error fetching {file_path}: {exc}") from exc
[docs] def store_file(self, relative_path, buf, mime_type="application/octet-stream", overwrite=False): relative_path = pathlib.Path(relative_path) file_path = self.base_path / relative_path if ".." in file_path.relative_to(self.base_path).parts: raise ValueError("only relative paths pointing under base_path " "are accepted") mode = "wb" if overwrite else "xb" try: os.makedirs(str(file_path.parent), exist_ok=True) if self.gzip and mime_type not in NO_COMPRESS_MIME_TYPES: with gzip.open( str(file_path.with_name(file_path.name + ".gz")), mode, compresslevel=self.compresslevel) as f: f.write(buf) else: with file_path.open(mode) as f: f.write(buf) except OSError as exc: raise DataAccessError(f"Error storing {file_path}: {exc}" ) from exc
[docs] def fetch_chunk(self, key, chunk_coords): f = None try: for pattern in _CHUNK_PATTERN_FLAT, _CHUNK_PATTERN_SUBDIR: chunk_path = self._chunk_path(key, chunk_coords, pattern) if chunk_path.is_file(): f = chunk_path.open("rb") elif chunk_path.with_name(chunk_path.name + ".gz").is_file(): f = gzip.open( str(chunk_path.with_name(chunk_path.name + ".gz")), "rb" ) if f is None: raise DataAccessError( "Cannot find chunk " f"{self._flat_chunk_basename(key, chunk_coords)} in " f"{self.base_path}" ) with f: return f.read() except OSError as exc: raise DataAccessError( "Error accessing chunk " f"{self._flat_chunk_basename(key, chunk_coords)} in " f"{self.base_path}: {exc}" ) from exc
[docs] def store_chunk(self, buf, key, chunk_coords, mime_type="application/octet-stream", overwrite=True): chunk_path = self._chunk_path(key, chunk_coords) mode = "wb" if overwrite else "xb" try: os.makedirs(str(chunk_path.parent), exist_ok=True) if self.gzip and mime_type not in NO_COMPRESS_MIME_TYPES: with gzip.open( str(chunk_path.with_name(chunk_path.name + ".gz")), mode, compresslevel=self.compresslevel) as f: f.write(buf) else: with chunk_path.open(mode) as f: f.write(buf) except OSError as exc: raise DataAccessError( "Error storing chunk " f"{self._flat_chunk_basename(key, chunk_coords)} in " f"{self.base_path}: {exc}" ) from exc
def _chunk_path(self, key, chunk_coords, pattern=None): if pattern is None: pattern = self.chunk_pattern xmin, xmax, ymin, ymax, zmin, zmax = chunk_coords chunk_filename = pattern.format( xmin, xmax, ymin, ymax, zmin, zmax, key=key) return self.base_path / chunk_filename def _flat_chunk_basename(self, key, chunk_coords): xmin, xmax, ymin, ymax, zmin, zmax = chunk_coords chunk_filename = _CHUNK_PATTERN_FLAT.format( xmin, xmax, ymin, ymax, zmin, zmax, key=key) return chunk_filename