Module emobject.emobject

Expand source code
# emObject
# A data abstraction for spatial omics.
from __future__ import annotations
from logging import warning
from typing import Optional, Union

import pandas as pd
from emobject.errors import EMObjectException
from emobject.emimage import EMImage, EMMask
from emobject.emlayer import LayeredData, BaseLayer
from emobject.utils import helpers
from emobject.version import __version__
import numpy as np
from scipy import sparse


class EMObject(LayeredData, BaseLayer):
    """
    An object that represents a single image capture (region).

    Args:
        data: data matrix (n_obs x n_var)
        obs: observation matrix (n_obs x annotations)
        var: variable matrix (n_var x annotations)
        pos: spatial coordinate matrix (n_obs x n_spatial_dimensions)
        sobs: segment observations (might remove)
        mask: ROI or single cell segmentation masks as array
        img: a multiplexed image
        meta: metadata about the entire region
        name: a name for the EMObject
        is_view: toggles whether to treat this as a view of another EMObject
        first_layer_name: the name of the first layer to be added, if None, uses object name.
    """

    def __init__(
        self,
        data: Optional[Union[np.ndarray, sparse.spmatrix, pd.DataFrame]] = None,
        obs: Optional[pd.DataFrame] = None,
        var: Optional[pd.DataFrame] = None,
        pos: Optional[Union[np.ndarray, pd.DataFrame]] = None,
        sobs: Optional[pd.DataFrame] = None,
        mask: Optional[Union[np.ndarray, EMMask]] = None,
        img: Optional[Union[np.ndarray, EMImage]] = None,
        meta: Optional[pd.DataFrame] = None,
        name: Optional[str] = None,
        assay: Optional[str] = None,
        scale_factor: Optional[float] = None,
        segmentation: Optional[str] = None,
        is_view: Optional[bool] = False,
        first_layer_name: Optional[str] = None,
    ) -> EMObject:
        super(EMObject, self).__init__()
        BaseLayer.__init__(self)

        if name is None:
            self._name = "default"
        else:
            self._name = name

        if first_layer_name is None:
            first_layer_name = self._name

        if data is not None:
            self.add(
                BaseLayer(
                    data=data,
                    obs=obs,
                    var=var,
                    name=first_layer_name,
                    segmentation=segmentation,
                    scale_factor=scale_factor,
                    pos=pos,
                    assay=assay,
                )
            )

            self._var_ax = self._layerdict[first_layer_name]._var_ax
            self._obs_ax = self._layerdict[first_layer_name]._obs_ax
        else:
            self._data = None
            self._var_ax = None
            self._obs_ax = None

        try:
            self._obs = self._layerdict[self._name].obs
            self._var = self._layerdict[self._name].var
        except KeyError:
            self._obs = None
            self._var = None

        self._activelayer = self._name
        self._sobs_ax = None
        self._meta = meta
        self._img = img
        self._mask = mask
        self._seg = None
        self._name = name
        self._defaultlayer = self._name

        # TO DO: Graphs and sobs
        self._sobs = None
        self.graph = None

    def __getitem__(self, key: str) -> BaseLayer:
        """
        Returns a layer from the EMObject.

        Args:
            key: the name of the layer to be returned.

        Returns:
            layer: the layer specified by key.
        """
        return self._layerdict[key]

    ##################################
    # ATTRIBUTE CONSTRUCTION
    ##################################

    def _build_meta(self, meta: Optional[pd.DataFrame] = None) -> pd.DataFrame:
        """
        Store meta data
        """

        if meta is not None:
            if type(meta) == pd.DataFrame:
                return meta
            else:
                return None

    def _build_img(
        self,
        img: Optional[Union[np.ndarray, EMImage]] = None,
        _var_ax: Optional[Union[np.ndarray, pd.Index]] = None,
    ) -> EMImage:
        if type(img) == np.ndarray:
            img = EMImage(img, channels=_var_ax.to_numpy())
        return img

    def _build_mask(self, mask) -> Optional[EMMask]:
        if type(mask) == np.ndarray:
            return EMMask(masks=mask, mask_idx=None, to_disk=False)
        else:
            return mask

    def build_seg(self, coord_sys: Optional[str]):
        """
        Builds an assignement matrix of cells to segments using a specific coordinate system.

        Args:
            coord_sys: str
                The coordinate system to use for building the segmentation.
        Returns:
            seg: np.ndarray
                seg is a `n_obs` x `n_mask` array. Columns correspond to masks.
                Non-zero integer values assign cells to specific segments.
        """
        assert coord_sys in self._layerdict[self._activelayer]._pos.keys()
        self._layerdict[self._activelayer]._seg = self._build_seg(coord_sys=coord_sys)

    def _build_seg(self, coord_sys=None) -> np.ndarray:
        """
        Builds an assignement matrix of cells to segments.

        n_obs x n_mask tensor, encoded with IDs.

        Args:
            None

        Returns:
            seg: np.ndarray
                seg is a `n_obs` x `n_mask` array. Columns correspond to masks.
                Non-zero integer values assign cells to specific segments.
        """

        if self.mask is None:
            raise EMObjectException("Masks not provided, cannot generate seg.")
        elif self.pos is None:
            raise EMObjectException(
                "Spatial information not stored in pos, \
                cannot generate seg."
            )

        if coord_sys is None:
            # If there are multiple coordinate systems in the layer, warn the user, and use the original one.
            if len(self._layerdict[self._activelayer]._pos.keys()) > 1:
                warning(
                    f"Multiple spatial coordinates detected, using first coordinate set: \
                    {list(self._layerdict[self._activelayer]._pos.keys())[0]}. \
                    If build fails, try specifying a coordinate system with E.build_seg(coord_sys='X')."
                )
            orig_coord_key = list(self._layerdict[self._activelayer]._pos.keys())[0]
        else:
            orig_coord_key = coord_sys

        shape = (self.n_obs, self.mask.n_masks)
        seg = np.zeros(shape, dtype=np.uint16)
        n_seg = self.mask.n_seg

        mask_idx = 0
        for mask_name in n_seg.keys():
            mask = self.mask.mloc(mask_name)
            seg_ids = np.unique(mask)
            try:
                coords = (
                    self._layerdict[self._activelayer]._pos[orig_coord_key].to_numpy()
                )
            except AttributeError:
                if self._layerdict[self._activelayer] == "visium":
                    warning(
                        "Visium data detected, applying scalefactor to segment coordinates.\
                            Be sure the appropriate scalefactor is set (E.scalefactor = X)"
                    )

                    coords = (
                        self._layerdict[self._activelayer]
                        ._pos[orig_coord_key]
                        .to_numpy()
                        * self.scale_factor
                    )
                else:
                    coords = (
                        self._layerdict[self._activelayer]
                        ._pos[orig_coord_key]
                        .to_numpy()
                    )

            coords = coords.astype(int)

            for si in seg_ids:
                if si == 0:
                    continue
                val = mask[coords[:, 1], coords[:, 0]]
                (idx,) = np.where(val == si)
                seg[idx, mask_idx] = si
            mask_idx += 1
        return seg

    def _build_sobs(self) -> dict:
        """
        Builds the observation matrix for segments.

        Annotations are done on a maskwise basis. Therefore,
        this object is structured as a list of length `n_masks`
        where each entry is a `n_seg` x `n_annotation` matrix.
        """

        n_masks = self.mask.n_masks
        mask_names = self.mask.mask_names
        sobs = dict()

        for i in range(0, n_masks):
            sobs_ix = np.unique(self.seg[:, i])
            if sobs_ix[0] == 0:
                sobs_ix = sobs_ix[1:]
            mask_name = mask_names[i]
            sobs[mask_name] = pd.DataFrame(index=sobs_ix)

        return sobs

    ##################################
    # PROPERTIES
    ##################################

    # LAYER SPECIFIC ATTRIBUTES
    # Data
    @property
    def data(self) -> Optional[pd.DataFrame]:
        if len(self.ax) >= 1:
            return self._layerdict[self._activelayer].data
        else:
            return None

    @property
    def X(self) -> Optional[pd.DataFrame]:
        return self.data

    # var
    @property
    def n_var(self) -> int:
        return len(self._layerdict[self._activelayer]._var_ax)

    @property
    def var(self) -> pd.DataFrame:
        if self._var is not None:
            if self._layerdict[self._activelayer].var is None:
                self._layerdict[self._activelayer].var = self._var
        return self._layerdict[self._activelayer].var

    @var.setter
    def var(self, value: Optional[Union[np.array, pd.DataFrame]]) -> None:
        if value.shape[0] != self.var_ax.shape[0]:
            raise EMObjectException(
                "Must be a `n_var` length array of arbitrary\
                 width."
            )
        self._layerdict[self._activelayer].var = value
        if self._activelayer == "raw":
            self._var = value

    # obs
    @property
    def n_obs(self) -> int:
        return len(self._layerdict[self._activelayer]._obs_ax)

    @property
    def obs(self) -> pd.DataFrame:
        self._validate()
        if self._obs is not None:
            if self._layerdict[self._activelayer].obs is None:
                self._layerdict[self._activelayer].obs = self._obs
        return self._layerdict[self._activelayer].obs

    @obs.setter
    def obs(self, value: Optional[Union[np.array, pd.DataFrame]]) -> None:
        if value.shape[0] != self.obs_ax.shape[0]:
            raise EMObjectException(
                "Must be a `n_obs` length array of arbitrary\
                 width."
            )
        self._layerdict[self._activelayer].obs = value
        if self._activelayer == "raw":
            self._obs = value
        self._validate()

    # EMOBJECT ADDITIONAL ATTRIBUTES

    @property
    def pos(self) -> pd.DataFrame:
        return self._layerdict[self._activelayer]._pos

    @property
    def seg(self) -> Optional[list]:
        if self._layerdict[self._activelayer]._seg is None:
            self._layerdict[self._activelayer]._seg = self._build_seg()
        return self._layerdict[self._activelayer]._seg

    @property
    def meta(self) -> Optional[pd.DataFrame]:
        if self._meta is not None:
            self._meta = self._build_meta(self._meta)
        return self._meta

    @property
    def img(self) -> Optional[EMImage]:
        if self._img is not None:
            self._img = self._build_img(self._img, self._var_ax)
        return self._img

    @property
    def mask(self) -> Optional[EMMask]:
        if self._mask is not None:
            self._mask = self._build_mask(self._mask)
        return self._mask

    @property
    def sobs(self):
        if self._sobs is None:
            self._sobs = self._build_sobs()
            # Add to the BaseLayer
            self._layerdict[self._activelayer].sobs = self._sobs
        elif self._layerdict[self._activelayer].sobs is None:
            self._layerdict[self._activelayer].sobs = self._sobs
        return self._sobs

    # EMOBJECT-WIDE PROPERTIES

    # Generic/Informational Properties
    @property
    def name(self) -> str:
        return self._name

    @name.setter
    def name(self, value) -> None:
        if type(value) is not str:
            raise EMObjectException("New EMObject name must be of type str.")
        self._name = value

    @property
    def n_seg(self) -> int:
        if self.mask is not None:
            return self.mask.n_seg
        else:
            return 0

    @property
    def is_view(self) -> bool:
        return self._is_view

    @is_view.setter
    def is_view(self, value) -> None:
        pass  # should be immutable I think

    @property
    def layers(self) -> list:
        return self.ax

    @property
    def summary(self) -> None:
        print(f"EMObject Version {__version__}")
        print(f"EMObject: {self.name}")
        print(f"Layers: {len(self.ax)}")

        for layer in self.layers:
            print(f"\t Layer: {layer}")
            print(f"\t\t Layer segmentation: {self._layerdict[layer]._segmentation}")
            print(f"\t\t Assay: {self._layerdict[layer]._assay}")

            print(f"\t\t n_obs: {len(self._layerdict[layer]._obs_ax)}")
            print(f"\t\t n_var: {len(self._layerdict[layer]._var_ax)}")
            """if len(self.mask.n_seg) > 0:
                print(f"\t\t n_seg: {self.n_seg}")
"""
        if self._mask is not None:
            print(f"masks: {self.mask.n_masks}")

        if self._seg is not None:
            print("Segment Summary:")
            print("Mask \t Segments")
            for i in self.mask.n_seg.keys():
                print(f"{i} \t {self.mask.n_seg[i]}")

    @property
    def version(self) -> float:
        print(f"EMObject Version {__version__}")
        return __version__

    @property
    def layer(self) -> str:
        return self._activelayer

    @layer.setter
    def layer(self, value) -> None:
        assert value in self.layers or value is None
        if value is None:
            self._activelayer = self._defaultlayer
        else:
            self._activelayer = value

    # Axes

    # TO DO: Should there be setters for changing axis? if so, how can the
    # dataframes be kept in alignment/share axes?
    @property
    def var_ax(self) -> pd.Index:
        # return self._var_ax
        return self._layerdict[self._activelayer]._var_ax

    @property
    def obs_ax(self) -> pd.Index:
        # return self._obs_ax
        return self._layerdict[self._activelayer]._obs_ax

    @property
    def scale_factor(self) -> float:
        return self._layerdict[self._activelayer].scale_factor

    @scale_factor.setter
    def scale_factor(self, value: float) -> None:
        self._layerdict[self._activelayer].scale_factor

    @property
    def assay(self) -> str:
        return self._layerdict[self._activelayer].assay

    @assay.setter
    def assay(self, value: str) -> None:
        self._layerdict[self._activelayer].assay = value.lower()

    @property
    def default_layer(self) -> str:
        return self._defaultlayer

    @default_layer.setter
    def default_layer(self, value: str) -> None:
        if value not in self.layers:
            raise EMObjectException(f"Layer {value} does not exist.")
        self._default_layer = value

    ##################################
    # METHODS
    ##################################

    def add_anno(
        self,
        attr: Optional[str] = None,
        value: Optional[Union[np.array, pd.DataFrame, list]] = None,
        name: Optional[Union[np.array, list, str]] = None,
        layer: Optional[str] = None,
        mask: Optional[Union[int, str]] = None,
    ) -> None:
        """
        Adds a new annotation to an attribute (var, obs, sobs).

        Args:
            attr: the attribute to add annotation to. One of
                   'sobs', 'var', 'obs'.
            value: the annotation. Must be array-like of same
                      size as axis.
            name: annotation name
            layer:   the layer to slice within. If None, uses the active layer.
            mask:   the mask to which segment observations are applied.
                Required if attr='sobs'.

        Returns:
            None
        """
        self._validate()

        # Set the layer
        if layer is None:
            layer = self._activelayer
        else:
            assert layer in self.ax

        # Some checks on inputs
        if type(value) == list:
            value = np.array(value)
        if len(value.shape) == 1:
            value = value.reshape(value.shape[0], 1)

        # Deal with annotation names.
        if name is not None:
            # Standardize as ndarray
            if type(name) == str or type(name) == list:
                name = np.array(name).reshape(-1)
            n_names_reqd = value.shape[1]  # n_obs/n_var/n_seg x n_newAnnos

            if name.shape[0] < n_names_reqd:
                # Handle case where too few names are provided
                warning(
                    f"Expected {n_names_reqd} names but received \
                    {name.shape[0]} names. Generating dummy annotation names."
                )
                n_new_names = n_names_reqd - name.shape[0]
                new_names = np.array([f"anno_{i}" for i in range(0, n_new_names)])
                name = np.concatenate([name, new_names])
        else:
            if type(value) != pd.DataFrame:
                #  No names provided, handle this.
                warning(
                    "No annotation names received. Generating dummy annotation\
                    names."
                )
                n_new_names = value.shape[1]
                name = np.array([f"anno_{i}" for i in range(0, n_new_names)])
                assert name.shape[0] == value.shape[1]

        if type(value) == pd.DataFrame:
            name = value.columns

        if attr == "var":
            # add attribute on var.
            # check annotation matches specified attribute
            assert value.shape[0] == self._layerdict[layer]._var_ax.shape[0]

            # Check that new annotation names are unique
            existing_annos = list(self._layerdict[layer].var.index)
            for n in name:
                if n in existing_annos:
                    raise EMObjectException(
                        f"The annotation name {n} already \
                        exists in the {attr} attribute. Please provide a unique \
                        name for the new annotation."
                    )

            if type(value) == np.ndarray:
                _appenddf = pd.DataFrame(
                    data=value, columns=name, index=self._layerdict[layer]._var_ax
                )
            elif type(value) == pd.DataFrame:
                _appenddf = value
                if name is not None:
                    _appenddf.columns = name
                if not _appenddf.index.equals(self._var_ax):
                    warning(
                        "Provided dataframe index does not match the var axis. Attempting to join on index."
                    )

            if (
                self._layerdict[layer]._var is None
                or self._layerdict[layer]._var.shape[1] == 0
            ):
                self._layerdict[layer]._var = _appenddf
            else:
                # left join on index
                self._layerdict[layer]._var = self._layerdict[layer]._var.join(
                    _appenddf, how="left"
                )
                """self._layerdict[layer]._var = pd.concat([self._layerdict[layer]._var, # noqa
                                                         _appenddf],
                                                        axis=1)"""
        elif attr == "obs":
            # add an attribute on obs
            # check annotation matches specified attribute
            assert value.shape[0] == self._layerdict[layer]._obs_ax.shape[0]

            # Check that new annotation names are unique
            existing_annos = self._layerdict[layer]._obs.columns
            for n in name:
                if n in existing_annos:
                    raise EMObjectException(
                        f"The annotation name {n} already \
                        exists in the {attr} attribute. Please provide a unique \
                        name for the new annotation."
                    )

            if type(value) == np.ndarray:
                _appenddf = pd.DataFrame(
                    data=value, columns=name, index=self._layerdict[layer]._obs_ax
                )
            elif type(value) == pd.DataFrame:
                _appenddf = value
                if name is not None:
                    _appenddf.columns = name
                if not _appenddf.index.equals(self._layerdict[layer]._obs_ax):
                    warning(
                        "Index of new annotation does not match obs index. Attempting to join on index value."
                    )

            if (
                self._layerdict[layer]._obs is None
                or self._layerdict[layer]._obs.shape[1] == 0
            ):
                self._layerdict[layer]._obs = _appenddf
            else:
                # join the new annotation horizontally on the index
                self._layerdict[layer]._obs = self._layerdict[layer]._obs.join(
                    _appenddf, how="left"
                )  # noqa
                """self._layerdict[layer]._obs = pd.concat([self._layerdict[layer]._obs, # noqa
                                                         _appenddf],
                                                        axis=1)"""

        elif attr == "sobs":
            # add an attribute on sobs
            if mask is None:
                raise EMObjectException("Mask to annotate is unspecified.")

            # Check that new annotation names are unique
            _ = self.sobs  # build sobs if not already built
            existing_annos = self._layerdict[layer]._sobs[mask].columns
            for n in name:
                if n in existing_annos:
                    raise EMObjectException(
                        f"The annotation name {n} already \
                        exists in the {attr} attribute. Please provide a unique \
                        name for the new annotation."
                    )

            # check that segment is valid.
            if type(mask) == str:
                if mask not in self.mask.mask_names:
                    raise EMObjectException(
                        f"Mask name {mask} is not in the current\
                     object."
                    )
            else:
                raise EMObjectException(
                    f"Mask must be a string. Received {type(mask)}."
                )

            # Get index for sobs
            anno_idx = self.sobs[mask].index  # hopefully calls build_sobs?
            if value.shape[0] != anno_idx.shape[0]:
                if anno_idx.shape[0] - value.shape[0] == 1:
                    # Handle case where one segment is missing
                    raise EMObjectException(
                        "Received one fewer segment annotations than expected.\
                         This is likely due to a missing background segment annotation (seg_id = 0). Try prepending a np.nan dummy."
                    )
                    """new_val = [np.nan]
                    for v in value:
                        new_val.append(v)
                    value = np.array(new_val)
                    assert value.shape[0] == anno_idx.shape[0]"""
                else:
                    raise EMObjectException(
                        f"Annotation length does not match the\
                        number of segments in the mask. Expected {anno_idx.shape[0]}\
                        but received {value.shape[0]}."
                    )

            if type(value) == np.ndarray:
                _appenddf = pd.DataFrame(data=value, columns=name, index=anno_idx)
            elif type(value) == pd.DataFrame:
                _appenddf = value
                if name is not None:
                    _appenddf.columns = name
                _appenddf.index = self._sobs_ax

            if (
                self._layerdict[layer]._obs is None
                or self._layerdict[layer]._obs.shape[1] == 0
            ):
                self._layerdict[layer]._sobs[mask] = _appenddf
            else:
                self._layerdict[layer]._sobs[mask] = pd.concat(
                    [self._layerdict[layer]._sobs[mask], _appenddf], axis=1  # noqa
                )
        else:
            raise EMObjectException(
                f"Unrecognized attribute to annotate. Must be one\
                 of 'sobs', 'var', 'obs', but received {attr}."
            )

        self._validate()

    def del_anno(
        self,
        attr: Optional[str] = None,
        name: Optional[str] = None,
        layer: Optional[str] = None,
        mask: Optional[Union[str, int]] = None,
    ) -> None:
        """Delete an annotation from an annotation matrix.

        Args:
            attr: the attribute to add annotation to. One of
                   'sobs', 'var', 'obs'.
            name: annotation name
            layer:   the layer to slice within. If None, uses the active layer.
            mask:   the mask to which segment observations are applied.
                Required if attr='sobs'.

        Returns:
            None
        """
        self._validate()

        if layer is None:
            layer = self._activelayer

        if attr is not None:
            if attr == "obs":
                assert name in self._layerdict[layer]._obs.columns
                self._layerdict[layer]._obs.drop(columns=name, axis=0, inplace=True)
            elif attr == "var":
                assert name in self._layerdict[layer]._var.columns
                self._layerdict[layer]._var.drop(columns=name, axis=0, inplace=True)
            elif attr == "sobs":
                # check that segment is valid.
                assert mask is not None

                # Validity checks on masks, get correct mask.
                if type(mask) == str:
                    if mask not in self.mask.mask_names:
                        raise EMObjectException(
                            f"Mask name {mask} is not in the\
                             current object."
                        )
                    else:
                        (ix,) = np.where(self.mask.mask_names == mask)
                        mask = ix[0]

                elif type(mask) == int:
                    if mask >= len(self.mask.mask_names) or mask < 0:
                        raise EMObjectException(
                            f"Mask index {mask} is \
                            out of range for the current object."
                        )

                assert name in self._layerdict[layer]._sobs[mask].columns

                # delete the annotation
                self._layerdict[layer]._sobs[mask].drop(
                    columns=name, axis=0, inplace=True
                )
        self._validate()

    def loc(
        self,
        obs_subset: Optional[Union[np.ndarray, list]] = None,
        var_subset: Optional[Union[np.ndarray, list]] = None,
        seg_subset: Optional[Union[np.ndarray, list, int]] = None,
        mask: Optional[str] = None,
        layer: Optional[str] = None,
    ) -> EMObject:
        """
        Allows for slicing of EMObjects to subsets of interest.

        Args:
            obs_subset: subset of observations to include.
                Elements must belong to obs_ax
            var_subset: subset of variables to include.
                Elements must belong to var_ax
            seg_subset: subset of segments to include.
                Elements must belong to sobs_ax
            mask:
            layer:  the layer to slice within.
                If None, uses the active layer.
        Returns:
            Subsetted EMObject view
        """
        self._validate()

        if layer is None:
            layer = self._activelayer
        else:
            assert layer in self.ax

        if obs_subset is None:
            obs_subset = self._layerdict[layer]._obs_ax
        if var_subset is None:
            var_subset = self._layerdict[layer]._var_ax
        if type(obs_subset) == list:
            obs_subset = np.array(obs_subset)
        if type(var_subset) == list:
            var_subset = np.array(var_subset)

        if seg_subset is not None:
            assert seg_subset in np.unique(self.seg)
            all_r = set()
            if type(seg_subset) == int:
                seg_subset = [seg_subset]
            for si in seg_subset:
                rr, cc = np.where(self._layerdict[layer]._seg == si)
                all_r.update(rr)
            obs_subset = np.array(list(all_r))
            obs_subset = self._layerdict[layer]._obs_ax[obs_subset]

        if mask is not None:
            if self._layerdict[layer]._seg is None:
                _ = self.seg
            assert mask in self.mask.mask_names
            (ix,) = np.where(self.mask.mask_names == mask)
            obs_subset, _ = np.where(self._layerdict[layer]._seg[:, ix] != 0)
            obs_subset = self._layerdict[layer]._obs_ax[obs_subset]

        pos_dict = {}
        for coord_sys in self._layerdict[layer]._pos.keys():
            pos_dict[coord_sys] = (
                self._layerdict[layer]._pos[coord_sys].loc[obs_subset, :]
            )

        # a view will return an in-memory EMObject that has been subsetted
        return EMObject(
            data=self._layerdict[layer].data.loc[obs_subset, var_subset],
            obs=self._layerdict[layer].obs.loc[obs_subset, :],
            var=self._layerdict[layer].var.loc[var_subset, :],
            pos=pos_dict,
            mask=self.mask,
            img=self.img,
            name=f"ViewOf{self.name}-{layer}",
            is_view=True,
        )

    def slice(
        self,
        obs_subset: Optional[Union[np.ndarray, list]] = None,
        seg_subset: Optional[Union[np.ndarray, list, int]] = None,
        anchor_layer: Optional[str] = None,
        layers: Optional[Union[str, list, np.ndarray]] = None,
    ) -> EMObject:
        """
        Slices through all emObject layers on the basis of observations, variables, or masks
        and returns a new EMObject with the subsetted data.

        Args:
            obs_subset: subset of observations to include.
                Elements must belong to obs_ax
            seg_subset: subset of segments to include.
                Elements must belong to sobs_ax
            layers: the layers to slice within. If None, uses all.
        """
        self._validate()

        if layers is None:
            raise EMObjectException("Must specify layers to slice.")
        elif type(layers) == str:
            layers = [layers]
        if type(layers) == list:
            layers = np.array(layers)

        if anchor_layer is not None:
            assert anchor_layer in self.layers
        else:
            anchor_layer = self._activelayer

        """
        if anchor_coord_sys is not None:
            assert anchor_coord_sys in E._layerdict[anchor_layer].pos.keys()
        else:
            warning(f"Coordinate system {anchor_coord_sys} not found in layer {anchor_layer}. Using {list(self._layerdict[anchor_layer].pos.keys())[0]}.")
            anchor_coord_sys = list(E._layerdict[anchor_layer].pos.keys())[0]
        """

        # Check that each layer has an assigned segmentation
        for layer in layers:
            # requires finding a spot to store spot_size
            if (
                self._layerdict[layer].segmentation is None
                and self._layerdict[layer]._assay == "visium"
            ):
                try:
                    test_key = list(self._layerdict[layer].obs.keys())[0]
                    visium_mask = helpers.build_visium_segmentation_mask(
                        spot_coords=self._layerdict[layer].pos[test_key],
                        spot_size=self._layerdict[layer]._spot_size,
                        scale_factor=self._layerdict[layer]._scale_factor,
                        shape=self.mask.dim,
                    )

                    self.mask.add_mask(visium_mask, name="visium_segmentation")
                    self._layerdict[layer].segmentation = "visium_segmentation"
                except Exception:
                    raise EMObjectException(
                        "Could not build segmentation mask for Visium data. Be sure to specify a spot_size in the metadata.\
                                            Alternatively, you can manually build a segmentation mask using utils.helpers.build_visium_segmetnation_mask\
                                            and assign it to the layer."
                    )

            assert self._layerdict[layer].segmentation is not None
            # Also check that the data layers have positions
            assert self._layerdict[layer].pos is not None
            test_key = list(self._layerdict[layer].pos.keys())[0]
            assert len(self._layerdict[layer].data) == len(
                self._layerdict[layer].pos[test_key]
            )

        # Based on the current layer's segmentation, slice the data
        # on the basis of the segmentation mask from the other layers.
        # Then, subset to any additional observations specified.
        # First, construct a new EMObject with the data from the current layer
        if obs_subset is None:
            obs_subset = self._layerdict[self._activelayer].data.index

        pos_dict = {}
        for coord_sys in self._layerdict[anchor_layer]._pos.keys():
            pos_dict[coord_sys] = (
                self._layerdict[anchor_layer]._pos[coord_sys].loc[obs_subset, :]
            )

        E = EMObject(
            data=self._layerdict[anchor_layer].data.loc[obs_subset, :],
            obs=self._layerdict[anchor_layer].obs.loc[obs_subset, :],
            var=self._layerdict[anchor_layer].var,
            pos=pos_dict,
            mask=self.mask,
            img=self.img,
            name=f"sliced_{self.name}",
            first_layer_name=anchor_layer,
            segmentation=self._layerdict[anchor_layer].segmentation,
        )

        # Make binary anchor mask
        try:
            sparse_anchor_mask = sparse.coo_matrix(
                self.mask.mloc(self._layerdict[anchor_layer].segmentation)
            )
        except TypeError:
            sparse_anchor_mask = sparse.coo_matrix(
                self.mask.mloc(self._layerdict[anchor_layer].segmentation).squeeze()
            )
        (obs_subset_anchor_mask_ix,) = np.where(
            np.isin(sparse_anchor_mask.data, obs_subset)
        )  # this is the "binarized array"
        obs_subset_anchor_mask_ix_rr = sparse_anchor_mask.row[obs_subset_anchor_mask_ix]
        obs_subset_anchor_mask_ix_cc = sparse_anchor_mask.col[obs_subset_anchor_mask_ix]

        for layer in layers:
            if layer != anchor_layer:
                assert self.mask.mloc(self._layerdict[layer].segmentation) is not None
                layer_segmentation = self.mask.mloc(self._layerdict[layer].segmentation)
                # Now subset the layer segmentatino using the root segmentation
                if len(layer_segmentation.shape) == 3:
                    layer_segmentation = layer_segmentation.squeeze()
                subset_ids = np.unique(
                    layer_segmentation[
                        obs_subset_anchor_mask_ix_rr, obs_subset_anchor_mask_ix_cc
                    ]
                )
                subset_ids = subset_ids[subset_ids != 0]
                # The values in the segmentation mask should be the cell/spot IDs
                # Subset the data and positions
                # for objects generated from the enable database, the index is the cell id.
                # TODO: Ensure that this is the case for all EMObjects
                pos_dict = {}
                for coord_sys in self._layerdict[layer]._pos.keys():
                    pos_dict[coord_sys] = (
                        self._layerdict[layer]._pos[coord_sys].loc[subset_ids, :]
                    )

                new_layer = BaseLayer(
                    data=self._layerdict[layer].data.loc[subset_ids, :],
                    obs=self._layerdict[layer].obs.loc[subset_ids, :],
                    var=self._layerdict[layer].var,
                    pos=pos_dict,
                    segmentation=self._layerdict[layer].segmentation,
                    name=layer,
                )
                E.add(new_layer)

        return E

    def _observations_for_segment(
        self,
        segment_id: Union[int, list, np.ndarray],
        mask_name: str,
        target_layer: str,
    ) -> dict:
        """Returns the observations in target layer for a given segment ID.
        For example, the CODEX cells of a specified visium spot or ROI mask segment.

        Args:
            segment_id: the segment ID to query
            mask_name: the name of the mask that contains segment_id
            target_layer: the layer to query for observations

        Returns:
            np.ndarray: the observations in target layer for a given segment ID
        """
        segment_to_cell_map = {}

        assert mask_name in self.mask.mask_names
        assert target_layer in self.layers

        # Check that a segmentation mask exists for the target layer
        if self._layerdict[target_layer].segmentation is None:
            if "segmentation_mask" in self.mask.mask_names:
                warning(
                    f"Automatically assigning mask `segmentation_mask` to {target_layer}.\
                        To assign a different mask, use `E.set_layer_segmentation()`."
                )
            else:
                raise EMObjectException(
                    f"No segmentation mask exists for {target_layer}.\
                                        Please specify a segmentation mask for {target_layer}\
                                        with `E.set_layer_segmentation().`"
                )

        if isinstance(segment_id, int):
            segment_id = [segment_id]

        # Get the pixels that belong to the segment
        segment_mask = sparse.coo_matrix(self.mask.mloc(mask_name))

        for si in segment_id:
            (segment_ix,) = np.where(segment_mask.data == si)

            if len(segment_ix) == 0:
                segment_to_cell_map[si] = []

            else:
                segment_ix_rr = segment_mask.row[segment_ix]
                segment_ix_cc = segment_mask.col[segment_ix]

                # Get the target layer's segmentation mask
                target_observations = np.unique(
                    self.mask.mloc(self._layerdict[target_layer].segmentation)[
                        segment_ix_rr, segment_ix_cc
                    ]
                )
                target_observations = target_observations[target_observations != 0]
                segment_to_cell_map[si] = target_observations

        return segment_to_cell_map

    def set_layer(self, value: Optional[str] = None) -> None:
        """Sets the active layer.

        Args:
            value: name of an existing layer in the EMObject
        Returns:
            None
        """
        assert value in self.ax or value is None
        self._validate()

        if value is None:
            self._activelayer = self._defaultlayer
        else:
            self._activelayer = value

    def _validate(self) -> None:
        """Validates the integrity of the EMObject
        as a check against unexpected modifications
        to the object."""

        for layer in self.layers:
            # Check that the obs index is consistent
            obs_ix_data = np.array(
                self._layerdict[layer].data.index
            )  # use arrays for consistency
            var_ix_data = np.array(self._layerdict[layer].data.columns)

            # Check the index of everything that is suppossed to be indexed by observations
            if self._layerdict[layer]._obs is not None:
                if not np.array_equal(
                    np.array(self._layerdict[layer]._obs.index), obs_ix_data
                ):
                    # the content of the index is not the same, check if the length is
                    if len(self._layerdict[layer]._obs.index) != len(obs_ix_data):
                        raise EMObjectException(
                            "Observation index mismatch (different lengths!)."
                        )
                    else:
                        # the length is the same, check if the order is the same
                        warning(
                            "Content of observation index is not the same as data. Renaming data index..."
                        )
                        self._layerdict[layer].data.index = self._layerdict[
                            layer
                        ]._obs.index

                    # raise EMObjectException('Observation index mismatch.')

            # Check that var axis is consistent
            if self._layerdict[layer].var is not None:
                if not np.array_equal(
                    np.array(self._layerdict[layer].var.index), var_ix_data
                ):
                    # the content of the index is not the same, check if the length is
                    if len(self._layerdict[layer].var.index) != len(var_ix_data):
                        raise EMObjectException(
                            "Variable index mismatch (different lengths!)."
                        )
                    else:
                        # the length is the same, check if the order is the same
                        warning(
                            "Content of variable index is not the same as data. Renaming data columns..."
                        )
                        self._layerdict[layer].data.columns = self._layerdict[
                            layer
                        ].var.index
                    # raise EMObjectException('Variable index mismatch.')

            # Check that pos axis is consistent
            if self._layerdict[layer]._pos is not None:
                for key in self._layerdict[layer]._pos.keys():
                    if not np.array_equal(
                        np.array(self._layerdict[layer]._pos[key].index),
                        np.array(self._layerdict[layer].data.index)
                    ):
                        raise EMObjectException("Observation index mismatch (pos).")

        if self._seg is not None and len(obs_ix_data) != self._seg.shape[0]:
            raise EMObjectException("Observation index mismatch.")

        # Check that segment axis is consistent
        if (
            self._seg is not None
            and self._mask is not None
            and self._seg.shape[1] != len(self._mask.mask_names)
        ):
            raise EMObjectException("Segment index mismatch.")

    def add_coordinate_system(
        self,
        label: str = None,
        coords: Union[np.ndarray, pd.DataFrame] = None,
        cols: Optional[Union[list, np.ndarray]] = None,
        scale_factor: Optional[float] = 1.0,
        layer: Optional[str] = None,
    ) -> None:
        """Add a new coordinate system to emobject.

        Args:
            label (str): name of new coordinate system
            coords (Union[np.ndarray, pd.DataFrame, None]): the coordinates (n_obs x dimensions)
            cols Optional[Union[list, np.ndarray, None]]: labels for cols e.g. x, y. Defaults to zero indexed ints
            scale_factor (Optional[float]): scale factor to apply to coordinates (common in Visium). Defaults to 1.

        Returns:
            None
        """

        if label is None:
            raise EMObjectException(
                "Must provide a name/label for this new coordinate system."
            )

        if coords is None:
            raise EMObjectException(
                "Must provide coordinate values for this new coordinate system."
            )

        if layer is None:
            layer = self._activelayer
        else:
            if layer not in self.layers:
                raise EMObjectException("Layer does not exist.")

        assert coords.shape[0] == self.n_obs

        if type(coords) == np.ndarray:
            coords = np.multiply(coords.astype(np.float32), scale_factor)
            coords = pd.DataFrame(
                coords, index=self._layerdict[layer].data.index, columns=cols
            )
        _ = self.pos  # make sure pos exists
        self.pos[label] = coords

    def add_mask(
        self,
        mask: Optional[np.ndarray] = None,
        mask_name: Optional[Union[np.ndarray, list]] = None,
    ) -> None:
        """Add a new mask to emobject.

        Args:
            mask (Optional[np.ndarray]): mask array (n_obs x n_masks)
            mask_name (Optional[Union[np.ndarray, list]]): name of mask. Defaults to None.
        """

        if mask is None:
            raise EMObjectException("Must provide a mask array.")

        if mask_name is None:
            raise EMObjectException("Must provide a mask name.")

        if type(mask_name) == str:
            mask_name = np.array([mask_name])

        if type(mask) == list:
            mask = np.array(mask)

        if type(mask_name) == list:
            mask_name = np.array(mask_name)

        if len(mask.shape) == 3:
            if mask.shape[0] != mask_name.shape[0]:
                raise EMObjectException("Mask and mask_name must have the same length.")
        else:
            if mask_name.shape[0] != 1:
                raise EMObjectException("Mask and mask_name must have the same length.")

        if self.mask is None:
            self._mask = EMMask(mask, mask_name)
        else:
            self._mask.add_mask(mask, mask_name)

    def set_layer_segmentation(self, layer: Optional[str], segmentation: str) -> None:
        """Sets the segmentation for a layer.

        Args:
            layer (str): name of layer
            segmentation (str): name of segmentation
        """

        if layer not in self.layers:
            raise EMObjectException(f"Layer {layer} does not exist.")

        if layer is None:
            layer = self._activelayer
            warning(
                f"Layer not specified, setting segmentation for active layer {layer}."
            )

        if segmentation not in self.mask.mask_names:
            raise EMObjectException(f"Mask {segmentation} does not exist.")

        self._layerdict[layer].segmentation = segmentation

    def drop_obs(self, obs: Union[list, np.ndarray]) -> None:
        """Drop observations from the current layer of the emObject.

        Args:
            obs (Union[list, np.ndarray]): list of observations to drop
        """

        if self._activelayer is None:
            raise EMObjectException("No active layer.")

        if type(obs) == list:
            obs = np.array(obs)

        self._layerdict[self._activelayer].data.drop(obs, inplace=True)
        self._layerdict[self._activelayer]._obs.drop(obs, inplace=True)

        if self._layerdict[self._activelayer]._pos is not None:
            for key in self._layerdict[self._activelayer]._pos.keys():
                self._layerdict[self._activelayer]._pos[key].drop(obs, inplace=True)

        self._layerdict[self._activelayer]._obs_ax = np.array(
            self._layerdict[self._activelayer].data.index
        )

        if self._seg is not None:
            warning("Existing `.seg` will be dropped. Attempting to reconstruct...")
            self._seg = None
            try:
                _ = self.seg
            except IndexError:
                warning(
                    "Failed to reconstruct `.seg`. Please pass an explicit coordinate system to `E.build_seg()`."
                )
                pass

        self._validate()

    def drop_var(self, var: Union[list, np.ndarray]) -> None:
        """Drop variables from the current layer of the emObject.

        Args:
            var (Union[list, np.ndarray]): list of variables to drop
        """

        if self._activelayer is None:
            raise EMObjectException("No active layer.")

        if type(var) == list:
            var = np.array(var)

        self._layerdict[self._activelayer].data.drop(var, axis=1, inplace=True)
        self._layerdict[self._activelayer].var.drop(var, inplace=True)
        self._layerdict[self._activelayer]._var_ax = np.array(
            self._layerdict[self._activelayer].data.columns
        )

        self._validate()

    def add_measurements(
        self,
        measurements: Union[np.ndarray, pd.DataFrame],
        var_names: Optional[Union[np.ndarray, list]] = None,
        layer: Optional[str] = None,
    ) -> None:
        """
        Add new variables to the current layer of the emObject.
        This expands the variable axis of the data array.
        """
        if layer is None:
            layer = self._activelayer

        if layer not in self.layers:
            raise EMObjectException("Layer does not exist.")

        if var_names is None and type(measurements) == np.ndarray:
            var_names = np.array([f"new_obs_{i}" for i in range(measurements.shape[1])])

        if type(measurements) == np.ndarray:
            measurements = pd.DataFrame(
                measurements, index=self._layerdict[layer].data.index, columns=var_names
            )

        if measurements.shape[0] != self.n_obs:
            raise EMObjectException(
                f"New measurements must have the same number of rows as existing data objects. Found {measurements.shape[0]} rows, expected {self.n_obs}."
            )

        if not np.all(measurements.index == self._layerdict[layer].data.index):
            raise EMObjectException(
                "New measurements must have the same index as existing observations."
            )

        self._layerdict[layer].data = pd.concat(
            [self._layerdict[layer].data, measurements], axis=1
        )
        self._layerdict[layer]._var_ax = np.array(self._layerdict[layer].data.columns)

        # also need to update var, which is indexed
        if (
            self._layerdict[layer].var is not None
            and self._layerdict[layer].var.shape[1] > 0
        ):
            self._layerdict[layer].var = pd.concat(
                [
                    self._layerdict[layer].var,
                    pd.DataFrame(
                        np.empty(measurements.shape[1], self.var.shape[1]),
                        index=measurements.columns,
                    ),
                ],
                axis=0,
            )
        else:
            self._layerdict[layer].var = pd.DataFrame(
                data=None, index=self._layerdict[layer]._var_ax
            )

        self._validate()

    def slice_on_segment(
        self,
        segment_id: Union[int, list, np.ndarray],
        mask_name: str = None,
        target_layer: Optional[str] = None,
    ) -> dict:
        """Slice the emObject on a segment.

        Args:
            segment_id (Union[int, list, np.ndarray]): segment id(s)
            mask_name (str): name of mask to use
            target_layer (Optional[str]): name of layer to slice on

        Returns:
            dict: dictionary of observations for each segment
        """

        if target_layer is None:
            target_layer = self._activelayer

        return self._observations_for_segment(
            segment_id=segment_id, mask_name=mask_name, target_layer=target_layer
        )

    def cite() -> None:
        """Prints the citation for the emObject package."""
        print(
            "If you use emObject in your research, please cite the following:\n\n \
            @article{Baker2023.06.07.543950, \
                author = {Ethan Alexander Garcia Baker and Meng-Yao Huang and Amy Lam and Maha K. Rahim and Matthew F. Bieniosek and Bobby Wang and Nancy R. Zhang and Aaron T Mayer and Alexandro E Trevino},\
                journal = {bioRxiv}, \
                title = {emObject: domain specific data abstraction for spatial omics},\
                year = {2023}}"  # noqa
        )

