Module emobject.emexperiment

Expand source code
from __future__ import annotations
from logging import warning
from typing import Optional, Union, List, Callable
from emobject.errors import EMExperimentException
import emoaccess.queries as queries
import emoaccess.emexp as emexp_helpers
import emobject.core as core
from emobject.core import EMObjectConfig, build_emobject, save, EMObject
from emobject.version import __version__
import numpy as np
import pandas as pd
import os
import glob


class EMExperimentConfig:
    """Object that defines the config
    for an emExperiment that is interacting with the
    Enable database.

    Args:
        acquisition_id: required, the acquisition ID to query
        study_id: study id, placeholder
        segmentation_versions: required, segmentation version
        biomarker_versions: required, biomarker expression version
        biomarkers: optional, a list or name of biomarkers
            to download. If None, gets all.
        annotations: optional, a list or name of annotations to download.
            If None, gets all.
        include_img: optional, if True, fetches the image with channels
            subsetted same as biomarkers.
        include_masks: optional, if True, fetches ROI masks.
        include_seg_masks: optional. If True, gets the segmentation mask.
        seg_mask_type: optional. Type of segmentation mask to fetch. Can be
            'nucleus' or 'cell'.
        img_format: img_format - placeholder
        img_res: optional, factor to downsample image by
        img_to_disk: optional, if True writes the zarr store to disk,
            otherwise object held in memory.
        root_dir: optional, path where experiment is built.
        name: optional, a name for this emObject.
        datatype: optional, describe the datatype used here.
    """

    def __init__(
        self,
        acquisition_ids: Optional[list] = None,
        study_id: Optional[int] = None,
        segmentation_versions: Optional[list] = None,
        biomarker_versions: Optional[list] = None,
        biomarkers: Optional[Union[list, np.ndarray, str]] = None,
        annotations: Optional[Union[list, np.ndarray, str]] = None,
        include_img: Optional[bool] = False,
        include_masks: Optional[bool] = False,
        include_seg_mask: Optional[bool] = False,
        seg_mask_type: Optional[str] = "nucleus",
        img_res: Optional[int] = 0,
        mask_names: Optional[Union[list, np.ndarray, str]] = None,
        name: Optional[str] = None,
        root_dir: Optional[str] = None,
        datatype: Optional[str] = None,
    ):
        # simple case, zarr directory is provided
        if root_dir is not None:
            zarrs = glob.glob(os.path.join(root_dir, "*.zarr"))
            if len(zarrs) < 1:
                raise EMExperimentException(
                    "Provided root_dir does not contain any .zarr files"
                )

        # otherwise validate DB call
        else:
            if acquisition_ids is None and study_id is None:
                raise EMExperimentException(
                    "Must provide either acquisition_ids or study_id."
                )

            if segmentation_versions is not None and biomarker_versions is not None:
                if len(segmentation_versions) != len(biomarker_versions):
                    raise EMExperimentException(
                        "Must provide same number of segmentation and biomarker versions."
                    )

            # note may need to fix this for flex studies
            if acquisition_ids is not None and study_id is not None:
                warning(
                    "Both acquisition_ids and study_id provided. \
                        Note: acquisition_ids overrides study_id"
                )

        self.acquisition_ids = acquisition_ids
        self.study_id = study_id
        self.segmentation_versions = segmentation_versions
        self.biomarker_versions = biomarker_versions
        self.biomarkers = biomarkers
        self.annotations = annotations
        self.include_img = include_img
        self.include_masks = include_masks
        self.include_seg_mask = include_seg_mask
        self.seg_mask_type = seg_mask_type
        self.img_res = img_res
        self.mask_names = mask_names
        self.name = name
        self.datatype = datatype
        self.masks = None
        self.img = None
        self.root_dir = root_dir

    def _validate_config_for_enable_db(self) -> None:
        # validation above. Skip the rest if zarr dir is provided
        # Later we will validate each zarr.
        if self.root_dir is not None:
            return

        if (
            self.segmentation_versions is not None
            and self.biomarker_versions is not None
        ):
            if len(self.segmentation_versions) != len(self.biomarker_versions):
                raise EMExperimentException(
                    "Must provide same number of segmentation and biomarker versions."
                )
            if len(self.segmentation_versions) != len(self.acquisition_ids):
                raise EMExperimentException(
                    "Must provide same number of segmentation versions and acquisition ids."
                )
            if len(self.biomarker_versions) != len(self.acquisition_ids):
                raise EMExperimentException(
                    "Must provide same number of biomarker versions and acquisition ids."
                )


