Module emobject.core

Expand source code
import os
import glob

import pandas as pd
import numpy as np
import zarr
import yaml
from typing import Optional, Union
from numcodecs import Blosc, JSON
from logging import warning

from emobject.emobject import EMObject
from emobject.emimage import EMMask, EMImage
from emobject.emlayer import BaseLayer
from emobject.errors import EMObjectException
from emobject.utils import helpers
from emobject.version import __version__

from emoaccess.main import (
    run_base_object_queries,
    run_mask_queries,
    run_image_queries,
    run_segmentation_mask_queries,
    run_metadata_queries,
)
from emoaccess.queries import get_all_biomarkers_for_acquisition_id


class EMObjectConfig:
    """Object that defines the config
    for an emObject that is interacting with the
    Enable database.

    Args:
        acquisition_id: required, the acquisition ID to query
        study_id: study id, placeholder
        segmentation_version: segmentation version. required to fetch
            single-cell data and segmentation_masks. if None, single-cell
            data will not be fetched.
        biomarker_version: biomarker expression version. required to fetch
            single-cell data. if None, single-cell data will not be fetched.
        biomarkers: optional, a list or name of biomarkers
            to download. If None, gets all.
        annotations: optional, a list or name of annotations to download.
            If None, gets all.
        include_img: optional, if True, fetches the image with channels
            subsetted same as biomarkers.
        include_masks: optional, if True, fetches ROI masks.
        include_seg_masks: optional. If True, gets the segmentation mask
            (segmentation_version must be provided).
        seg_mask_type: optional. Type of segmentation mask to fetch. Can be
            'nucleus' or 'cell'.
        img_format: img_format - placeholder
        img_res: optional, factor to downsample image by
        img_to_disk: optional, if True writes the zarr store to disk,
            otherwise object held in memory.
        img_path: optional, path to write zarr store to if img_to_disk==True.
        name: optional, a name for this emObject.
        datatype: optional, describe the datatype used here.
    """

    def __init__(
        self,
        acquisition_id: Optional[str] = None,
        study_id: Optional[int] = None,
        segmentation_version: Optional[int] = None,
        biomarker_version: Optional[int] = None,
        biomarkers: Optional[Union[list, np.ndarray, str]] = None,
        annotations: Optional[Union[list, np.ndarray, str]] = None,
        include_img: Optional[bool] = False,
        include_masks: Optional[bool] = False,
        include_seg_mask: Optional[bool] = False,
        include_metadata: Optional[bool] = True,
        seg_mask_type: Optional[str] = "nucleus",
        img_format: Optional[str] = "zarr",
        img_res: Optional[int] = 0,
        img_to_disk: Optional[bool] = False,
        img_path: Optional[str] = None,
        mask_names: Optional[Union[list, np.ndarray, str]] = None,
        name: Optional[str] = None,
        datatype: Optional[str] = None,
    ):
        self.acquisition_id = acquisition_id
        self.study_id = study_id
        self.segmentation_version = segmentation_version
        self.biomarker_version = biomarker_version
        self.biomarkers = biomarkers
        self.annotations = annotations
        self.include_img = include_img
        self.include_masks = include_masks
        self.include_seg_mask = include_seg_mask
        self.include_metadata = include_metadata
        self.seg_mask_type = seg_mask_type
        self.img_format = img_format
        self.img_res = img_res
        self.img_to_disk = img_to_disk
        self.img_path = img_path
        self.mask_names = mask_names
        self.name = name
        self.datatype = datatype
        self.masks = None
        self.img = None

        self._validate_config()

    def _validate_config(self):
        """Validates the arguments of
        an EMConfig to ensure user has provided
        logical args."""

        # acquisition ids
        assert type(self.acquisition_id) == str or type(self.acquisition_id) == np.str_

        # study is
        if self.study_id is not None:
            assert type(self.study_id) == int
            assert self.study_id > 0

        # seg version
        if self.segmentation_version is not None:
            assert type(self.segmentation_version) == int
            assert self.segmentation_version > 0

        # biomarker_versions
        if self.biomarker_version is not None:
            assert type(self.biomarker_version) == int
            assert self.biomarker_version > 0

        # check list formatting
        if self.biomarkers is not None:
            if type(self.biomarkers) == str:
                self.biomarkers = [self.biomarkers]

        if self.annotations is not None:
            if type(self.annotations) == str:
                self.annotations = [self.annotations]

        if self.include_masks and type(self.mask_names) == str:
            self.mask_names == [self.mask_names]

        if self.include_seg_mask:
            if self.segmentation_version is None:
                raise EMObjectException(
                    "include_seg_mask is True, but segmentation_version is not provided."
                )
            if self.seg_mask_type not in ["nucleus", "cell"]:
                raise EMObjectException(
                    f"Specified segmentation mask type {self.seg_mask_type} unknown."
                )

        # Avoid the issue where user specifies an image but forces an empty biomarker list
        if self.include_img and (
            self.biomarkers == [] or self.biomarkers == np.array([])
        ):
            warning(
                "include_img is True, but no channels/biomarkers. Setting include_img to False."
            )
            self.include_img = False

        # Avoid unintended exclusion of single-cell data (warn user if only image will be pulled)
        if self.segmentation_version is None or self.biomarker_version is None:
            if self.include_img:
                warning(
                    "segmentation_version and biomarker_version must both be provided "
                    + "to fetch single-cell data. At least one of these was not specified "
                    + "so only image will be fetched."
                )
            else:
                # In this case, the images and the single-cell data cannot be pulled.
                raise EMObjectException(
                    "Invalid config: neither image nor single-cell data can be fetched. "
                    + "To fetch image, set include_img to True. To fetch single-cell data, "
                    + "provide both segmentation_version and biomarker_version. "
                    + "You may also do both."
                )


def add_layer_from_enable_db(
    E: EMObject = None, config: EMObjectConfig = None
) -> EMObject:
    """Builds a layer from the Enable database and adds it to an existing EMObject.

    Args:
        E: EMObject
        config: EMObjectConfig

    Returns:
        EMObject with added new BaseLayer
    """

    bm_df, anno_df, coord_df = build_base_object_from_enable_db(
        acquisition_id=config.acquisition_id,
        biomarkers=config.biomarkers,
        annotations=config.annotations,
        segmentation_version=config.segmentation_version,
        biomarker_version=config.biomarker_version,
        config_mode=True,
    )

    if config.include_img:
        config.img = build_emimage_from_enable_db(
            acquisition_id=config.acquisition_id,
            study_id=config.study_id,
            channels=config.biomarkers,
            to_disk=config.img_to_disk,
            resolution=config.img_res,
        )
    segmentation = None

    if config.include_masks or config.include_seg_mask:
        if config.include_seg_mask:
            segmentation = "segmentation_mask"

        config.masks = build_emmask_from_enable_db(
            acquisition_id=config.acquisition_id,
            study_id=config.study_id,
            include_masks=config.include_masks,
            include_seg_masks=config.include_seg_mask,
            seg_mask_type=config.seg_mask_type,
            segmentation_version=config.segmentation_version,
            biomarker_version=config.biomarker_version,
        )
    if segmentation in E.mask.mask_names:
        segmentation = f"{segmentation}_1"

    # Need to add all the masks to the object's emmask
    if config.masks is not None:
        new_emmask = helpers.merge_emmasks(E.mask, config.masks)
        E.mask = new_emmask

    new_layer = BaseLayer(
        data=bm_df,
        obs=anno_df,
        var=None,
        pos=coord_df,
        name=config.acquisition_id,
        segmentation=segmentation,
    )

    E.add(new_layer)
    return E


