"""
The maui.utils model contains utility functions for multi-omics analysis
using maui.
"""
import numpy as np
import pandas as pd
from scipy import stats
from scipy import interp
from scipy import spatial
from scipy import cluster
from collections import Counter
from sklearn.svm import LinearSVC
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import cross_val_predict
[docs]def merge_factors(
z,
l=None,
threshold=0.17,
merge_fn=np.mean,
metric="correlation",
linkage="single",
plot_dendro=True,
plot_dendro_ax=None,
):
"""Merge latent factors in `z` which form clusters, as defined by hierarchical
clustering where a cluster is formed by cutting at a pre-set threshold, i.e.
merge factors if their distance to one-another is below `threshold`.
Parameters
----------
z: (n_samples, n_factors) DataFrame of latent factor values, output of a maui model
metric: Distance metric to merge factors by, one which is supported by
:func:`scipy.spatial.distance.pdist`
linkage: The kind of linkage to form hierarchical clustering, one which is
supported by :func:`scipy.cluster.hierarchy.linkage`
l: As an alternative to supplying `metric` and `linkage`, supply a
linkage matrix of your own choice, such as one computed by
:func:`scipy.cluster.hierarchy.linkage`
threshold: The distance threshold. latent factors with similarity below the
threshold will be merged to form single latent facator
merge_fn: A function which will be used to merge latent factors. The default
is :func:`numpy.mean`, i.e. the newly formed (merged) latent factor
will be the mean of the merged ones. Supply any function here which
has the same interface, i.e. takes a matrix and an axis.
plot_dendro: Boolean. If True, the function will plot a dendrogram showing
which latent factors are merged and the threshold.
"""
if l is None:
d = spatial.distance.pdist(z.T, metric)
l = cluster.hierarchy.linkage(d, linkage)
if plot_dendro:
try:
import matplotlib.pyplot as plt
except:
raise Exception("`plot_dendro` require matplotlib to be installed.")
if plot_dendro_ax is None:
fig, plot_dendro_ax = plt.subplots(figsize=(25, 10))
dendro = cluster.hierarchy.dendrogram(
l, leaf_font_size=18, color_threshold=threshold, ax=plot_dendro_ax
)
plot_dendro_ax.hlines(threshold, *plot_dendro_ax.get_xlim(), "red", "dashed")
cl_labels = cluster.hierarchy.cut_tree(l, height=threshold).T[0]
factors_to_delete = set()
new_factors = list()
for cl, ct in Counter(cl_labels).items():
if ct < 2:
continue
tomerge = (cl_labels == cl).nonzero()[0]
factors_to_delete.update(tomerge)
new_factors.append(
merge_fn(z.iloc[:, tomerge], axis=1).rename(
"_".join(f"{i}" for i in tomerge)
)
)
new_z = z.copy()
new_z = new_z.loc[:, [c for c in new_z.columns if c not in factors_to_delete]]
new_z = new_z.iloc[
:, [i for i in range(new_z.shape[1]) if i not in factors_to_delete]
]
new_z = pd.concat([new_z] + new_factors, axis=1)
return new_z
[docs]def filter_factors_by_r2(z, x, threshold=0.02):
"""Filter latent factors by the R^2 of a linear model predicting features x
from latent factors z.
Parameters
----------
z: (n_samples, n_factors) DataFrame of latent factor values, output of a maui model
x: (n_samples, n_features) DataFrame of concatenated multi-omics data
Returns
-------
z_filtered: (n_samples, n_factors) DataFrame of latent factor values,
with only those columns from the input `z` which have an R^2
above the threshold when using that column as an input
to a linear model predicting `x`.
"""
scores = list()
for i in range(z.shape[1]):
regressor = LinearRegression()
regressor.fit(z.values[:, i].reshape(-1, 1), scale(x).T)
scores.append(regressor.score(z.values[:, i].reshape(-1, 1), scale(x).T))
return z.iloc[:, pd.Series(scores)[pd.Series(scores) > threshold].index]
[docs]def map_factors_to_feaures_using_linear_models(z, x):
"""Get feature <-> latent factors mapping from linear models.
Runs one univariate (multi-output) linear model per latent factor in `z`,
predicting the values of the features `x`, in order to get weights
between inputs and outputs.
Parameters
----------
z: (n_samples, n_factors) DataFrame of latent factor values, output of a maui model
x: (n_samples, n_features) DataFrame of concatenated multi-omics data
Returns
-------
W: (n_features, n_latent_factors) DataFrame
w_{ij} is the coefficient associated with feature `i` in a linear model
predicting it from latent factor `j`.
"""
ws = list()
for i in range(z.shape[1]):
regressor = LinearRegression()
regressor.fit(z.values[:, i].reshape(-1, 1), scale(x).T)
ws.append(regressor.coef_)
W = pd.DataFrame(np.hstack(ws), index=x.index, columns=z.columns)
return W
[docs]def correlate_factors_and_features(z, concatenated_data, pval_threshold=0.001):
"""Compute pearson correlation of latent factors with input features.
Parameters
----------
z: (n_samples, n_factors) DataFrame of latent factor values, output of maui model
concatenated_data: (n_samples, n_features) DataFrame of concatenated multi-omics data
Returns
-------
feature_s: DataFrame (n_features, n_latent_factors)
Latent factors representation of the data X.
"""
feature_scores = list()
for j in range(z.shape[1]):
corrs = pd.DataFrame(
[
stats.pearsonr(concatenated_data.iloc[:, i], z.iloc[:, j])
for i in range(concatenated_data.shape[1])
],
index=concatenated_data.columns,
columns=["r", "pval"],
)
corrs.loc[corrs.pval > pval_threshold, "r"] = 0
feature_scores.append(corrs.r)
feature_s = pd.concat(feature_scores, axis=1)
feature_s.columns = [i for i in range(z.shape[1])]
return feature_s
[docs]def compute_roc(z, y, classifier=LinearSVC(C=0.001), cv_folds=10):
"""Compute the ROC (false positive rate, true positive rate) using cross-validation.
Parameters
----------
z: DataFrame (n_samples, n_latent_factors) of latent factor values
y: Series (n_samples,) of ground-truth labels to try to predict
classifier: Classifier object to use, default ``LinearSVC(C=.001)``
Returns
-------
roc_curves: dict, one key per class as well as "mean", each value is a dataframe
containing the tpr (true positive rate) and fpr (false positive rate)
defining that class (or the mean) ROC.
"""
class_names = sorted(y.unique())
z_to_use = z.loc[y.index]
y_true_bin = label_binarize(y, classes=class_names)
y_proba = cross_val_predict(
classifier, z_to_use, y, cv=cv_folds, method="decision_function"
)
# Compute ROC curve and ROC area for each class
roc_curves = dict()
for i, cl_name in enumerate(class_names):
fpr, tpr, thresholds = roc_curve(y_true_bin[:, i], y_proba[:, i])
roc_curves[cl_name] = pd.concat(
[pd.Series(fpr, name="FPR"), pd.Series(tpr, name="TPR")], axis=1
)
mean_fpr = np.unique(
np.concatenate([roc_curves[cl_name].FPR for cl_name in class_names])
)
# Then interpolate all ROC curves at this points
mean_tpr = np.zeros_like(mean_fpr)
for cl_name in class_names:
mean_tpr += interp(mean_fpr, roc_curves[cl_name].FPR, roc_curves[cl_name].TPR)
# Finally average it
mean_tpr /= len(class_names)
roc_curves["mean"] = pd.concat(
[pd.Series(mean_fpr, name="FPR"), pd.Series(mean_tpr, name="TPR")], axis=1
)
return roc_curves
[docs]def estimate_kaplan_meier(
y, survival, duration_column="duration", observed_column="observed"
):
"""Estimate survival curves for groups defined in y based on survival data in ``survival``
Parameters
----------
y: pd.Series, groups (clusters, subtypes). the index is
the sample names
survival: pd.DataFrame with the same index as y, with columns for
the duration (survival time for each patient) and whether
or not the death was observed. If the death was not
observed (censored), the duration is the time of the last
followup.
duration_column: the name of the column in ``survival`` with the duration
observed_column: the name of the column in ``survival`` with True/False values
for whether death was observed or not
Returns
-------
km_estimates: pd.DataFrame, index is the timeline, columns are survival
functions (estimated by Kaplan-Meier) for each class, as
defined in ``y``.
"""
try:
import lifelines
except ImportError:
raise ImportError(
"The module ``lifelines`` was not found. It is required for this functionality. You may install it using `pip install lifelines`."
)
kmf = lifelines.KaplanMeierFitter()
sfs = dict()
for cl in y.unique():
ixs = list(set(y[y == cl].index) & set(survival.index))
kmf.fit(
survival.loc[ixs][duration_column],
survival.loc[ixs][observed_column],
label=cl,
)
sfs[cl] = kmf.survival_function_
return pd.concat([sfs[k] for k in sorted(y.unique())], axis=1).interpolate()
[docs]def multivariate_logrank_test(
y, survival, duration_column="duration", observed_column="observed"
):
"""Compute the multivariate log-rank test for differential survival
among the groups defined by ``y`` in the survival data in ``survival``,
under the null-hypothesis that all groups have the same survival function
(i.e. test whether at least one group has different survival rates)
Parameters
----------
y: pd.Series, groups (clusters, subtypes). the index is
the sample names
survival: pd.DataFrame with the same index as y, with columns for
the duration (survival time for each patient) and whether
or not the death was observed. If the death was not
observed (sensored), the duration is the time of the last
followup.
duration_column: the name of the column in ``survival`` with the duration
observed_column: the name of the column in ``survival`` with True/False values
for whether death was observed or not
Returns
-------
test_statistic: the test statistic (chi-square)
p_value: the associated p_value
"""
try:
import lifelines
except ImportError:
raise ImportError(
"The module ``lifelines`` was not found. It is required for this functionality. You may install it using `pip install lifelines`."
)
ixs = list(set(y.index) & set(survival.index))
mlr = lifelines.statistics.multivariate_logrank_test(
survival.loc[ixs][duration_column],
y.loc[ixs],
survival.loc[ixs][observed_column],
)
return mlr.test_statistic, mlr.p_value
[docs]def select_clinical_factors(
z,
survival,
duration_column="duration",
observed_column="observed",
alpha=0.05,
cox_penalizer=0,
):
"""Select latent factors which are predictive of survival. This is
accomplished by fitting a Cox Proportional Hazards (CPH) model to each
latent factor, while controlling for known covariates, and only keeping
those latent factors whose coefficient in the CPH is nonzero (adjusted
p-value < alpha).
Parameters
----------
survival: pd.DataFrame of survival information and relevant covariates
(such as sex, age at diagnosis, or tumor stage)
duration_column: the name of the column in ``survival`` containing the
duration (time between diagnosis and death or last followup)
observed_column: the name of the column in ``survival`` containing
indicating whether time of death is known
alpha: threshold for p-value of CPH coefficients to call a latent
factor clinically relevant (p < alpha)
cox_penalizer: penalty coefficient in Cox PH solver (see ``lifelines.CoxPHFitter``)
Returns
-------
z_clinical: pd.DataFrame, subset of the latent factors which have been
determined to have clinical value (are individually predictive
of survival, controlling for covariates)
"""
cox_coefficients = _cph_coefs(
z, survival, duration_column, observed_column, penalizer=cox_penalizer
)
signif_cox_coefs = cox_coefficients.T[cox_coefficients.T.p < alpha]
return z.loc[:, signif_cox_coefs.index]
def _cph_coefs(z, survival, duration_column, observed_column, penalizer=0):
"""Compute one CPH model for each latent factor (column) in z.
Return summaries (beta values, p values, confidence intervals)
"""
try:
import lifelines
except ImportError:
raise ImportError(
"The module ``lifelines`` was not found. It is required for this functionality. You may install it using `pip install lifelines`."
)
return pd.concat(
[
lifelines.CoxPHFitter(penalizer=penalizer)
.fit(
survival.assign(LF=z.loc[:, i]).dropna(),
duration_column,
observed_column,
)
.summary.loc["LF"]
.rename(i)
for i in z.columns
],
axis=1,
)
[docs]def compute_harrells_c(
z,
survival,
duration_column="duration",
observed_column="observed",
cox_penalties=None,
cv_folds=5,
):
"""Compute's Harrell's c-Index for a Cox Proportional Hazards regression modeling
survival by the latent factors in z.
Parameters
----------
z: pd.DataFrame (n_samples, n_latent factors)
survival: pd.DataFrame of survival information and relevant covariates
(such as sex, age at diagnosis, or tumor stage)
duration_column: the name of the column in ``survival`` containing the
duration (time between diagnosis and death or last followup)
observed_column: the name of the column in ``survival`` containing
indicating whether time of death is known
cox_penalties: penalty coefficient in Cox PH solver (see ``lifelines.CoxPHFitter``)
to try. Returns the best c given by the different penalties
(by cross-validation). Defualt: [0.1, 1, 10, 100, 1000, 10000]
cv_folds: number of cross-validation folds to compute C
Returns
-------
cs: array, Harrell's c-Index, an auc-like metric for survival prediction accuracy.
one value per cv_fold
"""
if cox_penalties is None:
cox_penalties = [0.1, 1, 10, 100, 1000, 10000]
cvcs = [
_cv_coxph_c(z, survival, p, duration_column, observed_column, cv_folds)
for p in cox_penalties
]
return cvcs[np.argmax([np.median(e) for e in cvcs])]
def _cv_coxph_c(
z,
survival,
penalty,
duration_column="duration",
observed_column="observed",
cv_folds=5,
):
try:
import lifelines
import lifelines.utils
except ImportError:
raise ImportError(
"The module ``lifelines`` was not found. It is required for this functionality. You may install it using `pip install lifelines`."
)
cph = lifelines.CoxPHFitter(penalizer=penalty)
survdf = pd.concat([survival, z], axis=1, sort=False).dropna()
scores = lifelines.utils.k_fold_cross_validation(
cph, survdf, duration_column, event_col=observed_column, k=cv_folds
)
return scores
[docs]def scale(df):
"""Scale and center data
Parameters
----------
df: pd.DataFrame (n_features, n_samples) non-scaled data
Returns
-------
scaled: pd.DataFrame (n_features, n_samples) scaled data
"""
df_scaled = StandardScaler().fit_transform(df.T)
return pd.DataFrame(df_scaled, columns=df.index, index=df.columns).T