Classes

class EMObject (data: Optional[Union[np.ndarray, sparse.spmatrix, pd.DataFrame]] = None, obs: Optional[pd.DataFrame] = None, var: Optional[pd.DataFrame] = None, pos: Optional[Union[np.ndarray, pd.DataFrame]] = None, sobs: Optional[pd.DataFrame] = None, mask: Optional[Union[np.ndarray, EMMask]] = None, img: Optional[Union[np.ndarray, EMImage]] = None, meta: Optional[pd.DataFrame] = None, name: Optional[str] = None, assay: Optional[str] = None, scale_factor: Optional[float] = None, segmentation: Optional[str] = None, is_view: Optional[bool] = False, first_layer_name: Optional[str] = None)

An object that represents a single image capture (region).

Args

data
data matrix (n_obs x n_var)
obs
observation matrix (n_obs x annotations)
var
variable matrix (n_var x annotations)
pos
spatial coordinate matrix (n_obs x n_spatial_dimensions)
sobs
segment observations (might remove)
mask
ROI or single cell segmentation masks as array
img
a multiplexed image
meta
metadata about the entire region
name
a name for the EMObject
is_view
toggles whether to treat this as a view of another EMObject
first_layer_name
the name of the first layer to be added, if None, uses object name.

Contains the all tabular/tensor data.