def build_emobject(config: EMObjectConfig) -> EMObject:
    """Builds a complete EMObject from the Enable database.

    Args:
        config: EMObjectConfig

    Returns:
        EMObject
    """

    if config.segmentation_version is not None and config.biomarker_version is not None:
        bm_df, anno_df, coord_df = build_base_object_from_enable_db(
            acquisition_id=config.acquisition_id,
            biomarkers=config.biomarkers,
            annotations=config.annotations,
            segmentation_version=config.segmentation_version,
            biomarker_version=config.biomarker_version,
            config_mode=True,
        )
    else:
        bm_df = None
        anno_df = None
        coord_df = None

    if config.include_img:
        config.img = build_emimage_from_enable_db(
            acquisition_id=config.acquisition_id,
            study_id=config.study_id,
            channels=config.biomarkers,
            to_disk=config.img_to_disk,
            resolution=config.img_res,
        )
    segmentation = None

    if config.include_masks or config.include_seg_mask:
        if config.include_seg_mask:
            segmentation = "segmentation_mask"

        config.masks = build_emmask_from_enable_db(
            acquisition_id=config.acquisition_id,
            study_id=config.study_id,
            include_masks=config.include_masks,
            include_seg_masks=config.include_seg_mask,
            seg_mask_type=config.seg_mask_type,
            segmentation_version=config.segmentation_version,
            biomarker_version=config.biomarker_version,
        )

    if config.include_metadata:
        meta = run_metadata_queries(config.acquisition_id)
    else:
        meta = None

    return EMObject(
        data=bm_df,
        obs=anno_df,
        var=None,
        pos=coord_df,
        mask=config.masks,
        img=config.img,
        meta=meta,
        name=config.acquisition_id,
        segmentation=segmentation,
    )


def build_base_object_from_enable_db(
    acquisition_id: str = None,
    biomarkers: Optional[list] = None,
    annotations: Optional[list] = None,
    segmentation_version: int = None,
    biomarker_version: int = None,
    config_mode: Optional[bool] = False,
) -> EMObject:
    """Builds an EMObject from the Enable Database via emoaccess.

    This is a separate function to allow for future open-sourcing

    Args:
        acquisition_id: acquistion ID for the region.
        biomarkers: A subset of biomarkers to include. If None, returns all.
        annotations: A subset of annotations to include.
        segmentation_version: Segmentation version.
        biomarker_version: Biomarker expression version.
        config_mode: If False, returns an EMObject.
            If True, returns pd.DataFrames corresponding to the data, obs, and pos attributes.
    """

    bm_df, anno_df, coord_df = run_base_object_queries(
        acquisition_id=acquisition_id,
        biomarkers=biomarkers,
        annotations=annotations,
        segmentation_version=segmentation_version,
        biomarker_version=biomarker_version,
    )

    if config_mode:
        return bm_df, anno_df, coord_df

    else:
        return EMObject(
            data=bm_df,
            obs=anno_df,
            var=None,
            pos=coord_df,
            mask=None,
            img=None,
            meta=None,
            name=acquisition_id,
        )


def build_emmask_from_enable_db(
    acquisition_id: Optional[str] = None,
    study_id: Optional[int] = None,
    include_masks: Optional[bool] = None,
    include_seg_masks: Optional[bool] = None,
    seg_mask_type: Optional[str] = None,
    segmentation_version: Optional[int] = None,
    biomarker_version: Optional[int] = None,
) -> EMMask:
    """Builds a EMMask object from the Enable database
    using the acquisition ID to obtain ROI masks.

    This needs to follow these steps:
        1. Obtain all masks associated with this acquisition
        2. If segmentation masks are included, adjust segment IDs to avoid collision with cell IDs
        3. Stack the masks into an object that can be passed to EMMask

    Returns:
        EMMask
    """

    # Need to get the data for each story this is going to be VERY slow
    acquisition_masks = []
    mask_names = []

    if include_masks:
        mask_dict = run_mask_queries(acquisition_id=acquisition_id)

        # TO DO: remove this logic in the future, didn't want to change to test new SQL
        for key in mask_dict.keys():
            acquisition_masks.append(mask_dict[key])
            mask_names.append(key)

    # Get the unique segment IDs (exclusive of cell segments)
    t = np.array(acquisition_masks)
    unique_seg_ids = np.sort(np.unique(t))
    del t

    if include_seg_masks:
        seg_mask = run_segmentation_mask_queries(
            acquisition_id=acquisition_id,
            study_id=study_id,
            seg_mask_type=seg_mask_type,
            segmentation_version=segmentation_version,
            biomarker_version=biomarker_version,
        )

        acquisition_masks.append(seg_mask)
        mask_names.append("segmentation_mask")
        acquisition_masks = np.array(acquisition_masks)

        if include_masks:
            # To avoid collisions, make new seg IDs for ROIs, sequentially following the
            # max cell ID.
            max_cell_segment_id = np.max(seg_mask)
            new_seg_ids = [max_cell_segment_id + i for i in range(1, len(unique_seg_ids) + 1)]
            assert len(unique_seg_ids) == len(new_seg_ids)

            # Now replace old IDs with the new one
            # TO DO: revisit to see if there's a quicker way here
            for j in range(
                1, len(new_seg_ids)
            ):  # we skip 0 because that's a special background value.
                old_id = unique_seg_ids[j]
                new_id = new_seg_ids[j]
                acquisition_masks[acquisition_masks == old_id] = new_id
    else:
        acquisition_masks = np.array(acquisition_masks)

    if acquisition_masks.size == 0:
        # this case should be relatively rare, but it is possible
        # we would hit this if run_mask_queries returned an empty dict and include_seg_masks is False
        return None
    else:
        return EMMask(
            masks=np.stack(acquisition_masks, axis=0), mask_idx=np.array(mask_names)
        )


def build_emimage_from_enable_db(
    acquisition_id: Optional[str] = None,
    study_id: Optional[int] = None,
    channels: Optional[Union[list, np.ndarray]] = None,
    to_disk: Optional[bool] = False,
    resolution: Optional[int] = 0,
) -> EMObject:
    """Builds an EMImage object from an acquisition_id.

    Args:
        acquisition_id (str): desired acquisition
        study_id (int): study, placeholder for future functionality
        channels (Union[list, np.ndarray]]): the channels to include, if None includes all.

    Returns:
        image (EMImage)
    """

    if channels is None:
        channels = get_all_biomarkers_for_acquisition_id(acquisition_id=acquisition_id)

    channel_images = run_image_queries(
        acquisition_id=acquisition_id, channels=channels, resolution=resolution
    )

    return EMImage(img=channel_images, channels=channels, to_disk=to_disk)