class EMExperiment:
    """
    Holds the data for a single experiment, composed of multiple emObjects.

    This could be thought of as representing the study level, cohorts of studies,
    or acquistions, and is useful for cohort level comparisons.

    There are three ways to construct an EMExperiment object:
    1. Pass a list of emObjects
    2. Pass a directory containing emObject Zarr stores
    3. Pass a database configuration object, which will retrieve remote data
    """

    def __init__(
        self,
        experiment_name: Optional[str] = None,
        root_dir: Optional[str] = None,
        emobjects: Optional[list] = None,
        config: Optional[EMExperimentConfig] = None,
    ) -> EMExperiment:
        """
        Args:
            experiment_name: a name for the experiment (optional)
            root_dir: a directory containing emObject zarr stores.
            emobjects: a list of emObjects
            config: An EMExperiment config file
        """

        # It's ok to only pass a config object.

        self._groupnames = ["ungrouped"]  # list of groups
        self._groups = dict()
        self._groups["ungrouped"] = list()
        self._zarrpaths = dict()
        self._meta = None
        self.config = None
        self._acquisition_ids = list()
        self._segmentation_versions = list()
        self._biomarker_versions = list()
        self._study_id = None

        if config is None:
            if experiment_name is not None:
                self._experiment_name = experiment_name  # name of the experiment
            else:
                warning(
                    "No experiment name provided. Using default name 'emExperiment'."
                )
                self._experiment_name = "emExperiment"
            # dict of zarr paths is created by `build` using the root_dir
            if root_dir is None:
                self._rootdir = f"./{self._experiment_name}"
            else:
                self._rootdir = root_dir

            # if a list of emobjects was passed, collect some info?
            if emobjects is not None:
                # assume their names match their acquisition_ids
                for E in emobjects:
                    self._acquisition_ids.append(E.name)
                self._groups["ungrouped"] = self._acquisition_ids

        else:
            self.config = config
            self._experiment_name = config.name
            self._acquisition_ids = config.acquisition_ids
            self._rootdir = config.root_dir
            self._groups["ungrouped"] = self._acquisition_ids
            self._biomarker_versions = config.biomarker_versions
            self._segmentation_versions = config.segmentation_versions
            self._study_id = config.study_id

    def __iter__(self):
        return self._groups.__iter__()  # make the iterator the dictionary's iterator.

    def __next__(self):
        return self._groups.__next__()

    @property
    def summary(self):
        """
        Returns a summary of the emExperiment
        """
        print(f"EMObject Version {__version__}")
        print(f"EMExperiment: {self._experiment_name}")
        print("Groups:")
        for group in self._groups.keys():
            if group == "ungrouped":
                # there's an exception that ungrouped is not a dict
                print(f"\t{group}: {len(self._groups[group])} emObjects")
                for ai in self._groups[group]:
                    print(f"\t\t{ai}")
            else:
                print(f"\t{group}: {len(self._groups[group])} subgroups")
                for subgroup in self._groups[group].keys():
                    print(
                        f"\t\t{subgroup}: {len(self._groups[group][subgroup])} emObjects"
                    )

    @property
    def group_names(self):
        """
        Returns a list of the groups that exist in the emExperiment.
        """
        if len(self._groupnames) == 0:
            self._groupnames.append("ungrouped")
        return self._groupnames

    @property
    def acquisitions(self):
        """
        Returns a list of the acquisitions that exist in the emExperiment.
        """
        self._acquisition_ids = list()

        for group in self._groups:
            if group == "ungrouped":
                for ai in self._groups[group]:
                    self._acquisition_ids.append(ai)

            else:
                for subgroup in self._groups[group].keys():
                    for ai in self._groups[group][subgroup]:
                        if ai not in self._acquisition_ids:
                            self._acquisition_ids.append(ai)

        return self._acquisition_ids

    def add_group(self, groupname: str, acquisition_ids: Optional[dict] = None) -> None:
        """
        Adds a top-level group to the emExperiment.

        Args:
            groupname: the name of the group to add. typically this is the metadata covariate.
            acquisition_ids: a dictionary of covariate feature value to acquisition ids to add to the group

        Returns:
            None
        """
        if groupname in self._groupnames:
            raise EMExperimentException(f"Group {groupname} already exists.")

        self._groupnames.append(groupname)
        if acquisition_ids is None:
            self._groups[groupname] = dict()
        else:
            self._groups[groupname] = acquisition_ids

            # remove the acquisition ids from the ungrouped group
            for subgroup in acquisition_ids.keys():
                for ai in acquisition_ids[subgroup]:
                    self._groups["ungrouped"].remove(ai)

    def remove_group(self, groupname: str) -> None:
        """
        Removes a group from the emExperiment.

        Args:
            groupname: the name of the group to remove

        Returns:
            None
        """
        if groupname not in self._groups.keys:
            raise EMExperimentException(f"Group {groupname} does not exist.")

        # move the acquisition ids back to the ungrouped group
        for subgroup in self._groups[groupname].keys():
            for ai in self._groups[groupname][subgroup]:
                self._groups["ungrouped"].append(ai)
        self._groupnames.remove(groupname)
        self._groups.pop(groupname)

    def add_subgroup(
        self, groupname: str, subgroupname: str, acquisition_ids: list
    ) -> None:
        """
        Adds a subgroup to a group in the emExperiment.

        Args:
            groupname: the name of the group to add the subgroup to
            subgroupname: the name of the subgroup to add
            acquisition_ids: a list of acquisition ids to add to the subgroup

        Returns:
            None
        """
        if groupname not in self._groups.keys():
            raise EMExperimentException(
                f"Group {groupname} does not exist. Use add_group to add a group."
            )

        if subgroupname in self._groups[groupname].keys():
            raise EMExperimentException(f"Subgroup {subgroupname} already exists.")

        self._groups[groupname][subgroupname] = acquisition_ids

        # remove the acquisition ids from the ungrouped group
        for ai in acquisition_ids:
            self._groups["ungrouped"].remove(ai)

    def remove_subgroup(self, groupname: str, subgroupname: str) -> None:
        """
        Removes a subgroup from a group in the emExperiment.

        Args:
            groupname: the name of the group to remove the subgroup from
            subgroupname: the name of the subgroup to remove

        Returns:
            None
        """
        if groupname not in self._groups.keys:
            raise EMExperimentException(f"Group {groupname} does not exist.")

        if subgroupname not in self._groups[groupname].keys():
            raise EMExperimentException(f"Subgroup {subgroupname} does not exist.")

        # move the acquisition ids back to the ungrouped group
        for ai in self._groups[groupname][subgroupname]:
            self._groups["ungrouped"].append(ai)
        self._groups[groupname].pop(subgroupname)

    def autogroup(self, covariate: str, acquisition_ids: Optional[list] = None) -> None:
        """
        Automatically groups acquisitions based on a covariate.

        Args:
            covariate: the covariate to group by
            acquisition_ids: a list of acquisition ids to group. If None, all acquisitions are grouped.

        Returns:
            None
        """
        _ = self.acquisitions

        if acquisition_ids is None:
            acquisition_ids = self.acquisitions

        if self._meta is None:
            _ = self.get_experiment_metadata()
            assert self._meta is not None

        possible_covariates = self._meta["FEATURE_NAME"].unique()
        possible_covariates = [
            cov.lower() for cov in possible_covariates if cov is not None
        ]

        if covariate.lower() not in possible_covariates:
            raise EMExperimentException(f"Covariate {covariate} does not exist.")

        # subset metdata to only the covariate of interest
        covariate_meta = self._meta[self._meta["FEATURE_NAME"] == covariate]

        # Now check that each acquisition id is in the metadata subset
        missing_ais = set(acquisition_ids) - set(
            covariate_meta["ACQUISITION_ID"].unique()
        )
        if len(missing_ais) > 0:
            raise EMExperimentException(
                f"Acquisition ids {missing_ais} do not have metadata covariate {covariate} assigned."
            )

        # Now get the unique values of the covariate
        covariate_values = [c.lower() for c in covariate_meta["FEATURE_VALUE"]]
        covariate_values = list(set(covariate_values))  # remove duplicates
        self.add_group(covariate)
        for value in covariate_values:
            # get the acquisition ids for this covariate value
            value_ais = (
                covariate_meta[covariate_meta["FEATURE_VALUE"].str.lower() == value][
                    "ACQUISITION_ID"
                ]
                .unique()
                .tolist()
            )
            self.add_subgroup(covariate, value, value_ais)

    def add_acquisition(
        self,
        acq_id: Union[str, list],
        groupname: Optional[str] = None,
        subgroup: Optional[str] = None,
    ) -> None:
        """
        Adds an acquisition to the emExperiment.

        Args:
            acq_id: the acquisition id to add. Multiple acquisition ids can be added at once by passing a list.
            groupname: the metadata-level group to add the acquisition to. If no group is specified, the acquisition is added to the 'ungrouped' group.
            subgroup: the subgroup (e.g. metadata feature) to add acquisition to.

        Returns:
            None
        """
        if type(acq_id) is str:
            acq_id = [acq_id]

        if groupname is None:
            for ai in acq_id:
                if ai not in self._groups["ungrouped"]:
                    self._groups["ungrouped"].append(ai)

        if groupname is not None and subgroup is None:
            raise EMExperimentException(
                "Must specify subgroup if groupname is specified."
            )

        if groupname not in self._groupnames:
            # group doesen't exist, so create it and add all the acq_ids.
            warning(f"Group {groupname} does not exist. Creating a new group...")
            ai_dict = {subgroup: acq_id}
            self.add_group(groupname=groupname, acquisition_ids=ai_dict)

        else:
            # group exists, see if subgroup exists.
            if subgroup not in self._groups[groupname].keys():
                # subgroup doesen't exist, so create it and add all the acq_ids.
                warning(
                    f"Subgroup {subgroup} does not exist. Creating a new subgroup..."
                )
                self._groups[groupname][subgroup] = acq_id
            else:
                # subgroup exists, so add the acq_ids.
                for ai in acq_id:
                    if ai not in self._groups[groupname][subgroup]:
                        self._groups[groupname][subgroup].append(ai)
                    else:
                        warning(
                            f"Acquisition {acq_id} already exists in group {groupname}. Skipping..."
                        )

    def remove_acquisition(self, acq_id: Union[str, list]) -> None:
        """
        Removes an acquisition from the emExperiment.

        Args:
            acq_id: the acquisition id to remove. Multiple acquisition ids can be removed at once by passing a list.

        Returns:
            None
        """
        if type(acq_id) is str:
            acq_id = [acq_id]

        # what's the best way to do this?
        # iterate through the groups and remove the acquisition from each group?
        # TODO: revist this for a more efficient implementation.
        for group in self._groups.keys():
            for subgroup in self._groups[group].keys():
                for ai in acq_id:
                    if ai in self._groups[group][subgroup]:
                        self._groups[group][subgroup].remove(ai)

    def get_experiment_metadata(self) -> pd.DataFrame:
        """
        Assembles the metadata for each acquisition in the emExperiment.

        Args:
            None

        Returns:
            A pandas dataframe containing the metadata for each acquisition in the emExperiment.
        """
        if self._acquisition_ids is not None:
            if type(self._acquisition_ids) is str:
                self._acquisition_ids = [self._acquisition_ids]
            else:
                assert (
                    type(self._acquisition_ids) is list
                ), "acquisition_ids must be a list of acquisition ids"

            meta = queries.get_all_metadata_for_acquisition_id(self._acquisition_ids)
            self._meta = meta

        else:
            raise EMExperimentException("No acquisition ids provided.")

        return meta

    def build(
        self,
        local_mode: Optional[bool] = False,
        enable_internal_mode: Optional[bool] = False,
    ) -> None:
        """
        Builds the emExperiment as an on-disk Zarr store.

        Args:
            local_mode: if True, the emExperiment will be built in local mode, which requires emObjects Zarr
                stores to be present in the root directory of the emExperiment.
            enable_internal_mode: if True, the emExperiment will be built by populating emObjects from the
                internal database, which requires correct Enable credentials.

        Returns:
            None
        """
        if not local_mode and not enable_internal_mode:
            raise EMExperimentException(
                "Must specify either local_mode or enable_internal_mode."
            )

        if local_mode and enable_internal_mode:
            raise EMExperimentException(
                "Cannot specify both local_mode and enable_internal_mode."
            )

        # Perform checks to make sure the emExperiment is valid
        self._validate()

        # Create the root directory
        if not os.path.exists(self._rootdir):
            os.mkdir(self._rootdir)

        if local_mode:
            # in this case, emobjects are already present in the root directory
            if len(self._acquisition_ids) > 0:
                # check that these objects exist in the root directory
                # CHECK: Unclear where _acquisition_ids comes from
                # I don't think it can really be defined for local...
                for acq_id in self._acquisition_ids:
                    if not os.path.exists(
                        os.path.join(self._rootdir, f"{acq_id}.zarr")
                    ):
                        raise EMExperimentException(
                            f"Acquisition {acq_id} not found in root directory."
                        )
                    else:
                        self._zarrpaths[acq_id] = os.path.join(
                            self._rootdir, f"{acq_id}.zarr"
                        )

            # This will be an empty list if nothing is provided in the config (config == None).
            # It should be able to generate acquisition IDs from .zarr path names, I think this is how.
            else:
                zarrs = glob.glob(os.path.join(self._rootdir, "*.zarr"))
                # CHECK: Need this test in this class, as well as in the config class?
                if len(zarrs) < 1:
                    raise EMExperimentException(
                        "Provided zarr_dir does not contain any .zarr files"
                    )
                self._acquisition_ids = [
                    os.path.basename(zarr).split(".", 1)[0] for zarr in zarrs
                ]

                for acq_id in self._acquisition_ids:
                    self._zarrpaths[acq_id] = os.path.join(
                        self._rootdir, f"{acq_id}.zarr"
                    )

                # also put the acquisition_ids into the "ungrouped" group (default initial group)
                # otherwise self.acquisitions will come up empty
                # (this step is done for database config during initialization of the EMExperiment)
                self._groups["ungrouped"] = self._acquisition_ids

        else:
            # in this case, we need to populate the emobjects from the internal database
            # To do this, we need to have the correct biomarker and segmentation versions
            # for each acquisition in the emExperiment.

            if self.config is None:
                raise EMExperimentException(
                    "Cannot build emExperiment without providing an EMExperimentConfig."
                )

            # get the biomarker and segmentation versions for each acquisition
            # this is all wrapped in the config class so we can use that to check the versions.
            if (
                (
                    self.config.biomarker_versions is None
                    or self.config.biomarker_versions is None
                )
                and self._biomarker_versions is None
                and self._segmentation_versions is None
            ):
                self.config._validate_config_for_enable_db()

            # for each acqusition in the experiment, construct an emobject.
            # In the future, this is going to need to be parallelized.

            # Check for study_id
            if self.config.study_id is not None and self.config.acquisition_ids is None:
                self._acquisition_ids = queries.get_all_acquisition_ids_for_study_id(
                    self.config.study_id
                )

            # First build all the emObjectConfigs
            emobject_configs = list()
            self.config.acquisition_ids = self._acquisition_ids
            for i in range(0, len(self._acquisition_ids)):
                emobject_configs.append(
                    EMObjectConfig(
                        acquisition_id=self.config.acquisition_ids[i],
                        study_id=None,
                        segmentation_version=self._segmentation_versions[i],
                        biomarker_version=self._biomarker_versions[i],
                        biomarkers=self.config.biomarkers,
                        annotations=self.config.annotations,
                        include_img=self.config.include_img,
                        include_masks=self.config.include_masks,
                        include_seg_mask=self.config.include_seg_mask,
                        seg_mask_type=self.config.seg_mask_type,
                        img_res=self.config.img_res,
                        name=self._acquisition_ids[i],
                    )
                )

            # Now build the emObjects
            for emo_config in emobject_configs:
                E = build_emobject(emo_config)
                save(E, out_dir=self._rootdir)

            # Add groups to the experiment
            # This should all now be handled within the add_group method, leaving this here for now.

            """for group_name in groups.keys():
                acquisitions_to_assign = groups[group_name]

                if type(acquisitions_to_assign) == str:
                    acquisitions_to_assign = [acquisitions_to_assign]

                for ai in acquisitions_to_assign:
                    if ai in self._groups['ungrouped']:
                        self._groups['ungrouped'].remove(ai)

                self.add_group(group_name, acquisitions_to_assign)"""

            # Build the dictionary of zarrs, this also essentially checks that everything was built correctly
            for (
                acq_id
            ) in (
                self.acquisitions
            ):  # for some reason self._acquisition_ids is empty here? seems wrong.
                if not os.path.exists(os.path.join(self._rootdir, f"{acq_id}.zarr")):
                    raise EMExperimentException(
                        f"Acquisition {acq_id} not found in root directory."
                    )
                else:
                    self._zarrpaths[acq_id] = os.path.join(
                        self._rootdir, f"{acq_id}.zarr"
                    )

    def _validate(self) -> None:
        """
        Validates the emExperiment.
        """

        # Make sure that there are groups
        if len(self._groupnames) == 0:
            raise EMExperimentException("No groups exist in the emExperiment.")

        for group in self._groupnames:
            if len(self._groups[group]) == 0:
                if group != "ungrouped":
                    raise EMExperimentException(f"Group {group} is empty.")

        if self._experiment_name is None:
            raise EMExperimentException("No experiment name provided.")

        if self._rootdir is None:
            self._rootdir = f"./{self._experiment_name}"

    def get_available_versions(self) -> pd.DataFrame:
        """
        Based on the acquisition_ids provided in the experiment, gets all of the usable versions of both
        biomarker expression and segmentation.
        """
        _ = self.acquisitions
        return emexp_helpers.get_available_versions(self._acquisition_ids)

    @property
    def segmentation_versions(self) -> List[str]:
        return self._segmentation_versions

    @segmentation_versions.setter
    def segmentation_versions(
        self, segmentation_versions: List[Union[int, str]]
    ) -> None:
        if len(segmentation_versions) != len(self._acquisition_ids):
            raise EMExperimentException(
                "Number of segmentation versions provided does not match number of acquisitions."
            )

        self._segmentation_versions = segmentation_versions

    @property
    def biomarker_versions(self) -> List[str]:
        return self._biomarker_versions

    @biomarker_versions.setter
    def biomarker_versions(self, biomarker_versions: List[Union[int, str]]) -> None:
        if len(biomarker_versions) != len(self._acquisition_ids):
            raise EMExperimentException(
                "Number of biomarker versions provided does not match number of acquisitions."
            )

        self._biomarker_versions = biomarker_versions

    def load_object(
        self, emo_names: Optional[Union[str, List[str]]] = None
    ) -> Union[EMObject, list[EMObject]]:
        """
        Loads the emObject(s) from the emExperiment into memory.

        Args:
            emo_names: The name(s) of the emObject(s) to load. If None, all emObjects are loaded.

        Returns:
            The emObject(s) loaded from the emExperiment. If emo_names is a list, a dictionary of emObjects is returned.
        """

        if emo_names is None:
            emo_names = self._acquisition_ids

        if len(emo_names) == 1:
            E = core.load(self._zarrpaths[emo_names[0]])

        else:
            E = dict()
            for emo_name in emo_names:
                E[emo_name] = core.load(self._zarrpaths[emo_name])

        return E

    def apply(
        self,
        emo_names: Optional[Union[str, List[str]]] = None,
        func: Callable = None,
        **kwargs,
    ) -> None:
        """
        Applies a function to the emObject(s) in the emExperiment.

        Args:
            emo_names: The name(s) of the emObject(s) to apply the function to. If None, the function is applied to all emObjects.
            func: The function to apply to the emObject(s).
            kwargs: Keyword arguments to pass to the function.

        Returns:
            modified emObject or None if func applies changes in place.
        """

        if emo_names is None:
            emo_names = self._acquisition_ids

        if len(emo_names) == 1:
            E = core.load(self._zarrpaths[emo_names[0]])
            E2 = func(E, **kwargs)
            yield E2

        else:
            for emo_name in emo_names:
                E = core.load(self._zarrpaths[emo_name])
                E2 = func(E, **kwargs)
                yield E2

