Source code for pointannotator.annotate_samples

import numpy as np
import pandas as pd

from scipy.stats.mstats import rankdata
from scipy.stats import hypergeom, binom

from pointannotator.utils import FDR

SCORING_EXP_RATIO = "scoring_exp_ratio"
SCORING_MARKERS_SUM = "scoring_sum_of_expressed_markers"
SCORING_LOG_FDR = "scoring_log_fdr"
SCORING_LOG_PVALUE = "scoring_log_p_value"

PFUN_BINOMIAL = "binom"
PFUN_HYPERGEOMETRIC = "hypergeom"


class ScoringNotImplemented(Exception):
    pass


[docs]class AnnotateSamples: """ AnnotateSamples is class used for the annotation of data items with the annotations (e.g. Cell Types). We used the Mann-Whitney U test for selecting important values and the Hyper-geometric for assigning the annotations. Example on biological data where we assign cell types to cells: >>> gene_expressions_df = pd.read_csv("data/DC_expMatrix_DCnMono.csv.gz", ... compression='gzip') >>> marker_genes_df = pd.read_csv("data/panglao_gene_markers.csv.gz", ... compression="gzip") >>> # rename genes column and filter human markers >>> marker_genes_df = marker_genes_df[ ... marker_genes_df["Organism"] == "Human"] >>> >>> annotations = AnnotateSamples.annotate_samples( ... gene_expressions_df, marker_genes_df, num_all_attributes=60000, ... attributes_col="Cell Type", annotations_col="Name", ... p_threshold=0.05) Example for full manual annotation. Here annotation is split in three phases. We assume that data are already loaded. >>> z = AnnotateSamples.mann_whitney_test(gene_expressions_df) >>> scores, p_val = AnnotateSamples.assign_annotations( ... z, marker_genes_df, gene_expressions_df, num_all_attributes=60000, ... attributes_col="Cell Type", annotations_col="Name") >>> scores = AnnotateSamples.filter_annotations( ... scores, p_val, p_threshold=0.05) """
[docs] @staticmethod def log_cpm(data): """ Function normalizes data with the log CPM methods. Parameters ---------- data : pd.DataFrame Non-normalized data table. Returns ------- pd.DataFrame Normalized data table. """ norm_data = np.log(1 + AnnotateSamples._cpm(data)) return norm_data
@staticmethod def _cpm(data): """ This function normalizes data with CPM methods. Parameters ---------- data : pd.DataFrame Tabular data. """ return data.divide(data.sum(axis=1), axis=0) * 1e6 @staticmethod def _ranks(data): """ This function computes ranks for data in the table along axis=0. Parameters ---------- data : np.ndarray Array of data to be ranked Returns ------- np.ndarray Table of data ranks """ x_len = data.shape[0] x_mask = data.sum(axis=0) > 0 # create a matrix of ranges - init with average rank # for columns without nonzero expressions data_ge_ranked = np.ones(data.shape) * (1 + data.shape[0]) / 2 # compute ranks only for nonzero columns for i in np.where(x_mask)[0]: mask = data[:, i] > 0 col = np.ones(x_len) * (1 + (x_len - mask.sum())) / 2 col[mask] = rankdata(data[mask, i]) + (x_len - mask.sum()) data_ge_ranked[:, i] = col return data_ge_ranked
[docs] @staticmethod def mann_whitney_test(data): """ Compute z values with the Mann-Whitney U test. Parameters ---------- data : pd.DataFrame Tabular data. Returns ------- pd.DataFrame Z-value for each item. """ if not isinstance(data, pd.DataFrame): raise TypeError("Data argument must be pandas DataFrame") if len(data) <= 1: return None # rank data data_ge_ranked = AnnotateSamples._ranks(data.values) # compute U, mu, sigma n = data_ge_ranked.shape[0] n2 = n - 1 u = data_ge_ranked - 1 mu = n2 / 2 sigma = np.zeros(data_ge_ranked.shape[1]) for i in range(data_ge_ranked.shape[1]): _, counts = np.unique(data_ge_ranked[:, i], return_counts=True) sigma[i] = np.sqrt( 1 * n2 / 12 * ((n + 1) - np.sum((counts ** 3 - counts)) / (n * (n - 1)))) # compute z z = (u - mu) / (sigma + 1e-16) # pack z values to pandas dataframe z_dataframe = pd.DataFrame(z, columns=data.columns, index=data.index) return z_dataframe
@staticmethod def _reorder_matrix(matrix, attributes_order): """ Function reorder the columns of the array to fit the attributes_order. Parameters ---------- matrix : pd.DataFrame Tabular data that needs to be reordered attributes_order : list Desired attributes order Returns ------ np.ndarray Reordered array. """ current_order = np.array(matrix.columns.values) values = matrix.values attributes_order = np.array(attributes_order) xsorted = np.argsort(attributes_order) ypos = np.searchsorted(attributes_order[xsorted], current_order) indices = xsorted[ypos] # index which tell where should be the column reordered_values = np.zeros((values.shape[0], len(attributes_order))) for i_curr, i_dest in enumerate(indices): reordered_values[:, i_dest] = values[:, i_curr] return reordered_values @staticmethod def _select_attributes(z, attributes_order, z_threshold=1): """ The function selects "over"-expressed attributes for items based on z values. It also reorders the matrix columns. Parameters ---------- z : pd.Dataframe Tabular data z values for each item in the table attributes_order : list Desired genes order z_threshold : float The threshold for selecting the attribute. For each item the attributes with z-value above this value are selected. Returns ------- np.ndarray Reordered and thresholded z-values """ reordered_z = AnnotateSamples._reorder_matrix(z, attributes_order) return reordered_z > z_threshold @staticmethod def _group_attributes_annotations(available_annotations, attributes_order, attributes_col, annotations_col): """ Function transforms annotations to matrix with the size (attributes_order x annotations). """ types = sorted( list(set(available_annotations.loc[:, annotations_col].values))) attributes_annotations = np.zeros((len(attributes_order), len(types))) for _, m in available_annotations.iterrows(): g = m[attributes_col] m = m[annotations_col] if g is not None: attributes_annotations[ attributes_order.index(g), types.index(m)] = 1 return attributes_annotations, types @staticmethod def _score(scoring_type, p_values, fdrs, data, M, x, m, attributes_order): if scoring_type == SCORING_MARKERS_SUM: return AnnotateSamples._reorder_matrix( data, attributes_order).dot(M) elif scoring_type == SCORING_EXP_RATIO: return x / m elif scoring_type == SCORING_LOG_FDR: return -np.log(fdrs) elif scoring_type == SCORING_LOG_PVALUE: return -np.log(p_values) else: raise ScoringNotImplemented()
[docs] @staticmethod def assign_annotations(z_values, available_annotations, data, num_all_attributes=None, attributes_col="Attributes", annotations_col="Annotations", z_threshold=1, p_value_fun=PFUN_BINOMIAL, scoring=SCORING_EXP_RATIO): """ The function gets a set of attributes (e.g. genes) for each item and attributes for each annotation. It returns the annotations significant for each item. Parameters ---------- z_values : pd.DataFrame DataFrame that shows z values for each item. Rows are data items and columns are attributes. available_annotations : pd.DataFrame Available annotations (e.g. cell types), this data frame has two columns: attributes column name is set by *attributes_col* variable (default: Attributes) and annotations is set by *annotations_col* variable (default: Annotations). data : pd.DataFrame Tabular input (raw) data - we need that to compute scores. num_all_attributes : int The number of all attributes for a case (also those that do not appear in the data). In the case of genes, it is the number of all genes that an organism has. It is recommended to set your value, in cases when the value is not set the number of attributes in z_values table will be used. attributes_col : str The name of an attributes column in available_annotations (default: Attributes"). annotations_col : str The name of an annotations column in available_annotations (default: Annotations"). z_threshold : float The threshold for selecting the attribute. For each item, the attributes with z-value above this value are selected. p_value_fun : str, optional (defaults: TEST_BINOMIAL) A function that calculates the p-value. It can be either PFUN_BINOMIAL that uses binom.sf or PFUN_HYPERGEOMETRIC that uses hypergeom.sf. scoring : str, optional (default=SCORING_EXP_RATIO) Type of scoring Returns ------- pd.DataFrame Annotation probabilities pd.DataFrame Annotation FDRS. """ if not isinstance(z_values, pd.DataFrame): raise TypeError("z_values argument must be pandas DataFrame") if not isinstance(available_annotations, pd.DataFrame): raise TypeError("available_annotations argument must be pandas " "DataFrame") if not isinstance(data, pd.DataFrame): raise TypeError("data argument must be pandas DataFrame") if not available_annotations[attributes_col].dtype == object: raise TypeError("The type of attributes_col column must be " "string/object") if not available_annotations[annotations_col].dtype == object: raise TypeError("The type of annotations_col column must be " "string/object") # select function for p-value if p_value_fun == PFUN_HYPERGEOMETRIC: p_fun = lambda x, N, m, k: hypergeom.sf(x, N, m, k) else: p_fun = lambda x, N, m, k: binom.sf(x, k, m / N) # make an attributes order attributes = z_values.columns.values attributes_annotations = available_annotations[attributes_col].values attributes_order = list(set(attributes) | set(attributes_annotations)) # get attributes-annotations matrix M M, annotations = AnnotateSamples._group_attributes_annotations( available_annotations, attributes_order, attributes_col, annotations_col) Z = AnnotateSamples._select_attributes( z_values, attributes_order, z_threshold) # if user do not set own num_all_attributes value it is set to number # of attributes in the table if num_all_attributes is None: num_all_attributes = len(z_values.columns) x = Z.dot(M) k = np.repeat(Z.sum(axis=1).reshape(-1, 1), x.shape[1], axis=1) m = np.repeat(M.sum(axis=0).reshape(1, -1), x.shape[0], axis=0) p_values = p_fun(x - 1, num_all_attributes, m, k) fdrs = np.zeros(p_values.shape) for i, row in enumerate(p_values): fdrs[i] = np.array(FDR(row.tolist())) scores = AnnotateSamples._score( scoring, p_values, fdrs, data, M, x, m, attributes_order) scores_table = pd.DataFrame( scores, columns=annotations, index=data.index) fdrs_table = pd.DataFrame(fdrs, columns=annotations, index=data.index) return scores_table, fdrs_table
[docs] @staticmethod def filter_annotations(scores, p_values, return_nonzero_annotations=True, p_threshold=0.05): """ This function filters the probabilities on places that do not reach the threshold for p-value and filter zero columns if return_nonzero_annotations is True. Parameters ---------- scores : pd.DataFrame Scores for each annotation for data items p_values : pd.DataFrame p-value scores for annotations for data items return_nonzero_annotations : bool The flag that enables filtering the non-zero columns. p_threshold : float A threshold for accepting the annotations. Annotations that have FDR value bellow this threshold are used. Returns ------- pd.Dataframe Filtered scores for each annotation for data items """ if not isinstance(scores, pd.DataFrame): raise TypeError("scores argument must be pandas DataFrame") if not isinstance(p_values, pd.DataFrame): raise TypeError("p_values argument must be pandas DataFrame") scores = scores.copy() # do not want to edit values inplace scores[p_values > p_threshold] = np.nan if return_nonzero_annotations: col_not_empty = ~np.isnan(scores).all(axis=0) scores = scores.loc[:, col_not_empty] return scores
[docs] @staticmethod def annotate_samples(data, available_annotations, num_all_attributes=None, attributes_col="Attributes", annotations_col="Annotations", return_nonzero_annotations=True, p_threshold=0.05, p_value_fun=PFUN_BINOMIAL, z_threshold=1, scoring=SCORING_EXP_RATIO, normalize=False): """ Function marks the data with annotations that are provided. This function implements the complete functionality. First select attributes for each item with z_test, then annotate data and filter data. Parameters ---------- data : pd.DataFrame Tabular data available_annotations : pd.DataFrame Available annotations (e.g. cell types), this data frame has two columns: attributes column name is set by *attributes_col* variable (default: Attributes) and annotations is set by *annotations_col* variable (default: Annotations). num_all_attributes : int The number of all attributes for a case (also those that do not appear in the data). In the case of genes, it is the number of all genes that an organism has. It is recommended to set your value, in cases when the value is not set the number of attributes in z_values table will be used. return_nonzero_annotations : bool, optional (default=True) If true return scores for only annotations present in at least one sample. attributes_col : str The name of an attributes column in available_annotations (default: Attributes"). annotations_col : str The name of an annotations column in available_annotations (default: Annotations"). p_threshold : float A threshold for accepting the annotations. Annotations that has FDR value bellow this threshold are used. p_value_fun : str, optional (defaults: TEST_BINOMIAL) A function that calculates p-value. It can be either PFUN_BINOMIAL that uses statistics.Binomial().p_value or PFUN_HYPERGEOMETRIC that uses hypergeom.sf. z_threshold : float The threshold for selecting the attribute. For each item the attributes with z-value above this value are selected. scoring : str, optional (default = SCORING_EXP_RATIO) Type of scoring normalize : bool, optional (default = False) This variable tells whether to normalize data or not. Returns ------- pd.DataFrame Scores table - each line of the table has scores that tell how probable is that items have specific annotations. """ if not isinstance(data, pd.DataFrame): raise TypeError("data argument must be pandas DataFrame") if not isinstance(available_annotations, pd.DataFrame): raise TypeError("available_annotations argument must be pandas " "DataFrame") if len(data) <= 1: raise ValueError("At least two data items are required for " "method to work.") if normalize: data = AnnotateSamples.log_cpm(data) z = AnnotateSamples.mann_whitney_test( data) annotation_probs, annotation_fdrs = AnnotateSamples.assign_annotations( z, available_annotations, data, num_all_attributes=num_all_attributes, attributes_col=attributes_col, annotations_col=annotations_col, z_threshold=z_threshold, p_value_fun=p_value_fun, scoring=scoring) annotation_probs = AnnotateSamples.filter_annotations( annotation_probs, annotation_fdrs, return_nonzero_annotations, p_threshold ) return annotation_probs