def save(
    E: EMObject = None, out_dir: Optional[str] = None, name: Optional[str] = None
) -> None:
    """Write an EMObject to disk.

    Args:
        E (EMObject): EMObject to write to disk
        out_dir (str): Path to write file. If None, writes to current directory.
        name (str): Name for the Zarr archive. If None, accesses E.name.

    Returns:
        None
    """

    # Check the outdir
    if not os.path.exists(out_dir):
        os.mkdir(out_dir)

    # Create an empty zarr store
    if name is None:
        name = f"{E.name}.zarr"
    else:
        name = f"{name}.zarr"

    zarr_path = os.path.join(out_dir, name)
    store = zarr.DirectoryStore(zarr_path, normalize_keys=True)
    zarr_root = zarr.group(store=store, overwrite=True)
    compressor = Blosc(cname="zstd", clevel=3, shuffle=2)

    # Establish the top level
    img = None
    mask = None
    core = zarr_root.create_group(name="core")

    # Build image group
    # Add a new dataset for each image.

    if E.img is not None:
        img = zarr_root.create_group(name="img")
        _ = img.create_dataset(
            name="CODEX", data=E.img.img, compressor=compressor
        )  # noqa
        _ = img.create_dataset(
            name="CODEX_ax",
            data=E.img.channels,
            compressor=compressor,
            dtype=object,
            object_codec=JSON(),
        )

    # Build mask group
    if E.mask is not None:
        mask = zarr_root.create_group(name="mask")
        if E.mask._style == "tensor":
            _ = mask.create_dataset(
                name="masktensor", data=E.mask.mask, compressor=compressor
            )
            _ = mask.create_dataset(name="maskix", data=E.mask.mask_names)
        else:
            _ = mask.create_dataset(
                name="maskix", data=E.mask.mask_names, dtype=object, object_codec=JSON()
            )
            _ = mask.create_dataset(
                name="maskdims", data=E.mask.dims, dtype=object, object_codec=JSON()
            )
            _ = mask.create_dataset(
                name="maskpos", data=np.array(list(E.mask.pos.values()))
            )
            for name in E.mask.mask.keys():
                _ = mask.create_dataset(
                    name=name, data=E.mask.mask[name], compressor=compressor
                )

    if E._meta is not None:
        _ = core.create_dataset(
            name="meta",
            data=E.meta.to_numpy(),
            compressor=compressor,
            dtype=object,
            object_codec=JSON(),
        )

    # Build core data group
    # add layers as hierarchy:
    for layer in E.layers:
        layer_group = core.create_group(layer)

        E.set_layer(layer)
        if E._layerdict[layer].data is not None:
            _ = layer_group.create_dataset(
                name="data",
                data=E._layerdict[layer].data,
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )

        if E._layerdict[layer]._obs is not None:
            _ = layer_group.create_dataset(
                name="obs",
                data=E._layerdict[layer]._obs.to_numpy(),
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )

            _ = layer_group.create_dataset(
                name="obs_y",
                data=E._layerdict[layer]._obs.columns.to_numpy(),
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )

        # Note, in version 0.6.0, all of this now is layer-specific.
        if E._layerdict[layer]._var is not None:
            _ = layer_group.create_dataset(
                name="var",
                data=E._layerdict[layer]._var.to_numpy(),
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )
            _ = layer_group.create_dataset(
                name="var_y",
                data=E._layerdict[layer]._var.columns.to_numpy(),
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )

        if E._layerdict[layer]._sobs is not None:
            _ = layer_group.create_dataset(
                name="sobs",
                data=E._layerdict[layer]._sobs.to_numpy(),
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )

            _ = layer_group.create_dataset(
                name="sobs_y",
                data=E._layerdict[layer]._sobs.columns.to_numpy(),
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )

        if E._layerdict[layer]._pos is not None:
            pos = layer_group.create_group("pos")
            _ = E.pos  # in case it wasn't ever made you need to call this.

            for coord_sys in list(E._layerdict[layer]._pos.keys()):
                pos.create_dataset(
                    name=coord_sys,
                    data=E._layerdict[layer]._pos[coord_sys],
                    compressor=compressor,
                )
                pos.create_dataset(
                    name=f"{coord_sys}_y",
                    data=np.array(E._layerdict[layer]._pos[coord_sys].columns),
                    compressor=compressor,
                    dtype=object,
                    object_codec=JSON(),
                )

        # add the index information
        ix = layer_group.create_group("ix")
        ix.create_dataset(name="obsax", data=E._layerdict[layer]._obs_ax.to_numpy())
        ix.create_dataset(
            name="varax",
            data=E._layerdict[layer]._var_ax.to_numpy(),
            dtype=object,
            object_codec=JSON(),
        )

        if E._sobs_ax is not None:
            ix.create_dataset(
                name="sobsax", data=E._layerdict[layer]._obs_ax.to_numpy()
            )

    # write a file with overall object state
    # for each layer, write the assay, scalefactor, spot_size
    # for entire object, write the emobject version, and default layer

    state_dict = {"version": __version__, "defaultlayer": E._defaultlayer, "layers": {}}

    for layer in E.layers:
        state_dict["layers"][layer] = {
            "assay": E._layerdict[layer]._assay,
            "scale_factor": E._layerdict[layer]._scale_factor,
            "spot_size": E._layerdict[layer]._spot_size,
        }

    with open(os.path.join(zarr_path, "state.yml"), "w+") as f:
        yaml.dump(state_dict, f)


def load(path: str = None) -> EMObject:
    """Read an EMObject from disk

    Args:
        path (str): Path to EMObject Zarr store.

    Returns:
        EMObject
    """

    # Placeholders
    emimg_to_add = [None]
    mask = None

    # Check that the path is valid
    if not os.path.exists(path):
        raise FileNotFoundError(f"Path {path} does not exist.")

    # Check if this is before/after version 0.6.0
    if not os.path.exists(os.path.join(path, "state.yml")):
        E = __load_deprecated(path)
        return E

    if os.path.exists(os.path.join(path, "core")):
        core_level_elements = ["seg", "meta", "pos"]
        obj_layers = glob.glob(os.path.join(path, "core", "*"))
        obj_layers = [s.split("/")[-1] for s in obj_layers]
        obj_layers = [s for s in obj_layers if s not in core_level_elements]

        # load the first layer
        # it may not exist as a path if the object only has images and/or metadata
        if obj_layers != []:
            layer_name = obj_layers[0]
            layer_path = os.path.join(path, "core", layer_name)
            layer = __load_layer(layer_path)
        else:
            layer_name = None

    # Load img
    img_group_path = os.path.join(path, "img")
    if os.path.exists(img_group_path):
        emimg_to_add = __load_img(img_group_path)

    # Load mask
    mask_group_path = os.path.join(path, "mask")
    if os.path.exists(mask_group_path):
        mask = __load_mask(mask_group_path)

    # Now it's time to build the EMObject

    # Load the meta attributes
    meta = None
    meta_path = os.path.join(path, "core", "meta")
    if os.path.exists(meta_path):
        meta = zarr.convenience.load(meta_path)
        if meta.shape[1] == 2:
            col_names = ["FEATURE_NAME", "FEATURE_VALUE"]
        else:
            col_names = None
        meta = pd.DataFrame(meta, columns=col_names)

    object_name = os.path.basename(path).split(".")[0]

    E = EMObject(
        data=None,
        obs=None,
        var=None,
        sobs=None,
        meta=meta,
        pos=None,
        mask=mask,
        img=emimg_to_add[0],
        first_layer_name=layer_name,
        name=object_name,
    )

    if layer_name:
        # Add the first layer
        E.add(layer)

        # Load the rest of the layers
        for layer_name in obj_layers[1:]:
            layer_path = os.path.join(path, "core", layer_name)
            layer = __load_layer(layer_path)
            E.add(layer)

    # Load the state
    with open(os.path.join(path, "state.yml"), "r") as stream:
        state_dict = yaml.load(stream, Loader=yaml.SafeLoader)
        E._defaultlayer = state_dict["defaultlayer"].lower()
        for layer in E.layers:
            matching_key = __find_matching_key(state_dict["layers"], layer)
            if matching_key:
                E._layerdict[layer]._assay = state_dict["layers"][matching_key]["assay"]
                E._layerdict[layer]._scale_factor = state_dict["layers"][matching_key][
                    "scale_factor"
                ]
                E._layerdict[layer]._spot_size = state_dict["layers"][matching_key][
                    "spot_size"
                ]

    E.set_layer()
    return E


def __find_matching_key(dictionary, key):
    for dict_key in dictionary:
        if dict_key.lower() == key.lower():
            return dict_key
    return None


def __load_pos(pos_path: str = None, obs_ix=None) -> dict:
    """Load the position dictionary from disk

    Args:
        path (str): Path to EMObject Zarr store.

    Returns:
        dict
    """
    if os.path.exists(pos_path):
        #  get position info
        coord_sys_to_add = glob.glob(os.path.join(pos_path, "*"))
        # coord_sys_to_add = [c.split('/')[-1].split('_')[0] for c in coord_sys_to_add]
        coord_sys_to_add = [c.split("/")[-1] for c in coord_sys_to_add]
        coord_sys_to_add = [c for c in coord_sys_to_add if not c.endswith("_y")]
        coord_sys_to_add = set(coord_sys_to_add)
        posdict = dict()
        for coord_sys in coord_sys_to_add:
            data = zarr.convenience.load(os.path.join(pos_path, coord_sys))
            cols = zarr.convenience.load(os.path.join(pos_path, f"{coord_sys}_y"))
            posdict[coord_sys] = pd.DataFrame(data=data, columns=cols, index=obs_ix)

        return posdict
    else:
        raise EMObjectException("No position data found.")