Classes

class EMExperiment (experiment_name: Optional[str] = None, root_dir: Optional[str] = None, emobjects: Optional[list] = None, config: Optional[EMExperimentConfig] = None)

Holds the data for a single experiment, composed of multiple emObjects.

This could be thought of as representing the study level, cohorts of studies, or acquistions, and is useful for cohort level comparisons.

There are three ways to construct an EMExperiment object: 1. Pass a list of emObjects 2. Pass a directory containing emObject Zarr stores 3. Pass a database configuration object, which will retrieve remote data

Args

experiment_name
a name for the experiment (optional)
root_dir
a directory containing emObject zarr stores.
emobjects
a list of emObjects
config
An EMExperiment config file
Expand source code
class EMExperiment:
    """
    Holds the data for a single experiment, composed of multiple emObjects.

    This could be thought of as representing the study level, cohorts of studies,
    or acquistions, and is useful for cohort level comparisons.

    There are three ways to construct an EMExperiment object:
    1. Pass a list of emObjects
    2. Pass a directory containing emObject Zarr stores
    3. Pass a database configuration object, which will retrieve remote data
    """

    def __init__(
        self,
        experiment_name: Optional[str] = None,
        root_dir: Optional[str] = None,
        emobjects: Optional[list] = None,
        config: Optional[EMExperimentConfig] = None,
    ) -> EMExperiment:
        """
        Args:
            experiment_name: a name for the experiment (optional)
            root_dir: a directory containing emObject zarr stores.
            emobjects: a list of emObjects
            config: An EMExperiment config file
        """

        # It's ok to only pass a config object.

        self._groupnames = ["ungrouped"]  # list of groups
        self._groups = dict()
        self._groups["ungrouped"] = list()
        self._zarrpaths = dict()
        self._meta = None
        self.config = None
        self._acquisition_ids = list()
        self._segmentation_versions = list()
        self._biomarker_versions = list()
        self._study_id = None

        if config is None:
            if experiment_name is not None:
                self._experiment_name = experiment_name  # name of the experiment
            else:
                warning(
                    "No experiment name provided. Using default name 'emExperiment'."
                )
                self._experiment_name = "emExperiment"
            # dict of zarr paths is created by `build` using the root_dir
            if root_dir is None:
                self._rootdir = f"./{self._experiment_name}"
            else:
                self._rootdir = root_dir

            # if a list of emobjects was passed, collect some info?
            if emobjects is not None:
                # assume their names match their acquisition_ids
                for E in emobjects:
                    self._acquisition_ids.append(E.name)
                self._groups["ungrouped"] = self._acquisition_ids

        else:
            self.config = config
            self._experiment_name = config.name
            self._acquisition_ids = config.acquisition_ids
            self._rootdir = config.root_dir
            self._groups["ungrouped"] = self._acquisition_ids
            self._biomarker_versions = config.biomarker_versions
            self._segmentation_versions = config.segmentation_versions
            self._study_id = config.study_id

    def __iter__(self):
        return self._groups.__iter__()  # make the iterator the dictionary's iterator.

    def __next__(self):
        return self._groups.__next__()

    @property
    def summary(self):
        """
        Returns a summary of the emExperiment
        """
        print(f"EMObject Version {__version__}")
        print(f"EMExperiment: {self._experiment_name}")
        print("Groups:")
        for group in self._groups.keys():
            if group == "ungrouped":
                # there's an exception that ungrouped is not a dict
                print(f"\t{group}: {len(self._groups[group])} emObjects")
                for ai in self._groups[group]:
                    print(f"\t\t{ai}")
            else:
                print(f"\t{group}: {len(self._groups[group])} subgroups")
                for subgroup in self._groups[group].keys():
                    print(
                        f"\t\t{subgroup}: {len(self._groups[group][subgroup])} emObjects"
                    )

    @property
    def group_names(self):
        """
        Returns a list of the groups that exist in the emExperiment.
        """
        if len(self._groupnames) == 0:
            self._groupnames.append("ungrouped")
        return self._groupnames

    @property
    def acquisitions(self):
        """
        Returns a list of the acquisitions that exist in the emExperiment.
        """
        self._acquisition_ids = list()

        for group in self._groups:
            if group == "ungrouped":
                for ai in self._groups[group]:
                    self._acquisition_ids.append(ai)

            else:
                for subgroup in self._groups[group].keys():
                    for ai in self._groups[group][subgroup]:
                        if ai not in self._acquisition_ids:
                            self._acquisition_ids.append(ai)

        return self._acquisition_ids

    def add_group(self, groupname: str, acquisition_ids: Optional[dict] = None) -> None:
        """
        Adds a top-level group to the emExperiment.

        Args:
            groupname: the name of the group to add. typically this is the metadata covariate.
            acquisition_ids: a dictionary of covariate feature value to acquisition ids to add to the group

        Returns:
            None
        """
        if groupname in self._groupnames:
            raise EMExperimentException(f"Group {groupname} already exists.")

        self._groupnames.append(groupname)
        if acquisition_ids is None:
            self._groups[groupname] = dict()
        else:
            self._groups[groupname] = acquisition_ids

            # remove the acquisition ids from the ungrouped group
            for subgroup in acquisition_ids.keys():
                for ai in acquisition_ids[subgroup]:
                    self._groups["ungrouped"].remove(ai)

    def remove_group(self, groupname: str) -> None:
        """
        Removes a group from the emExperiment.

        Args:
            groupname: the name of the group to remove

        Returns:
            None
        """
        if groupname not in self._groups.keys:
            raise EMExperimentException(f"Group {groupname} does not exist.")

        # move the acquisition ids back to the ungrouped group
        for subgroup in self._groups[groupname].keys():
            for ai in self._groups[groupname][subgroup]:
                self._groups["ungrouped"].append(ai)
        self._groupnames.remove(groupname)
        self._groups.pop(groupname)

    def add_subgroup(
        self, groupname: str, subgroupname: str, acquisition_ids: list
    ) -> None:
        """
        Adds a subgroup to a group in the emExperiment.

        Args:
            groupname: the name of the group to add the subgroup to
            subgroupname: the name of the subgroup to add
            acquisition_ids: a list of acquisition ids to add to the subgroup

        Returns:
            None
        """
        if groupname not in self._groups.keys():
            raise EMExperimentException(
                f"Group {groupname} does not exist. Use add_group to add a group."
            )

        if subgroupname in self._groups[groupname].keys():
            raise EMExperimentException(f"Subgroup {subgroupname} already exists.")

        self._groups[groupname][subgroupname] = acquisition_ids

        # remove the acquisition ids from the ungrouped group
        for ai in acquisition_ids:
            self._groups["ungrouped"].remove(ai)

    def remove_subgroup(self, groupname: str, subgroupname: str) -> None:
        """
        Removes a subgroup from a group in the emExperiment.

        Args:
            groupname: the name of the group to remove the subgroup from
            subgroupname: the name of the subgroup to remove

        Returns:
            None
        """
        if groupname not in self._groups.keys:
            raise EMExperimentException(f"Group {groupname} does not exist.")

        if subgroupname not in self._groups[groupname].keys():
            raise EMExperimentException(f"Subgroup {subgroupname} does not exist.")

        # move the acquisition ids back to the ungrouped group
        for ai in self._groups[groupname][subgroupname]:
            self._groups["ungrouped"].append(ai)
        self._groups[groupname].pop(subgroupname)

    def autogroup(self, covariate: str, acquisition_ids: Optional[list] = None) -> None:
        """
        Automatically groups acquisitions based on a covariate.

        Args:
            covariate: the covariate to group by
            acquisition_ids: a list of acquisition ids to group. If None, all acquisitions are grouped.

        Returns:
            None
        """
        _ = self.acquisitions

        if acquisition_ids is None:
            acquisition_ids = self.acquisitions

        if self._meta is None:
            _ = self.get_experiment_metadata()
            assert self._meta is not None

        possible_covariates = self._meta["FEATURE_NAME"].unique()
        possible_covariates = [
            cov.lower() for cov in possible_covariates if cov is not None
        ]

        if covariate.lower() not in possible_covariates:
            raise EMExperimentException(f"Covariate {covariate} does not exist.")

        # subset metdata to only the covariate of interest
        covariate_meta = self._meta[self._meta["FEATURE_NAME"] == covariate]

        # Now check that each acquisition id is in the metadata subset
        missing_ais = set(acquisition_ids) - set(
            covariate_meta["ACQUISITION_ID"].unique()
        )
        if len(missing_ais) > 0:
            raise EMExperimentException(
                f"Acquisition ids {missing_ais} do not have metadata covariate {covariate} assigned."
            )

        # Now get the unique values of the covariate
        covariate_values = [c.lower() for c in covariate_meta["FEATURE_VALUE"]]
        covariate_values = list(set(covariate_values))  # remove duplicates
        self.add_group(covariate)
        for value in covariate_values:
            # get the acquisition ids for this covariate value
            value_ais = (
                covariate_meta[covariate_meta["FEATURE_VALUE"].str.lower() == value][
                    "ACQUISITION_ID"
                ]
                .unique()
                .tolist()
            )
            self.add_subgroup(covariate, value, value_ais)

    def add_acquisition(
        self,
        acq_id: Union[str, list],
        groupname: Optional[str] = None,
        subgroup: Optional[str] = None,
    ) -> None:
        """
        Adds an acquisition to the emExperiment.

        Args:
            acq_id: the acquisition id to add. Multiple acquisition ids can be added at once by passing a list.
            groupname: the metadata-level group to add the acquisition to. If no group is specified, the acquisition is added to the 'ungrouped' group.
            subgroup: the subgroup (e.g. metadata feature) to add acquisition to.

        Returns:
            None
        """
        if type(acq_id) is str:
            acq_id = [acq_id]

        if groupname is None:
            for ai in acq_id:
                if ai not in self._groups["ungrouped"]:
                    self._groups["ungrouped"].append(ai)

        if groupname is not None and subgroup is None:
            raise EMExperimentException(
                "Must specify subgroup if groupname is specified."
            )

        if groupname not in self._groupnames:
            # group doesen't exist, so create it and add all the acq_ids.
            warning(f"Group {groupname} does not exist. Creating a new group...")
            ai_dict = {subgroup: acq_id}
            self.add_group(groupname=groupname, acquisition_ids=ai_dict)

        else:
            # group exists, see if subgroup exists.
            if subgroup not in self._groups[groupname].keys():
                # subgroup doesen't exist, so create it and add all the acq_ids.
                warning(
                    f"Subgroup {subgroup} does not exist. Creating a new subgroup..."
                )
                self._groups[groupname][subgroup] = acq_id
            else:
                # subgroup exists, so add the acq_ids.
                for ai in acq_id:
                    if ai not in self._groups[groupname][subgroup]:
                        self._groups[groupname][subgroup].append(ai)
                    else:
                        warning(
                            f"Acquisition {acq_id} already exists in group {groupname}. Skipping..."
                        )

    def remove_acquisition(self, acq_id: Union[str, list]) -> None:
        """
        Removes an acquisition from the emExperiment.

        Args:
            acq_id: the acquisition id to remove. Multiple acquisition ids can be removed at once by passing a list.

        Returns:
            None
        """
        if type(acq_id) is str:
            acq_id = [acq_id]

        # what's the best way to do this?
        # iterate through the groups and remove the acquisition from each group?
        # TODO: revist this for a more efficient implementation.
        for group in self._groups.keys():
            for subgroup in self._groups[group].keys():
                for ai in acq_id:
                    if ai in self._groups[group][subgroup]:
                        self._groups[group][subgroup].remove(ai)

    def get_experiment_metadata(self) -> pd.DataFrame:
        """
        Assembles the metadata for each acquisition in the emExperiment.

        Args:
            None

        Returns:
            A pandas dataframe containing the metadata for each acquisition in the emExperiment.
        """
        if self._acquisition_ids is not None:
            if type(self._acquisition_ids) is str:
                self._acquisition_ids = [self._acquisition_ids]
            else:
                assert (
                    type(self._acquisition_ids) is list
                ), "acquisition_ids must be a list of acquisition ids"

            meta = queries.get_all_metadata_for_acquisition_id(self._acquisition_ids)
            self._meta = meta

        else:
            raise EMExperimentException("No acquisition ids provided.")

        return meta

    def build(
        self,
        local_mode: Optional[bool] = False,
        enable_internal_mode: Optional[bool] = False,
    ) -> None:
        """
        Builds the emExperiment as an on-disk Zarr store.

        Args:
            local_mode: if True, the emExperiment will be built in local mode, which requires emObjects Zarr
                stores to be present in the root directory of the emExperiment.
            enable_internal_mode: if True, the emExperiment will be built by populating emObjects from the
                internal database, which requires correct Enable credentials.

        Returns:
            None
        """
        if not local_mode and not enable_internal_mode:
            raise EMExperimentException(
                "Must specify either local_mode or enable_internal_mode."
            )

        if local_mode and enable_internal_mode:
            raise EMExperimentException(
                "Cannot specify both local_mode and enable_internal_mode."
            )

        # Perform checks to make sure the emExperiment is valid
        self._validate()

        # Create the root directory
        if not os.path.exists(self._rootdir):
            os.mkdir(self._rootdir)

        if local_mode:
            # in this case, emobjects are already present in the root directory
            if len(self._acquisition_ids) > 0:
                # check that these objects exist in the root directory
                # CHECK: Unclear where _acquisition_ids comes from
                # I don't think it can really be defined for local...
                for acq_id in self._acquisition_ids:
                    if not os.path.exists(
                        os.path.join(self._rootdir, f"{acq_id}.zarr")
                    ):
                        raise EMExperimentException(
                            f"Acquisition {acq_id} not found in root directory."
                        )
                    else:
                        self._zarrpaths[acq_id] = os.path.join(
                            self._rootdir, f"{acq_id}.zarr"
                        )

            # This will be an empty list if nothing is provided in the config (config == None).
            # It should be able to generate acquisition IDs from .zarr path names, I think this is how.
            else:
                zarrs = glob.glob(os.path.join(self._rootdir, "*.zarr"))
                # CHECK: Need this test in this class, as well as in the config class?
                if len(zarrs) < 1:
                    raise EMExperimentException(
                        "Provided zarr_dir does not contain any .zarr files"
                    )
                self._acquisition_ids = [
                    os.path.basename(zarr).split(".", 1)[0] for zarr in zarrs
                ]

                for acq_id in self._acquisition_ids:
                    self._zarrpaths[acq_id] = os.path.join(
                        self._rootdir, f"{acq_id}.zarr"
                    )

                # also put the acquisition_ids into the "ungrouped" group (default initial group)
                # otherwise self.acquisitions will come up empty
                # (this step is done for database config during initialization of the EMExperiment)
                self._groups["ungrouped"] = self._acquisition_ids

        else:
            # in this case, we need to populate the emobjects from the internal database
            # To do this, we need to have the correct biomarker and segmentation versions
            # for each acquisition in the emExperiment.

            if self.config is None:
                raise EMExperimentException(
                    "Cannot build emExperiment without providing an EMExperimentConfig."
                )

            # get the biomarker and segmentation versions for each acquisition
            # this is all wrapped in the config class so we can use that to check the versions.
            if (
                (
                    self.config.biomarker_versions is None
                    or self.config.biomarker_versions is None
                )
                and self._biomarker_versions is None
                and self._segmentation_versions is None
            ):
                self.config._validate_config_for_enable_db()

            # for each acqusition in the experiment, construct an emobject.
            # In the future, this is going to need to be parallelized.

            # Check for study_id
            if self.config.study_id is not None and self.config.acquisition_ids is None:
                self._acquisition_ids = queries.get_all_acquisition_ids_for_study_id(
                    self.config.study_id
                )

            # First build all the emObjectConfigs
            emobject_configs = list()
            self.config.acquisition_ids = self._acquisition_ids
            for i in range(0, len(self._acquisition_ids)):
                emobject_configs.append(
                    EMObjectConfig(
                        acquisition_id=self.config.acquisition_ids[i],
                        study_id=None,
                        segmentation_version=self._segmentation_versions[i],
                        biomarker_version=self._biomarker_versions[i],
                        biomarkers=self.config.biomarkers,
                        annotations=self.config.annotations,
                        include_img=self.config.include_img,
                        include_masks=self.config.include_masks,
                        include_seg_mask=self.config.include_seg_mask,
                        seg_mask_type=self.config.seg_mask_type,
                        img_res=self.config.img_res,
                        name=self._acquisition_ids[i],
                    )
                )

            # Now build the emObjects
            for emo_config in emobject_configs:
                E = build_emobject(emo_config)
                save(E, out_dir=self._rootdir)

            # Add groups to the experiment
            # This should all now be handled within the add_group method, leaving this here for now.

            """for group_name in groups.keys():
                acquisitions_to_assign = groups[group_name]

                if type(acquisitions_to_assign) == str:
                    acquisitions_to_assign = [acquisitions_to_assign]

                for ai in acquisitions_to_assign:
                    if ai in self._groups['ungrouped']:
                        self._groups['ungrouped'].remove(ai)

                self.add_group(group_name, acquisitions_to_assign)"""

            # Build the dictionary of zarrs, this also essentially checks that everything was built correctly
            for (
                acq_id
            ) in (
                self.acquisitions
            ):  # for some reason self._acquisition_ids is empty here? seems wrong.
                if not os.path.exists(os.path.join(self._rootdir, f"{acq_id}.zarr")):
                    raise EMExperimentException(
                        f"Acquisition {acq_id} not found in root directory."
                    )
                else:
                    self._zarrpaths[acq_id] = os.path.join(
                        self._rootdir, f"{acq_id}.zarr"
                    )

    def _validate(self) -> None:
        """
        Validates the emExperiment.
        """

        # Make sure that there are groups
        if len(self._groupnames) == 0:
            raise EMExperimentException("No groups exist in the emExperiment.")

        for group in self._groupnames:
            if len(self._groups[group]) == 0:
                if group != "ungrouped":
                    raise EMExperimentException(f"Group {group} is empty.")

        if self._experiment_name is None:
            raise EMExperimentException("No experiment name provided.")

        if self._rootdir is None:
            self._rootdir = f"./{self._experiment_name}"

    def get_available_versions(self) -> pd.DataFrame:
        """
        Based on the acquisition_ids provided in the experiment, gets all of the usable versions of both
        biomarker expression and segmentation.
        """
        _ = self.acquisitions
        return emexp_helpers.get_available_versions(self._acquisition_ids)

    @property
    def segmentation_versions(self) -> List[str]:
        return self._segmentation_versions

    @segmentation_versions.setter
    def segmentation_versions(
        self, segmentation_versions: List[Union[int, str]]
    ) -> None:
        if len(segmentation_versions) != len(self._acquisition_ids):
            raise EMExperimentException(
                "Number of segmentation versions provided does not match number of acquisitions."
            )

        self._segmentation_versions = segmentation_versions

    @property
    def biomarker_versions(self) -> List[str]:
        return self._biomarker_versions

    @biomarker_versions.setter
    def biomarker_versions(self, biomarker_versions: List[Union[int, str]]) -> None:
        if len(biomarker_versions) != len(self._acquisition_ids):
            raise EMExperimentException(
                "Number of biomarker versions provided does not match number of acquisitions."
            )

        self._biomarker_versions = biomarker_versions

    def load_object(
        self, emo_names: Optional[Union[str, List[str]]] = None
    ) -> Union[EMObject, list[EMObject]]:
        """
        Loads the emObject(s) from the emExperiment into memory.

        Args:
            emo_names: The name(s) of the emObject(s) to load. If None, all emObjects are loaded.

        Returns:
            The emObject(s) loaded from the emExperiment. If emo_names is a list, a dictionary of emObjects is returned.
        """

        if emo_names is None:
            emo_names = self._acquisition_ids

        if len(emo_names) == 1:
            E = core.load(self._zarrpaths[emo_names[0]])

        else:
            E = dict()
            for emo_name in emo_names:
                E[emo_name] = core.load(self._zarrpaths[emo_name])

        return E

    def apply(
        self,
        emo_names: Optional[Union[str, List[str]]] = None,
        func: Callable = None,
        **kwargs,
    ) -> None:
        """
        Applies a function to the emObject(s) in the emExperiment.

        Args:
            emo_names: The name(s) of the emObject(s) to apply the function to. If None, the function is applied to all emObjects.
            func: The function to apply to the emObject(s).
            kwargs: Keyword arguments to pass to the function.

        Returns:
            modified emObject or None if func applies changes in place.
        """

        if emo_names is None:
            emo_names = self._acquisition_ids

        if len(emo_names) == 1:
            E = core.load(self._zarrpaths[emo_names[0]])
            E2 = func(E, **kwargs)
            yield E2

        else:
            for emo_name in emo_names:
                E = core.load(self._zarrpaths[emo_name])
                E2 = func(E, **kwargs)
                yield E2