Args

data : Optional[Union[pd.DataFrame, np.ndarray, sparse.spmatrix]]
data matrix (n_obs x n_var)
obs : pd.DataFrame
observation matrix (n_obs x annotations)
var : Optional[pd.DataFrame]
variable matrix (n_var x annotations)
sobs : Optional[pd.DataFrame]
segment observations
name : Optional[str]
a name for the layer
pos : Optional[Union[pd.DataFrame, dict]]
dictionary of position matricies
segmentation : Optional[str]
name of segmentation mask
scale_factor : Optional[float]
scale factor for spatial data/Visium
assay : Optional[str]
assay type (e.g. Visium, seqFISH, etc.)
spot_size : Optional[float]
spot size for Visium

Returns

BaseLayer instance

Expand source code
class EMObject(LayeredData, BaseLayer):
    """
    An object that represents a single image capture (region).

    Args:
        data: data matrix (n_obs x n_var)
        obs: observation matrix (n_obs x annotations)
        var: variable matrix (n_var x annotations)
        pos: spatial coordinate matrix (n_obs x n_spatial_dimensions)
        sobs: segment observations (might remove)
        mask: ROI or single cell segmentation masks as array
        img: a multiplexed image
        meta: metadata about the entire region
        name: a name for the EMObject
        is_view: toggles whether to treat this as a view of another EMObject
        first_layer_name: the name of the first layer to be added, if None, uses object name.
    """

    def __init__(
        self,
        data: Optional[Union[np.ndarray, sparse.spmatrix, pd.DataFrame]] = None,
        obs: Optional[pd.DataFrame] = None,
        var: Optional[pd.DataFrame] = None,
        pos: Optional[Union[np.ndarray, pd.DataFrame]] = None,
        sobs: Optional[pd.DataFrame] = None,
        mask: Optional[Union[np.ndarray, EMMask]] = None,
        img: Optional[Union[np.ndarray, EMImage]] = None,
        meta: Optional[pd.DataFrame] = None,
        name: Optional[str] = None,
        assay: Optional[str] = None,
        scale_factor: Optional[float] = None,
        segmentation: Optional[str] = None,
        is_view: Optional[bool] = False,
        first_layer_name: Optional[str] = None,
    ) -> EMObject:
        super(EMObject, self).__init__()
        BaseLayer.__init__(self)

        if name is None:
            self._name = "default"
        else:
            self._name = name

        if first_layer_name is None:
            first_layer_name = self._name

        if data is not None:
            self.add(
                BaseLayer(
                    data=data,
                    obs=obs,
                    var=var,
                    name=first_layer_name,
                    segmentation=segmentation,
                    scale_factor=scale_factor,
                    pos=pos,
                    assay=assay,
                )
            )

            self._var_ax = self._layerdict[first_layer_name]._var_ax
            self._obs_ax = self._layerdict[first_layer_name]._obs_ax
        else:
            self._data = None
            self._var_ax = None
            self._obs_ax = None

        try:
            self._obs = self._layerdict[self._name].obs
            self._var = self._layerdict[self._name].var
        except KeyError:
            self._obs = None
            self._var = None

        self._activelayer = self._name
        self._sobs_ax = None
        self._meta = meta
        self._img = img
        self._mask = mask
        self._seg = None
        self._name = name
        self._defaultlayer = self._name

        # TO DO: Graphs and sobs
        self._sobs = None
        self.graph = None

    def __getitem__(self, key: str) -> BaseLayer:
        """
        Returns a layer from the EMObject.

        Args:
            key: the name of the layer to be returned.

        Returns:
            layer: the layer specified by key.
        """
        return self._layerdict[key]

    ##################################
    # ATTRIBUTE CONSTRUCTION
    ##################################

    def _build_meta(self, meta: Optional[pd.DataFrame] = None) -> pd.DataFrame:
        """
        Store meta data
        """

        if meta is not None:
            if type(meta) == pd.DataFrame:
                return meta
            else:
                return None

    def _build_img(
        self,
        img: Optional[Union[np.ndarray, EMImage]] = None,
        _var_ax: Optional[Union[np.ndarray, pd.Index]] = None,
    ) -> EMImage:
        if type(img) == np.ndarray:
            img = EMImage(img, channels=_var_ax.to_numpy())
        return img

    def _build_mask(self, mask) -> Optional[EMMask]:
        if type(mask) == np.ndarray:
            return EMMask(masks=mask, mask_idx=None, to_disk=False)
        else:
            return mask

    def build_seg(self, coord_sys: Optional[str]):
        """
        Builds an assignement matrix of cells to segments using a specific coordinate system.

        Args:
            coord_sys: str
                The coordinate system to use for building the segmentation.
        Returns:
            seg: np.ndarray
                seg is a `n_obs` x `n_mask` array. Columns correspond to masks.
                Non-zero integer values assign cells to specific segments.
        """
        assert coord_sys in self._layerdict[self._activelayer]._pos.keys()
        self._layerdict[self._activelayer]._seg = self._build_seg(coord_sys=coord_sys)

    def _build_seg(self, coord_sys=None) -> np.ndarray:
        """
        Builds an assignement matrix of cells to segments.

        n_obs x n_mask tensor, encoded with IDs.

        Args:
            None

        Returns:
            seg: np.ndarray
                seg is a `n_obs` x `n_mask` array. Columns correspond to masks.
                Non-zero integer values assign cells to specific segments.
        """

        if self.mask is None:
            raise EMObjectException("Masks not provided, cannot generate seg.")
        elif self.pos is None:
            raise EMObjectException(
                "Spatial information not stored in pos, \
                cannot generate seg."
            )

        if coord_sys is None:
            # If there are multiple coordinate systems in the layer, warn the user, and use the original one.
            if len(self._layerdict[self._activelayer]._pos.keys()) > 1:
                warning(
                    f"Multiple spatial coordinates detected, using first coordinate set: \
                    {list(self._layerdict[self._activelayer]._pos.keys())[0]}. \
                    If build fails, try specifying a coordinate system with E.build_seg(coord_sys='X')."
                )
            orig_coord_key = list(self._layerdict[self._activelayer]._pos.keys())[0]
        else:
            orig_coord_key = coord_sys

        shape = (self.n_obs, self.mask.n_masks)
        seg = np.zeros(shape, dtype=np.uint16)
        n_seg = self.mask.n_seg

        mask_idx = 0
        for mask_name in n_seg.keys():
            mask = self.mask.mloc(mask_name)
            seg_ids = np.unique(mask)
            try:
                coords = (
                    self._layerdict[self._activelayer]._pos[orig_coord_key].to_numpy()
                )
            except AttributeError:
                if self._layerdict[self._activelayer] == "visium":
                    warning(
                        "Visium data detected, applying scalefactor to segment coordinates.\
                            Be sure the appropriate scalefactor is set (E.scalefactor = X)"
                    )

                    coords = (
                        self._layerdict[self._activelayer]
                        ._pos[orig_coord_key]
                        .to_numpy()
                        * self.scale_factor
                    )
                else:
                    coords = (
                        self._layerdict[self._activelayer]
                        ._pos[orig_coord_key]
                        .to_numpy()
                    )

            coords = coords.astype(int)

            for si in seg_ids:
                if si == 0:
                    continue
                val = mask[coords[:, 1], coords[:, 0]]
                (idx,) = np.where(val == si)
                seg[idx, mask_idx] = si
            mask_idx += 1
        return seg

    def _build_sobs(self) -> dict:
        """
        Builds the observation matrix for segments.

        Annotations are done on a maskwise basis. Therefore,
        this object is structured as a list of length `n_masks`
        where each entry is a `n_seg` x `n_annotation` matrix.
        """

        n_masks = self.mask.n_masks
        mask_names = self.mask.mask_names
        sobs = dict()

        for i in range(0, n_masks):
            sobs_ix = np.unique(self.seg[:, i])
            if sobs_ix[0] == 0:
                sobs_ix = sobs_ix[1:]
            mask_name = mask_names[i]
            sobs[mask_name] = pd.DataFrame(index=sobs_ix)

        return sobs

    ##################################
    # PROPERTIES
    ##################################

    # LAYER SPECIFIC ATTRIBUTES
    # Data
    @property
    def data(self) -> Optional[pd.DataFrame]:
        if len(self.ax) >= 1:
            return self._layerdict[self._activelayer].data
        else:
            return None

    @property
    def X(self) -> Optional[pd.DataFrame]:
        return self.data

    # var
    @property
    def n_var(self) -> int:
        return len(self._layerdict[self._activelayer]._var_ax)

    @property
    def var(self) -> pd.DataFrame:
        if self._var is not None:
            if self._layerdict[self._activelayer].var is None:
                self._layerdict[self._activelayer].var = self._var
        return self._layerdict[self._activelayer].var

    @var.setter
    def var(self, value: Optional[Union[np.array, pd.DataFrame]]) -> None:
        if value.shape[0] != self.var_ax.shape[0]:
            raise EMObjectException(
                "Must be a `n_var` length array of arbitrary\
                 width."
            )
        self._layerdict[self._activelayer].var = value
        if self._activelayer == "raw":
            self._var = value

    # obs
    @property
    def n_obs(self) -> int:
        return len(self._layerdict[self._activelayer]._obs_ax)

    @property
    def obs(self) -> pd.DataFrame:
        self._validate()
        if self._obs is not None:
            if self._layerdict[self._activelayer].obs is None:
                self._layerdict[self._activelayer].obs = self._obs
        return self._layerdict[self._activelayer].obs

    @obs.setter
    def obs(self, value: Optional[Union[np.array, pd.DataFrame]]) -> None:
        if value.shape[0] != self.obs_ax.shape[0]:
            raise EMObjectException(
                "Must be a `n_obs` length array of arbitrary\
                 width."
            )
        self._layerdict[self._activelayer].obs = value
        if self._activelayer == "raw":
            self._obs = value
        self._validate()

    # EMOBJECT ADDITIONAL ATTRIBUTES

    @property
    def pos(self) -> pd.DataFrame:
        return self._layerdict[self._activelayer]._pos

    @property
    def seg(self) -> Optional[list]:
        if self._layerdict[self._activelayer]._seg is None:
            self._layerdict[self._activelayer]._seg = self._build_seg()
        return self._layerdict[self._activelayer]._seg

    @property
    def meta(self) -> Optional[pd.DataFrame]:
        if self._meta is not None:
            self._meta = self._build_meta(self._meta)
        return self._meta

    @property
    def img(self) -> Optional[EMImage]:
        if self._img is not None:
            self._img = self._build_img(self._img, self._var_ax)
        return self._img

    @property
    def mask(self) -> Optional[EMMask]:
        if self._mask is not None:
            self._mask = self._build_mask(self._mask)
        return self._mask

    @property
    def sobs(self):
        if self._sobs is None:
            self._sobs = self._build_sobs()
            # Add to the BaseLayer
            self._layerdict[self._activelayer].sobs = self._sobs
        elif self._layerdict[self._activelayer].sobs is None:
            self._layerdict[self._activelayer].sobs = self._sobs
        return self._sobs

    # EMOBJECT-WIDE PROPERTIES

    # Generic/Informational Properties
    @property
    def name(self) -> str:
        return self._name

    @name.setter
    def name(self, value) -> None:
        if type(value) is not str:
            raise EMObjectException("New EMObject name must be of type str.")
        self._name = value

    @property
    def n_seg(self) -> int:
        if self.mask is not None:
            return self.mask.n_seg
        else:
            return 0

    @property
    def is_view(self) -> bool:
        return self._is_view

    @is_view.setter
    def is_view(self, value) -> None:
        pass  # should be immutable I think

    @property
    def layers(self) -> list:
        return self.ax

    @property
    def summary(self) -> None:
        print(f"EMObject Version {__version__}")
        print(f"EMObject: {self.name}")
        print(f"Layers: {len(self.ax)}")

        for layer in self.layers:
            print(f"\t Layer: {layer}")
            print(f"\t\t Layer segmentation: {self._layerdict[layer]._segmentation}")
            print(f"\t\t Assay: {self._layerdict[layer]._assay}")

            print(f"\t\t n_obs: {len(self._layerdict[layer]._obs_ax)}")
            print(f"\t\t n_var: {len(self._layerdict[layer]._var_ax)}")
            """if len(self.mask.n_seg) > 0:
                print(f"\t\t n_seg: {self.n_seg}")
"""
        if self._mask is not None:
            print(f"masks: {self.mask.n_masks}")

        if self._seg is not None:
            print("Segment Summary:")
            print("Mask \t Segments")
            for i in self.mask.n_seg.keys():
                print(f"{i} \t {self.mask.n_seg[i]}")

    @property
    def version(self) -> float:
        print(f"EMObject Version {__version__}")
        return __version__

    @property
    def layer(self) -> str:
        return self._activelayer

    @layer.setter
    def layer(self, value) -> None:
        assert value in self.layers or value is None
        if value is None:
            self._activelayer = self._defaultlayer
        else:
            self._activelayer = value

    # Axes

    # TO DO: Should there be setters for changing axis? if so, how can the
    # dataframes be kept in alignment/share axes?
    @property
    def var_ax(self) -> pd.Index:
        # return self._var_ax
        return self._layerdict[self._activelayer]._var_ax

    @property
    def obs_ax(self) -> pd.Index:
        # return self._obs_ax
        return self._layerdict[self._activelayer]._obs_ax

    @property
    def scale_factor(self) -> float:
        return self._layerdict[self._activelayer].scale_factor

    @scale_factor.setter
    def scale_factor(self, value: float) -> None:
        self._layerdict[self._activelayer].scale_factor

    @property
    def assay(self) -> str:
        return self._layerdict[self._activelayer].assay

    @assay.setter
    def assay(self, value: str) -> None:
        self._layerdict[self._activelayer].assay = value.lower()

    @property
    def default_layer(self) -> str:
        return self._defaultlayer

    @default_layer.setter
    def default_layer(self, value: str) -> None:
        if value not in self.layers:
            raise EMObjectException(f"Layer {value} does not exist.")
        self._default_layer = value

    ##################################
    # METHODS
    ##################################

    def add_anno(
        self,
        attr: Optional[str] = None,
        value: Optional[Union[np.array, pd.DataFrame, list]] = None,
        name: Optional[Union[np.array, list, str]] = None,
        layer: Optional[str] = None,
        mask: Optional[Union[int, str]] = None,
    ) -> None:
        """
        Adds a new annotation to an attribute (var, obs, sobs).

        Args:
            attr: the attribute to add annotation to. One of
                   'sobs', 'var', 'obs'.
            value: the annotation. Must be array-like of same
                      size as axis.
            name: annotation name
            layer:   the layer to slice within. If None, uses the active layer.
            mask:   the mask to which segment observations are applied.
                Required if attr='sobs'.

        Returns:
            None
        """
        self._validate()

        # Set the layer
        if layer is None:
            layer = self._activelayer
        else:
            assert layer in self.ax

        # Some checks on inputs
        if type(value) == list:
            value = np.array(value)
        if len(value.shape) == 1:
            value = value.reshape(value.shape[0], 1)

        # Deal with annotation names.
        if name is not None:
            # Standardize as ndarray
            if type(name) == str or type(name) == list:
                name = np.array(name).reshape(-1)
            n_names_reqd = value.shape[1]  # n_obs/n_var/n_seg x n_newAnnos

            if name.shape[0] < n_names_reqd:
                # Handle case where too few names are provided
                warning(
                    f"Expected {n_names_reqd} names but received \
                    {name.shape[0]} names. Generating dummy annotation names."
                )
                n_new_names = n_names_reqd - name.shape[0]
                new_names = np.array([f"anno_{i}" for i in range(0, n_new_names)])
                name = np.concatenate([name, new_names])
        else:
            if type(value) != pd.DataFrame:
                #  No names provided, handle this.
                warning(
                    "No annotation names received. Generating dummy annotation\
                    names."
                )
                n_new_names = value.shape[1]
                name = np.array([f"anno_{i}" for i in range(0, n_new_names)])
                assert name.shape[0] == value.shape[1]

        if type(value) == pd.DataFrame:
            name = value.columns

        if attr == "var":
            # add attribute on var.
            # check annotation matches specified attribute
            assert value.shape[0] == self._layerdict[layer]._var_ax.shape[0]

            # Check that new annotation names are unique
            existing_annos = list(self._layerdict[layer].var.index)
            for n in name:
                if n in existing_annos:
                    raise EMObjectException(
                        f"The annotation name {n} already \
                        exists in the {attr} attribute. Please provide a unique \
                        name for the new annotation."
                    )

            if type(value) == np.ndarray:
                _appenddf = pd.DataFrame(
                    data=value, columns=name, index=self._layerdict[layer]._var_ax
                )
            elif type(value) == pd.DataFrame:
                _appenddf = value
                if name is not None:
                    _appenddf.columns = name
                if not _appenddf.index.equals(self._var_ax):
                    warning(
                        "Provided dataframe index does not match the var axis. Attempting to join on index."
                    )

            if (
                self._layerdict[layer]._var is None
                or self._layerdict[layer]._var.shape[1] == 0
            ):
                self._layerdict[layer]._var = _appenddf
            else:
                # left join on index
                self._layerdict[layer]._var = self._layerdict[layer]._var.join(
                    _appenddf, how="left"
                )
                """self._layerdict[layer]._var = pd.concat([self._layerdict[layer]._var, # noqa
                                                         _appenddf],
                                                        axis=1)"""
        elif attr == "obs":
            # add an attribute on obs
            # check annotation matches specified attribute
            assert value.shape[0] == self._layerdict[layer]._obs_ax.shape[0]

            # Check that new annotation names are unique
            existing_annos = self._layerdict[layer]._obs.columns
            for n in name:
                if n in existing_annos:
                    raise EMObjectException(
                        f"The annotation name {n} already \
                        exists in the {attr} attribute. Please provide a unique \
                        name for the new annotation."
                    )

            if type(value) == np.ndarray:
                _appenddf = pd.DataFrame(
                    data=value, columns=name, index=self._layerdict[layer]._obs_ax
                )
            elif type(value) == pd.DataFrame:
                _appenddf = value
                if name is not None:
                    _appenddf.columns = name
                if not _appenddf.index.equals(self._layerdict[layer]._obs_ax):
                    warning(
                        "Index of new annotation does not match obs index. Attempting to join on index value."
                    )

            if (
                self._layerdict[layer]._obs is None
                or self._layerdict[layer]._obs.shape[1] == 0
            ):
                self._layerdict[layer]._obs = _appenddf
            else:
                # join the new annotation horizontally on the index
                self._layerdict[layer]._obs = self._layerdict[layer]._obs.join(
                    _appenddf, how="left"
                )  # noqa
                """self._layerdict[layer]._obs = pd.concat([self._layerdict[layer]._obs, # noqa
                                                         _appenddf],
                                                        axis=1)"""

        elif attr == "sobs":
            # add an attribute on sobs
            if mask is None:
                raise EMObjectException("Mask to annotate is unspecified.")

            # Check that new annotation names are unique
            _ = self.sobs  # build sobs if not already built
            existing_annos = self._layerdict[layer]._sobs[mask].columns
            for n in name:
                if n in existing_annos:
                    raise EMObjectException(
                        f"The annotation name {n} already \
                        exists in the {attr} attribute. Please provide a unique \
                        name for the new annotation."
                    )

            # check that segment is valid.
            if type(mask) == str:
                if mask not in self.mask.mask_names:
                    raise EMObjectException(
                        f"Mask name {mask} is not in the current\
                     object."
                    )
            else:
                raise EMObjectException(
                    f"Mask must be a string. Received {type(mask)}."
                )

            # Get index for sobs
            anno_idx = self.sobs[mask].index  # hopefully calls build_sobs?
            if value.shape[0] != anno_idx.shape[0]:
                if anno_idx.shape[0] - value.shape[0] == 1:
                    # Handle case where one segment is missing
                    raise EMObjectException(
                        "Received one fewer segment annotations than expected.\
                         This is likely due to a missing background segment annotation (seg_id = 0). Try prepending a np.nan dummy."
                    )
                    """new_val = [np.nan]
                    for v in value:
                        new_val.append(v)
                    value = np.array(new_val)
                    assert value.shape[0] == anno_idx.shape[0]"""
                else:
                    raise EMObjectException(
                        f"Annotation length does not match the\
                        number of segments in the mask. Expected {anno_idx.shape[0]}\
                        but received {value.shape[0]}."
                    )

            if type(value) == np.ndarray:
                _appenddf = pd.DataFrame(data=value, columns=name, index=anno_idx)
            elif type(value) == pd.DataFrame:
                _appenddf = value
                if name is not None:
                    _appenddf.columns = name
                _appenddf.index = self._sobs_ax

            if (
                self._layerdict[layer]._obs is None
                or self._layerdict[layer]._obs.shape[1] == 0
            ):
                self._layerdict[layer]._sobs[mask] = _appenddf
            else:
                self._layerdict[layer]._sobs[mask] = pd.concat(
                    [self._layerdict[layer]._sobs[mask], _appenddf], axis=1  # noqa
                )
        else:
            raise EMObjectException(
                f"Unrecognized attribute to annotate. Must be one\
                 of 'sobs', 'var', 'obs', but received {attr}."
            )

        self._validate()

    def del_anno(
        self,
        attr: Optional[str] = None,
        name: Optional[str] = None,
        layer: Optional[str] = None,
        mask: Optional[Union[str, int]] = None,
    ) -> None:
        """Delete an annotation from an annotation matrix.

        Args:
            attr: the attribute to add annotation to. One of
                   'sobs', 'var', 'obs'.
            name: annotation name
            layer:   the layer to slice within. If None, uses the active layer.
            mask:   the mask to which segment observations are applied.
                Required if attr='sobs'.

        Returns:
            None
        """
        self._validate()

        if layer is None:
            layer = self._activelayer

        if attr is not None:
            if attr == "obs":
                assert name in self._layerdict[layer]._obs.columns
                self._layerdict[layer]._obs.drop(columns=name, axis=0, inplace=True)
            elif attr == "var":
                assert name in self._layerdict[layer]._var.columns
                self._layerdict[layer]._var.drop(columns=name, axis=0, inplace=True)
            elif attr == "sobs":
                # check that segment is valid.
                assert mask is not None

                # Validity checks on masks, get correct mask.
                if type(mask) == str:
                    if mask not in self.mask.mask_names:
                        raise EMObjectException(
                            f"Mask name {mask} is not in the\
                             current object."
                        )
                    else:
                        (ix,) = np.where(self.mask.mask_names == mask)
                        mask = ix[0]

                elif type(mask) == int:
                    if mask >= len(self.mask.mask_names) or mask < 0:
                        raise EMObjectException(
                            f"Mask index {mask} is \
                            out of range for the current object."
                        )

                assert name in self._layerdict[layer]._sobs[mask].columns

                # delete the annotation
                self._layerdict[layer]._sobs[mask].drop(
                    columns=name, axis=0, inplace=True
                )
        self._validate()

    def loc(
        self,
        obs_subset: Optional[Union[np.ndarray, list]] = None,
        var_subset: Optional[Union[np.ndarray, list]] = None,
        seg_subset: Optional[Union[np.ndarray, list, int]] = None,
        mask: Optional[str] = None,
        layer: Optional[str] = None,
    ) -> EMObject:
        """
        Allows for slicing of EMObjects to subsets of interest.

        Args:
            obs_subset: subset of observations to include.
                Elements must belong to obs_ax
            var_subset: subset of variables to include.
                Elements must belong to var_ax
            seg_subset: subset of segments to include.
                Elements must belong to sobs_ax
            mask:
            layer:  the layer to slice within.
                If None, uses the active layer.
        Returns:
            Subsetted EMObject view
        """
        self._validate()

        if layer is None:
            layer = self._activelayer
        else:
            assert layer in self.ax

        if obs_subset is None:
            obs_subset = self._layerdict[layer]._obs_ax
        if var_subset is None:
            var_subset = self._layerdict[layer]._var_ax
        if type(obs_subset) == list:
            obs_subset = np.array(obs_subset)
        if type(var_subset) == list:
            var_subset = np.array(var_subset)

        if seg_subset is not None:
            assert seg_subset in np.unique(self.seg)
            all_r = set()
            if type(seg_subset) == int:
                seg_subset = [seg_subset]
            for si in seg_subset:
                rr, cc = np.where(self._layerdict[layer]._seg == si)
                all_r.update(rr)
            obs_subset = np.array(list(all_r))
            obs_subset = self._layerdict[layer]._obs_ax[obs_subset]

        if mask is not None:
            if self._layerdict[layer]._seg is None:
                _ = self.seg
            assert mask in self.mask.mask_names
            (ix,) = np.where(self.mask.mask_names == mask)
            obs_subset, _ = np.where(self._layerdict[layer]._seg[:, ix] != 0)
            obs_subset = self._layerdict[layer]._obs_ax[obs_subset]

        pos_dict = {}
        for coord_sys in self._layerdict[layer]._pos.keys():
            pos_dict[coord_sys] = (
                self._layerdict[layer]._pos[coord_sys].loc[obs_subset, :]
            )

        # a view will return an in-memory EMObject that has been subsetted
        return EMObject(
            data=self._layerdict[layer].data.loc[obs_subset, var_subset],
            obs=self._layerdict[layer].obs.loc[obs_subset, :],
            var=self._layerdict[layer].var.loc[var_subset, :],
            pos=pos_dict,
            mask=self.mask,
            img=self.img,
            name=f"ViewOf{self.name}-{layer}",
            is_view=True,
        )

    def slice(
        self,
        obs_subset: Optional[Union[np.ndarray, list]] = None,
        seg_subset: Optional[Union[np.ndarray, list, int]] = None,
        anchor_layer: Optional[str] = None,
        layers: Optional[Union[str, list, np.ndarray]] = None,
    ) -> EMObject:
        """
        Slices through all emObject layers on the basis of observations, variables, or masks
        and returns a new EMObject with the subsetted data.

        Args:
            obs_subset: subset of observations to include.
                Elements must belong to obs_ax
            seg_subset: subset of segments to include.
                Elements must belong to sobs_ax
            layers: the layers to slice within. If None, uses all.
        """
        self._validate()

        if layers is None:
            raise EMObjectException("Must specify layers to slice.")
        elif type(layers) == str:
            layers = [layers]
        if type(layers) == list:
            layers = np.array(layers)

        if anchor_layer is not None:
            assert anchor_layer in self.layers
        else:
            anchor_layer = self._activelayer

        """
        if anchor_coord_sys is not None:
            assert anchor_coord_sys in E._layerdict[anchor_layer].pos.keys()
        else:
            warning(f"Coordinate system {anchor_coord_sys} not found in layer {anchor_layer}. Using {list(self._layerdict[anchor_layer].pos.keys())[0]}.")
            anchor_coord_sys = list(E._layerdict[anchor_layer].pos.keys())[0]
        """

        # Check that each layer has an assigned segmentation
        for layer in layers:
            # requires finding a spot to store spot_size
            if (
                self._layerdict[layer].segmentation is None
                and self._layerdict[layer]._assay == "visium"
            ):
                try:
                    test_key = list(self._layerdict[layer].obs.keys())[0]
                    visium_mask = helpers.build_visium_segmentation_mask(
                        spot_coords=self._layerdict[layer].pos[test_key],
                        spot_size=self._layerdict[layer]._spot_size,
                        scale_factor=self._layerdict[layer]._scale_factor,
                        shape=self.mask.dim,
                    )

                    self.mask.add_mask(visium_mask, name="visium_segmentation")
                    self._layerdict[layer].segmentation = "visium_segmentation"
                except Exception:
                    raise EMObjectException(
                        "Could not build segmentation mask for Visium data. Be sure to specify a spot_size in the metadata.\
                                            Alternatively, you can manually build a segmentation mask using utils.helpers.build_visium_segmetnation_mask\
                                            and assign it to the layer."
                    )

            assert self._layerdict[layer].segmentation is not None
            # Also check that the data layers have positions
            assert self._layerdict[layer].pos is not None
            test_key = list(self._layerdict[layer].pos.keys())[0]
            assert len(self._layerdict[layer].data) == len(
                self._layerdict[layer].pos[test_key]
            )

        # Based on the current layer's segmentation, slice the data
        # on the basis of the segmentation mask from the other layers.
        # Then, subset to any additional observations specified.
        # First, construct a new EMObject with the data from the current layer
        if obs_subset is None:
            obs_subset = self._layerdict[self._activelayer].data.index

        pos_dict = {}
        for coord_sys in self._layerdict[anchor_layer]._pos.keys():
            pos_dict[coord_sys] = (
                self._layerdict[anchor_layer]._pos[coord_sys].loc[obs_subset, :]
            )

        E = EMObject(
            data=self._layerdict[anchor_layer].data.loc[obs_subset, :],
            obs=self._layerdict[anchor_layer].obs.loc[obs_subset, :],
            var=self._layerdict[anchor_layer].var,
            pos=pos_dict,
            mask=self.mask,
            img=self.img,
            name=f"sliced_{self.name}",
            first_layer_name=anchor_layer,
            segmentation=self._layerdict[anchor_layer].segmentation,
        )

        # Make binary anchor mask
        try:
            sparse_anchor_mask = sparse.coo_matrix(
                self.mask.mloc(self._layerdict[anchor_layer].segmentation)
            )
        except TypeError:
            sparse_anchor_mask = sparse.coo_matrix(
                self.mask.mloc(self._layerdict[anchor_layer].segmentation).squeeze()
            )
        (obs_subset_anchor_mask_ix,) = np.where(
            np.isin(sparse_anchor_mask.data, obs_subset)
        )  # this is the "binarized array"
        obs_subset_anchor_mask_ix_rr = sparse_anchor_mask.row[obs_subset_anchor_mask_ix]
        obs_subset_anchor_mask_ix_cc = sparse_anchor_mask.col[obs_subset_anchor_mask_ix]

        for layer in layers:
            if layer != anchor_layer:
                assert self.mask.mloc(self._layerdict[layer].segmentation) is not None
                layer_segmentation = self.mask.mloc(self._layerdict[layer].segmentation)
                # Now subset the layer segmentatino using the root segmentation
                if len(layer_segmentation.shape) == 3:
                    layer_segmentation = layer_segmentation.squeeze()
                subset_ids = np.unique(
                    layer_segmentation[
                        obs_subset_anchor_mask_ix_rr, obs_subset_anchor_mask_ix_cc
                    ]
                )
                subset_ids = subset_ids[subset_ids != 0]
                # The values in the segmentation mask should be the cell/spot IDs
                # Subset the data and positions
                # for objects generated from the enable database, the index is the cell id.
                # TODO: Ensure that this is the case for all EMObjects
                pos_dict = {}
                for coord_sys in self._layerdict[layer]._pos.keys():
                    pos_dict[coord_sys] = (
                        self._layerdict[layer]._pos[coord_sys].loc[subset_ids, :]
                    )

                new_layer = BaseLayer(
                    data=self._layerdict[layer].data.loc[subset_ids, :],
                    obs=self._layerdict[layer].obs.loc[subset_ids, :],
                    var=self._layerdict[layer].var,
                    pos=pos_dict,
                    segmentation=self._layerdict[layer].segmentation,
                    name=layer,
                )
                E.add(new_layer)

        return E

    def _observations_for_segment(
        self,
        segment_id: Union[int, list, np.ndarray],
        mask_name: str,
        target_layer: str,
    ) -> dict:
        """Returns the observations in target layer for a given segment ID.
        For example, the CODEX cells of a specified visium spot or ROI mask segment.

        Args:
            segment_id: the segment ID to query
            mask_name: the name of the mask that contains segment_id
            target_layer: the layer to query for observations

        Returns:
            np.ndarray: the observations in target layer for a given segment ID
        """
        segment_to_cell_map = {}

        assert mask_name in self.mask.mask_names
        assert target_layer in self.layers

        # Check that a segmentation mask exists for the target layer
        if self._layerdict[target_layer].segmentation is None:
            if "segmentation_mask" in self.mask.mask_names:
                warning(
                    f"Automatically assigning mask `segmentation_mask` to {target_layer}.\
                        To assign a different mask, use `E.set_layer_segmentation()`."
                )
            else:
                raise EMObjectException(
                    f"No segmentation mask exists for {target_layer}.\
                                        Please specify a segmentation mask for {target_layer}\
                                        with `E.set_layer_segmentation().`"
                )

        if isinstance(segment_id, int):
            segment_id = [segment_id]

        # Get the pixels that belong to the segment
        segment_mask = sparse.coo_matrix(self.mask.mloc(mask_name))

        for si in segment_id:
            (segment_ix,) = np.where(segment_mask.data == si)

            if len(segment_ix) == 0:
                segment_to_cell_map[si] = []

            else:
                segment_ix_rr = segment_mask.row[segment_ix]
                segment_ix_cc = segment_mask.col[segment_ix]

                # Get the target layer's segmentation mask
                target_observations = np.unique(
                    self.mask.mloc(self._layerdict[target_layer].segmentation)[
                        segment_ix_rr, segment_ix_cc
                    ]
                )
                target_observations = target_observations[target_observations != 0]
                segment_to_cell_map[si] = target_observations

        return segment_to_cell_map

    def set_layer(self, value: Optional[str] = None) -> None:
        """Sets the active layer.

        Args:
            value: name of an existing layer in the EMObject
        Returns:
            None
        """
        assert value in self.ax or value is None
        self._validate()

        if value is None:
            self._activelayer = self._defaultlayer
        else:
            self._activelayer = value

    def _validate(self) -> None:
        """Validates the integrity of the EMObject
        as a check against unexpected modifications
        to the object."""

        for layer in self.layers:
            # Check that the obs index is consistent
            obs_ix_data = np.array(
                self._layerdict[layer].data.index
            )  # use arrays for consistency
            var_ix_data = np.array(self._layerdict[layer].data.columns)

            # Check the index of everything that is suppossed to be indexed by observations
            if self._layerdict[layer]._obs is not None:
                if not np.array_equal(
                    np.array(self._layerdict[layer]._obs.index), obs_ix_data
                ):
                    # the content of the index is not the same, check if the length is
                    if len(self._layerdict[layer]._obs.index) != len(obs_ix_data):
                        raise EMObjectException(
                            "Observation index mismatch (different lengths!)."
                        )
                    else:
                        # the length is the same, check if the order is the same
                        warning(
                            "Content of observation index is not the same as data. Renaming data index..."
                        )
                        self._layerdict[layer].data.index = self._layerdict[
                            layer
                        ]._obs.index

                    # raise EMObjectException('Observation index mismatch.')

            # Check that var axis is consistent
            if self._layerdict[layer].var is not None:
                if not np.array_equal(
                    np.array(self._layerdict[layer].var.index), var_ix_data
                ):
                    # the content of the index is not the same, check if the length is
                    if len(self._layerdict[layer].var.index) != len(var_ix_data):
                        raise EMObjectException(
                            "Variable index mismatch (different lengths!)."
                        )
                    else:
                        # the length is the same, check if the order is the same
                        warning(
                            "Content of variable index is not the same as data. Renaming data columns..."
                        )
                        self._layerdict[layer].data.columns = self._layerdict[
                            layer
                        ].var.index
                    # raise EMObjectException('Variable index mismatch.')

            # Check that pos axis is consistent
            if self._layerdict[layer]._pos is not None:
                for key in self._layerdict[layer]._pos.keys():
                    if not np.array_equal(
                        np.array(self._layerdict[layer]._pos[key].index),
                        np.array(self._layerdict[layer].data.index)
                    ):
                        raise EMObjectException("Observation index mismatch (pos).")

        if self._seg is not None and len(obs_ix_data) != self._seg.shape[0]:
            raise EMObjectException("Observation index mismatch.")

        # Check that segment axis is consistent
        if (
            self._seg is not None
            and self._mask is not None
            and self._seg.shape[1] != len(self._mask.mask_names)
        ):
            raise EMObjectException("Segment index mismatch.")

    def add_coordinate_system(
        self,
        label: str = None,
        coords: Union[np.ndarray, pd.DataFrame] = None,
        cols: Optional[Union[list, np.ndarray]] = None,
        scale_factor: Optional[float] = 1.0,
        layer: Optional[str] = None,
    ) -> None:
        """Add a new coordinate system to emobject.

        Args:
            label (str): name of new coordinate system
            coords (Union[np.ndarray, pd.DataFrame, None]): the coordinates (n_obs x dimensions)
            cols Optional[Union[list, np.ndarray, None]]: labels for cols e.g. x, y. Defaults to zero indexed ints
            scale_factor (Optional[float]): scale factor to apply to coordinates (common in Visium). Defaults to 1.

        Returns:
            None
        """

        if label is None:
            raise EMObjectException(
                "Must provide a name/label for this new coordinate system."
            )

        if coords is None:
            raise EMObjectException(
                "Must provide coordinate values for this new coordinate system."
            )

        if layer is None:
            layer = self._activelayer
        else:
            if layer not in self.layers:
                raise EMObjectException("Layer does not exist.")

        assert coords.shape[0] == self.n_obs

        if type(coords) == np.ndarray:
            coords = np.multiply(coords.astype(np.float32), scale_factor)
            coords = pd.DataFrame(
                coords, index=self._layerdict[layer].data.index, columns=cols
            )
        _ = self.pos  # make sure pos exists
        self.pos[label] = coords

    def add_mask(
        self,
        mask: Optional[np.ndarray] = None,
        mask_name: Optional[Union[np.ndarray, list]] = None,
    ) -> None:
        """Add a new mask to emobject.

        Args:
            mask (Optional[np.ndarray]): mask array (n_obs x n_masks)
            mask_name (Optional[Union[np.ndarray, list]]): name of mask. Defaults to None.
        """

        if mask is None:
            raise EMObjectException("Must provide a mask array.")

        if mask_name is None:
            raise EMObjectException("Must provide a mask name.")

        if type(mask_name) == str:
            mask_name = np.array([mask_name])

        if type(mask) == list:
            mask = np.array(mask)

        if type(mask_name) == list:
            mask_name = np.array(mask_name)

        if len(mask.shape) == 3:
            if mask.shape[0] != mask_name.shape[0]:
                raise EMObjectException("Mask and mask_name must have the same length.")
        else:
            if mask_name.shape[0] != 1:
                raise EMObjectException("Mask and mask_name must have the same length.")

        if self.mask is None:
            self._mask = EMMask(mask, mask_name)
        else:
            self._mask.add_mask(mask, mask_name)

    def set_layer_segmentation(self, layer: Optional[str], segmentation: str) -> None:
        """Sets the segmentation for a layer.

        Args:
            layer (str): name of layer
            segmentation (str): name of segmentation
        """

        if layer not in self.layers:
            raise EMObjectException(f"Layer {layer} does not exist.")

        if layer is None:
            layer = self._activelayer
            warning(
                f"Layer not specified, setting segmentation for active layer {layer}."
            )

        if segmentation not in self.mask.mask_names:
            raise EMObjectException(f"Mask {segmentation} does not exist.")

        self._layerdict[layer].segmentation = segmentation

    def drop_obs(self, obs: Union[list, np.ndarray]) -> None:
        """Drop observations from the current layer of the emObject.

        Args:
            obs (Union[list, np.ndarray]): list of observations to drop
        """

        if self._activelayer is None:
            raise EMObjectException("No active layer.")

        if type(obs) == list:
            obs = np.array(obs)

        self._layerdict[self._activelayer].data.drop(obs, inplace=True)
        self._layerdict[self._activelayer]._obs.drop(obs, inplace=True)

        if self._layerdict[self._activelayer]._pos is not None:
            for key in self._layerdict[self._activelayer]._pos.keys():
                self._layerdict[self._activelayer]._pos[key].drop(obs, inplace=True)

        self._layerdict[self._activelayer]._obs_ax = np.array(
            self._layerdict[self._activelayer].data.index
        )

        if self._seg is not None:
            warning("Existing `.seg` will be dropped. Attempting to reconstruct...")
            self._seg = None
            try:
                _ = self.seg
            except IndexError:
                warning(
                    "Failed to reconstruct `.seg`. Please pass an explicit coordinate system to `E.build_seg()`."
                )
                pass

        self._validate()

    def drop_var(self, var: Union[list, np.ndarray]) -> None:
        """Drop variables from the current layer of the emObject.

        Args:
            var (Union[list, np.ndarray]): list of variables to drop
        """

        if self._activelayer is None:
            raise EMObjectException("No active layer.")

        if type(var) == list:
            var = np.array(var)

        self._layerdict[self._activelayer].data.drop(var, axis=1, inplace=True)
        self._layerdict[self._activelayer].var.drop(var, inplace=True)
        self._layerdict[self._activelayer]._var_ax = np.array(
            self._layerdict[self._activelayer].data.columns
        )

        self._validate()

    def add_measurements(
        self,
        measurements: Union[np.ndarray, pd.DataFrame],
        var_names: Optional[Union[np.ndarray, list]] = None,
        layer: Optional[str] = None,
    ) -> None:
        """
        Add new variables to the current layer of the emObject.
        This expands the variable axis of the data array.
        """
        if layer is None:
            layer = self._activelayer

        if layer not in self.layers:
            raise EMObjectException("Layer does not exist.")

        if var_names is None and type(measurements) == np.ndarray:
            var_names = np.array([f"new_obs_{i}" for i in range(measurements.shape[1])])

        if type(measurements) == np.ndarray:
            measurements = pd.DataFrame(
                measurements, index=self._layerdict[layer].data.index, columns=var_names
            )

        if measurements.shape[0] != self.n_obs:
            raise EMObjectException(
                f"New measurements must have the same number of rows as existing data objects. Found {measurements.shape[0]} rows, expected {self.n_obs}."
            )

        if not np.all(measurements.index == self._layerdict[layer].data.index):
            raise EMObjectException(
                "New measurements must have the same index as existing observations."
            )

        self._layerdict[layer].data = pd.concat(
            [self._layerdict[layer].data, measurements], axis=1
        )
        self._layerdict[layer]._var_ax = np.array(self._layerdict[layer].data.columns)

        # also need to update var, which is indexed
        if (
            self._layerdict[layer].var is not None
            and self._layerdict[layer].var.shape[1] > 0
        ):
            self._layerdict[layer].var = pd.concat(
                [
                    self._layerdict[layer].var,
                    pd.DataFrame(
                        np.empty(measurements.shape[1], self.var.shape[1]),
                        index=measurements.columns,
                    ),
                ],
                axis=0,
            )
        else:
            self._layerdict[layer].var = pd.DataFrame(
                data=None, index=self._layerdict[layer]._var_ax
            )

        self._validate()

    def slice_on_segment(
        self,
        segment_id: Union[int, list, np.ndarray],
        mask_name: str = None,
        target_layer: Optional[str] = None,
    ) -> dict:
        """Slice the emObject on a segment.

        Args:
            segment_id (Union[int, list, np.ndarray]): segment id(s)
            mask_name (str): name of mask to use
            target_layer (Optional[str]): name of layer to slice on

        Returns:
            dict: dictionary of observations for each segment
        """

        if target_layer is None:
            target_layer = self._activelayer

        return self._observations_for_segment(
            segment_id=segment_id, mask_name=mask_name, target_layer=target_layer
        )

    def cite() -> None:
        """Prints the citation for the emObject package."""
        print(
            "If you use emObject in your research, please cite the following:\n\n \
            @article{Baker2023.06.07.543950, \
                author = {Ethan Alexander Garcia Baker and Meng-Yao Huang and Amy Lam and Maha K. Rahim and Matthew F. Bieniosek and Bobby Wang and Nancy R. Zhang and Aaron T Mayer and Alexandro E Trevino},\
                journal = {bioRxiv}, \
                title = {emObject: domain specific data abstraction for spatial omics},\
                year = {2023}}"  # noqa
        )