def __load_layer(layer_path: str = None) -> BaseLayer:
    """
    Helper function to load a layer from disk

    Args:
        layer_path (str): Path to layer group in the Zarr store.
    """

    # Load the data
    data_path = os.path.join(layer_path, "data")
    if os.path.exists(data_path):
        data = zarr.convenience.load(data_path)
    else:
        data = None

    # Load the var
    var_path = os.path.join(layer_path, "var")
    if os.path.exists(var_path):
        var = zarr.convenience.load(var_path)
    else:
        var = None

    var_y_path = os.path.join(layer_path, "var_y")
    if os.path.exists(var_y_path):
        var_y = zarr.convenience.load(var_y_path)
    else:
        var_y = None

    # Load the obs
    obs_path = os.path.join(layer_path, "obs")
    if os.path.exists(obs_path):
        obs = zarr.convenience.load(obs_path)
    else:
        obs = None

    obs_y_path = os.path.join(layer_path, "obs_y")
    if os.path.exists(obs_y_path):
        obs_y = zarr.convenience.load(obs_y_path)
    else:
        obs_y = None

    # Load the sobs
    sobs_path = os.path.join(layer_path, "sobs")
    if os.path.exists(sobs_path):
        sobs = zarr.convenience.load(sobs_path)
    else:
        sobs = None

    sobs_y_path = os.path.join(layer_path, "sobs_y")
    if os.path.exists(sobs_y_path):
        sobs_y = zarr.convenience.load(sobs_y_path)
    else:
        sobs_y = None

    # Load the ix
    obs_ax_path = os.path.join(layer_path, "ix", "obsax")
    if os.path.exists(obs_ax_path):
        obs_ax = zarr.convenience.load(obs_ax_path)
    else:
        obs_ax = None

    var_ax_path = os.path.join(layer_path, "ix", "varax")
    if os.path.exists(var_ax_path):
        var_ax = zarr.convenience.load(var_ax_path)
    else:
        var_ax = None

    sobs_ax_path = os.path.join(layer_path, "ix", "sobsax")
    if os.path.exists(sobs_ax_path):
        sobs_ax = zarr.convenience.load(sobs_ax_path)
    else:
        sobs_ax = None

    # Load the pos
    pos_path = os.path.join(layer_path, "pos")
    if os.path.exists(pos_path):
        pos = __load_pos(pos_path, obs_ax)
    else:
        pos = None

    # parse the layer name
    layer_name = layer_path.split("/")[-1]

    # Construct the data frames
    data = pd.DataFrame(data=data, columns=var_ax, index=obs_ax)

    if obs is not None:
        obs = pd.DataFrame(data=obs, columns=obs_y, index=obs_ax)
    if var is not None:
        var = pd.DataFrame(data=var, columns=var_y, index=var_ax)
    if sobs is not None:
        sobs = pd.DataFrame(data=sobs, columns=sobs_y, index=sobs_ax)

    return BaseLayer(data=data, var=var, obs=obs, sobs=sobs, pos=pos, name=layer_name)


def __load_img(path: str = None) -> list:
    """
    Helper function to load an image from disk

    Args:
        path (str): Path to image group in the Zarr store.
    """
    if os.path.exists(os.path.join(path)):
        # we have images to load
        imgs_to_load = glob.glob(os.path.join(path, "*"))
        imgs_to_load = [i.split("/")[-1].split("_")[0] for i in imgs_to_load]
        imgs_to_load = set(imgs_to_load)
        # this is written like this to future-proof for holding multiple image types.
        emimg_to_add = list()
        for img in imgs_to_load:
            if img == "codex":
                img_path = os.path.join(path, img)
                img_tensor = zarr.convenience.load(img_path)
                img_ix = zarr.convenience.load(os.path.join(path, f"{img}_ax"))
                emimg = EMImage(img=img_tensor, channels=img_ix, img_name=img)
            emimg_to_add.append(emimg)

        # Remove this assertion when multiple image types are supported.
        if not len(emimg_to_add) == 1:
            raise EMObjectException(
                "Attempted to add multiple images to single EMObject. This is not yet implemented."
            )

    else:
        emimg_to_add = [None]

    return emimg_to_add


def __load_mask(path: str = None) -> EMMask:
    """
    Helper function to load a mask from disk

    Args:
        path (str): Path to mask group in the Zarr store.
    """
    if os.path.exists(path):
        if os.path.exists(os.path.join(path, "masktensor")):
            mask = zarr.convenience.load(os.path.join(path, "masktensor"))
            labels = zarr.convenience.load(os.path.join(path, "maskix"))
            return EMMask(mask, mask_idx=labels)
        else:
            # Open a Zarr group
            group = zarr.open_group(path, mode="r")
            tree = zarr.tree(group)
            paths = tree.keys()
            mask = {}
            mask_idx = zarr.convenience.load(os.path.join(path, "maskix"))
            mask_pos = zarr.convenience.load(os.path.join(path, "mask_pos"))
            dims = zarr.convenience.load(os.path.join(path, "maskdims"))
            pos = {key: mask_pos[i] for i, key in enumerate(mask_idx)}
            for p in paths:
                mask[p] = zarr.convenience.load(os.path.join(path, p))

            return EMMask(mask, mask_idx=mask_idx, dims=dims, pos=pos, to_disk=True)
    else:
        return None