Instance variables

var acquisitions

Returns a list of the acquisitions that exist in the emExperiment.

Expand source code
@property
def acquisitions(self):
    """
    Returns a list of the acquisitions that exist in the emExperiment.
    """
    self._acquisition_ids = list()

    for group in self._groups:
        if group == "ungrouped":
            for ai in self._groups[group]:
                self._acquisition_ids.append(ai)

        else:
            for subgroup in self._groups[group].keys():
                for ai in self._groups[group][subgroup]:
                    if ai not in self._acquisition_ids:
                        self._acquisition_ids.append(ai)

    return self._acquisition_ids
var biomarker_versions : List[str]
Expand source code
@property
def biomarker_versions(self) -> List[str]:
    return self._biomarker_versions
var group_names

Returns a list of the groups that exist in the emExperiment.

Expand source code
@property
def group_names(self):
    """
    Returns a list of the groups that exist in the emExperiment.
    """
    if len(self._groupnames) == 0:
        self._groupnames.append("ungrouped")
    return self._groupnames
var segmentation_versions : List[str]
Expand source code
@property
def segmentation_versions(self) -> List[str]:
    return self._segmentation_versions
var summary

Returns a summary of the emExperiment

Expand source code
@property
def summary(self):
    """
    Returns a summary of the emExperiment
    """
    print(f"EMObject Version {__version__}")
    print(f"EMExperiment: {self._experiment_name}")
    print("Groups:")
    for group in self._groups.keys():
        if group == "ungrouped":
            # there's an exception that ungrouped is not a dict
            print(f"\t{group}: {len(self._groups[group])} emObjects")
            for ai in self._groups[group]:
                print(f"\t\t{ai}")
        else:
            print(f"\t{group}: {len(self._groups[group])} subgroups")
            for subgroup in self._groups[group].keys():
                print(
                    f"\t\t{subgroup}: {len(self._groups[group][subgroup])} emObjects"
                )