Ancestors

Instance variables

var X : Optional[pandas.core.frame.DataFrame]
Expand source code
@property
def X(self) -> Optional[pd.DataFrame]:
    return self.data
var assay : str
Expand source code
@property
def assay(self) -> str:
    return self._layerdict[self._activelayer].assay
var data : Optional[pandas.core.frame.DataFrame]
Expand source code
@property
def data(self) -> Optional[pd.DataFrame]:
    if len(self.ax) >= 1:
        return self._layerdict[self._activelayer].data
    else:
        return None
var default_layer : str
Expand source code
@property
def default_layer(self) -> str:
    return self._defaultlayer
var img : Optional[EMImage]
Expand source code
@property
def img(self) -> Optional[EMImage]:
    if self._img is not None:
        self._img = self._build_img(self._img, self._var_ax)
    return self._img
var is_view : bool
Expand source code
@property
def is_view(self) -> bool:
    return self._is_view
var layer : str
Expand source code
@property
def layer(self) -> str:
    return self._activelayer
var layers : list
Expand source code
@property
def layers(self) -> list:
    return self.ax
var mask : Optional[EMMask]
Expand source code
@property
def mask(self) -> Optional[EMMask]:
    if self._mask is not None:
        self._mask = self._build_mask(self._mask)
    return self._mask