def __load_deprecated(path: str = None) -> EMObject:
    """Read an EMObject from disk

    Args:
        path (str): Path to EMObject Zarr store.

    Returns:
        EMObject
    """

    # Placeholders
    emimg_to_add = [None]
    mask = None
    sobs = None
    # pos = None
    sobs = None
    meta = None

    if os.path.exists(os.path.join(path, "ix")):
        obs_ix = zarr.convenience.load(os.path.join(path, "ix", "obsax"))
        var_ax = zarr.convenience.load(os.path.join(path, "ix", "varax"))
        if os.path.exists(os.path.join(path, "ix", "sobsax")):
            sobs_ax = zarr.convenience.load(os.path.join(path, "ix", "sobsax"))
        else:
            sobs_ax = None

    if os.path.exists(os.path.join(path, "core")):
        core_level_elements = ["seg", "meta", "pos"]
        obj_layers = glob.glob(os.path.join(path, "core", "*"))
        obj_layers = [s.split("/")[-1] for s in obj_layers]
        obj_layers = [s for s in obj_layers if s not in core_level_elements]

        if os.path.exists(os.path.join(path, "core", "meta")):
            meta = zarr.convenience.load(os.path.join(path, "core", "meta"))
        else:
            meta = None

        if os.path.exists(os.path.join(path, "core", "pos")):
            pos_path = os.path.join(path, "core", "pos")
            # get position info
            coord_sys_to_add = glob.glob(os.path.join(pos_path, "*"))
            coord_sys_to_add = np.array(
                [c.split("/")[-1].split("_")[0] for c in coord_sys_to_add]
            )
            coord_sys_to_add = np.unique(coord_sys_to_add)

            _posdict = dict()
            for coord_sys in coord_sys_to_add:
                data = zarr.convenience.load(os.path.join(pos_path, coord_sys))
                cols = zarr.convenience.load(os.path.join(pos_path, f"{coord_sys}_y"))
                _posdict[coord_sys] = pd.DataFrame(
                    data=data, columns=cols, index=obs_ix
                )

        if "raw" in obj_layers:
            layer_path = os.path.join(path, "core", "raw")
            data = zarr.convenience.load(os.path.join(layer_path, "data"))

            if os.path.exists(os.path.join(layer_path, "var")):
                var = zarr.convenience.load(os.path.join(layer_path, "var"))
            else:
                var = None
            if os.path.exists(os.path.join(layer_path, "var_y")):
                var_y = zarr.convenience.load(os.path.join(layer_path, "var_y"))
            else:
                var_y = None
            if os.path.exists(os.path.join(layer_path, "obs")):
                obs = zarr.convenience.load(os.path.join(layer_path, "obs"))
            else:
                obs = None
            if os.path.exists(os.path.join(layer_path, "obs_y")):
                obs_y = zarr.convenience.load(os.path.join(layer_path, "obs_y"))
            else:
                obs_y = None
            if os.path.exists(os.path.join(layer_path, "sobs")):
                sobs = zarr.convenience.load(os.path.join(layer_path, "sobs"))
            else:
                sobs = None
            if os.path.exists(os.path.join(layer_path, "sobs_y")):
                sobs_y = zarr.convenience.load(os.path.join(layer_path, "sobs_y"))
            else:
                sobs_y = None

    else:
        raise EMObjectException("Not a valid EMObject store.")

    # Now figure out masks
    if os.path.exists(os.path.join(path, "mask")):
        # we have masks to load
        mask = zarr.convenience.load(os.path.join(path, "mask", "masktensor"))
        labels = zarr.convenience.load(os.path.join(path, "mask", "maskix"))
        assert mask.shape[0] == labels.shape[0]
        mask = EMMask(mask, mask_idx=labels)

    if os.path.exists(os.path.join(path, "img")):
        # we have images to load
        imgs_to_load = glob.glob(os.path.join(path, "img", "*"))
        imgs_to_load = np.array([i.split("/")[-1].split("_")[0] for i in imgs_to_load])
        imgs_to_load = np.unique(imgs_to_load)

        # this is written like this to future-proof for holding multiple image types.
        emimg_to_add = list()
        for img in imgs_to_load:
            if img == "codex":
                img_path = os.path.join(path, "img", img)
                img_tensor = zarr.convenience.load(img_path)
                img_ix = zarr.convenience.load(os.path.join(path, "img", f"{img}_ax"))
                emimg = EMImage(img=img_tensor, channels=img_ix, img_name=img)
            emimg_to_add.append(emimg)

        # Remove this assertion when multiple image types are supported.
        if not len(emimg_to_add) == 1:
            raise EMObjectException(
                "Attempted to add multiple images to single EMObject. This is not yet implemented."
            )

    # Build the initial EMObject
    if obs is not None:
        obs = pd.DataFrame(data=obs, columns=obs_y, index=obs_ix)
    if var is not None:
        var = pd.DataFrame(data=var, columns=var_y, index=var_ax)
    if sobs is not None:
        sobs = pd.DataFrame(data=sobs, columns=sobs_y, index=sobs_ax)

    name = path.split("/")[-1].split(".")[0]
    E = EMObject(
        data=pd.DataFrame(data=data, columns=var_ax, index=obs_ix),
        obs=obs,
        var=var,
        pos=_posdict,
        sobs=sobs,
        mask=mask,
        img=emimg_to_add[0],  # this needs to change when multiple images are supported
        meta=meta,
        name=name,
        is_view=False,
    )
    #  E._pos = _posdict

    # Free memory
    del _posdict
    del emimg_to_add
    del mask
    del var
    del sobs
    del obs
    del data

    # Now need to add the rest of the layers.
    for layer in obj_layers:
        if layer != "raw":
            layer_path = os.path.join(path, "core", layer)
            data = zarr.convenience.load(os.path.join(layer_path, "data"))

            if os.path.exists(os.path.join(layer_path, "var")):
                var = zarr.convenience.load(os.path.join(layer_path, "var"))
            else:
                var = None
            if os.path.exists(os.path.join(layer_path, "var_y")):
                var_y = zarr.convenience.load(os.path.join(layer_path, "var_y"))
            else:
                var_y = None
            if os.path.exists(os.path.join(layer_path, "obs")):
                obs = zarr.convenience.load(os.path.join(layer_path, "obs"))
            else:
                obs = None
            if os.path.exists(os.path.join(layer_path, "obs_y")):
                obs_y = zarr.convenience.load(os.path.join(layer_path, "obs_y"))
            else:
                obs_y = None
            if os.path.exists(os.path.join(layer_path, "sobs")):
                sobs = zarr.convenience.load(os.path.join(layer_path, "sobs"))
            else:
                sobs = None
            if os.path.exists(os.path.join(layer_path, "sobs_y")):
                sobs_y = zarr.convenience.load(os.path.join(layer_path, "sobs_y"))
            else:
                sobs_y = None

            data = pd.DataFrame(data=data, columns=var_ax, index=obs_ix)

            if obs is not None:
                obs = pd.DataFrame(data=obs, columns=obs_y, index=obs_ix)
            if var is not None:
                var = pd.DataFrame(data=var, columns=var_y, index=var_ax)
            if sobs is not None:
                sobs = pd.DataFrame(data=sobs, columns=sobs_y, index=sobs_ax)

            E.add(BaseLayer(data=data, var=var, obs=obs, sobs=sobs, name=layer))
    return E

Functions

def add_layer_from_enable_db(E: EMObject = None, config: EMObjectConfig = None) ‑> EMObject

Builds a layer from the Enable database and adds it to an existing EMObject.

Args

E
EMObject
config
EMObjectConfig

Returns

EMObject with added new BaseLayer

Expand source code
def add_layer_from_enable_db(
    E: EMObject = None, config: EMObjectConfig = None
) -> EMObject:
    """Builds a layer from the Enable database and adds it to an existing EMObject.

    Args:
        E: EMObject
        config: EMObjectConfig

    Returns:
        EMObject with added new BaseLayer
    """

    bm_df, anno_df, coord_df = build_base_object_from_enable_db(
        acquisition_id=config.acquisition_id,
        biomarkers=config.biomarkers,
        annotations=config.annotations,
        segmentation_version=config.segmentation_version,
        biomarker_version=config.biomarker_version,
        config_mode=True,
    )

    if config.include_img:
        config.img = build_emimage_from_enable_db(
            acquisition_id=config.acquisition_id,
            study_id=config.study_id,
            channels=config.biomarkers,
            to_disk=config.img_to_disk,
            resolution=config.img_res,
        )
    segmentation = None

    if config.include_masks or config.include_seg_mask:
        if config.include_seg_mask:
            segmentation = "segmentation_mask"

        config.masks = build_emmask_from_enable_db(
            acquisition_id=config.acquisition_id,
            study_id=config.study_id,
            include_masks=config.include_masks,
            include_seg_masks=config.include_seg_mask,
            seg_mask_type=config.seg_mask_type,
            segmentation_version=config.segmentation_version,
            biomarker_version=config.biomarker_version,
        )
    if segmentation in E.mask.mask_names:
        segmentation = f"{segmentation}_1"

    # Need to add all the masks to the object's emmask
    if config.masks is not None:
        new_emmask = helpers.merge_emmasks(E.mask, config.masks)
        E.mask = new_emmask

    new_layer = BaseLayer(
        data=bm_df,
        obs=anno_df,
        var=None,
        pos=coord_df,
        name=config.acquisition_id,
        segmentation=segmentation,
    )

    E.add(new_layer)
    return E
def build_base_object_from_enable_db(acquisition_id: str = None, biomarkers: Optional[list] = None, annotations: Optional[list] = None, segmentation_version: int = None, biomarker_version: int = None, config_mode: Optional[bool] = False) ‑> EMObject

Builds an EMObject from the Enable Database via emoaccess.

This is a separate function to allow for future open-sourcing

Args