Methods

def add_acquisition(self, acq_id: Union[str, list], groupname: Optional[str] = None, subgroup: Optional[str] = None) ‑> None

Adds an acquisition to the emExperiment.

Args

acq_id
the acquisition id to add. Multiple acquisition ids can be added at once by passing a list.
groupname
the metadata-level group to add the acquisition to. If no group is specified, the acquisition is added to the 'ungrouped' group.
subgroup
the subgroup (e.g. metadata feature) to add acquisition to.

Returns

None

Expand source code
def add_acquisition(
    self,
    acq_id: Union[str, list],
    groupname: Optional[str] = None,
    subgroup: Optional[str] = None,
) -> None:
    """
    Adds an acquisition to the emExperiment.

    Args:
        acq_id: the acquisition id to add. Multiple acquisition ids can be added at once by passing a list.
        groupname: the metadata-level group to add the acquisition to. If no group is specified, the acquisition is added to the 'ungrouped' group.
        subgroup: the subgroup (e.g. metadata feature) to add acquisition to.

    Returns:
        None
    """
    if type(acq_id) is str:
        acq_id = [acq_id]

    if groupname is None:
        for ai in acq_id:
            if ai not in self._groups["ungrouped"]:
                self._groups["ungrouped"].append(ai)

    if groupname is not None and subgroup is None:
        raise EMExperimentException(
            "Must specify subgroup if groupname is specified."
        )

    if groupname not in self._groupnames:
        # group doesen't exist, so create it and add all the acq_ids.
        warning(f"Group {groupname} does not exist. Creating a new group...")
        ai_dict = {subgroup: acq_id}
        self.add_group(groupname=groupname, acquisition_ids=ai_dict)

    else:
        # group exists, see if subgroup exists.
        if subgroup not in self._groups[groupname].keys():
            # subgroup doesen't exist, so create it and add all the acq_ids.
            warning(
                f"Subgroup {subgroup} does not exist. Creating a new subgroup..."
            )
            self._groups[groupname][subgroup] = acq_id
        else:
            # subgroup exists, so add the acq_ids.
            for ai in acq_id:
                if ai not in self._groups[groupname][subgroup]:
                    self._groups[groupname][subgroup].append(ai)
                else:
                    warning(
                        f"Acquisition {acq_id} already exists in group {groupname}. Skipping..."
                    )
