Source code for velocyto.estimation

import numpy as np
import scipy.optimize
from scipy import sparse
import logging
from typing import *
from sklearn.neighbors import NearestNeighbors
from .speedboosted import _colDeltaCor, _colDeltaCorLog10, _colDeltaCorSqrt
from .speedboosted import _colDeltaCorpartial, _colDeltaCorLog10partial, _colDeltaCorSqrtpartial


[docs]def colDeltaCor(emat: np.ndarray, dmat: np.ndarray, threads: int=None) -> np.ndarray: """Calculate the correlation between the displacement (d[:,i]) and the difference between a cell and every other (e - e[:, i]) Parallel cython+OpenMP implemetation Arguments --------- emat: np.ndarray (ngenes, ncells) gene expression matrix dmat: np.ndarray (ngenes, ncells) gene velocity/displacement matrix threads: int number of parallel threads to use """ import multiprocessing if threads is None: num_threads = int(multiprocessing.cpu_count() / 2) else: num_threads = max(threads, multiprocessing.cpu_count()) out = np.zeros((emat.shape[1], emat.shape[1])) _colDeltaCor(emat, dmat, out, num_threads) return out
[docs]def colDeltaCorpartial(emat: np.ndarray, dmat: np.ndarray, ixs: np.ndarray, threads: int=None) -> np.ndarray: """Calculate the correlation between the displacement (d[:,i]) and the difference between a cell and every other (e - e[:, i]) Parallel cython+OpenMP implemetation Arguments --------- emat: np.ndarray (ngenes, ncells) gene expression matrix dmat: np.ndarray (ngenes, ncells) gene velocity/displacement matrix ixs: the neighborhood matrix (ncells, nneighbours) ixs[i, k] is the kth neighbour to the cell i threads: int number of parallel threads to use """ import multiprocessing if threads is None: num_threads = int(multiprocessing.cpu_count() / 2) else: num_threads = max(threads, multiprocessing.cpu_count()) out = np.zeros((emat.shape[1], emat.shape[1])) emat = np.require(emat, requirements="C") ixs = np.require(ixs, requirements="C").astype(np.intp) _colDeltaCorpartial(emat, dmat, out, ixs, num_threads) return out
[docs]def colDeltaCorLog10(emat: np.ndarray, dmat: np.ndarray, threads: int=None, psc: float=1.0) -> np.ndarray: """Calculate the correlation between the displacement (d[:,i]) and the difference between a cell and every other (e - e[:, i]) Parallel cython+OpenMP implemetation Arguments --------- emat: np.ndarray (ngenes, ncells) gene expression matrix dmat: np.ndarray (ngenes, ncells) gene velocity/displacement matrix threads: int number of parallel threads to use """ import multiprocessing if threads is None: num_threads = int(multiprocessing.cpu_count() / 2) else: num_threads = max(threads, multiprocessing.cpu_count()) out = np.zeros((emat.shape[1], emat.shape[1])) _colDeltaCorLog10(emat, dmat, out, num_threads, psc) return out
[docs]def colDeltaCorLog10partial(emat: np.ndarray, dmat: np.ndarray, ixs: np.ndarray, threads: int=None, psc: float=1.0) -> np.ndarray: """Calculate the correlation between the displacement (d[:,i]) and the difference between a cell and every other (e - e[:, i]) Parallel cython+OpenMP implemetation Arguments --------- emat: np.ndarray (ngenes, ncells) gene expression matrix dmat: np.ndarray (ngenes, ncells) gene velocity/displacement matrix ixs: the neighborhood matrix (ncells, nneighbours) ixs[i, k] is the kth neighbour to the cell i threads: int number of parallel threads to use """ import multiprocessing if threads is None: num_threads = int(multiprocessing.cpu_count() / 2) else: num_threads = max(threads, multiprocessing.cpu_count()) out = np.zeros((emat.shape[1], emat.shape[1])) emat = np.require(emat, requirements="C") ixs = np.require(ixs, requirements="C").astype(np.intp) _colDeltaCorLog10partial(emat, dmat, out, ixs, num_threads, psc) return out
[docs]def colDeltaCorSqrt(emat: np.ndarray, dmat: np.ndarray, threads: int=None, psc: float=0.0) -> np.ndarray: """Calculate the correlation between the displacement (d[:,i]) and the difference between a cell and every other (e - e[:, i]) Parallel cython+OpenMP implemetation Arguments --------- emat: np.ndarray (ngenes, ncells) gene expression matrix dmat: np.ndarray (ngenes, ncells) gene velocity/displacement matrix threads: int number of parallel threads to use """ import multiprocessing if threads is None: num_threads = int(multiprocessing.cpu_count() / 2) else: num_threads = max(threads, multiprocessing.cpu_count()) out = np.zeros((emat.shape[1], emat.shape[1])) _colDeltaCorSqrt(emat, dmat, out, num_threads, psc) return out
[docs]def colDeltaCorSqrtpartial(emat: np.ndarray, dmat: np.ndarray, ixs: np.ndarray, threads: int=None, psc: float=0.0) -> np.ndarray: """Calculate the correlation between the displacement (d[:,i]) and the difference between a cell and every other (e - e[:, i]) Parallel cython+OpenMP implemetation Arguments --------- emat: np.ndarray (ngenes, ncells) gene expression matrix dmat: np.ndarray (ngenes, ncells) gene velocity/displacement matrix ixs: the neighborhood matrix (ncells, nneighbours) ixs[i, k] is the kth neighbour to the cell i threads: int number of parallel threads to use """ import multiprocessing if threads is None: num_threads = int(multiprocessing.cpu_count() / 2) else: num_threads = max(threads, multiprocessing.cpu_count()) out = np.zeros((emat.shape[1], emat.shape[1])) emat = np.require(emat, requirements="C") ixs = np.require(ixs, requirements="C").astype(np.intp) _colDeltaCorSqrtpartial(emat, dmat, out, ixs, num_threads, psc) return out
def _fit1_slope(y: np.ndarray, x: np.ndarray) -> float: """Simple function that fit a linear regression model without intercept """ if not np.any(x): m = np.NAN # It is definetelly not at steady state!!! elif not np.any(y): m = 0 else: result, rnorm = scipy.optimize.nnls(x[:, None], y) # Fastest but costrains result >= 0 m = result[0] # Second fastest: m, _ = scipy.optimize.leastsq(lambda m: x*m - y, x0=(0,)) # Third fastest: m = scipy.optimize.minimize_scalar(lambda m: np.sum((x*m - y)**2 )).x # Before I was doinf fastest: scipy.optimize.minimize_scalar(lambda m: np.sum((y - m * x)**2), bounds=(0, 3), method="bounded").x # Optionally one could clip m if high value make no sense # m = np.clip(m,0,3) return m def _fit1_slope_weighted(y: np.ndarray, x: np.ndarray, w: np.ndarray, limit_gamma: bool=False, bounds: Tuple[float, float]=(0, 20)) -> float: """Simple function that fit a weighted linear regression model without intercept """ if not np.any(x): m = np.NAN # It is definetelly not at steady state!!! elif not np.any(y): m = 0 else: if limit_gamma: if np.median(y) > np.median(x): high_x = x > np.percentile(x, 90) up_gamma = np.percentile(y[high_x], 10) / np.median(x[high_x]) up_gamma = np.maximum(1.5, up_gamma) else: up_gamma = 1.5 # Just a bit more than 1 m = scipy.optimize.minimize_scalar(lambda m: np.sum(w * (x * m - y)**2), bounds=(1e-8, up_gamma), method="bounded").x else: m = scipy.optimize.minimize_scalar(lambda m: np.sum(w * (x * m - y)**2), bounds=bounds, method="bounded").x return m def _fit1_slope_weighted_offset(y: np.ndarray, x: np.ndarray, w: np.ndarray, fixperc_q: bool=False, limit_gamma: bool=False) -> Tuple[float, float]: """Function that fits a weighted linear regression model with intercept with some adhoc """ if not np.any(x): m = (np.NAN, 0) # It is definetelly not at steady state!!! elif not np.any(y): m = (0, 0) else: if fixperc_q: m1 = np.percentile(y[x <= np.percentile(x, 1)], 50) m0 = scipy.optimize.minimize_scalar(lambda m: np.sum(w * (x * m - y + m1)**2), bounds=(0, 20), method="bounded").x m = (m0, m1) else: # m, _ = scipy.optimize.leastsq(lambda m: np.sqrt(w) * (-y + x * m[0] + m[1]), x0=(0, 0)) # This is probably faster but it can have negative slope # NOTE: The up_gamma is to deal with cases where consistently y > x. Those should have positive velocity everywhere if limit_gamma: if np.median(y) > np.median(x): high_x = x > np.percentile(x, 90) up_gamma = np.percentile(y[high_x], 10) / np.median(x[high_x]) up_gamma = np.maximum(1.5, up_gamma) else: up_gamma = 1.5 # Just a bit more than 1 else: up_gamma = 20 up_q = 2 * np.sum(y * w) / np.sum(w) m = scipy.optimize.minimize(lambda m: np.sum(w * (-y + x * m[0] + m[1])**2), x0=(0.1, 1e-16), method="L-BFGS-B", bounds=[(1e-8, up_gamma), (0, up_q)]).x # If speedup is needed either the gradient or numexpr could be used return m[0], m[1] def _fit1_slope_offset(y: np.ndarray, x: np.ndarray, fixperc_q: bool=False) -> Tuple[float, float]: """Simple function that fit a linear regression model with intercept """ if not np.any(x): m = (np.NAN, 0) # It is definetelly not at steady state!!! elif not np.any(y): m = (0, 0) else: # result, rnorm = scipy.optimize.nnls(x[:, None], y) # Fastest but costrains result >= 0 # m = result[0] if fixperc_q: m1 = np.percentile(y[x <= np.percentile(x, 1)], 50) m0 = scipy.optimize.minimize_scalar(lambda m: np.sum((x * m - y + m1)**2), bounds=(0, 20), method="bounded").x m = (m0, m1) else: m, _ = scipy.optimize.leastsq(lambda m: -y + x * m[0] + m[1], x0=(0, 0)) # Third fastest: m = scipy.optimize.minimize_scalar(lambda m: np.sum((x*m - y)**2 )).x # Before I was doinf fastest: scipy.optimize.minimize_scalar(lambda m: np.sum((y - m * x)**2), bounds=(0, 3), method="bounded").x # Optionally one could clip m if high value make no sense # m = np.clip(m,0,3) return m[0], m[1]
[docs]def fit_slope(Y: np.ndarray, X: np.ndarray) -> np.ndarray: """Loop through the genes and fits the slope Y: np.ndarray, shape=(genes, cells) the dependent variable (unspliced) X: np.ndarray, shape=(genes, cells) the independent variable (spliced) """ # NOTE this could be easily parallelized slopes = np.fromiter((_fit1_slope(Y[i, :], X[i, :]) for i in range(Y.shape[0])), dtype="float32", count=Y.shape[0]) return slopes
[docs]def fit_slope_offset(Y: np.ndarray, X: np.ndarray, fixperc_q: bool=False) -> Tuple[np.ndarray, np.ndarray]: """Loop through the genes and fits the slope Y: np.ndarray, shape=(genes, cells) the dependent variable (unspliced) X: np.ndarray, shape=(genes, cells) the independent variable (spliced) """ # NOTE this could be easily parallelized slopes = np.zeros(Y.shape[0], dtype="float32") offsets = np.zeros(Y.shape[0], dtype="float32") for i in range(Y.shape[0]): m, q = _fit1_slope_offset(Y[i, :], X[i, :], fixperc_q) slopes[i] = m offsets[i] = q return slopes, offsets
[docs]def fit_slope_weighted(Y: np.ndarray, X: np.ndarray, W: np.ndarray, return_R2: bool=False, limit_gamma: bool=False, bounds: Tuple[float, float]=(0, 20)) -> np.ndarray: """Loop through the genes and fits the slope Y: np.ndarray, shape=(genes, cells) the dependent variable (unspliced) X: np.ndarray, shape=(genes, cells) the independent variable (spliced) W: np.ndarray, shape=(genes, cells) the weights that will scale the square residuals """ # NOTE this could be easily parallelized # slopes = np.fromiter((_fit1_slope_weighted(Y[i, :], X[i, :], W[i, :], bounds=bounds) for i in range(Y.shape[0])), # dtype="float32", # count=Y.shape[0]) slopes = np.zeros(Y.shape[0], dtype="float32") offsets = np.zeros(Y.shape[0], dtype="float32") if return_R2: R2 = np.zeros(Y.shape[0], dtype="float32") for i in range(Y.shape[0]): m = _fit1_slope_weighted(Y[i, :], X[i, :], W[i, :], limit_gamma) slopes[i] = m if return_R2: # NOTE: the coefficient of determination is not weighted but the fit is with np.errstate(divide='ignore', invalid='ignore'): SSres = np.sum((m * X[i, :] - Y[i, :])**2) SStot = np.sum((Y[i, :].mean() - Y[i, :])**2) _R2 = 1 - (SSres / SStot) if np.isfinite(_R2): R2[i] = _R2 else: R2[i] = -1e16 if return_R2: return slopes, R2 return slopes
[docs]def fit_slope_weighted_offset(Y: np.ndarray, X: np.ndarray, W: np.ndarray, fixperc_q: bool=False, return_R2: bool=True, limit_gamma: bool=False) -> Any: """Loop through the genes and fits the slope Y: np.ndarray, shape=(genes, cells) the dependent variable (unspliced) X: np.ndarray, shape=(genes, cells) the independent variable (spliced) """ # NOTE this could be easily parallelized slopes = np.zeros(Y.shape[0], dtype="float32") offsets = np.zeros(Y.shape[0], dtype="float32") if return_R2: R2 = np.zeros(Y.shape[0], dtype="float32") for i in range(Y.shape[0]): m, q = _fit1_slope_weighted_offset(Y[i, :], X[i, :], W[i, :], fixperc_q, limit_gamma) slopes[i] = m offsets[i] = q if return_R2: # NOTE: the coefficient of determination is not weighted but the fit is with np.errstate(divide='ignore', invalid='ignore'): SSres = np.sum((m * X[i, :] + q - Y[i, :])**2) SStot = np.sum((Y[i, :].mean() - Y[i, :])**2) _R2 = 1 - (SSres / SStot) if np.isfinite(_R2): R2[i] = _R2 else: R2[i] = -1e16 if return_R2: return slopes, offsets, R2 return slopes, offsets
[docs]def clusters_stats(U: np.ndarray, S: np.ndarray, clusters_uid: np.ndarray, cluster_ix: np.ndarray, size_limit: int=40) -> Tuple[np.ndarray, np.ndarray]: """Calculate the averages per cluster If the cluster is too small (size<size_limit) the average of the toal is reported instead """ U_avgs = np.zeros((S.shape[0], len(clusters_uid))) S_avgs = np.zeros((S.shape[0], len(clusters_uid))) avgU_div_avgS = np.zeros((S.shape[0], len(clusters_uid))) slopes_by_clust = np.zeros((S.shape[0], len(clusters_uid))) for i, uid in enumerate(clusters_uid): cluster_filter = cluster_ix == i n_cells = np.sum(cluster_filter) logging.info(f"Cluster: {uid} ({n_cells} cells)") if n_cells > size_limit: U_avgs[:, i], S_avgs[:, i] = U[:, cluster_filter].mean(1), S[:, cluster_filter].mean(1) else: U_avgs[:, i], S_avgs[:, i] = U.mean(1), S.mean(1) return U_avgs, S_avgs