acquisition_id
acquistion ID for the region.
biomarkers
A subset of biomarkers to include. If None, returns all.
annotations
A subset of annotations to include.
segmentation_version
Segmentation version.
biomarker_version
Biomarker expression version.
config_mode
If False, returns an EMObject. If True, returns pd.DataFrames corresponding to the data, obs, and pos attributes.
Expand source code
def build_base_object_from_enable_db(
    acquisition_id: str = None,
    biomarkers: Optional[list] = None,
    annotations: Optional[list] = None,
    segmentation_version: int = None,
    biomarker_version: int = None,
    config_mode: Optional[bool] = False,
) -> EMObject:
    """Builds an EMObject from the Enable Database via emoaccess.

    This is a separate function to allow for future open-sourcing

    Args:
        acquisition_id: acquistion ID for the region.
        biomarkers: A subset of biomarkers to include. If None, returns all.
        annotations: A subset of annotations to include.
        segmentation_version: Segmentation version.
        biomarker_version: Biomarker expression version.
        config_mode: If False, returns an EMObject.
            If True, returns pd.DataFrames corresponding to the data, obs, and pos attributes.
    """

    bm_df, anno_df, coord_df = run_base_object_queries(
        acquisition_id=acquisition_id,
        biomarkers=biomarkers,
        annotations=annotations,
        segmentation_version=segmentation_version,
        biomarker_version=biomarker_version,
    )

    if config_mode:
        return bm_df, anno_df, coord_df

    else:
        return EMObject(
            data=bm_df,
            obs=anno_df,
            var=None,
            pos=coord_df,
            mask=None,
            img=None,
            meta=None,
            name=acquisition_id,
        )
def build_emimage_from_enable_db(acquisition_id: Optional[str] = None, study_id: Optional[int] = None, channels: Union[list, numpy.ndarray, None] = None, to_disk: Optional[bool] = False, resolution: Optional[int] = 0) ‑> EMObject

Builds an EMImage object from an acquisition_id.

Args

acquisition_id : str
desired acquisition
study_id : int
study, placeholder for future functionality
channels : Union[list, np.ndarray]]
the channels to include, if None includes all.

Returns

image (EMImage)

Expand source code
def build_emimage_from_enable_db(
    acquisition_id: Optional[str] = None,
    study_id: Optional[int] = None,
    channels: Optional[Union[list, np.ndarray]] = None,
    to_disk: Optional[bool] = False,
    resolution: Optional[int] = 0,
) -> EMObject:
    """Builds an EMImage object from an acquisition_id.

    Args:
        acquisition_id (str): desired acquisition
        study_id (int): study, placeholder for future functionality
        channels (Union[list, np.ndarray]]): the channels to include, if None includes all.

    Returns:
        image (EMImage)
    """

    if channels is None:
        channels = get_all_biomarkers_for_acquisition_id(acquisition_id=acquisition_id)

    channel_images = run_image_queries(
        acquisition_id=acquisition_id, channels=channels, resolution=resolution
    )

    return EMImage(img=channel_images, channels=channels, to_disk=to_disk)
def build_emmask_from_enable_db(acquisition_id: Optional[str] = None, study_id: Optional[int] = None, include_masks: Optional[bool] = None, include_seg_masks: Optional[bool] = None, seg_mask_type: Optional[str] = None, segmentation_version: Optional[int] = None, biomarker_version: Optional[int] = None) ‑> EMMask

Builds a EMMask object from the Enable database using the acquisition ID to obtain ROI masks.

This needs to follow these steps: 1. Obtain all masks associated with this acquisition 2. If segmentation masks are included, adjust segment IDs to avoid collision with cell IDs 3. Stack the masks into an object that can be passed to EMMask

Returns

EMMask

Expand source code
def build_emmask_from_enable_db(
    acquisition_id: Optional[str] = None,
    study_id: Optional[int] = None,
    include_masks: Optional[bool] = None,
    include_seg_masks: Optional[bool] = None,
    seg_mask_type: Optional[str] = None,
    segmentation_version: Optional[int] = None,
    biomarker_version: Optional[int] = None,
) -> EMMask:
    """Builds a EMMask object from the Enable database
    using the acquisition ID to obtain ROI masks.

    This needs to follow these steps:
        1. Obtain all masks associated with this acquisition
        2. If segmentation masks are included, adjust segment IDs to avoid collision with cell IDs
        3. Stack the masks into an object that can be passed to EMMask

    Returns:
        EMMask
    """

    # Need to get the data for each story this is going to be VERY slow
    acquisition_masks = []
    mask_names = []

    if include_masks:
        mask_dict = run_mask_queries(acquisition_id=acquisition_id)

        # TO DO: remove this logic in the future, didn't want to change to test new SQL
        for key in mask_dict.keys():
            acquisition_masks.append(mask_dict[key])
            mask_names.append(key)

    # Get the unique segment IDs (exclusive of cell segments)
    t = np.array(acquisition_masks)
    unique_seg_ids = np.sort(np.unique(t))
    del t

    if include_seg_masks:
        seg_mask = run_segmentation_mask_queries(
            acquisition_id=acquisition_id,
            study_id=study_id,
            seg_mask_type=seg_mask_type,
            segmentation_version=segmentation_version,
            biomarker_version=biomarker_version,
        )

        acquisition_masks.append(seg_mask)
        mask_names.append("segmentation_mask")
        acquisition_masks = np.array(acquisition_masks)

        if include_masks:
            # To avoid collisions, make new seg IDs for ROIs, sequentially following the
            # max cell ID.
            max_cell_segment_id = np.max(seg_mask)
            new_seg_ids = [max_cell_segment_id + i for i in range(1, len(unique_seg_ids) + 1)]
            assert len(unique_seg_ids) == len(new_seg_ids)

            # Now replace old IDs with the new one
            # TO DO: revisit to see if there's a quicker way here
            for j in range(
                1, len(new_seg_ids)
            ):  # we skip 0 because that's a special background value.
                old_id = unique_seg_ids[j]
                new_id = new_seg_ids[j]
                acquisition_masks[acquisition_masks == old_id] = new_id
    else:
        acquisition_masks = np.array(acquisition_masks)

    if acquisition_masks.size == 0:
        # this case should be relatively rare, but it is possible
        # we would hit this if run_mask_queries returned an empty dict and include_seg_masks is False
        return None
    else:
        return EMMask(
            masks=np.stack(acquisition_masks, axis=0), mask_idx=np.array(mask_names)
        )
def build_emobject(config: EMObjectConfig) ‑> EMObject

Builds a complete EMObject from the Enable database.

Args

config
EMObjectConfig

Returns

EMObject

Expand source code
def build_emobject(config: EMObjectConfig) -> EMObject:
    """Builds a complete EMObject from the Enable database.

    Args:
        config: EMObjectConfig

    Returns:
        EMObject
    """

    if config.segmentation_version is not None and config.biomarker_version is not None:
        bm_df, anno_df, coord_df = build_base_object_from_enable_db(
            acquisition_id=config.acquisition_id,
            biomarkers=config.biomarkers,
            annotations=config.annotations,
            segmentation_version=config.segmentation_version,
            biomarker_version=config.biomarker_version,
            config_mode=True,
        )
    else:
        bm_df = None
        anno_df = None
        coord_df = None

    if config.include_img:
        config.img = build_emimage_from_enable_db(
            acquisition_id=config.acquisition_id,
            study_id=config.study_id,
            channels=config.biomarkers,
            to_disk=config.img_to_disk,
            resolution=config.img_res,
        )
    segmentation = None

    if config.include_masks or config.include_seg_mask:
        if config.include_seg_mask:
            segmentation = "segmentation_mask"

        config.masks = build_emmask_from_enable_db(
            acquisition_id=config.acquisition_id,
            study_id=config.study_id,
            include_masks=config.include_masks,
            include_seg_masks=config.include_seg_mask,
            seg_mask_type=config.seg_mask_type,
            segmentation_version=config.segmentation_version,
            biomarker_version=config.biomarker_version,
        )

    if config.include_metadata:
        meta = run_metadata_queries(config.acquisition_id)
    else:
        meta = None

    return EMObject(
        data=bm_df,
        obs=anno_df,
        var=None,
        pos=coord_df,
        mask=config.masks,
        img=config.img,
        meta=meta,
        name=config.acquisition_id,
        segmentation=segmentation,
    )
def load(path: str = None) ‑> EMObject

Read an EMObject from disk

Args

path : str
Path to EMObject Zarr store.

Returns

EMObject