var meta : Optional[pandas.core.frame.DataFrame]
Expand source code
@property
def meta(self) -> Optional[pd.DataFrame]:
    if self._meta is not None:
        self._meta = self._build_meta(self._meta)
    return self._meta
var n_obs : int
Expand source code
@property
def n_obs(self) -> int:
    return len(self._layerdict[self._activelayer]._obs_ax)
var n_seg : int
Expand source code
@property
def n_seg(self) -> int:
    if self.mask is not None:
        return self.mask.n_seg
    else:
        return 0
var n_var : int
Expand source code
@property
def n_var(self) -> int:
    return len(self._layerdict[self._activelayer]._var_ax)
var name : str
Expand source code
@property
def name(self) -> str:
    return self._name
var obs : pandas.core.frame.DataFrame
Expand source code
@property
def obs(self) -> pd.DataFrame:
    self._validate()
    if self._obs is not None:
        if self._layerdict[self._activelayer].obs is None:
            self._layerdict[self._activelayer].obs = self._obs
    return self._layerdict[self._activelayer].obs
var obs_ax : pandas.core.indexes.base.Index
Expand source code
@property
def obs_ax(self) -> pd.Index:
    # return self._obs_ax
    return self._layerdict[self._activelayer]._obs_ax
var pos : pandas.core.frame.DataFrame
Expand source code
@property
def pos(self) -> pd.DataFrame:
    return self._layerdict[self._activelayer]._pos