def add_group(self, groupname: str, acquisition_ids: Optional[dict] = None) ‑> None

Adds a top-level group to the emExperiment.

Args

groupname
the name of the group to add. typically this is the metadata covariate.
acquisition_ids
a dictionary of covariate feature value to acquisition ids to add to the group

Returns

None

Expand source code
def add_group(self, groupname: str, acquisition_ids: Optional[dict] = None) -> None:
    """
    Adds a top-level group to the emExperiment.

    Args:
        groupname: the name of the group to add. typically this is the metadata covariate.
        acquisition_ids: a dictionary of covariate feature value to acquisition ids to add to the group

    Returns:
        None
    """
    if groupname in self._groupnames:
        raise EMExperimentException(f"Group {groupname} already exists.")

    self._groupnames.append(groupname)
    if acquisition_ids is None:
        self._groups[groupname] = dict()
    else:
        self._groups[groupname] = acquisition_ids

        # remove the acquisition ids from the ungrouped group
        for subgroup in acquisition_ids.keys():
            for ai in acquisition_ids[subgroup]:
                self._groups["ungrouped"].remove(ai)
def add_subgroup(self, groupname: str, subgroupname: str, acquisition_ids: list) ‑> None

Adds a subgroup to a group in the emExperiment.

Args

groupname
the name of the group to add the subgroup to
subgroupname
the name of the subgroup to add
acquisition_ids
a list of acquisition ids to add to the subgroup

Returns

None

Expand source code
def add_subgroup(
    self, groupname: str, subgroupname: str, acquisition_ids: list
) -> None:
    """
    Adds a subgroup to a group in the emExperiment.

    Args:
        groupname: the name of the group to add the subgroup to
        subgroupname: the name of the subgroup to add
        acquisition_ids: a list of acquisition ids to add to the subgroup

    Returns:
        None
    """
    if groupname not in self._groups.keys():
        raise EMExperimentException(
            f"Group {groupname} does not exist. Use add_group to add a group."
        )

    if subgroupname in self._groups[groupname].keys():
        raise EMExperimentException(f"Subgroup {subgroupname} already exists.")

    self._groups[groupname][subgroupname] = acquisition_ids

    # remove the acquisition ids from the ungrouped group
    for ai in acquisition_ids:
        self._groups["ungrouped"].remove(ai)
def apply(self, emo_names: Optional[Union[str, List[str]]] = None, func: Callable = None, **kwargs) ‑> None

Applies a function to the emObject(s) in the emExperiment.

Args

emo_names
The name(s) of the emObject(s) to apply the function to. If None, the function is applied to all emObjects.
func
The function to apply to the emObject(s).
kwargs
Keyword arguments to pass to the function.

Returns

modified emObject or None if func applies changes in place.

Expand source code
def apply(
    self,
    emo_names: Optional[Union[str, List[str]]] = None,
    func: Callable = None,
    **kwargs,
) -> None:
    """
    Applies a function to the emObject(s) in the emExperiment.

    Args:
        emo_names: The name(s) of the emObject(s) to apply the function to. If None, the function is applied to all emObjects.
        func: The function to apply to the emObject(s).
        kwargs: Keyword arguments to pass to the function.

    Returns:
        modified emObject or None if func applies changes in place.
    """

    if emo_names is None:
        emo_names = self._acquisition_ids

    if len(emo_names) == 1:
        E = core.load(self._zarrpaths[emo_names[0]])
        E2 = func(E, **kwargs)
        yield E2

    else:
        for emo_name in emo_names:
            E = core.load(self._zarrpaths[emo_name])
            E2 = func(E, **kwargs)
            yield E2
def autogroup(self, covariate: str, acquisition_ids: Optional[list] = None) ‑> None

Automatically groups acquisitions based on a covariate.

Args

covariate
the covariate to group by
acquisition_ids
a list of acquisition ids to group. If None, all acquisitions are grouped.

Returns

None