Expand source code
def load(path: str = None) -> EMObject:
    """Read an EMObject from disk

    Args:
        path (str): Path to EMObject Zarr store.

    Returns:
        EMObject
    """

    # Placeholders
    emimg_to_add = [None]
    mask = None

    # Check that the path is valid
    if not os.path.exists(path):
        raise FileNotFoundError(f"Path {path} does not exist.")

    # Check if this is before/after version 0.6.0
    if not os.path.exists(os.path.join(path, "state.yml")):
        E = __load_deprecated(path)
        return E

    if os.path.exists(os.path.join(path, "core")):
        core_level_elements = ["seg", "meta", "pos"]
        obj_layers = glob.glob(os.path.join(path, "core", "*"))
        obj_layers = [s.split("/")[-1] for s in obj_layers]
        obj_layers = [s for s in obj_layers if s not in core_level_elements]

        # load the first layer
        # it may not exist as a path if the object only has images and/or metadata
        if obj_layers != []:
            layer_name = obj_layers[0]
            layer_path = os.path.join(path, "core", layer_name)
            layer = __load_layer(layer_path)
        else:
            layer_name = None

    # Load img
    img_group_path = os.path.join(path, "img")
    if os.path.exists(img_group_path):
        emimg_to_add = __load_img(img_group_path)

    # Load mask
    mask_group_path = os.path.join(path, "mask")
    if os.path.exists(mask_group_path):
        mask = __load_mask(mask_group_path)

    # Now it's time to build the EMObject

    # Load the meta attributes
    meta = None
    meta_path = os.path.join(path, "core", "meta")
    if os.path.exists(meta_path):
        meta = zarr.convenience.load(meta_path)
        if meta.shape[1] == 2:
            col_names = ["FEATURE_NAME", "FEATURE_VALUE"]
        else:
            col_names = None
        meta = pd.DataFrame(meta, columns=col_names)

    object_name = os.path.basename(path).split(".")[0]

    E = EMObject(
        data=None,
        obs=None,
        var=None,
        sobs=None,
        meta=meta,
        pos=None,
        mask=mask,
        img=emimg_to_add[0],
        first_layer_name=layer_name,
        name=object_name,
    )

    if layer_name:
        # Add the first layer
        E.add(layer)

        # Load the rest of the layers
        for layer_name in obj_layers[1:]:
            layer_path = os.path.join(path, "core", layer_name)
            layer = __load_layer(layer_path)
            E.add(layer)

    # Load the state
    with open(os.path.join(path, "state.yml"), "r") as stream:
        state_dict = yaml.load(stream, Loader=yaml.SafeLoader)
        E._defaultlayer = state_dict["defaultlayer"].lower()
        for layer in E.layers:
            matching_key = __find_matching_key(state_dict["layers"], layer)
            if matching_key:
                E._layerdict[layer]._assay = state_dict["layers"][matching_key]["assay"]
                E._layerdict[layer]._scale_factor = state_dict["layers"][matching_key][
                    "scale_factor"
                ]
                E._layerdict[layer]._spot_size = state_dict["layers"][matching_key][
                    "spot_size"
                ]

    E.set_layer()
    return E
def save(E: EMObject = None, out_dir: Optional[str] = None, name: Optional[str] = None) ‑> None

Write an EMObject to disk.

Args

E : EMObject
EMObject to write to disk
out_dir : str
Path to write file. If None, writes to current directory.
name : str
Name for the Zarr archive. If None, accesses E.name.

Returns

None

Expand source code
def save(
    E: EMObject = None, out_dir: Optional[str] = None, name: Optional[str] = None
) -> None:
    """Write an EMObject to disk.

    Args:
        E (EMObject): EMObject to write to disk
        out_dir (str): Path to write file. If None, writes to current directory.
        name (str): Name for the Zarr archive. If None, accesses E.name.

    Returns:
        None
    """

    # Check the outdir
    if not os.path.exists(out_dir):
        os.mkdir(out_dir)

    # Create an empty zarr store
    if name is None:
        name = f"{E.name}.zarr"
    else:
        name = f"{name}.zarr"

    zarr_path = os.path.join(out_dir, name)
    store = zarr.DirectoryStore(zarr_path, normalize_keys=True)
    zarr_root = zarr.group(store=store, overwrite=True)
    compressor = Blosc(cname="zstd", clevel=3, shuffle=2)

    # Establish the top level
    img = None
    mask = None
    core = zarr_root.create_group(name="core")

    # Build image group
    # Add a new dataset for each image.

    if E.img is not None:
        img = zarr_root.create_group(name="img")
        _ = img.create_dataset(
            name="CODEX", data=E.img.img, compressor=compressor
        )  # noqa
        _ = img.create_dataset(
            name="CODEX_ax",
            data=E.img.channels,
            compressor=compressor,
            dtype=object,
            object_codec=JSON(),
        )

    # Build mask group
    if E.mask is not None:
        mask = zarr_root.create_group(name="mask")
        if E.mask._style == "tensor":
            _ = mask.create_dataset(
                name="masktensor", data=E.mask.mask, compressor=compressor
            )
            _ = mask.create_dataset(name="maskix", data=E.mask.mask_names)
        else:
            _ = mask.create_dataset(
                name="maskix", data=E.mask.mask_names, dtype=object, object_codec=JSON()
            )
            _ = mask.create_dataset(
                name="maskdims", data=E.mask.dims, dtype=object, object_codec=JSON()
            )
            _ = mask.create_dataset(
                name="maskpos", data=np.array(list(E.mask.pos.values()))
            )
            for name in E.mask.mask.keys():
                _ = mask.create_dataset(
                    name=name, data=E.mask.mask[name], compressor=compressor
                )

    if E._meta is not None:
        _ = core.create_dataset(
            name="meta",
            data=E.meta.to_numpy(),
            compressor=compressor,
            dtype=object,
            object_codec=JSON(),
        )

    # Build core data group
    # add layers as hierarchy:
    for layer in E.layers:
        layer_group = core.create_group(layer)

        E.set_layer(layer)
        if E._layerdict[layer].data is not None:
            _ = layer_group.create_dataset(
                name="data",
                data=E._layerdict[layer].data,
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )

        if E._layerdict[layer]._obs is not None:
            _ = layer_group.create_dataset(
                name="obs",
                data=E._layerdict[layer]._obs.to_numpy(),
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )

            _ = layer_group.create_dataset(
                name="obs_y",
                data=E._layerdict[layer]._obs.columns.to_numpy(),
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )

        # Note, in version 0.6.0, all of this now is layer-specific.
        if E._layerdict[layer]._var is not None:
            _ = layer_group.create_dataset(
                name="var",
                data=E._layerdict[layer]._var.to_numpy(),
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )
            _ = layer_group.create_dataset(
                name="var_y",
                data=E._layerdict[layer]._var.columns.to_numpy(),
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )

        if E._layerdict[layer]._sobs is not None:
            _ = layer_group.create_dataset(
                name="sobs",
                data=E._layerdict[layer]._sobs.to_numpy(),
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )

            _ = layer_group.create_dataset(
                name="sobs_y",
                data=E._layerdict[layer]._sobs.columns.to_numpy(),
                compressor=compressor,
                dtype=object,
                object_codec=JSON(),
            )

        if E._layerdict[layer]._pos is not None:
            pos = layer_group.create_group("pos")
            _ = E.pos  # in case it wasn't ever made you need to call this.

            for coord_sys in list(E._layerdict[layer]._pos.keys()):
                pos.create_dataset(
                    name=coord_sys,
                    data=E._layerdict[layer]._pos[coord_sys],
                    compressor=compressor,
                )
                pos.create_dataset(
                    name=f"{coord_sys}_y",
                    data=np.array(E._layerdict[layer]._pos[coord_sys].columns),
                    compressor=compressor,
                    dtype=object,
                    object_codec=JSON(),
                )

        # add the index information
        ix = layer_group.create_group("ix")
        ix.create_dataset(name="obsax", data=E._layerdict[layer]._obs_ax.to_numpy())
        ix.create_dataset(
            name="varax",
            data=E._layerdict[layer]._var_ax.to_numpy(),
            dtype=object,
            object_codec=JSON(),
        )

        if E._sobs_ax is not None:
            ix.create_dataset(
                name="sobsax", data=E._layerdict[layer]._obs_ax.to_numpy()
            )

    # write a file with overall object state
    # for each layer, write the assay, scalefactor, spot_size
    # for entire object, write the emobject version, and default layer

    state_dict = {"version": __version__, "defaultlayer": E._defaultlayer, "layers": {}}

    for layer in E.layers:
        state_dict["layers"][layer] = {
            "assay": E._layerdict[layer]._assay,
            "scale_factor": E._layerdict[layer]._scale_factor,
            "spot_size": E._layerdict[layer]._spot_size,
        }

    with open(os.path.join(zarr_path, "state.yml"), "w+") as f:
        yaml.dump(state_dict, f)