var scale_factor : float
Expand source code
@property
def scale_factor(self) -> float:
    return self._layerdict[self._activelayer].scale_factor
var seg : Optional[list]
Expand source code
@property
def seg(self) -> Optional[list]:
    if self._layerdict[self._activelayer]._seg is None:
        self._layerdict[self._activelayer]._seg = self._build_seg()
    return self._layerdict[self._activelayer]._seg
var sobs
Expand source code
@property
def sobs(self):
    if self._sobs is None:
        self._sobs = self._build_sobs()
        # Add to the BaseLayer
        self._layerdict[self._activelayer].sobs = self._sobs
    elif self._layerdict[self._activelayer].sobs is None:
        self._layerdict[self._activelayer].sobs = self._sobs
    return self._sobs
var summary : None
Expand source code
    @property
    def summary(self) -> None:
        print(f"EMObject Version {__version__}")
        print(f"EMObject: {self.name}")
        print(f"Layers: {len(self.ax)}")

        for layer in self.layers:
            print(f"\t Layer: {layer}")
            print(f"\t\t Layer segmentation: {self._layerdict[layer]._segmentation}")
            print(f"\t\t Assay: {self._layerdict[layer]._assay}")

            print(f"\t\t n_obs: {len(self._layerdict[layer]._obs_ax)}")
            print(f"\t\t n_var: {len(self._layerdict[layer]._var_ax)}")
            """if len(self.mask.n_seg) > 0:
                print(f"\t\t n_seg: {self.n_seg}")
"""
        if self._mask is not None:
            print(f"masks: {self.mask.n_masks}")

        if self._seg is not None:
            print("Segment Summary:")
            print("Mask \t Segments")
            for i in self.mask.n_seg.keys():
                print(f"{i} \t {self.mask.n_seg[i]}")
var var_ax : pandas.core.indexes.base.Index
Expand source code
@property
def var_ax(self) -> pd.Index:
    # return self._var_ax
    return self._layerdict[self._activelayer]._var_ax
