Source code for cvanmf.data.utils

"""Functions for dealing with example case study and synthetic data."""
import itertools
import logging
import math
import random
import re
from importlib.resources import files
from pathlib import Path
from typing import NamedTuple, Optional, Dict, Any, List, Union, Set, Callable

import numpy as np
import numpy.random
import pandas as pd

logger: logging.Logger = logging.getLogger(__name__)

[docs] class ExampleData(NamedTuple): """Example data, including citations, description, and other metadata.""" data: pd.DataFrame """Table containing the data.""" rank: Optional[Union[int, List[int]]] """Correct rank for this data, or ranks if more than one can be considered correct.""" row_metadata: Optional[pd.DataFrame] """Metadata associated to each row.""" col_metadata: Optional[pd.DataFrame] """Metadata associated to each column.""" other_metadata: Optional[Dict[str, Any]] """Any other metadata related to this data (data dictionaries etc.)""" description: str """Longform description of the data.""" name: str """Descriptive name.""" short_name: str """Short name.""" doi: Optional[str] """DOI for data or original paper.""" citation: Optional[str] """Preferred citation if you use this data.""" def _repr_html_(self) -> str: """HTML output summarising dataset for display in notebooks etc.""" html: str = ( "<table style='font-family: monospace'>" "<tr><td colspan='2' style='font-weight: bold'>" f"{self.name}</td></tr>" f"<tr><td style='font-size: smaller' colspan='2'>cvanmf example " f"data</td></tr>\n" f"<tr><td>shape</td><td>{str(self.data.shape)}</td></tr>\n" f"<tr><td>rank</td><td>{str(self.rank)}</td></tr>\n" f"<tr><td>description</td><td>" f"{_shorten_docstring(self.description)}</td></tr>\n" ) if self.doi is not None: html += f"<tr><td>doi</td><td>{self.doi}</td></tr>\n" if self.citation is not None: html += f"<tr><td>citation</td><td>{self.citation}</td></tr>\n" html += "</table>" return html def __repr__(self) -> str: repr: str = ( f"ExampleData[{self.short_name}, shape={self.data.shape}, " f"rank={self.rank}]" ) return repr
[docs] def leukemia() -> ExampleData: """Gene expression data for ALL and AML B- and T-cell type leukemia. This data was analysed in Brunet et al (2004), and often used as a standard dataset for biological applications of NMF since. It has two broad categories (ALL/AML), but AML can be refined into two subtypes (B/T). B-cell AML appears to contain a further stable sub-grouping, so we have indicated the true rank of this data as 3 or 4. """ data: pd.DataFrame = pd.read_csv( str(files("cvanmf.data").joinpath( Path("ALL_AML", "ALL_AML_counts.tsv.gz") )), sep="\t", index_col=0 ) col_md: pd.DataFrame = pd.DataFrame( data.columns.map(__label_all_leuk), index=data.columns ) return ExampleData( data=data, row_metadata=pd.read_csv( str(files("cvanmf.data").joinpath( Path("ALL_AML", "ALL_AML_feature_md.tsv.gz") )), sep="\t", index_col=0 ), col_metadata=col_md, other_metadata=None, rank=4, description=_shorten_docstring(leukemia.__doc__), name="Leukemia (ALL/AML) Gene Expression", doi="doi:10.1073/pnas.030853110", citation=("Brunet, J-P., Tamayo, P., Golub, T. R. & Mesirov, " "J. P. Metagenes and molecular pattern discovery using " "matrix factorization. Proc. Natl. Acad. Sci. U.S.A. 101, " "4164–4169 (2004)."), short_name="ALL_AML" )
[docs] def swimmer() -> ExampleData: """Stick figure images of a swimmer with 4 limbs in 4 positions. Designed by Donoho et al. to be partially representable by NMF, each image has a line torso, accompanied by four straight limbs which can be in one of four positions (0, 45, 90, 135 and 180 degrees from torso). With the exception of the torso, each limb position should be representable by NMF decomposition. As such, the true rank of this data is 17 (4*4 limbs, plus torso), but with conventional NMF as implemented here the torso cannot be learnt, only the 16 limbs. """ return ExampleData( data=pd.read_csv( str(files("cvanmf.data").joinpath( Path("swimmer", "swimmer.tsv.gz") )), sep="\t", index_col=0 ), rank=17, row_metadata=None, col_metadata=None, other_metadata=None, description=_shorten_docstring(swimmer.__doc__), name="Swimmer Dataset", doi="", citation=("Donoho, D. & Stodden, V. When Does Non-Negative Matrix " "Factorization Give a Correct Decomposition into Parts? in " "Advances in Neural Information Processing Systems (eds. " "Thrun, S., Saul, L. & Schölkopf, B.) vol. 16 (MIT Press, " "2003)."), short_name="swimmer" )
[docs] def lung_cancer_cells() -> ExampleData: """Relative cell-compositions from non-small cell lung cancer studies. Gives the number of cells of different types in lung tissue samples from a non-small cell lung cancer atlas, which was compiled from 29 studies and includes 556 samples, from 318 individuals (86 of which are healthy controls). The data was uploaded to cellxgene using their standard ontologies, which is the source we have taken the data from. Metadata provided here is a mixture of metadata from cellxgene, and some from the original paper. We have selected out only the tissues samples labelled as "lung". In total, this gives 224 samples, and 33 cell types. The data here is total-sum-scaled, i.e. each sample sums to 1. """ return ExampleData( data=pd.read_csv( str(files("cvanmf.data").joinpath( Path("NSCLC", "nsclc.lung.composition.tsv.gz") )), sep="\t", index_col=0 ), rank=[], row_metadata=None, col_metadata=pd.read_csv( str(files("cvanmf.data").joinpath( Path("NSCLC", "nsclc.lung.metadata.tsv.gz") )), sep="\t", index_col=0 ), other_metadata=None, description=_shorten_docstring(lung_cancer_cells.__doc__), name="Non-small cell lung cancer", doi="https://doi.org/10.1016/j.ccell.2022.10.008", citation=("Salcher, S. et al. High-resolution single-cell atlas " "reveals diversity and plasticity of tissue-resident " "neutrophils in non-small cell lung cancer. Cancer Cell 40, " "1503-1520.e8 (2022). \n" "Program, C. S.-C. B. et al. CZ CELL×GENE Discover: A " "single-cell data platform for scalable exploration, " "analysis and modeling of aggregated data. " "2023.10.30.563174 Preprint at " "https://doi.org/10.1101/2023.10.30.563174 (2023)."), short_name="NSCLC" )
[docs] def synthetic_blocks(m: int = 100, n: int = 100, overlap: float = 0.25, k: int = 3, normal_noise_params: Optional[Dict] = None, scale_lognormal_params: Optional[Dict] = None ) -> ExampleData: """Generate simple synthetic data. Create an m x n matrix with blocks along the diagonal which overlap to an extent defined by overlap. :param m: Number of rows in matrix :param n: Number of columns in matrix :param overlap: Proportion of block length to participate in overlap :param k: Number of signatures :param normal_noise_params: Parameters to pass to `numpy.random.normal` to apply noise to entries. Leave as none to use default parameters. :param scale_lognormal_params: Parameters to pass to `numpy.random.lognormal` to scale each feature (give some features higher values than others). If set to true, will use default parameters for distribution. Leave as None to skip feature scaling. """ # Matrix dimensions i, j = m, n # Noise parameters do_scale: bool = scale_lognormal_params is not None scale_lognormal_params: Dict = ( dict() if scale_lognormal_params is None or not isinstance(scale_lognormal_params, dict) else scale_lognormal_params ) normal_noise_params = ( dict() if normal_noise_params is None else normal_noise_params ) # Width of blocks without overlap base_h, tail_h = divmod(i, k) base_w, tail_w = divmod(j, k) # Overlap proportion - proportion of block's base dimension to extend # block by overlap_proportion: float = overlap overlap_h: int = math.ceil(base_h * overlap_proportion) overlap_w: int = math.ceil(base_w * overlap_proportion) # Make a randomly filled matrix, multiply by mask matrix which has 0 # or 1 then apply noise (so 0s also have some noise) mask: np.ndarray = np.zeros(shape=(i, j)) for ki in range(k): h: int = base_h + tail_h if k == ki else base_h w: int = base_w + tail_w if k == ki else base_w h_start: int = max(0, ki * base_h) h_end: int = min(i, h_start + base_h + overlap_h) w_start = max(0, ki * base_w) w_end: int = min(j, w_start + base_w + overlap_w) mask[h_start:h_end, w_start:w_end] = 1.0 block_mat: np.ndarray = np.random.uniform( low=0.0, high=1.0, size=(i, j)) * mask # Apply noise from normal distribution normal_noise_params = dict(loc=0.0, scale=0.1) | normal_noise_params block_mat = block_mat + np.absolute( np.random.normal(size=(i, j), **normal_noise_params)) # Scale features if do_scale: fscale: np.ndarray = np.random.lognormal( **scale_lognormal_params, size=m ) block_mat = fscale.reshape(-1, 1) * block_mat # Convert to DataFrame and add some proper naming for rows/cols df: pd.DataFrame = pd.DataFrame( block_mat, index=[f'feat_{i_lab}' for i_lab in range(i)], columns=[f'samp_{j_lab}' for j_lab in range(j)]) # TSS scale df = df / df.sum() return ExampleData( data=df, rank=k, row_metadata=None, col_metadata=None, other_metadata=dict( overlap=overlap, scale_lognormal_params=scale_lognormal_params, normal_noise_params=normal_noise_params ), doi="", citation="", description="Synthetic data with blocks along the diagonal which " "overlap to a defined extent.", name="Synthetic Overlapping Blocks", short_name="SYNTH_BLOCKS", )
[docs] def synthetic_dense(m: int = 100, n: int = 100, h_sparsity: float = 0.0, shared_features: float = 0.25, k: int = 3, normal_noise_params: Optional[Dict] = None, scale_lognormal_params: Optional[Dict] = None, keep_mats: bool = False ) -> ExampleData: """Generate dense synthetic data. Dense data is generated by making a :math:`W` matrix with :math:`k` signatures, and multiplying this with a randomly filled :math:`H` matrix. Optionally, a proportion of the :math:`H` matrices can be randomly set to 0 using `h_sparsity`. The extent to which features are shared between signatures is defined via `shared_features`. Each signature is initially assigned an even proportion of the :math:`m` features (remainder spread as evenly as possible between them), so there are no shared features. Then if :math:`|k|` is the number of features assigned to a signature, each signature is assigned :math:`|k|*shared\_features` randomly selected from the remaining features. This means the overlapping structure is potentially quite different from that of :func:`synthetic_blocks`. :param m: Number of features. :param n: Number of samples. :param h_sparsity: Proportion of :math:`H` matrix to randomly set to 0 :param shared_features: Amount of shared features to add to a signature, as a proportion of it's base size. :param k: Number of signatures. :param normal_noise_params: Parameters passed to :func:`numpy.random.normal` when adding noise to data. :param scale_lognormal_params: Parameters passed to :func:`numpy.random.lognormal` when selecting weights for features in a signature. If this is None, a uniform distribution between 0 and 1 is used instead. :param keep_mats: Return the :math:`H` and :math:`W` matrices used to generate the data in the :attr:`ExampleData.other_metadata`. """ # Create W k_scale: np.array = np.array([1] * k) k_feats: np.array = np.floor((k_scale / k_scale.sum()) * m) rem: int = m - int(k_feats.sum()) # Got to be a nicer way to do this for i in range(rem): k_feats[i] = k_feats[i] + 1 k_feat_names: List[Set[str]] = [ set(f'sig{i}_feat{j}' for j in range(int(k_feats[i]))) for i in range(k) ] # Add shared features all_feats: Set[str] = set(itertools.chain.from_iterable(k_feat_names)) for k_names in k_feat_names: other: List[str] = random.choices( list(all_feats.difference(k_names)), k=math.floor(len(k_names) * shared_features) ) k_names.update(other) w: np.ndarray if scale_lognormal_params: w = np.random.lognormal(size=(m, k), **scale_lognormal_params) else: w = np.random.uniform(size=(m, k), low=0.0, high=1.0) mask: pd.DataFrame = pd.DataFrame( np.zeros(shape=(m, k)), index=list(all_feats), columns=[i for i in range(k)] ).sort_index() mask = mask.apply(lambda x: x.index.isin(k_feat_names[x.name])) w = w * mask.values # Create H h: np.ndarray = np.random.uniform(low=0.0, high=1.0, size=n*k) h[numpy.random.choice( list(range(n*k)), size=math.floor(n*k*h_sparsity), replace=False )] = 0.0 h = h.reshape(k, n) wh: pd.DataFrame = pd.DataFrame( w.dot(h), index=mask.index, columns=[f's{i}' for i in range(n)] ) normal_noise_params = (dict() if normal_noise_params is None else normal_noise_params) normal_noise_params = dict(loc=0.0, scale=0.1) | normal_noise_params wh = (wh + numpy.random.normal(size=wh.shape, **normal_noise_params)).clip(lower=0) other_metadata: Dict[str, Any] = dict( h_sparsity=h_sparsity, shared_features=shared_features, scale_lognormal_params=scale_lognormal_params, normal_noise_params=normal_noise_params ) if keep_mats: other_metadata['h'] = h other_metadata['w'] = w return ExampleData( data=wh, col_metadata=None, row_metadata=mask, rank=k, other_metadata=other_metadata, name="Synthetic Dense Data", short_name="SYNTH_DENSE", doi="", citation="", description="" )
[docs] def example_abundance() -> pd.DataFrame: """Genus level Non-Western cohort bacterial microbiome abundance. From Frioux et al. (2023, https://doi.org/10.1016/j.chom.2023.05.024). :return: Genus level relative abundance table using GTDB r207 taxonomy. :rtype: pd.DataFrame """ return pd.read_csv( str(files("cvanmf.data").joinpath("NW_ABUNDANCE.tsv")), sep="\t", index_col=0 )
def _shorten_docstring(s: str) -> str: """Strip newlines and multiple spaces from a docstring to make it a more readable format when returned.""" # Want to keep any double newlines, but remove any singles # This is a very messy way of doing it but didn't have time to work out a # regex s = ( s.replace("\n\n", ":lb:") .replace("\n", "") .replace(":lb:", "\n\n") ) s = re.sub(r"[\t ]+", " ", s) return s def __label_all_leuk(smpl: str) -> str: if smpl[:3] == "AML": return "AML" if "B-cell" in smpl: return "ALL B-Cell" if "T-cell" in smpl: return "ALL T-Cell" raise ValueError("Leukemia sample with incorrect label encountered")