Classes

class EMObjectConfig (acquisition_id: Optional[str] = None, study_id: Optional[int] = None, segmentation_version: Optional[int] = None, biomarker_version: Optional[int] = None, biomarkers: Union[list, numpy.ndarray, str, None] = None, annotations: Union[list, numpy.ndarray, str, None] = None, include_img: Optional[bool] = False, include_masks: Optional[bool] = False, include_seg_mask: Optional[bool] = False, include_metadata: Optional[bool] = True, seg_mask_type: Optional[str] = 'nucleus', img_format: Optional[str] = 'zarr', img_res: Optional[int] = 0, img_to_disk: Optional[bool] = False, img_path: Optional[str] = None, mask_names: Union[list, numpy.ndarray, str, None] = None, name: Optional[str] = None, datatype: Optional[str] = None)

Object that defines the config for an emObject that is interacting with the Enable database.

Args

acquisition_id
required, the acquisition ID to query
study_id
study id, placeholder
segmentation_version
segmentation version. required to fetch single-cell data and segmentation_masks. if None, single-cell data will not be fetched.
biomarker_version
biomarker expression version. required to fetch single-cell data. if None, single-cell data will not be fetched.
biomarkers
optional, a list or name of biomarkers to download. If None, gets all.
annotations
optional, a list or name of annotations to download. If None, gets all.
include_img
optional, if True, fetches the image with channels subsetted same as biomarkers.
include_masks
optional, if True, fetches ROI masks.
include_seg_masks
optional. If True, gets the segmentation mask (segmentation_version must be provided).
seg_mask_type
optional. Type of segmentation mask to fetch. Can be 'nucleus' or 'cell'.
img_format
img_format - placeholder
img_res
optional, factor to downsample image by
img_to_disk
optional, if True writes the zarr store to disk, otherwise object held in memory.
img_path
optional, path to write zarr store to if img_to_disk==True.
name
optional, a name for this emObject.
datatype
optional, describe the datatype used here.
Expand source code
class EMObjectConfig:
    """Object that defines the config
    for an emObject that is interacting with the
    Enable database.

    Args:
        acquisition_id: required, the acquisition ID to query
        study_id: study id, placeholder
        segmentation_version: segmentation version. required to fetch
            single-cell data and segmentation_masks. if None, single-cell
            data will not be fetched.
        biomarker_version: biomarker expression version. required to fetch
            single-cell data. if None, single-cell data will not be fetched.
        biomarkers: optional, a list or name of biomarkers
            to download. If None, gets all.
        annotations: optional, a list or name of annotations to download.
            If None, gets all.
        include_img: optional, if True, fetches the image with channels
            subsetted same as biomarkers.
        include_masks: optional, if True, fetches ROI masks.
        include_seg_masks: optional. If True, gets the segmentation mask
            (segmentation_version must be provided).
        seg_mask_type: optional. Type of segmentation mask to fetch. Can be
            'nucleus' or 'cell'.
        img_format: img_format - placeholder
        img_res: optional, factor to downsample image by
        img_to_disk: optional, if True writes the zarr store to disk,
            otherwise object held in memory.
        img_path: optional, path to write zarr store to if img_to_disk==True.
        name: optional, a name for this emObject.
        datatype: optional, describe the datatype used here.
    """

    def __init__(
        self,
        acquisition_id: Optional[str] = None,
        study_id: Optional[int] = None,
        segmentation_version: Optional[int] = None,
        biomarker_version: Optional[int] = None,
        biomarkers: Optional[Union[list, np.ndarray, str]] = None,
        annotations: Optional[Union[list, np.ndarray, str]] = None,
        include_img: Optional[bool] = False,
        include_masks: Optional[bool] = False,
        include_seg_mask: Optional[bool] = False,
        include_metadata: Optional[bool] = True,
        seg_mask_type: Optional[str] = "nucleus",
        img_format: Optional[str] = "zarr",
        img_res: Optional[int] = 0,
        img_to_disk: Optional[bool] = False,
        img_path: Optional[str] = None,
        mask_names: Optional[Union[list, np.ndarray, str]] = None,
        name: Optional[str] = None,
        datatype: Optional[str] = None,
    ):
        self.acquisition_id = acquisition_id
        self.study_id = study_id
        self.segmentation_version = segmentation_version
        self.biomarker_version = biomarker_version
        self.biomarkers = biomarkers
        self.annotations = annotations
        self.include_img = include_img
        self.include_masks = include_masks
        self.include_seg_mask = include_seg_mask
        self.include_metadata = include_metadata
        self.seg_mask_type = seg_mask_type
        self.img_format = img_format
        self.img_res = img_res
        self.img_to_disk = img_to_disk
        self.img_path = img_path
        self.mask_names = mask_names
        self.name = name
        self.datatype = datatype
        self.masks = None
        self.img = None

        self._validate_config()

    def _validate_config(self):
        """Validates the arguments of
        an EMConfig to ensure user has provided
        logical args."""

        # acquisition ids
        assert type(self.acquisition_id) == str or type(self.acquisition_id) == np.str_

        # study is
        if self.study_id is not None:
            assert type(self.study_id) == int
            assert self.study_id > 0

        # seg version
        if self.segmentation_version is not None:
            assert type(self.segmentation_version) == int
            assert self.segmentation_version > 0

        # biomarker_versions
        if self.biomarker_version is not None:
            assert type(self.biomarker_version) == int
            assert self.biomarker_version > 0

        # check list formatting
        if self.biomarkers is not None:
            if type(self.biomarkers) == str:
                self.biomarkers = [self.biomarkers]

        if self.annotations is not None:
            if type(self.annotations) == str:
                self.annotations = [self.annotations]

        if self.include_masks and type(self.mask_names) == str:
            self.mask_names == [self.mask_names]

        if self.include_seg_mask:
            if self.segmentation_version is None:
                raise EMObjectException(
                    "include_seg_mask is True, but segmentation_version is not provided."
                )
            if self.seg_mask_type not in ["nucleus", "cell"]:
                raise EMObjectException(
                    f"Specified segmentation mask type {self.seg_mask_type} unknown."
                )

        # Avoid the issue where user specifies an image but forces an empty biomarker list
        if self.include_img and (
            self.biomarkers == [] or self.biomarkers == np.array([])
        ):
            warning(
                "include_img is True, but no channels/biomarkers. Setting include_img to False."
            )
            self.include_img = False

        # Avoid unintended exclusion of single-cell data (warn user if only image will be pulled)
        if self.segmentation_version is None or self.biomarker_version is None:
            if self.include_img:
                warning(
                    "segmentation_version and biomarker_version must both be provided "
                    + "to fetch single-cell data. At least one of these was not specified "
                    + "so only image will be fetched."
                )
            else:
                # In this case, the images and the single-cell data cannot be pulled.
                raise EMObjectException(
                    "Invalid config: neither image nor single-cell data can be fetched. "
                    + "To fetch image, set include_img to True. To fetch single-cell data, "
                    + "provide both segmentation_version and biomarker_version. "
                    + "You may also do both."
                )