var version : float
Expand source code
@property
def version(self) -> float:
    print(f"EMObject Version {__version__}")
    return __version__

Methods

def add_anno(self, attr: Optional[str] = None, value: Optional[Union[np.array, pd.DataFrame, list]] = None, name: Optional[Union[np.array, list, str]] = None, layer: Optional[str] = None, mask: Optional[Union[int, str]] = None) ‑> None

Adds a new annotation to an attribute (var, obs, sobs).

Args

attr
the attribute to add annotation to. One of 'sobs', 'var', 'obs'.
value
the annotation. Must be array-like of same size as axis.
name
annotation name
layer
the layer to slice within. If None, uses the active layer.
mask
the mask to which segment observations are applied. Required if attr='sobs'.

Returns

None

Expand source code
def add_anno(
    self,
    attr: Optional[str] = None,
    value: Optional[Union[np.array, pd.DataFrame, list]] = None,
    name: Optional[Union[np.array, list, str]] = None,
    layer: Optional[str] = None,
    mask: Optional[Union[int, str]] = None,
) -> None:
    """
    Adds a new annotation to an attribute (var, obs, sobs).

    Args:
        attr: the attribute to add annotation to. One of
               'sobs', 'var', 'obs'.
        value: the annotation. Must be array-like of same
                  size as axis.
        name: annotation name
        layer:   the layer to slice within. If None, uses the active layer.
        mask:   the mask to which segment observations are applied.
            Required if attr='sobs'.

    Returns:
        None
    """
    self._validate()

    # Set the layer
    if layer is None:
        layer = self._activelayer
    else:
        assert layer in self.ax

    # Some checks on inputs
    if type(value) == list:
        value = np.array(value)
    if len(value.shape) == 1:
        value = value.reshape(value.shape[0], 1)

    # Deal with annotation names.
    if name is not None:
        # Standardize as ndarray
        if type(name) == str or type(name) == list:
            name = np.array(name).reshape(-1)
        n_names_reqd = value.shape[1]  # n_obs/n_var/n_seg x n_newAnnos

        if name.shape[0] < n_names_reqd:
            # Handle case where too few names are provided
            warning(
                f"Expected {n_names_reqd} names but received \
                {name.shape[0]} names. Generating dummy annotation names."
            )
            n_new_names = n_names_reqd - name.shape[0]
            new_names = np.array([f"anno_{i}" for i in range(0, n_new_names)])
            name = np.concatenate([name, new_names])
    else:
        if type(value) != pd.DataFrame:
            #  No names provided, handle this.
            warning(
                "No annotation names received. Generating dummy annotation\
                names."
            )
            n_new_names = value.shape[1]
            name = np.array([f"anno_{i}" for i in range(0, n_new_names)])
            assert name.shape[0] == value.shape[1]

    if type(value) == pd.DataFrame:
        name = value.columns

    if attr == "var":
        # add attribute on var.
        # check annotation matches specified attribute
        assert value.shape[0] == self._layerdict[layer]._var_ax.shape[0]

        # Check that new annotation names are unique
        existing_annos = list(self._layerdict[layer].var.index)
        for n in name:
            if n in existing_annos:
                raise EMObjectException(
                    f"The annotation name {n} already \
                    exists in the {attr} attribute. Please provide a unique \
                    name for the new annotation."
                )

        if type(value) == np.ndarray:
            _appenddf = pd.DataFrame(
                data=value, columns=name, index=self._layerdict[layer]._var_ax
            )
        elif type(value) == pd.DataFrame:
            _appenddf = value
            if name is not None:
                _appenddf.columns = name
            if not _appenddf.index.equals(self._var_ax):
                warning(
                    "Provided dataframe index does not match the var axis. Attempting to join on index."
                )

        if (
            self._layerdict[layer]._var is None
            or self._layerdict[layer]._var.shape[1] == 0
        ):
            self._layerdict[layer]._var = _appenddf
        else:
            # left join on index
            self._layerdict[layer]._var = self._layerdict[layer]._var.join(
                _appenddf, how="left"
            )
            """self._layerdict[layer]._var = pd.concat([self._layerdict[layer]._var, # noqa
                                                     _appenddf],
                                                    axis=1)"""
    elif attr == "obs":
        # add an attribute on obs
        # check annotation matches specified attribute
        assert value.shape[0] == self._layerdict[layer]._obs_ax.shape[0]

        # Check that new annotation names are unique
        existing_annos = self._layerdict[layer]._obs.columns
        for n in name:
            if n in existing_annos:
                raise EMObjectException(
                    f"The annotation name {n} already \
                    exists in the {attr} attribute. Please provide a unique \
                    name for the new annotation."
                )

        if type(value) == np.ndarray:
            _appenddf = pd.DataFrame(
                data=value, columns=name, index=self._layerdict[layer]._obs_ax
            )
        elif type(value) == pd.DataFrame:
            _appenddf = value
            if name is not None:
                _appenddf.columns = name
            if not _appenddf.index.equals(self._layerdict[layer]._obs_ax):
                warning(
                    "Index of new annotation does not match obs index. Attempting to join on index value."
                )

        if (
            self._layerdict[layer]._obs is None
            or self._layerdict[layer]._obs.shape[1] == 0
        ):
            self._layerdict[layer]._obs = _appenddf
        else:
            # join the new annotation horizontally on the index
            self._layerdict[layer]._obs = self._layerdict[layer]._obs.join(
                _appenddf, how="left"
            )  # noqa
            """self._layerdict[layer]._obs = pd.concat([self._layerdict[layer]._obs, # noqa
                                                     _appenddf],
                                                    axis=1)"""

    elif attr == "sobs":
        # add an attribute on sobs
        if mask is None:
            raise EMObjectException("Mask to annotate is unspecified.")

        # Check that new annotation names are unique
        _ = self.sobs  # build sobs if not already built
        existing_annos = self._layerdict[layer]._sobs[mask].columns
        for n in name:
            if n in existing_annos:
                raise EMObjectException(
                    f"The annotation name {n} already \
                    exists in the {attr} attribute. Please provide a unique \
                    name for the new annotation."
                )

        # check that segment is valid.
        if type(mask) == str:
            if mask not in self.mask.mask_names:
                raise EMObjectException(
                    f"Mask name {mask} is not in the current\
                 object."
                )
        else:
            raise EMObjectException(
                f"Mask must be a string. Received {type(mask)}."
            )

        # Get index for sobs
        anno_idx = self.sobs[mask].index  # hopefully calls build_sobs?
        if value.shape[0] != anno_idx.shape[0]:
            if anno_idx.shape[0] - value.shape[0] == 1:
                # Handle case where one segment is missing
                raise EMObjectException(
                    "Received one fewer segment annotations than expected.\
                     This is likely due to a missing background segment annotation (seg_id = 0). Try prepending a np.nan dummy."
                )
                """new_val = [np.nan]
                for v in value:
                    new_val.append(v)
                value = np.array(new_val)
                assert value.shape[0] == anno_idx.shape[0]"""
            else:
                raise EMObjectException(
                    f"Annotation length does not match the\
                    number of segments in the mask. Expected {anno_idx.shape[0]}\
                    but received {value.shape[0]}."
                )

        if type(value) == np.ndarray:
            _appenddf = pd.DataFrame(data=value, columns=name, index=anno_idx)
        elif type(value) == pd.DataFrame:
            _appenddf = value
            if name is not None:
                _appenddf.columns = name
            _appenddf.index = self._sobs_ax

        if (
            self._layerdict[layer]._obs is None
            or self._layerdict[layer]._obs.shape[1] == 0
        ):
            self._layerdict[layer]._sobs[mask] = _appenddf
        else:
            self._layerdict[layer]._sobs[mask] = pd.concat(
                [self._layerdict[layer]._sobs[mask], _appenddf], axis=1  # noqa
            )
    else:
        raise EMObjectException(
            f"Unrecognized attribute to annotate. Must be one\
             of 'sobs', 'var', 'obs', but received {attr}."
        )

    self._validate()
def add_coordinate_system(self, label: str = None, coords: Union[np.ndarray, pd.DataFrame] = None, cols: Optional[Union[list, np.ndarray]] = None, scale_factor: Optional[float] = 1.0, layer: Optional[str] = None) ‑> None

Add a new coordinate system to emobject.

Args

label : str
name of new coordinate system
coords : Union[np.ndarray, pd.DataFrame, None]
the coordinates (n_obs x dimensions)
cols Optional[Union[list, np.ndarray, None]]: labels for cols e.g. x, y. Defaults to zero indexed ints
scale_factor : Optional[float]
scale factor to apply to coordinates (common in Visium). Defaults to 1.

Returns

None

Expand source code
def add_coordinate_system(
    self,
    label: str = None,
    coords: Union[np.ndarray, pd.DataFrame] = None,
    cols: Optional[Union[list, np.ndarray]] = None,
    scale_factor: Optional[float] = 1.0,
    layer: Optional[str] = None,
) -> None:
    """Add a new coordinate system to emobject.

    Args:
        label (str): name of new coordinate system
        coords (Union[np.ndarray, pd.DataFrame, None]): the coordinates (n_obs x dimensions)
        cols Optional[Union[list, np.ndarray, None]]: labels for cols e.g. x, y. Defaults to zero indexed ints
        scale_factor (Optional[float]): scale factor to apply to coordinates (common in Visium). Defaults to 1.

    Returns:
        None
    """

    if label is None:
        raise EMObjectException(
            "Must provide a name/label for this new coordinate system."
        )

    if coords is None:
        raise EMObjectException(
            "Must provide coordinate values for this new coordinate system."
        )

    if layer is None:
        layer = self._activelayer
    else:
        if layer not in self.layers:
            raise EMObjectException("Layer does not exist.")

    assert coords.shape[0] == self.n_obs

    if type(coords) == np.ndarray:
        coords = np.multiply(coords.astype(np.float32), scale_factor)
        coords = pd.DataFrame(
            coords, index=self._layerdict[layer].data.index, columns=cols
        )
    _ = self.pos  # make sure pos exists
    self.pos[label] = coords
def add_mask(self, mask: Optional[np.ndarray] = None, mask_name: Optional[Union[np.ndarray, list]] = None) ‑> None

Add a new mask to emobject.

Args

mask : Optional[np.ndarray]
mask array (n_obs x n_masks)
mask_name : Optional[Union[np.ndarray, list]]
name of mask. Defaults to None.
Expand source code
def add_mask(
    self,
    mask: Optional[np.ndarray] = None,
    mask_name: Optional[Union[np.ndarray, list]] = None,
) -> None:
    """Add a new mask to emobject.

    Args:
        mask (Optional[np.ndarray]): mask array (n_obs x n_masks)
        mask_name (Optional[Union[np.ndarray, list]]): name of mask. Defaults to None.
    """

    if mask is None:
        raise EMObjectException("Must provide a mask array.")

    if mask_name is None:
        raise EMObjectException("Must provide a mask name.")

    if type(mask_name) == str:
        mask_name = np.array([mask_name])

    if type(mask) == list:
        mask = np.array(mask)

    if type(mask_name) == list:
        mask_name = np.array(mask_name)

    if len(mask.shape) == 3:
        if mask.shape[0] != mask_name.shape[0]:
            raise EMObjectException("Mask and mask_name must have the same length.")
    else:
        if mask_name.shape[0] != 1:
            raise EMObjectException("Mask and mask_name must have the same length.")

    if self.mask is None:
        self._mask = EMMask(mask, mask_name)
    else:
        self._mask.add_mask(mask, mask_name)
def add_measurements(self, measurements: Union[np.ndarray, pd.DataFrame], var_names: Optional[Union[np.ndarray, list]] = None, layer: Optional[str] = None) ‑> None

Add new variables to the current layer of the emObject. This expands the variable axis of the data array.

Expand source code
def add_measurements(
    self,
    measurements: Union[np.ndarray, pd.DataFrame],
    var_names: Optional[Union[np.ndarray, list]] = None,
    layer: Optional[str] = None,
) -> None:
    """
    Add new variables to the current layer of the emObject.
    This expands the variable axis of the data array.
    """
    if layer is None:
        layer = self._activelayer

    if layer not in self.layers:
        raise EMObjectException("Layer does not exist.")

    if var_names is None and type(measurements) == np.ndarray:
        var_names = np.array([f"new_obs_{i}" for i in range(measurements.shape[1])])

    if type(measurements) == np.ndarray:
        measurements = pd.DataFrame(
            measurements, index=self._layerdict[layer].data.index, columns=var_names
        )

    if measurements.shape[0] != self.n_obs:
        raise EMObjectException(
            f"New measurements must have the same number of rows as existing data objects. Found {measurements.shape[0]} rows, expected {self.n_obs}."
        )

    if not np.all(measurements.index == self._layerdict[layer].data.index):
        raise EMObjectException(
            "New measurements must have the same index as existing observations."
        )

    self._layerdict[layer].data = pd.concat(
        [self._layerdict[layer].data, measurements], axis=1
    )
    self._layerdict[layer]._var_ax = np.array(self._layerdict[layer].data.columns)

    # also need to update var, which is indexed
    if (
        self._layerdict[layer].var is not None
        and self._layerdict[layer].var.shape[1] > 0
    ):
        self._layerdict[layer].var = pd.concat(
            [
                self._layerdict[layer].var,
                pd.DataFrame(
                    np.empty(measurements.shape[1], self.var.shape[1]),
                    index=measurements.columns,
                ),
            ],
            axis=0,
        )
    else:
        self._layerdict[layer].var = pd.DataFrame(
            data=None, index=self._layerdict[layer]._var_ax
        )

    self._validate()
def build_seg(self, coord_sys: Optional[str])

Builds an assignement matrix of cells to segments using a specific coordinate system.

Args

coord_sys
str The coordinate system to use for building the segmentation.

Returns

seg
np.ndarray seg is a n_obs x n_mask array. Columns correspond to masks. Non-zero integer values assign cells to specific segments.
Expand source code
def build_seg(self, coord_sys: Optional[str]):
    """
    Builds an assignement matrix of cells to segments using a specific coordinate system.

    Args:
        coord_sys: str
            The coordinate system to use for building the segmentation.
    Returns:
        seg: np.ndarray
            seg is a `n_obs` x `n_mask` array. Columns correspond to masks.
            Non-zero integer values assign cells to specific segments.
    """
    assert coord_sys in self._layerdict[self._activelayer]._pos.keys()
    self._layerdict[self._activelayer]._seg = self._build_seg(coord_sys=coord_sys)
def cite() ‑> None

Prints the citation for the emObject package.

Expand source code
def cite() -> None:
    """Prints the citation for the emObject package."""
    print(
        "If you use emObject in your research, please cite the following:\n\n \
        @article{Baker2023.06.07.543950, \
            author = {Ethan Alexander Garcia Baker and Meng-Yao Huang and Amy Lam and Maha K. Rahim and Matthew F. Bieniosek and Bobby Wang and Nancy R. Zhang and Aaron T Mayer and Alexandro E Trevino},\
            journal = {bioRxiv}, \
            title = {emObject: domain specific data abstraction for spatial omics},\
            year = {2023}}"  # noqa
    )
def del_anno(self, attr: Optional[str] = None, name: Optional[str] = None, layer: Optional[str] = None, mask: Optional[Union[str, int]] = None) ‑> None

Delete an annotation from an annotation matrix.

Args

attr
the attribute to add annotation to. One of 'sobs', 'var', 'obs'.
name
annotation name
layer
the layer to slice within. If None, uses the active layer.
mask
the mask to which segment observations are applied. Required if attr='sobs'.

Returns

None

