Module emobject.utils.io
Expand source code
from emobject import emobject as emo
from emobject.emlayer import BaseLayer
from emobject.errors import EMObjectException
from emobject.core import save
import emobject.emimage as emi
from scipy import sparse
import pandas as pd
from anndata import AnnData
from typing import Optional
from lxml import etree
# from glob import glob
import os
import json
import zipfile
from pathlib import Path
import numpy as np
from ome_types import from_xml
import ome_types
import codecs
import re
import base64
def to_anndata(E: emo.EMObject) -> AnnData:
"""Converts an EMObject to an anndata object.
Args:
E : EMObject to convert
"""
try:
import anndata as ad
except ImportError:
raise EMObjectException("anndata not installed")
adata = ad.AnnData(X=E.data)
# make obs axis string
temp_obs = E.obs.copy()
temp_obs.index = temp_obs.index.astype(str)
adata.obs = temp_obs
adata.var = E.var
if E.meta is not None:
# convert df to dict
adata.uns = E.meta.to_dict()
return adata
def from_anndata(
adata: AnnData,
dtype=int,
include_uns: Optional[bool] = False,
name: Optional[str] = None,
assay: Optional[str] = "visium",
) -> emo.EMObject:
"""Converts an anndata object to an EMObject.
Args:
adata : anndata object to convert
"""
new_idx = [j for j in range(1, adata.shape[0] + 1)]
if type(adata.X) == sparse._csr.csr_matrix:
df = pd.DataFrame(adata.X.todense(), dtype=dtype)
else:
df = pd.DataFrame(adata.X, dtype=dtype)
# see if there's position data
try:
pos = adata.obsm["spatial"]
pos = pd.DataFrame(pos, dtype=dtype, index=new_idx)
except KeyError:
pos = None
if include_uns:
meta = adata.uns
else:
meta = None
obs = adata.obs
obs.index = new_idx
E = emo.EMObject(
data=df, obs=obs, var=adata.var, meta=meta, pos=pos, name=name, assay=assay
)
return E
def layer_from_anndata(
adata: AnnData,
dtype=int,
include_uns: Optional[bool] = False,
name: str = None,
assay: Optional[str] = "visium",
spot_size: Optional[float] = None,
scale_factor: Optional[float] = None,
) -> emo.EMObject:
"""Converts an anndata object to an EMObject.
Args:
adata : anndata object to convert
"""
new_idx = [j for j in range(1, adata.shape[0] + 1)]
# old_idx = adata.obs.index
if type(adata.X) == sparse._csr.csr_matrix:
df = pd.DataFrame(adata.X.todense(), dtype=dtype)
else:
df = pd.DataFrame(adata.X, dtype=dtype, index=new_idx)
# see if there's position data
try:
pos = adata.obsm["spatial"]
pos = pd.DataFrame(pos, dtype=dtype, index=new_idx)
# pos.index = new_idx
except KeyError:
pos = None
if name is None:
raise EMObjectException("Must provide a name for the layer")
obs = adata.obs
obs.index = new_idx
return BaseLayer(
data=df,
obs=obs,
var=adata.var,
pos=pos,
name=name,
assay=assay,
spot_size=spot_size,
scale_factor=scale_factor,
)
def from_10x_visium(path: str, name: Optional[str] = None) -> emo.EMObject:
"""Converts a 10x Visium directory to an EMObject.
Args:
path : path to 10x Visium directory
"""
assert os.path.exists(path), f"Path {path} does not exist."
# TODO: Parse visium directory
def object_from_files(
data: str,
obs: str,
var: str,
meta: Optional[str] = None,
pos: Optional[str] = None,
name: Optional[str] = None,
assay: Optional[str] = None,
delimiter: Optional[str] = ",",
) -> emo.EMObject:
"""Converts files to an EMObject.
Args:
data : path to data file
obs : path to obs file
var : path to var file
meta : path to meta file
pos : path to pos file
name : name of the EMObject
assay : assay type
"""
data = pd.read_csv(data, index_col=0, delimiter=delimiter)
obs = pd.read_csv(obs, index_col=0, delimiter=delimiter)
var = pd.read_csv(var, index_col=0, delimiter=delimiter)
if meta is not None:
meta = pd.read_csv(meta, index_col=0, delimiter=delimiter)
else:
meta = None
if pos is not None:
pos = pd.read_csv(pos, index_col=0, delimiter=delimiter)
else:
pos = None
E = emo.EMObject(
data=data, obs=obs, var=var, meta=meta, pos=pos, name=name, assay=assay
)
return E
def layer_from_files(
data: str,
obs: str,
var: str,
pos: Optional[str] = None,
name: Optional[str] = None,
assay: Optional[str] = None,
delimiter: Optional[str] = ",",
spot_size: Optional[float] = None,
scale_factor: Optional[float] = None,
) -> BaseLayer:
"""Converts files to an EMObject.
Args:
data : path to data file
obs : path to obs file
var : path to var file
meta : path to meta file
pos : path to pos file
name : name of the layer
assay : assay type
"""
data = pd.read_csv(data, index_col=0, delimiter=delimiter)
obs = pd.read_csv(obs, index_col=0, delimiter=delimiter)
var = pd.read_csv(var, index_col=0, delimiter=delimiter)
if pos is not None:
pos = pd.read_csv(pos, index_col=0, delimiter=delimiter)
else:
pos = None
return BaseLayer(
data=data,
obs=obs,
var=var,
pos=pos,
name=name,
assay=assay,
spot_size=None,
scale_factor=None,
)
def from_geomx(workflow_directory, image_directory, save_directory):
"""
Parse Nanostring GeoMX data into emObject, including DCC files, PKC
files, OME-XML metadata (ROIs), and images.
Args:
workflow_directory (str): The path of the directory containing the
GeoMX data outputs (often referred to as 'workflow' files).
image_directory (str): The path of the directory containing GeoMX
image data
save_directory (str): The path of the directory where Zarr data will
be stored.
Returns:
A list of emObjects
"""
# From top level 'Workflow' directory, find DCC directory
dcc_dir = __find_dirs_with_ext(workflow_directory, ".dcc")
# Find PKC file
pkc_files = __find_files_with_ext(workflow_directory, ".pkc")
pkc_files = [
file for file in pkc_files if not os.path.basename(file).startswith(".")
]
# Find XML files
xml_paths = __find_files_with_ext(image_directory, ".xml")
xml_fnames = [os.path.basename(x) for x in xml_paths]
xml_names = [x.split(".", 1)[0] for x in xml_fnames]
# Check for duplicates
assert len(dcc_dir) == 1, "More than one DCC directory found"
assert len(pkc_files) == 1, "More than one .PKC file found in directory"
# Parse files:
annos = _read_region_annotations(workflow_directory)
segments = _read_segment_properties(workflow_directory)
obs = pd.merge(annos, segments, how="left")
mat = _create_counts_matrix(dcc_dir=dcc_dir[0], pkc_file=pkc_files[0])
mat.reindex(obs.index)
# Load images and create emObjects
image_file_names = set(xml_names) & set(obs["SlideName"])
xml_file_paths = [
os.path.join(image_directory, "".join([x, ".ome.xml"]))
for x in image_file_names
]
img_file_paths = [
os.path.join(image_directory, "".join([x, ".ome.tiff"]))
for x in image_file_names
]
zarr_paths = []
for name, path, img_path in zip(image_file_names, xml_file_paths, img_file_paths):
combined_mask, mask_inds, mask_positions, dims = _extract_rois_from_xml(path)
these_obs = obs[obs["SlideName"].isin([name])]
this_mat = mat[mat.index.isin(these_obs["Sample_ID"])]
# position_dict = {m: mask_positions[i] for i, m in enumerate(mask_inds)}
M = emi.EMMask(
masks=combined_mask,
mask_idx=np.array([name]),
# dims=dims,
# pos=position_dict,
to_disk=True,
)
E = emo.EMObject(
data=this_mat,
obs=these_obs,
var=None,
pos=mask_positions,
mask=M,
img=None,
meta=None,
name=name,
)
save(E, out_dir=save_directory, name=name)
zarr_path = zarr_path = os.path.join(save_directory, name + ".zarr")
zarr_paths.append(zarr_path)
return zarr_paths
def combine_masks(mask_list, xy, W, H):
"""
Combine multiple binary masks into a single 2D array.
Each mask in mask_list is represented by a tuple: (top_left_x, top_left_y, mask_array),
where mask_array is a 2D 1-bit numpy array of known width and height.
max_x and max_y are the dimensions of the final combined mask.
"""
# Coerce ints
W = int(W)
H = int(H)
# Create an empty 2D numpy array of the required dimensions
combined_mask = np.zeros((H, W), dtype=int)
# Iterate over each mask, adding its values into the combined mask
for i, mask in enumerate(mask_list, start=1):
i1 = i - 1
x1 = xy[i1][0]
y1 = xy[i1][1]
combined_mask[y1 : y1 + mask.shape[0], x1 : x1 + mask.shape[1]] += i * mask
return combined_mask
def _init_EMMask_from_dict(mask_dictionary):
first_key = next(iter(mask_dictionary))
first_mask = mask_dictionary[first_key]
M = emi.EMMask(masks=first_mask, mask_idx=first_key, to_disk=True)
return M
def _add_mask_from_dict(mask_dictionary: dict = None, M: emi.EMMask = None):
for i, m in enumerate(mask_dictionary.keys()):
print(m)
if m in M.mask_names:
continue
M.add_mask(mask=mask_dictionary[m], mask_name=m)
return M
def _extract_rois_from_xml(xml_fpath, validate=False):
"""
Ingests an OME-XML (or XML) file as a path, parses the file, then extracts
ROI/AOIs as binary masks. This function uses the ome-types library to parse
XML and to model the different data structures needed.
Binary base64 data from the BinData model are extracted and bit unpacked
into a 1-bit binary 2D numpy array.
Args:
xml_fpath: A string defining the XML file path
validate: A logical, defining whether or not the ome-types library
should perform XML validation on input. Not recommended, since
GeoMX outputs may contain non standard metadata.
Returns:
A tuple of two dictionaries. The first dictionary contains the binary
ROI masks, the second contains the centroid of the ROI. The dictionary
keys are a comma-separated string of the 'ROI' and column of the GeoMX annotation input.
"""
# parse ome-xml
try:
types = from_xml(xml_fpath, validate=validate, parser="lxml")
except etree.XMLSyntaxError:
__clean_xml_file(xml_fpath, xml_fpath)
types = from_xml(xml_fpath, validate=validate, parser="lxml")
# retrieve full image dimensions
pixels_dict = dict(dict(types.images[0])["pixels"])
X = pixels_dict["size_x"]
Y = pixels_dict["size_y"]
# retrieve image name (slide name)
slide_name = dict(dict(types)["images"][0])["name"]
# extract rois
rois = types.rois
masks = {}
mask_inds = []
mask_positions = []
for i in range(len(rois)):
roi = rois[i]
# get ID information
roi_dict = dict(roi)
roi_id = int(roi_dict["id"].split(":")[1]) + 1
roi_id_str = f"{roi_id:0>3}"
union = roi_dict["union"]
# get mask information
models = [dict(u) for u in union if isinstance(u, ome_types.model.mask.Mask)]
for mask in models:
mask_id_str = mask["text"]
mh = mask["height"]
mw = mask["width"]
mx = mask["x"]
my = mask["y"]
dict_key = " | ".join([slide_name, roi_id_str, mask_id_str])
bin_mask = _read_bindata_mask(
dict(mask["bin_data"])["value"], width=mw, height=mh
)
masks[dict_key] = bin_mask
mask_inds.append(dict_key)
mask_positions.append(np.array([mx, my], dtype=int))
mask_position_array = np.vstack(mask_positions)
dims = (X, Y)
combined_mask = combine_masks(masks.values(), mask_position_array, X, Y)
return combined_mask, mask_inds, mask_position_array, dims
def _read_bindata_mask(base64_data, width, height):
width = int(width)
height = int(height)
# Decode the base64 data
decoded_data = base64.b64decode(base64_data)
# Convert the binary data to a NumPy array of 8-bit unsigned integers (0-255)
data_array = np.frombuffer(decoded_data, dtype=np.uint8)
# Unpack bits to 1-bit binary
np_array = np.unpackbits(data_array)
# Reshape the 1D array into a 2D array based on the dimensions of the image
mask = np_array[0 : height * width].reshape((height, width))
return mask
def __combine_rows_by_index(df, indices_to_combine, new_index):
"""
Combine specified rows in a pandas DataFrame by summing column values.
Args:
df (DataFrame): pandas DataFrame
indices_to_combine (list): list of indices to combine
new_index (str): new index for the combined row
Returns:
pandas DataFrame with combined rows
"""
# Create a mapping from old index to new index
index_mapping = {
idx: new_index if idx in indices_to_combine else idx for idx in df.index
}
# Set the new index
df.index = df.index.to_series().map(index_mapping)
# Group by the new index and sum
df = df.groupby(df.index).sum()
return df
def __find_dirs_with_ext(top_dir, ext):
dirs_with_ext = set()
for root, dirs, files in os.walk(top_dir):
for file in files:
if file.lower().endswith(ext.lower()):
dirs_with_ext.add(root)
return list(dirs_with_ext)
def __find_files_with_ext(top_dir, ext):
files_with_ext = []
for root, dirs, files in os.walk(top_dir):
for file in files:
if file.lower().endswith(ext.lower()):
files_with_ext.append(os.path.join(root, file))
return files_with_ext
def _parse_dcc(file_path):
"""
Parse a DCC (Digital Cancer Capture) file into a nested dictionary
structure.
The DCC file format is used by NanoString Technologies for their nCounter
and GeoMx platforms. The file contains sections with key-value pairs
separated by commas. Each section starts with a tag in the format
<SectionName> and ends with a tag in the format </SectionName>.
Example DCC file content:
<Header>
FileVersion,0.02
SoftwareVersion,"GeoMx_NGS_Pipeline_2.3.4"
Date,2021-5-20
</Header>
<Scan_Attributes>
ID,DSP-1005330000011-D-A01
Plate_ID,1005330000011
Well,A01
</Scan_Attributes>
Args:
file_path (str): The path of the DCC file to parse.
Returns:
dict: A nested dictionary representing the DCC file content, where the
outer dictionary keys are section names, and the inner dictionaries
contain the key-value pairs within each section.
Example usage:
dcc_file_path = 'example.dcc'
parsed_dcc = _parse_dcc(dcc_file_path)
print(parsed_dcc)
Output:
{
'Header': {
'FileVersion': '0.02',
'SoftwareVersion': '"GeoMx_NGS_Pipeline_2.3.4"',
'Date': '2021-5-20'
},
'Scan_Attributes': {
'ID': 'DSP-1005330000011-D-A01',
'Plate_ID': '1005330000011',
'Well': 'A01'
}
}
"""
with open(file_path, "r") as f:
lines = f.readlines()
dcc_dict = {}
current_section = None
for line in lines:
line = line.strip()
# Skip empty lines
if not line:
continue
# Start a new section
if line.startswith("<") and not line.startswith("</") and line.endswith(">"):
section_name = line[1:-1]
current_section = section_name
dcc_dict[current_section] = {}
# Close the current section
elif line.startswith("</") and line.endswith(">"):
current_section = None
# Process key-value pairs within the current section
elif current_section is not None:
key, value = line.split(",", 1)
dcc_dict[current_section][key] = value
return dcc_dict
def _parse_pkc(file_path):
"""
Parse a PKC (Probe Key Collection) file and extract the RTS_ID to gene
name mapping.
The PKC file is a JSON-formatted file containing information about probes
and their associated genes. This function extracts the 'RTS_ID' and gene
name from the PKC file and returns a dictionary where the RTS_ID is the
key, and the gene name is the value.
Args:
file_path (str): The path of the PKC file to parse.
Returns:
dict: A dictionary containing the RTS_ID to gene name mapping, where
the RTS_ID is the key, and the gene name is the value.
Example usage:
pkc_file_path = 'example.pkc'
parsed_pkc = parse_pkc(pkc_file_path)
print(parsed_pkc)
Output (example):
{
'RTS0021170': 'GOLPH3L',
'RTS0021193': 'ACTB',
...
}
"""
# Parse PKC as JSON (same format)
with open(file_path, "r") as file:
data = json.load(file)
# Initialize dictionary
mapping = dict()
# Extract RTS ID - Gene name mapping
for target in data["Targets"]:
rts = target["Probes"][0]["RTS_ID"]
gene = target["Probes"][0]["DisplayName"].split("_")[0]
mapping[rts] = gene
return mapping
def _read_dcc_files(directory, counts_key="Code_Summary"):
"""
Read and parse .dcc files in a directory, unzip .zip files if necessary.
Given a directory name, this function checks for .dcc and .zip files.
If only .zip files are found, it unzips them. If both .zip and .dcc files
are found, it assumes unzipping has already occurred. If no .dcc files are
found, even after unzipping, the function raises an error.
Args:
directory (str): The directory path containing .dcc and/or .zip files.
counts_key (str): The name of the key from `_parse_dcc` output that contains counts data.
Returns:
dict: A dictionary with .dcc file basenames as keys and _parse_dcc output
as values.
Raises:
FileNotFoundError: If no .dcc files are found in the directory.
"""
dcc_files = list(Path(directory).glob("*.dcc"))
zip_files = list(Path(directory).glob("*.zip"))
# Unzip .zip files if necessary
if not dcc_files and zip_files:
for zip_file in zip_files:
with zipfile.ZipFile(zip_file, "r") as zf:
zf.extractall(directory)
dcc_files = list(Path(directory).glob("*.dcc"))
# Check for .dcc files
if not dcc_files:
raise FileNotFoundError("No .dcc files found in the directory.")
# Read and parse .dcc files
parsed_files = {}
for dcc_file in dcc_files:
basename = dcc_file.stem
parsed_files[basename] = _parse_dcc(dcc_file)[counts_key]
df = pd.DataFrame.from_dict(parsed_files, orient="index").fillna(0)
return df
def _create_counts_matrix(dcc_dir, pkc_file, counts_key="Code_Summary"):
"""
Create a counts matrix from GeoMx data output: DCC and PKC files.
Args:
dcc_dir (str): A directory to search for DCC files in
pkc_file (str): A path to the PKC file
Returns:
pd.DataFrame: The cleaned data inputs as a pandas DataFrame
"""
dcc = _read_dcc_files(dcc_dir, counts_key=counts_key)
mapping = _parse_pkc(pkc_file)
dcc.rename(columns=mapping, inplace=True)
return dcc
def _read_region_annotations(
directory,
file_name_pattern="LabWorksheet",
file_extension=".txt",
text_matching_pattern="Annotations",
):
"""
Read files matching a file name pattern and extension, and parse data
following a text matching pattern into a pandas DataFrame. If multiple
matching files are found, concatenate the data by rows.
Args:
directory (str): The directory path to search for files.
file_name_pattern (str): The file name pattern to match.
file_extension (str): The file extension to match.
text_matching_pattern (str): The text pattern to search for in the file.
Returns:
pd.DataFrame: The parsed data as a pandas DataFrame.
Raises:
FileNotFoundError: If no matching files are found.
"""
search_pattern = f"*{file_name_pattern}*{file_extension}"
matching_files = list(Path(directory).rglob(search_pattern))
if not matching_files:
raise FileNotFoundError(
f"No files matching the pattern {search_pattern} were found."
)
dataframes = []
for file in matching_files:
with open(file, "r") as f:
lines = f.readlines()
for i, line in enumerate(lines):
if text_matching_pattern in line:
# Read the file into a DataFrame starting from the next line
df = pd.read_csv(file, sep="\t", skiprows=i + 1)
dataframes.append(df)
break
if not dataframes:
raise ValueError(
f"No matching text pattern '{text_matching_pattern}' found in the matching files."
)
# Concatenate DataFrames by rows
combined_df = pd.concat(dataframes, axis=0, ignore_index=True, sort=False)
# Clean ROI column
combined_df["ROI"] = combined_df["roi"].str.extract(r"(\d+)")
return combined_df
def _read_segment_properties(
directory, file_name_pattern="Export1", sheet_name="SegmentProperties"
):
search_pattern = f"{file_name_pattern}*.xlsx"
matching_files = list(Path(directory).rglob(search_pattern))
seg_properties = pd.read_excel(matching_files[0], sheet_name=sheet_name)
seg_properties[["slide name", "ROI", "segment"]] = seg_properties[
"SegmentDisplayName"
].str.split(
" \| ", expand=True # noqa: W605
)
return seg_properties
def __clean_xml_file(input_filename, output_filename):
# Open the file using codecs to ensure correct encoding handling
with codecs.open(input_filename, "r", encoding="utf-8", errors="replace") as file:
content = file.read()
# Using regex to find non-UTF-8 characters and replace them
content = re.sub(r"[^\x00-\x7F]+", "\u00b5", content)
# Writing the cleaned content to a new file
with codecs.open(output_filename, "w", encoding="utf-8") as file:
file.write(content)
def __get_rectangle_corners(x, y, width, height):
top_left = (x, y)
top_right = (x + width, y)
bottom_left = (x, y + height)
bottom_right = (x + width, y + height)
return np.array([top_left, top_right, bottom_left, bottom_right])
def __aggregate_columns(column):
if np.issubdtype(column.dtype, np.number):
return column.sum()
elif np.issubdtype(column.dtype, np.object):
return ",".join(column.unique())
elif np.issubdtype(column.dtype, np.bool):
return column.any()
else:
return column.iloc[0]
Functions
def combine_masks(mask_list, xy, W, H)-
Combine multiple binary masks into a single 2D array.
Each mask in mask_list is represented by a tuple: (top_left_x, top_left_y, mask_array), where mask_array is a 2D 1-bit numpy array of known width and height.
max_x and max_y are the dimensions of the final combined mask.
Expand source code
def combine_masks(mask_list, xy, W, H): """ Combine multiple binary masks into a single 2D array. Each mask in mask_list is represented by a tuple: (top_left_x, top_left_y, mask_array), where mask_array is a 2D 1-bit numpy array of known width and height. max_x and max_y are the dimensions of the final combined mask. """ # Coerce ints W = int(W) H = int(H) # Create an empty 2D numpy array of the required dimensions combined_mask = np.zeros((H, W), dtype=int) # Iterate over each mask, adding its values into the combined mask for i, mask in enumerate(mask_list, start=1): i1 = i - 1 x1 = xy[i1][0] y1 = xy[i1][1] combined_mask[y1 : y1 + mask.shape[0], x1 : x1 + mask.shape[1]] += i * mask return combined_mask def from_10x_visium(path: str, name: Optional[str] = None) ‑> EMObject-
Converts a 10x Visium directory to an EMObject.
Args
path : path to 10x Visium directory
Expand source code
def from_10x_visium(path: str, name: Optional[str] = None) -> emo.EMObject: """Converts a 10x Visium directory to an EMObject. Args: path : path to 10x Visium directory """ assert os.path.exists(path), f"Path {path} does not exist." # TODO: Parse visium directory def from_anndata(adata: anndata._core.anndata.AnnData, dtype=builtins.int, include_uns: Optional[bool] = False, name: Optional[str] = None, assay: Optional[str] = 'visium') ‑> EMObject-
Converts an anndata object to an EMObject.
Args
adata : anndata object to convert
Expand source code
def from_anndata( adata: AnnData, dtype=int, include_uns: Optional[bool] = False, name: Optional[str] = None, assay: Optional[str] = "visium", ) -> emo.EMObject: """Converts an anndata object to an EMObject. Args: adata : anndata object to convert """ new_idx = [j for j in range(1, adata.shape[0] + 1)] if type(adata.X) == sparse._csr.csr_matrix: df = pd.DataFrame(adata.X.todense(), dtype=dtype) else: df = pd.DataFrame(adata.X, dtype=dtype) # see if there's position data try: pos = adata.obsm["spatial"] pos = pd.DataFrame(pos, dtype=dtype, index=new_idx) except KeyError: pos = None if include_uns: meta = adata.uns else: meta = None obs = adata.obs obs.index = new_idx E = emo.EMObject( data=df, obs=obs, var=adata.var, meta=meta, pos=pos, name=name, assay=assay ) return E def from_geomx(workflow_directory, image_directory, save_directory)-
Parse Nanostring GeoMX data into emObject, including DCC files, PKC files, OME-XML metadata (ROIs), and images.
Args
workflow_directory:str- The path of the directory containing the
- GeoMX data outputs (often referred to as 'workflow' files).
image_directory:str- The path of the directory containing GeoMX
- image data
save_directory:str- The path of the directory where Zarr data will
be stored.
Returns
A list of emObjects
Expand source code
def from_geomx(workflow_directory, image_directory, save_directory): """ Parse Nanostring GeoMX data into emObject, including DCC files, PKC files, OME-XML metadata (ROIs), and images. Args: workflow_directory (str): The path of the directory containing the GeoMX data outputs (often referred to as 'workflow' files). image_directory (str): The path of the directory containing GeoMX image data save_directory (str): The path of the directory where Zarr data will be stored. Returns: A list of emObjects """ # From top level 'Workflow' directory, find DCC directory dcc_dir = __find_dirs_with_ext(workflow_directory, ".dcc") # Find PKC file pkc_files = __find_files_with_ext(workflow_directory, ".pkc") pkc_files = [ file for file in pkc_files if not os.path.basename(file).startswith(".") ] # Find XML files xml_paths = __find_files_with_ext(image_directory, ".xml") xml_fnames = [os.path.basename(x) for x in xml_paths] xml_names = [x.split(".", 1)[0] for x in xml_fnames] # Check for duplicates assert len(dcc_dir) == 1, "More than one DCC directory found" assert len(pkc_files) == 1, "More than one .PKC file found in directory" # Parse files: annos = _read_region_annotations(workflow_directory) segments = _read_segment_properties(workflow_directory) obs = pd.merge(annos, segments, how="left") mat = _create_counts_matrix(dcc_dir=dcc_dir[0], pkc_file=pkc_files[0]) mat.reindex(obs.index) # Load images and create emObjects image_file_names = set(xml_names) & set(obs["SlideName"]) xml_file_paths = [ os.path.join(image_directory, "".join([x, ".ome.xml"])) for x in image_file_names ] img_file_paths = [ os.path.join(image_directory, "".join([x, ".ome.tiff"])) for x in image_file_names ] zarr_paths = [] for name, path, img_path in zip(image_file_names, xml_file_paths, img_file_paths): combined_mask, mask_inds, mask_positions, dims = _extract_rois_from_xml(path) these_obs = obs[obs["SlideName"].isin([name])] this_mat = mat[mat.index.isin(these_obs["Sample_ID"])] # position_dict = {m: mask_positions[i] for i, m in enumerate(mask_inds)} M = emi.EMMask( masks=combined_mask, mask_idx=np.array([name]), # dims=dims, # pos=position_dict, to_disk=True, ) E = emo.EMObject( data=this_mat, obs=these_obs, var=None, pos=mask_positions, mask=M, img=None, meta=None, name=name, ) save(E, out_dir=save_directory, name=name) zarr_path = zarr_path = os.path.join(save_directory, name + ".zarr") zarr_paths.append(zarr_path) return zarr_paths def layer_from_anndata(adata: anndata._core.anndata.AnnData, dtype=builtins.int, include_uns: Optional[bool] = False, name: str = None, assay: Optional[str] = 'visium', spot_size: Optional[float] = None, scale_factor: Optional[float] = None) ‑> EMObject-
Converts an anndata object to an EMObject.
Args
adata : anndata object to convert
Expand source code
def layer_from_anndata( adata: AnnData, dtype=int, include_uns: Optional[bool] = False, name: str = None, assay: Optional[str] = "visium", spot_size: Optional[float] = None, scale_factor: Optional[float] = None, ) -> emo.EMObject: """Converts an anndata object to an EMObject. Args: adata : anndata object to convert """ new_idx = [j for j in range(1, adata.shape[0] + 1)] # old_idx = adata.obs.index if type(adata.X) == sparse._csr.csr_matrix: df = pd.DataFrame(adata.X.todense(), dtype=dtype) else: df = pd.DataFrame(adata.X, dtype=dtype, index=new_idx) # see if there's position data try: pos = adata.obsm["spatial"] pos = pd.DataFrame(pos, dtype=dtype, index=new_idx) # pos.index = new_idx except KeyError: pos = None if name is None: raise EMObjectException("Must provide a name for the layer") obs = adata.obs obs.index = new_idx return BaseLayer( data=df, obs=obs, var=adata.var, pos=pos, name=name, assay=assay, spot_size=spot_size, scale_factor=scale_factor, ) def layer_from_files(data: str, obs: str, var: str, pos: Optional[str] = None, name: Optional[str] = None, assay: Optional[str] = None, delimiter: Optional[str] = ',', spot_size: Optional[float] = None, scale_factor: Optional[float] = None) ‑> BaseLayer-
Converts files to an EMObject.
Args
data : path to data file obs : path to obs file var : path to var file meta : path to meta file pos : path to pos file name : name of the layer assay : assay type
Expand source code
def layer_from_files( data: str, obs: str, var: str, pos: Optional[str] = None, name: Optional[str] = None, assay: Optional[str] = None, delimiter: Optional[str] = ",", spot_size: Optional[float] = None, scale_factor: Optional[float] = None, ) -> BaseLayer: """Converts files to an EMObject. Args: data : path to data file obs : path to obs file var : path to var file meta : path to meta file pos : path to pos file name : name of the layer assay : assay type """ data = pd.read_csv(data, index_col=0, delimiter=delimiter) obs = pd.read_csv(obs, index_col=0, delimiter=delimiter) var = pd.read_csv(var, index_col=0, delimiter=delimiter) if pos is not None: pos = pd.read_csv(pos, index_col=0, delimiter=delimiter) else: pos = None return BaseLayer( data=data, obs=obs, var=var, pos=pos, name=name, assay=assay, spot_size=None, scale_factor=None, ) def object_from_files(data: str, obs: str, var: str, meta: Optional[str] = None, pos: Optional[str] = None, name: Optional[str] = None, assay: Optional[str] = None, delimiter: Optional[str] = ',') ‑> EMObject-
Converts files to an EMObject.
Args
data : path to data file obs : path to obs file var : path to var file meta : path to meta file pos : path to pos file name : name of the EMObject assay : assay type
Expand source code
def object_from_files( data: str, obs: str, var: str, meta: Optional[str] = None, pos: Optional[str] = None, name: Optional[str] = None, assay: Optional[str] = None, delimiter: Optional[str] = ",", ) -> emo.EMObject: """Converts files to an EMObject. Args: data : path to data file obs : path to obs file var : path to var file meta : path to meta file pos : path to pos file name : name of the EMObject assay : assay type """ data = pd.read_csv(data, index_col=0, delimiter=delimiter) obs = pd.read_csv(obs, index_col=0, delimiter=delimiter) var = pd.read_csv(var, index_col=0, delimiter=delimiter) if meta is not None: meta = pd.read_csv(meta, index_col=0, delimiter=delimiter) else: meta = None if pos is not None: pos = pd.read_csv(pos, index_col=0, delimiter=delimiter) else: pos = None E = emo.EMObject( data=data, obs=obs, var=var, meta=meta, pos=pos, name=name, assay=assay ) return E def to_anndata(E: EMObject) ‑> anndata._core.anndata.AnnData-
Converts an EMObject to an anndata object.
Args
E : EMObject to convert
Expand source code
def to_anndata(E: emo.EMObject) -> AnnData: """Converts an EMObject to an anndata object. Args: E : EMObject to convert """ try: import anndata as ad except ImportError: raise EMObjectException("anndata not installed") adata = ad.AnnData(X=E.data) # make obs axis string temp_obs = E.obs.copy() temp_obs.index = temp_obs.index.astype(str) adata.obs = temp_obs adata.var = E.var if E.meta is not None: # convert df to dict adata.uns = E.meta.to_dict() return adata