Expand source code
def autogroup(self, covariate: str, acquisition_ids: Optional[list] = None) -> None:
    """
    Automatically groups acquisitions based on a covariate.

    Args:
        covariate: the covariate to group by
        acquisition_ids: a list of acquisition ids to group. If None, all acquisitions are grouped.

    Returns:
        None
    """
    _ = self.acquisitions

    if acquisition_ids is None:
        acquisition_ids = self.acquisitions

    if self._meta is None:
        _ = self.get_experiment_metadata()
        assert self._meta is not None

    possible_covariates = self._meta["FEATURE_NAME"].unique()
    possible_covariates = [
        cov.lower() for cov in possible_covariates if cov is not None
    ]

    if covariate.lower() not in possible_covariates:
        raise EMExperimentException(f"Covariate {covariate} does not exist.")

    # subset metdata to only the covariate of interest
    covariate_meta = self._meta[self._meta["FEATURE_NAME"] == covariate]

    # Now check that each acquisition id is in the metadata subset
    missing_ais = set(acquisition_ids) - set(
        covariate_meta["ACQUISITION_ID"].unique()
    )
    if len(missing_ais) > 0:
        raise EMExperimentException(
            f"Acquisition ids {missing_ais} do not have metadata covariate {covariate} assigned."
        )

    # Now get the unique values of the covariate
    covariate_values = [c.lower() for c in covariate_meta["FEATURE_VALUE"]]
    covariate_values = list(set(covariate_values))  # remove duplicates
    self.add_group(covariate)
    for value in covariate_values:
        # get the acquisition ids for this covariate value
        value_ais = (
            covariate_meta[covariate_meta["FEATURE_VALUE"].str.lower() == value][
                "ACQUISITION_ID"
            ]
            .unique()
            .tolist()
        )
        self.add_subgroup(covariate, value, value_ais)
def build(self, local_mode: Optional[bool] = False, enable_internal_mode: Optional[bool] = False) ‑> None

Builds the emExperiment as an on-disk Zarr store.

Args

local_mode
if True, the emExperiment will be built in local mode, which requires emObjects Zarr stores to be present in the root directory of the emExperiment.
enable_internal_mode
if True, the emExperiment will be built by populating emObjects from the internal database, which requires correct Enable credentials.

Returns

None

Expand source code
def build(
    self,
    local_mode: Optional[bool] = False,
    enable_internal_mode: Optional[bool] = False,
) -> None:
    """
    Builds the emExperiment as an on-disk Zarr store.

    Args:
        local_mode: if True, the emExperiment will be built in local mode, which requires emObjects Zarr
            stores to be present in the root directory of the emExperiment.
        enable_internal_mode: if True, the emExperiment will be built by populating emObjects from the
            internal database, which requires correct Enable credentials.

    Returns:
        None
    """
    if not local_mode and not enable_internal_mode:
        raise EMExperimentException(
            "Must specify either local_mode or enable_internal_mode."
        )

    if local_mode and enable_internal_mode:
        raise EMExperimentException(
            "Cannot specify both local_mode and enable_internal_mode."
        )

    # Perform checks to make sure the emExperiment is valid
    self._validate()

    # Create the root directory
    if not os.path.exists(self._rootdir):
        os.mkdir(self._rootdir)

    if local_mode:
        # in this case, emobjects are already present in the root directory
        if len(self._acquisition_ids) > 0:
            # check that these objects exist in the root directory
            # CHECK: Unclear where _acquisition_ids comes from
            # I don't think it can really be defined for local...
            for acq_id in self._acquisition_ids:
                if not os.path.exists(
                    os.path.join(self._rootdir, f"{acq_id}.zarr")
                ):
                    raise EMExperimentException(
                        f"Acquisition {acq_id} not found in root directory."
                    )
                else:
                    self._zarrpaths[acq_id] = os.path.join(
                        self._rootdir, f"{acq_id}.zarr"
                    )

        # This will be an empty list if nothing is provided in the config (config == None).
        # It should be able to generate acquisition IDs from .zarr path names, I think this is how.
        else:
            zarrs = glob.glob(os.path.join(self._rootdir, "*.zarr"))
            # CHECK: Need this test in this class, as well as in the config class?
            if len(zarrs) < 1:
                raise EMExperimentException(
                    "Provided zarr_dir does not contain any .zarr files"
                )
            self._acquisition_ids = [
                os.path.basename(zarr).split(".", 1)[0] for zarr in zarrs
            ]

            for acq_id in self._acquisition_ids:
                self._zarrpaths[acq_id] = os.path.join(
                    self._rootdir, f"{acq_id}.zarr"
                )

            # also put the acquisition_ids into the "ungrouped" group (default initial group)
            # otherwise self.acquisitions will come up empty
            # (this step is done for database config during initialization of the EMExperiment)
            self._groups["ungrouped"] = self._acquisition_ids

    else:
        # in this case, we need to populate the emobjects from the internal database
        # To do this, we need to have the correct biomarker and segmentation versions
        # for each acquisition in the emExperiment.

        if self.config is None:
            raise EMExperimentException(
                "Cannot build emExperiment without providing an EMExperimentConfig."
            )

        # get the biomarker and segmentation versions for each acquisition
        # this is all wrapped in the config class so we can use that to check the versions.
        if (
            (
                self.config.biomarker_versions is None
                or self.config.biomarker_versions is None
            )
            and self._biomarker_versions is None
            and self._segmentation_versions is None
        ):
            self.config._validate_config_for_enable_db()

        # for each acqusition in the experiment, construct an emobject.
        # In the future, this is going to need to be parallelized.

        # Check for study_id
        if self.config.study_id is not None and self.config.acquisition_ids is None:
            self._acquisition_ids = queries.get_all_acquisition_ids_for_study_id(
                self.config.study_id
            )

        # First build all the emObjectConfigs
        emobject_configs = list()
        self.config.acquisition_ids = self._acquisition_ids
        for i in range(0, len(self._acquisition_ids)):
            emobject_configs.append(
                EMObjectConfig(
                    acquisition_id=self.config.acquisition_ids[i],
                    study_id=None,
                    segmentation_version=self._segmentation_versions[i],
                    biomarker_version=self._biomarker_versions[i],
                    biomarkers=self.config.biomarkers,
                    annotations=self.config.annotations,
                    include_img=self.config.include_img,
                    include_masks=self.config.include_masks,
                    include_seg_mask=self.config.include_seg_mask,
                    seg_mask_type=self.config.seg_mask_type,
                    img_res=self.config.img_res,
                    name=self._acquisition_ids[i],
                )
            )

        # Now build the emObjects
        for emo_config in emobject_configs:
            E = build_emobject(emo_config)
            save(E, out_dir=self._rootdir)

        # Add groups to the experiment
        # This should all now be handled within the add_group method, leaving this here for now.

        """for group_name in groups.keys():
            acquisitions_to_assign = groups[group_name]

            if type(acquisitions_to_assign) == str:
                acquisitions_to_assign = [acquisitions_to_assign]

            for ai in acquisitions_to_assign:
                if ai in self._groups['ungrouped']:
                    self._groups['ungrouped'].remove(ai)

            self.add_group(group_name, acquisitions_to_assign)"""

        # Build the dictionary of zarrs, this also essentially checks that everything was built correctly
        for (
            acq_id
        ) in (
            self.acquisitions
        ):  # for some reason self._acquisition_ids is empty here? seems wrong.
            if not os.path.exists(os.path.join(self._rootdir, f"{acq_id}.zarr")):
                raise EMExperimentException(
                    f"Acquisition {acq_id} not found in root directory."
                )
            else:
                self._zarrpaths[acq_id] = os.path.join(
                    self._rootdir, f"{acq_id}.zarr"
                )
def get_available_versions(self) ‑> pandas.core.frame.DataFrame

Based on the acquisition_ids provided in the experiment, gets all of the usable versions of both biomarker expression and segmentation.

Expand source code
def get_available_versions(self) -> pd.DataFrame:
    """
    Based on the acquisition_ids provided in the experiment, gets all of the usable versions of both
    biomarker expression and segmentation.
    """
    _ = self.acquisitions
    return emexp_helpers.get_available_versions(self._acquisition_ids)
def get_experiment_metadata(self) ‑> pandas.core.frame.DataFrame

Assembles the metadata for each acquisition in the emExperiment.

Args

None

Returns

A pandas dataframe containing the metadata for each acquisition in the emExperiment.

Expand source code
def get_experiment_metadata(self) -> pd.DataFrame:
    """
    Assembles the metadata for each acquisition in the emExperiment.

    Args:
        None

    Returns:
        A pandas dataframe containing the metadata for each acquisition in the emExperiment.
    """
    if self._acquisition_ids is not None:
        if type(self._acquisition_ids) is str:
            self._acquisition_ids = [self._acquisition_ids]
        else:
            assert (
                type(self._acquisition_ids) is list
            ), "acquisition_ids must be a list of acquisition ids"

        meta = queries.get_all_metadata_for_acquisition_id(self._acquisition_ids)
        self._meta = meta

    else:
        raise EMExperimentException("No acquisition ids provided.")

    return meta
def load_object(self, emo_names: Optional[Union[str, List[str]]] = None) ‑> Union[EMObject, list[EMObject]]

Loads the emObject(s) from the emExperiment into memory.

Args

emo_names
The name(s) of the emObject(s) to load. If None, all emObjects are loaded.

Returns

The emObject(s) loaded from the emExperiment. If emo_names is a list, a dictionary of emObjects is returned.