Expand source code
def del_anno(
    self,
    attr: Optional[str] = None,
    name: Optional[str] = None,
    layer: Optional[str] = None,
    mask: Optional[Union[str, int]] = None,
) -> None:
    """Delete an annotation from an annotation matrix.

    Args:
        attr: the attribute to add annotation to. One of
               'sobs', 'var', 'obs'.
        name: annotation name
        layer:   the layer to slice within. If None, uses the active layer.
        mask:   the mask to which segment observations are applied.
            Required if attr='sobs'.

    Returns:
        None
    """
    self._validate()

    if layer is None:
        layer = self._activelayer

    if attr is not None:
        if attr == "obs":
            assert name in self._layerdict[layer]._obs.columns
            self._layerdict[layer]._obs.drop(columns=name, axis=0, inplace=True)
        elif attr == "var":
            assert name in self._layerdict[layer]._var.columns
            self._layerdict[layer]._var.drop(columns=name, axis=0, inplace=True)
        elif attr == "sobs":
            # check that segment is valid.
            assert mask is not None

            # Validity checks on masks, get correct mask.
            if type(mask) == str:
                if mask not in self.mask.mask_names:
                    raise EMObjectException(
                        f"Mask name {mask} is not in the\
                         current object."
                    )
                else:
                    (ix,) = np.where(self.mask.mask_names == mask)
                    mask = ix[0]

            elif type(mask) == int:
                if mask >= len(self.mask.mask_names) or mask < 0:
                    raise EMObjectException(
                        f"Mask index {mask} is \
                        out of range for the current object."
                    )

            assert name in self._layerdict[layer]._sobs[mask].columns

            # delete the annotation
            self._layerdict[layer]._sobs[mask].drop(
                columns=name, axis=0, inplace=True
            )
    self._validate()
def drop_obs(self, obs: Union[list, np.ndarray]) ‑> None

Drop observations from the current layer of the emObject.

Args

obs : Union[list, np.ndarray]
list of observations to drop
Expand source code
def drop_obs(self, obs: Union[list, np.ndarray]) -> None:
    """Drop observations from the current layer of the emObject.

    Args:
        obs (Union[list, np.ndarray]): list of observations to drop
    """

    if self._activelayer is None:
        raise EMObjectException("No active layer.")

    if type(obs) == list:
        obs = np.array(obs)

    self._layerdict[self._activelayer].data.drop(obs, inplace=True)
    self._layerdict[self._activelayer]._obs.drop(obs, inplace=True)

    if self._layerdict[self._activelayer]._pos is not None:
        for key in self._layerdict[self._activelayer]._pos.keys():
            self._layerdict[self._activelayer]._pos[key].drop(obs, inplace=True)

    self._layerdict[self._activelayer]._obs_ax = np.array(
        self._layerdict[self._activelayer].data.index
    )

    if self._seg is not None:
        warning("Existing `.seg` will be dropped. Attempting to reconstruct...")
        self._seg = None
        try:
            _ = self.seg
        except IndexError:
            warning(
                "Failed to reconstruct `.seg`. Please pass an explicit coordinate system to `E.build_seg()`."
            )
            pass

    self._validate()
def drop_var(self, var: Union[list, np.ndarray]) ‑> None

Drop variables from the current layer of the emObject.

Args

var : Union[list, np.ndarray]
list of variables to drop
Expand source code
def drop_var(self, var: Union[list, np.ndarray]) -> None:
    """Drop variables from the current layer of the emObject.

    Args:
        var (Union[list, np.ndarray]): list of variables to drop
    """

    if self._activelayer is None:
        raise EMObjectException("No active layer.")

    if type(var) == list:
        var = np.array(var)

    self._layerdict[self._activelayer].data.drop(var, axis=1, inplace=True)
    self._layerdict[self._activelayer].var.drop(var, inplace=True)
    self._layerdict[self._activelayer]._var_ax = np.array(
        self._layerdict[self._activelayer].data.columns
    )

    self._validate()
def loc(self, obs_subset: Optional[Union[np.ndarray, list]] = None, var_subset: Optional[Union[np.ndarray, list]] = None, seg_subset: Optional[Union[np.ndarray, list, int]] = None, mask: Optional[str] = None, layer: Optional[str] = None) ‑> EMObject

Allows for slicing of EMObjects to subsets of interest.

Args

obs_subset
subset of observations to include. Elements must belong to obs_ax
var_subset
subset of variables to include. Elements must belong to var_ax
seg_subset
subset of segments to include. Elements must belong to sobs_ax
mask:
layer
the layer to slice within. If None, uses the active layer.

Returns

Subsetted EMObject view

Expand source code
def loc(
    self,
    obs_subset: Optional[Union[np.ndarray, list]] = None,
    var_subset: Optional[Union[np.ndarray, list]] = None,
    seg_subset: Optional[Union[np.ndarray, list, int]] = None,
    mask: Optional[str] = None,
    layer: Optional[str] = None,
) -> EMObject:
    """
    Allows for slicing of EMObjects to subsets of interest.

    Args:
        obs_subset: subset of observations to include.
            Elements must belong to obs_ax
        var_subset: subset of variables to include.
            Elements must belong to var_ax
        seg_subset: subset of segments to include.
            Elements must belong to sobs_ax
        mask:
        layer:  the layer to slice within.
            If None, uses the active layer.
    Returns:
        Subsetted EMObject view
    """
    self._validate()

    if layer is None:
        layer = self._activelayer
    else:
        assert layer in self.ax

    if obs_subset is None:
        obs_subset = self._layerdict[layer]._obs_ax
    if var_subset is None:
        var_subset = self._layerdict[layer]._var_ax
    if type(obs_subset) == list:
        obs_subset = np.array(obs_subset)
    if type(var_subset) == list:
        var_subset = np.array(var_subset)

    if seg_subset is not None:
        assert seg_subset in np.unique(self.seg)
        all_r = set()
        if type(seg_subset) == int:
            seg_subset = [seg_subset]
        for si in seg_subset:
            rr, cc = np.where(self._layerdict[layer]._seg == si)
            all_r.update(rr)
        obs_subset = np.array(list(all_r))
        obs_subset = self._layerdict[layer]._obs_ax[obs_subset]

    if mask is not None:
        if self._layerdict[layer]._seg is None:
            _ = self.seg
        assert mask in self.mask.mask_names
        (ix,) = np.where(self.mask.mask_names == mask)
        obs_subset, _ = np.where(self._layerdict[layer]._seg[:, ix] != 0)
        obs_subset = self._layerdict[layer]._obs_ax[obs_subset]

    pos_dict = {}
    for coord_sys in self._layerdict[layer]._pos.keys():
        pos_dict[coord_sys] = (
            self._layerdict[layer]._pos[coord_sys].loc[obs_subset, :]
        )

    # a view will return an in-memory EMObject that has been subsetted
    return EMObject(
        data=self._layerdict[layer].data.loc[obs_subset, var_subset],
        obs=self._layerdict[layer].obs.loc[obs_subset, :],
        var=self._layerdict[layer].var.loc[var_subset, :],
        pos=pos_dict,
        mask=self.mask,
        img=self.img,
        name=f"ViewOf{self.name}-{layer}",
        is_view=True,
    )
def set_layer(self, value: Optional[str] = None) ‑> None

Sets the active layer.

Args

value
name of an existing layer in the EMObject

Returns

None

Expand source code
def set_layer(self, value: Optional[str] = None) -> None:
    """Sets the active layer.

    Args:
        value: name of an existing layer in the EMObject
    Returns:
        None
    """
    assert value in self.ax or value is None
    self._validate()

    if value is None:
        self._activelayer = self._defaultlayer
    else:
        self._activelayer = value
def set_layer_segmentation(self, layer: Optional[str], segmentation: str) ‑> None

Sets the segmentation for a layer.

Args

layer : str
name of layer
segmentation : str
name of segmentation
Expand source code
def set_layer_segmentation(self, layer: Optional[str], segmentation: str) -> None:
    """Sets the segmentation for a layer.

    Args:
        layer (str): name of layer
        segmentation (str): name of segmentation
    """

    if layer not in self.layers:
        raise EMObjectException(f"Layer {layer} does not exist.")

    if layer is None:
        layer = self._activelayer
        warning(
            f"Layer not specified, setting segmentation for active layer {layer}."
        )

    if segmentation not in self.mask.mask_names:
        raise EMObjectException(f"Mask {segmentation} does not exist.")

    self._layerdict[layer].segmentation = segmentation
def slice(self, obs_subset: Optional[Union[np.ndarray, list]] = None, seg_subset: Optional[Union[np.ndarray, list, int]] = None, anchor_layer: Optional[str] = None, layers: Optional[Union[str, list, np.ndarray]] = None) ‑> EMObject

Slices through all emObject layers on the basis of observations, variables, or masks and returns a new EMObject with the subsetted data.

Args

obs_subset
subset of observations to include. Elements must belong to obs_ax
seg_subset
subset of segments to include. Elements must belong to sobs_ax
layers
the layers to slice within. If None, uses all.
Expand source code
def slice(
    self,
    obs_subset: Optional[Union[np.ndarray, list]] = None,
    seg_subset: Optional[Union[np.ndarray, list, int]] = None,
    anchor_layer: Optional[str] = None,
    layers: Optional[Union[str, list, np.ndarray]] = None,
) -> EMObject:
    """
    Slices through all emObject layers on the basis of observations, variables, or masks
    and returns a new EMObject with the subsetted data.

    Args:
        obs_subset: subset of observations to include.
            Elements must belong to obs_ax
        seg_subset: subset of segments to include.
            Elements must belong to sobs_ax
        layers: the layers to slice within. If None, uses all.
    """
    self._validate()

    if layers is None:
        raise EMObjectException("Must specify layers to slice.")
    elif type(layers) == str:
        layers = [layers]
    if type(layers) == list:
        layers = np.array(layers)

    if anchor_layer is not None:
        assert anchor_layer in self.layers
    else:
        anchor_layer = self._activelayer

    """
    if anchor_coord_sys is not None:
        assert anchor_coord_sys in E._layerdict[anchor_layer].pos.keys()
    else:
        warning(f"Coordinate system {anchor_coord_sys} not found in layer {anchor_layer}. Using {list(self._layerdict[anchor_layer].pos.keys())[0]}.")
        anchor_coord_sys = list(E._layerdict[anchor_layer].pos.keys())[0]
    """

    # Check that each layer has an assigned segmentation
    for layer in layers:
        # requires finding a spot to store spot_size
        if (
            self._layerdict[layer].segmentation is None
            and self._layerdict[layer]._assay == "visium"
        ):
            try:
                test_key = list(self._layerdict[layer].obs.keys())[0]
                visium_mask = helpers.build_visium_segmentation_mask(
                    spot_coords=self._layerdict[layer].pos[test_key],
                    spot_size=self._layerdict[layer]._spot_size,
                    scale_factor=self._layerdict[layer]._scale_factor,
                    shape=self.mask.dim,
                )

                self.mask.add_mask(visium_mask, name="visium_segmentation")
                self._layerdict[layer].segmentation = "visium_segmentation"
            except Exception:
                raise EMObjectException(
                    "Could not build segmentation mask for Visium data. Be sure to specify a spot_size in the metadata.\
                                        Alternatively, you can manually build a segmentation mask using utils.helpers.build_visium_segmetnation_mask\
                                        and assign it to the layer."
                )

        assert self._layerdict[layer].segmentation is not None
        # Also check that the data layers have positions
        assert self._layerdict[layer].pos is not None
        test_key = list(self._layerdict[layer].pos.keys())[0]
        assert len(self._layerdict[layer].data) == len(
            self._layerdict[layer].pos[test_key]
        )

    # Based on the current layer's segmentation, slice the data
    # on the basis of the segmentation mask from the other layers.
    # Then, subset to any additional observations specified.
    # First, construct a new EMObject with the data from the current layer
    if obs_subset is None:
        obs_subset = self._layerdict[self._activelayer].data.index

    pos_dict = {}
    for coord_sys in self._layerdict[anchor_layer]._pos.keys():
        pos_dict[coord_sys] = (
            self._layerdict[anchor_layer]._pos[coord_sys].loc[obs_subset, :]
        )

    E = EMObject(
        data=self._layerdict[anchor_layer].data.loc[obs_subset, :],
        obs=self._layerdict[anchor_layer].obs.loc[obs_subset, :],
        var=self._layerdict[anchor_layer].var,
        pos=pos_dict,
        mask=self.mask,
        img=self.img,
        name=f"sliced_{self.name}",
        first_layer_name=anchor_layer,
        segmentation=self._layerdict[anchor_layer].segmentation,
    )

    # Make binary anchor mask
    try:
        sparse_anchor_mask = sparse.coo_matrix(
            self.mask.mloc(self._layerdict[anchor_layer].segmentation)
        )
    except TypeError:
        sparse_anchor_mask = sparse.coo_matrix(
            self.mask.mloc(self._layerdict[anchor_layer].segmentation).squeeze()
        )
    (obs_subset_anchor_mask_ix,) = np.where(
        np.isin(sparse_anchor_mask.data, obs_subset)
    )  # this is the "binarized array"
    obs_subset_anchor_mask_ix_rr = sparse_anchor_mask.row[obs_subset_anchor_mask_ix]
    obs_subset_anchor_mask_ix_cc = sparse_anchor_mask.col[obs_subset_anchor_mask_ix]

    for layer in layers:
        if layer != anchor_layer:
            assert self.mask.mloc(self._layerdict[layer].segmentation) is not None
            layer_segmentation = self.mask.mloc(self._layerdict[layer].segmentation)
            # Now subset the layer segmentatino using the root segmentation
            if len(layer_segmentation.shape) == 3:
                layer_segmentation = layer_segmentation.squeeze()
            subset_ids = np.unique(
                layer_segmentation[
                    obs_subset_anchor_mask_ix_rr, obs_subset_anchor_mask_ix_cc
                ]
            )
            subset_ids = subset_ids[subset_ids != 0]
            # The values in the segmentation mask should be the cell/spot IDs
            # Subset the data and positions
            # for objects generated from the enable database, the index is the cell id.
            # TODO: Ensure that this is the case for all EMObjects
            pos_dict = {}
            for coord_sys in self._layerdict[layer]._pos.keys():
                pos_dict[coord_sys] = (
                    self._layerdict[layer]._pos[coord_sys].loc[subset_ids, :]
                )

            new_layer = BaseLayer(
                data=self._layerdict[layer].data.loc[subset_ids, :],
                obs=self._layerdict[layer].obs.loc[subset_ids, :],
                var=self._layerdict[layer].var,
                pos=pos_dict,
                segmentation=self._layerdict[layer].segmentation,
                name=layer,
            )
            E.add(new_layer)

    return E
def slice_on_segment(self, segment_id: Union[int, list, np.ndarray], mask_name: str = None, target_layer: Optional[str] = None) ‑> dict

Slice the emObject on a segment.

Args

segment_id : Union[int, list, np.ndarray]
segment id(s)
mask_name : str
name of mask to use
target_layer : Optional[str]
name of layer to slice on

Returns

dict
dictionary of observations for each segment
Expand source code
def slice_on_segment(
    self,
    segment_id: Union[int, list, np.ndarray],
    mask_name: str = None,
    target_layer: Optional[str] = None,
) -> dict:
    """Slice the emObject on a segment.

    Args:
        segment_id (Union[int, list, np.ndarray]): segment id(s)
        mask_name (str): name of mask to use
        target_layer (Optional[str]): name of layer to slice on

    Returns:
        dict: dictionary of observations for each segment
    """

    if target_layer is None:
        target_layer = self._activelayer

    return self._observations_for_segment(
        segment_id=segment_id, mask_name=mask_name, target_layer=target_layer
    )

Inherited members