Source code for popari.io
from pathlib import Path
from typing import Optional, Sequence, Union
import anndata as ad
import awkward as ak
import numpy as np
import torch
from scipy.sparse import csr_matrix, issparse
from popari.components import PopariDataset
from popari.util import concatenate, unconcatenate
[docs]
def load_anndata(filepath: Union[str, Path]):
"""Load AnnData object from h5ad file and reformat for Popari."""
merged_dataset = ad.read_h5ad(filepath)
datasets, replicate_names = unmerge_anndata(merged_dataset)
return datasets, replicate_names
def unmerge_anndata(merged_dataset: ad.AnnData):
"""Unmerge composite AnnData object into constituent datasets."""
datasets = unconcatenate(merged_dataset)
for dataset in datasets:
replicate_string = f"{dataset.name}"
if "Sigma_x_inv" in dataset.uns:
# Keep only Sigma_x_inv corresponding to a particular replicate
replicate_Sigma_x_inv = dataset.uns["Sigma_x_inv"][replicate_string]
if np.isscalar(replicate_Sigma_x_inv) and replicate_Sigma_x_inv == -1:
replicate_Sigma_x_inv = None
dataset.uns["Sigma_x_inv"] = {
replicate_string: replicate_Sigma_x_inv,
}
if "Sigma_x_inv_bar" in dataset.uns:
# Keep only Sigma_x_inv_bar corresponding to a particular replicate
replicate_Sigma_x_inv_bar = dataset.uns["Sigma_x_inv_bar"][replicate_string]
if np.isscalar(replicate_Sigma_x_inv_bar) and replicate_Sigma_x_inv_bar == -1:
replicate_Sigma_x_inv_bar = None
dataset.uns["Sigma_x_inv_bar"] = {
replicate_string: replicate_Sigma_x_inv_bar,
}
# Hacks to load adjacency matrices efficiently
if "adjacency_matrix" in dataset.uns:
dataset.obsp["adjacency_matrix"] = csr_matrix(
dataset.uns["adjacency_matrix"][replicate_string],
)
adjacency_matrix = dataset.obsp["adjacency_matrix"].tocoo()
num_cells, _ = adjacency_matrix.shape
adjacency_list = [[] for _ in range(num_cells)]
for x, y in zip(*adjacency_matrix.nonzero()):
adjacency_list[x].append(y)
dataset.obsm["adjacency_list"] = ak.Array(adjacency_list)
if "M" in dataset.uns:
if replicate_string in dataset.uns["M"]:
replicate_M = make_hdf5_compatible(dataset.uns["M"][replicate_string])
dataset.uns["M"] = {replicate_string: replicate_M}
if "M_bar" in dataset.uns:
if replicate_string in dataset.uns["M_bar"]:
replicate_M_bar = make_hdf5_compatible(
dataset.uns["M_bar"][replicate_string],
)
dataset.uns["M_bar"] = {replicate_string: replicate_M_bar}
if "popari_hyperparameters" in dataset.uns:
if "prior_x" in dataset.uns["popari_hyperparameters"]:
prior_x = make_hdf5_compatible(
dataset.uns["popari_hyperparameters"]["prior_x"],
)
dataset.uns["popari_hyperparameters"]["prior_x"] = prior_x
if "spatial_affinity_groups" in dataset.uns["popari_hyperparameters"]:
name_parts = dataset.name.split("_level_")
if len(name_parts) > 1:
*_, level = name_parts
level = int(level)
else:
level = 0
groups = dataset.uns["popari_hyperparameters"]["spatial_affinity_groups"]
filtered_groups = {}
for group, group_replicates in groups.items():
group_name_parts = group.split("_level_")
if len(group_name_parts) > 1:
*_, group_level = group_name_parts
group_level = int(group_level)
else:
group_level = 0
if group_level == level:
filtered_groups[group] = group_replicates
dataset.uns["popari_hyperparameters"]["spatial_affinity_groups"] = filtered_groups
if "metagene_groups" in dataset.uns["popari_hyperparameters"]:
name_parts = dataset.name.split("_level_")
if len(name_parts) > 1:
*_, level = name_parts
level = int(level)
else:
level = 0
groups = dataset.uns["popari_hyperparameters"]["metagene_groups"]
filtered_groups = {}
for group, group_replicates in groups.items():
group_name_parts = group.split("_level_")
if len(group_name_parts) > 1:
*_, group_level = group_name_parts
group_level = int(group_level)
else:
group_level = 0
if group_level == level:
filtered_groups[group] = group_replicates
dataset.uns["popari_hyperparameters"]["metagene_groups"] = filtered_groups
if "X" in dataset.obsm:
replicate_X = make_hdf5_compatible(dataset.obsm["X"])
dataset.obsm["X"] = replicate_X
if issparse(dataset.X):
dataset.X = dataset.X.toarray()
replicate_names = [dataset.name for dataset in datasets]
return datasets, replicate_names
def merge_anndata(datasets: Sequence[PopariDataset], ignore_raw_data: bool = False):
"""Merge multiple PopariDatasets into a single AnnData object (for
storage)."""
dataset_copies = []
for dataset in datasets:
replicate = dataset.name
replicate_string = f"{replicate}"
if ignore_raw_data:
X = csr_matrix(dataset.X.shape)
else:
X = dataset.X
dataset_copy = PopariDataset(dataset, dataset.name)
# Hacks to store adjacency matrices efficiently
if "adjacency_matrix" in dataset.obsp:
adjacency_matrix = make_hdf5_compatible(dataset.obsp["adjacency_matrix"])
dataset_copy.uns["adjacency_matrix"] = {replicate_string: adjacency_matrix}
elif "adjacency_matrix" in dataset.uns:
dataset_copy.uns["adjacency_matrix"] = {
replicate_string: make_hdf5_compatible(adjacency_matrix)
for replicate_string, adjacency_matrix in dataset.uns["adjacency_matrix"]
}
if "Sigma_x_inv" in dataset_copy.uns:
replicate_Sigma_x_inv = dataset_copy.uns["Sigma_x_inv"][replicate_string]
if replicate_string in dataset_copy.uns["Sigma_x_inv"]:
# Using a sentinel value - hopefully this can be fixed in the future!
if replicate_Sigma_x_inv is None:
replicate_Sigma_x_inv = -1
else:
replicate_Sigma_x_inv = make_hdf5_compatible(replicate_Sigma_x_inv)
dataset_copy.uns["Sigma_x_inv"] = {replicate_string: replicate_Sigma_x_inv}
if "Sigma_x_inv_bar" in dataset_copy.uns:
replicate_Sigma_x_inv_bar = dataset_copy.uns["Sigma_x_inv_bar"][replicate_string]
if replicate_string in dataset_copy.uns["Sigma_x_inv_bar"]:
# Using a sentinel value - hopefully this can be fixed in the future!
if replicate_Sigma_x_inv_bar is None:
replicate_Sigma_x_inv_bar = -1
else:
replicate_Sigma_x_inv_bar = make_hdf5_compatible(
replicate_Sigma_x_inv_bar,
)
dataset_copy.uns["Sigma_x_inv_bar"] = {
replicate_string: replicate_Sigma_x_inv_bar,
}
if "M" in dataset_copy.uns:
if replicate_string in dataset_copy.uns["M"]:
replicate_M = make_hdf5_compatible(
dataset_copy.uns["M"][replicate_string],
)
dataset_copy.uns["M"] = {replicate_string: replicate_M}
if "M_bar" in dataset_copy.uns:
if replicate_string in dataset_copy.uns["M_bar"]:
replicate_M_bar = make_hdf5_compatible(
dataset_copy.uns["M_bar"][replicate_string],
)
dataset_copy.uns["M_bar"] = {replicate_string: replicate_M_bar}
if "popari_hyperparameters" in dataset_copy.uns:
if "prior_x" in dataset_copy.uns["popari_hyperparameters"]:
prior_x = make_hdf5_compatible(
dataset_copy.uns["popari_hyperparameters"]["prior_x"],
)
dataset_copy.uns["popari_hyperparameters"]["prior_x"] = prior_x
if "dataset_name" not in dataset_copy.uns:
dataset_copy.uns["dataset_name"] = dataset.name
if "X" in dataset_copy.obsm:
replicate_X = make_hdf5_compatible(dataset_copy.obsm["X"])
dataset_copy.obsm["X"] = replicate_X
dataset_copies.append(dataset_copy)
if "adjacency_list" in dataset_copy.obs:
del dataset_copy.obsm["adjacency_list"]
merged_dataset = concatenate(dataset_copies, join="outer")
return merged_dataset
[docs]
def save_anndata(
filepath: Union[str, Path],
datasets: Sequence[PopariDataset],
ignore_raw_data: bool = False,
):
"""Save Popari state as AnnData object."""
merged_dataset = merge_anndata(datasets, ignore_raw_data=ignore_raw_data)
merged_dataset.write(filepath)
return merged_dataset
def make_hdf5_compatible(array: Union[torch.Tensor, np.ndarray]):
"""Convert tensors to Numpy array for compatibility with HDF5.
Args:
array: array to convert to Numpy.
"""
if type(array) == torch.Tensor:
cpu_tensor = array.detach().cpu()
if cpu_tensor.is_sparse:
cpu_tensor = cpu_tensor.to_dense()
return cpu_tensor.numpy()
return array