Expand source code
def load_object(
    self, emo_names: Optional[Union[str, List[str]]] = None
) -> Union[EMObject, list[EMObject]]:
    """
    Loads the emObject(s) from the emExperiment into memory.

    Args:
        emo_names: The name(s) of the emObject(s) to load. If None, all emObjects are loaded.

    Returns:
        The emObject(s) loaded from the emExperiment. If emo_names is a list, a dictionary of emObjects is returned.
    """

    if emo_names is None:
        emo_names = self._acquisition_ids

    if len(emo_names) == 1:
        E = core.load(self._zarrpaths[emo_names[0]])

    else:
        E = dict()
        for emo_name in emo_names:
            E[emo_name] = core.load(self._zarrpaths[emo_name])

    return E
def remove_acquisition(self, acq_id: Union[str, list]) ‑> None

Removes an acquisition from the emExperiment.

Args

acq_id
the acquisition id to remove. Multiple acquisition ids can be removed at once by passing a list.

Returns

None

Expand source code
def remove_acquisition(self, acq_id: Union[str, list]) -> None:
    """
    Removes an acquisition from the emExperiment.

    Args:
        acq_id: the acquisition id to remove. Multiple acquisition ids can be removed at once by passing a list.

    Returns:
        None
    """
    if type(acq_id) is str:
        acq_id = [acq_id]

    # what's the best way to do this?
    # iterate through the groups and remove the acquisition from each group?
    # TODO: revist this for a more efficient implementation.
    for group in self._groups.keys():
        for subgroup in self._groups[group].keys():
            for ai in acq_id:
                if ai in self._groups[group][subgroup]:
                    self._groups[group][subgroup].remove(ai)
def remove_group(self, groupname: str) ‑> None

Removes a group from the emExperiment.

Args

groupname
the name of the group to remove

Returns

None

Expand source code
def remove_group(self, groupname: str) -> None:
    """
    Removes a group from the emExperiment.

    Args:
        groupname: the name of the group to remove

    Returns:
        None
    """
    if groupname not in self._groups.keys:
        raise EMExperimentException(f"Group {groupname} does not exist.")

    # move the acquisition ids back to the ungrouped group
    for subgroup in self._groups[groupname].keys():
        for ai in self._groups[groupname][subgroup]:
            self._groups["ungrouped"].append(ai)
    self._groupnames.remove(groupname)
    self._groups.pop(groupname)
def remove_subgroup(self, groupname: str, subgroupname: str) ‑> None

Removes a subgroup from a group in the emExperiment.

Args

groupname
the name of the group to remove the subgroup from
subgroupname
the name of the subgroup to remove

Returns

None

Expand source code
def remove_subgroup(self, groupname: str, subgroupname: str) -> None:
    """
    Removes a subgroup from a group in the emExperiment.

    Args:
        groupname: the name of the group to remove the subgroup from
        subgroupname: the name of the subgroup to remove

    Returns:
        None
    """
    if groupname not in self._groups.keys:
        raise EMExperimentException(f"Group {groupname} does not exist.")

    if subgroupname not in self._groups[groupname].keys():
        raise EMExperimentException(f"Subgroup {subgroupname} does not exist.")

    # move the acquisition ids back to the ungrouped group
    for ai in self._groups[groupname][subgroupname]:
        self._groups["ungrouped"].append(ai)
    self._groups[groupname].pop(subgroupname)
class EMExperimentConfig (acquisition_ids: Optional[list] = None, study_id: Optional[int] = None, segmentation_versions: Optional[list] = None, biomarker_versions: Optional[list] = None, biomarkers: Optional[Union[list, np.ndarray, str]] = None, annotations: Optional[Union[list, np.ndarray, str]] = None, include_img: Optional[bool] = False, include_masks: Optional[bool] = False, include_seg_mask: Optional[bool] = False, seg_mask_type: Optional[str] = 'nucleus', img_res: Optional[int] = 0, mask_names: Optional[Union[list, np.ndarray, str]] = None, name: Optional[str] = None, root_dir: Optional[str] = None, datatype: Optional[str] = None)

Object that defines the config for an emExperiment that is interacting with the Enable database.

Args

acquisition_id
required, the acquisition ID to query
study_id
study id, placeholder
segmentation_versions
required, segmentation version
biomarker_versions
required, biomarker expression version
biomarkers
optional, a list or name of biomarkers to download. If None, gets all.
annotations
optional, a list or name of annotations to download. If None, gets all.
include_img
optional, if True, fetches the image with channels subsetted same as biomarkers.
include_masks
optional, if True, fetches ROI masks.
include_seg_masks
optional. If True, gets the segmentation mask.
seg_mask_type
optional. Type of segmentation mask to fetch. Can be 'nucleus' or 'cell'.
img_format
img_format - placeholder
img_res
optional, factor to downsample image by
img_to_disk
optional, if True writes the zarr store to disk, otherwise object held in memory.
root_dir
optional, path where experiment is built.
name
optional, a name for this emObject.
datatype
optional, describe the datatype used here.
Expand source code
class EMExperimentConfig:
    """Object that defines the config
    for an emExperiment that is interacting with the
    Enable database.

    Args:
        acquisition_id: required, the acquisition ID to query
        study_id: study id, placeholder
        segmentation_versions: required, segmentation version
        biomarker_versions: required, biomarker expression version
        biomarkers: optional, a list or name of biomarkers
            to download. If None, gets all.
        annotations: optional, a list or name of annotations to download.
            If None, gets all.
        include_img: optional, if True, fetches the image with channels
            subsetted same as biomarkers.
        include_masks: optional, if True, fetches ROI masks.
        include_seg_masks: optional. If True, gets the segmentation mask.
        seg_mask_type: optional. Type of segmentation mask to fetch. Can be
            'nucleus' or 'cell'.
        img_format: img_format - placeholder
        img_res: optional, factor to downsample image by
        img_to_disk: optional, if True writes the zarr store to disk,
            otherwise object held in memory.
        root_dir: optional, path where experiment is built.
        name: optional, a name for this emObject.
        datatype: optional, describe the datatype used here.
    """

    def __init__(
        self,
        acquisition_ids: Optional[list] = None,
        study_id: Optional[int] = None,
        segmentation_versions: Optional[list] = None,
        biomarker_versions: Optional[list] = None,
        biomarkers: Optional[Union[list, np.ndarray, str]] = None,
        annotations: Optional[Union[list, np.ndarray, str]] = None,
        include_img: Optional[bool] = False,
        include_masks: Optional[bool] = False,
        include_seg_mask: Optional[bool] = False,
        seg_mask_type: Optional[str] = "nucleus",
        img_res: Optional[int] = 0,
        mask_names: Optional[Union[list, np.ndarray, str]] = None,
        name: Optional[str] = None,
        root_dir: Optional[str] = None,
        datatype: Optional[str] = None,
    ):
        # simple case, zarr directory is provided
        if root_dir is not None:
            zarrs = glob.glob(os.path.join(root_dir, "*.zarr"))
            if len(zarrs) < 1:
                raise EMExperimentException(
                    "Provided root_dir does not contain any .zarr files"
                )

        # otherwise validate DB call
        else:
            if acquisition_ids is None and study_id is None:
                raise EMExperimentException(
                    "Must provide either acquisition_ids or study_id."
                )

            if segmentation_versions is not None and biomarker_versions is not None:
                if len(segmentation_versions) != len(biomarker_versions):
                    raise EMExperimentException(
                        "Must provide same number of segmentation and biomarker versions."
                    )

            # note may need to fix this for flex studies
            if acquisition_ids is not None and study_id is not None:
                warning(
                    "Both acquisition_ids and study_id provided. \
                        Note: acquisition_ids overrides study_id"
                )

        self.acquisition_ids = acquisition_ids
        self.study_id = study_id
        self.segmentation_versions = segmentation_versions
        self.biomarker_versions = biomarker_versions
        self.biomarkers = biomarkers
        self.annotations = annotations
        self.include_img = include_img
        self.include_masks = include_masks
        self.include_seg_mask = include_seg_mask
        self.seg_mask_type = seg_mask_type
        self.img_res = img_res
        self.mask_names = mask_names
        self.name = name
        self.datatype = datatype
        self.masks = None
        self.img = None
        self.root_dir = root_dir

    def _validate_config_for_enable_db(self) -> None:
        # validation above. Skip the rest if zarr dir is provided
        # Later we will validate each zarr.
        if self.root_dir is not None:
            return

        if (
            self.segmentation_versions is not None
            and self.biomarker_versions is not None
        ):
            if len(self.segmentation_versions) != len(self.biomarker_versions):
                raise EMExperimentException(
                    "Must provide same number of segmentation and biomarker versions."
                )
            if len(self.segmentation_versions) != len(self.acquisition_ids):
                raise EMExperimentException(
                    "Must provide same number of segmentation versions and acquisition ids."
                )
            if len(self.biomarker_versions) != len(self.acquisition_ids):
                raise EMExperimentException(
                    "Must provide same number of biomarker versions and acquisition ids."
                )