Module dropkick.util
Utility functions for scoring dropkick logistic regression model
Expand source code
"""
Utility functions for scoring dropkick logistic regression model
"""
import math
import warnings
import scanpy as sc
import numpy as np
from joblib import Parallel, delayed
from scipy.interpolate import interp1d
from sklearn.base import clone
from sklearn.exceptions import UndefinedMetricWarning
from .scorer import check_scoring
def _score_lambda_path(
est, adata, y, n_hvgs, sample_weight, relative_penalties, scoring, n_jobs, verbose
):
"""
Score each model found by glmnet using cross validation.
Parameters
----------
est : estimator
The previously fitted estimator.
adata : AnnData object with scaled input features in .X
y : array, shape (n_samples,)
Target values.
n_hvgs : int, number of highly-variable genes to select
in each training set
sample_weight : array, shape (n_samples,)
Weight of each row in X.
relative_penalties: array, shape (n_features,)
Relative weight vector for penalty.
0 entries remove penalty.
scoring : string, callable or None
Scoring method to apply to each model.
n_jobs: int
Maximum number of threads to use for scoring models.
verbose : bool
Emit logging data and warnings when True.
Returns
-------
scores : array, shape (n_lambda,)
Scores for each value of lambda over all cv folds.
hvgs : index, names of hvgs selected from training set
"""
X = adata.X.copy() # take .X layer as input features
scorer = check_scoring(est, scoring)
cv_split = est._cv.split(X, y)
# We score the model for every value of lambda, for classification
# models, this will be an intercept-only model, meaning it predicts
# the same class regardless of the input. Obviously, this makes some of
# the scikit-learn metrics unhappy, so we are silencing these warnings.
# Also note, catch_warnings is not thread safe.
with warnings.catch_warnings():
action = "always" if verbose else "ignore"
warnings.simplefilter(action, UndefinedMetricWarning)
scores, hvgs = zip(
*Parallel(n_jobs=n_jobs, verbose=verbose, backend="threading")(
delayed(_fit_and_score)(
est,
scorer,
adata,
y,
n_hvgs,
sample_weight,
relative_penalties,
est.lambda_path_,
train_idx,
test_idx,
)
for (train_idx, test_idx) in cv_split
)
)
return scores, hvgs
def _fit_and_score(
est,
scorer,
adata,
y,
n_hvgs,
sample_weight,
relative_penalties,
score_lambda_path,
train_inx,
test_inx,
):
"""
Fit and score a single model.
Parameters
----------
est : estimator
The previously fitted estimator.
scorer : callable
The scoring function to apply to each model.
adata : AnnData object with scaled input features in .X
y : array, shape (n_samples,)
Target values.
n_hvgs : int, number of highly-variable genes to select
in each training set
sample_weight : array, shape (n_samples,)
Weight of each row in X.
relative_penalties: array, shape (n_features,)
Relative weight vector for penalty.
0 entries remove penalty.
score_lambda_path : array, shape (n_lambda,)
The lambda values to evaluate/score.
train_inx : array, shape (n_train,)
Array of integers indicating which rows from X, y are in the training
set for this fold.
test_inx : array, shape (n_test,)
Array of integers indicating which rows from X, y are in the test
set for this fold.
Returns
-------
scores : array, shape (n_lambda,)
Scores for each value of lambda for a single cv fold.
hvgs : index, names of hvgs selected from training set
"""
a = adata[train_inx, :].copy() # select training set for fold
a.X = a.layers["log1p_norm"].copy() # use log1p-transformed counts to calc HVGs
sc.pp.highly_variable_genes(
a, n_top_genes=n_hvgs, flavor="seurat", inplace=True
) # determine HVGs with Seurat method
X = adata.X[
:, a.var.highly_variable
].copy() # X becomes scaled counts for all cells in HVGs only
m = clone(est)
m = m._fit(
X[train_inx, :], y[train_inx], sample_weight[train_inx], relative_penalties
)
lamb = np.clip(score_lambda_path, m.lambda_path_[-1], m.lambda_path_[0])
return (
scorer(m, X[test_inx, :], y[test_inx], lamb=lamb),
np.where(a.var.highly_variable)[0],
)
def _fix_lambda_path(lambda_path):
"""
Replace the first value in lambda_path (+inf) with something more
reasonable. The method below matches what is done in the R/glmnet wrapper.
"""
if lambda_path.shape[0] > 2:
lambda_0 = math.exp(2 * math.log(lambda_path[1]) - math.log(lambda_path[2]))
lambda_path[0] = lambda_0
return lambda_path
def _check_user_lambda(lambda_path, lambda_best=None, lamb=None):
"""
Verify the user-provided value of lambda is acceptable and ensure this
is a 1-d array.
Parameters
----------
lambda_path : array, shape (n_lambda,)
The path of lambda values as found by glmnet. This must be in
decreasing order.
lambda_best : float, optional
The value of lambda producing the highest scoring model (found via
cross validation).
lamb : float, array, or None
The value(s) of lambda for which predictions are desired. This must
be provided if `lambda_best` is None, meaning the model was fit with
cv_folds < 1.
Returns
-------
lamb : array, shape (n_lambda,)
The value(s) of lambda, potentially clipped to the range of values in
lambda_path.
"""
if lamb is None:
if lambda_best is None:
raise ValueError(
"You must specify a value for lambda or run "
"with cv_folds > 1 to select a value "
"automatically."
)
lamb = lambda_best
# ensure numpy math works later
lamb = np.array(lamb, ndmin=1)
if np.any(lamb < lambda_path[-1]) or np.any(lamb > lambda_path[0]):
warnings.warn(
"Some values of lamb are outside the range of "
"lambda_path_ [{}, {}]".format(lambda_path[-1], lambda_path[0]),
RuntimeWarning,
)
np.clip(lamb, lambda_path[-1], lambda_path[0], lamb)
return lamb
def _interpolate_model(lambda_path, coef_path, intercept_path, lamb):
"""
Interpolate coefficients and intercept between values of lambda.
Parameters
----------
lambda_path : array, shape (n_lambda,)
The path of lambda values as found by glmnet. This must be in
decreasing order.
coef_path : array, shape (n_features, n_lambda) or
(n_classes, n_features, n_lambda)
The path of coefficients as found by glmnet.
intercept_path : array, shape (n_lambda,) or (n_classes, n_lambda)
The path of intercepts as found by glmnet.
Returns
-------
coef : array, shape (n_features, n_lambda) or
(n_classes, n_features, n_lambda)
The interpolated path of coefficients.
intercept : array, shape (n_lambda,) or (n_classes, n_lambda)
The interpolated path of intercepts.
"""
if lambda_path.shape[0] == 1:
warnings.warn(
"lambda_path has a single value, this may be an " "intercept-only model.",
RuntimeWarning,
)
coef = np.take(coef_path, 0, axis=-1)
intercept = np.take(intercept_path, 0, axis=-1)
else:
coef = interp1d(lambda_path, coef_path)(lamb)
intercept = interp1d(lambda_path, intercept_path)(lamb)
return coef, intercept