Source code for velocyto.neighbors

import numpy as np
from numba import jit
from sklearn.neighbors import kneighbors_graph, NearestNeighbors
from scipy import sparse
import logging
from typing import *

# Mutual KNN functions


[docs]@jit(signature_or_function="Tuple((float64[:,:], int64[:,:], int64[:]))(int64[:,:], float64[:, :], int64[:], int64, int64, boolean)", nopython=True) def balance_knn_loop(dsi: np.ndarray, dist: np.ndarray, lsi: np.ndarray, maxl: int, k: int, return_distance: bool) -> Tuple: """Fast and greedy algorythm to balance a K-NN graph so that no node is the NN to more than maxl other nodes Arguments --------- dsi : np.ndarray (samples, K) distance sorted indexes (as returned by sklearn NN) dist : np.ndarray (samples, K) the actual distance corresponding to the sorted indexes lsi : np.ndarray (samples,) degree of connectivity (l) sorted indexes maxl : int max degree of connectivity (from others to self) allowed k : int number of neighbours in the final graph return_distance : bool whether to return distance Returns ------- dsi_new : np.ndarray (samples, k+1) indexes of the NN, first column is the sample itself dist_new : np.ndarray (samples, k+1) distances to the NN l: np.ndarray (samples) l[i] is the number of connections from other samples to the sample i """ assert dsi.shape[1] >= k, "sight needs to be bigger than k" # numba signature "Tuple((int64[:,:], float32[:, :], int64[:]))(int64[:,:], int64[:], int64, int64, bool)" dsi_new = -1 * np.ones((dsi.shape[0], k + 1), np.int64) # maybe d.shape[0] l = np.zeros(dsi.shape[0], np.int64) if return_distance: dist_new = np.zeros(dsi_new.shape, np.float64) for i in range(dsi.shape[0]): # For every node el = lsi[i] p = 0 j = 0 for j in range(dsi.shape[1]): # For every other node it is connected (sight) if p >= k: break m = dsi[el, j] if el == m: dsi_new[el, 0] = el continue if l[m] >= maxl: continue dsi_new[el, p + 1] = m l[m] = l[m] + 1 if return_distance: dist_new[el, p + 1] = dist[el, j] p += 1 if (j == dsi.shape[1] - 1) and (p < k): while p < k: dsi_new[el, p + 1] = el dist_new[el, p + 1] = dist[el, 0] p += 1 if not return_distance: dist_new = np.ones_like(dsi_new, np.float64) return dist_new, dsi_new, l
[docs]@jit(signature_or_function="Tuple((float64[:,:], int64[:,:], int64[:]))(int64[:,:], float64[:, :], int64[:], int64[:], int64, int64, boolean)", nopython=True) def balance_knn_loop_constrained(dsi: np.ndarray, dist: np.ndarray, lsi: np.ndarray, groups: np.ndarray, maxl: int, k: int, return_distance: bool) -> Tuple: """Fast and greedy algorythm to balance a K-NN graph so that no node is the NN to more than maxl other nodes Arguments --------- dsi : np.ndarray (samples, K) distance sorted indexes (as returned by sklearn NN) dist : np.ndarray (samples, K) the actual distance corresponding to the sorted indexes lsi : np.ndarray (samples,) degree of connectivity (l) sorted indexes groups: np.ndarray (samples,) labels of the samples that constrain the connectivity maxl : int max degree of connectivity (from others to self) allowed k : int number of neighbours in the final graph return_distance : bool whether to return distance Returns ------- dsi_new : np.ndarray (samples, k+1) indexes of the NN, first column is the sample itself dist_new : np.ndarray (samples, k+1) distances to the NN l: np.ndarray (samples) l[i] is the number of connections from other samples to the sample i """ assert dsi.shape[1] >= k, "sight needs to be bigger than k" # numba signature "Tuple((int64[:,:], float32[:, :], int64[:]))(int64[:,:], int64[:], int64, int64, bool)" dsi_new = -1 * np.ones((dsi.shape[0], k + 1), np.int64) # maybe d.shape[0] l = np.zeros(dsi.shape[0], np.int64) if return_distance: dist_new = np.zeros(dsi_new.shape, np.float64) for i in range(dsi.shape[0]): # For every node el = lsi[i] p = 0 j = 0 for j in range(dsi.shape[1]): # For every other node it is connected (sight) if p >= k: # if k-nn were found break m = dsi[el, j] if el == m: dsi_new[el, 0] = el continue if groups[el] != groups[m]: # This is the constraint! continue if l[m] >= maxl: continue dsi_new[el, p + 1] = m l[m] = l[m] + 1 if return_distance: dist_new[el, p + 1] = dist[el, j] p += 1 if (j == dsi.shape[1] - 1) and (p < k): while p < k: dsi_new[el, p + 1] = el dist_new[el, p + 1] = dist[el, 0] p += 1 if not return_distance: dist_new = np.ones_like(dsi_new, np.float64) return dist_new, dsi_new, l
[docs]def knn_balance(dsi: np.ndarray, dist: np.ndarray=None, maxl: int=200, k: int=60, constraint: np.ndarray=None) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Balance a K-NN graph so that no node is the NN to more than maxl other nodes Arguments --------- dsi : np.ndarray (samples, K) distance sorted indexes (as returned by sklearn NN) dist : np.ndarray (samples, K) the actual distance corresponding to the sorted indexes maxl : int max degree of connectivity allowed k : int number of neighbours in the final graph constraint: np.ndarray (samples,) labels of the samples that constrain the connectivity Returns ------- dist_new : np.ndarray (samples, k+1) distances to the NN dsi_new : np.ndarray (samples, k+1) indexes of the NN, first column is the sample itself l: np.ndarray (samples) l[i] is the number of connections from other samples to the sample i """ l = np.bincount(dsi.flat[:], minlength=dsi.shape[0]) lsi = np.argsort(l, kind="mergesort")[::-1] if dist is None: dist = np.ones(dsi.shape, dtype="float64") dist[:, 0] = 0 if constraint is not None: return balance_knn_loop_constrained(dsi, dist, lsi, constraint.astype("int64"), maxl, k, return_distance=False) else: return balance_knn_loop(dsi, dist, lsi, maxl, k, return_distance=False) else: if constraint is not None: return balance_knn_loop_constrained(dsi, dist, lsi, constraint.astype("int64"), maxl, k, return_distance=True) else: return balance_knn_loop(dsi, dist, lsi, maxl, k, return_distance=True)
[docs]class BalancedKNN: """Greedy algorythm to balance a K-nearest neighbour graph It has an API similar to scikit-learn Parameters ---------- k : int (default=50) the number of neighbours in the final graph sight_k : int (default=100) the number of neighbours in the initialization graph It correspondent to the farthest neighbour that a sample is allowed to connect to when no closest neighbours are allowed. If sight_k is reached then the matrix is filled with the sample itself maxl : int (default=200) max degree of connectivity allowed. Avoids the presence of hubs in the graph, it is the maximum number of neighbours that are allowed to contact a node before the node is blocked constraint: np.ndarray (default=None) a numpy array defining the dirrent groups within wich connectivity is allowed if "clusters" it uses the clusters as in self.clusters_ix mode : str (default="connectivity") decide wich kind of utput "distance" or "connectivity" n_jobs : int (default=4) parallelization of the standard KNN search preformed at initialization """ def __init__(self, k: int=50, sight_k: int=100, maxl: int=200, constraint: np.ndarray=None, mode: str="distance", metric: str="euclidean", n_jobs: int=4) -> None: self.k = k self.sight_k = sight_k self.maxl = maxl self.mode = mode self.metric = metric self.n_jobs = n_jobs self.dist_new = self.dsi_new = self.l = None # type: np.ndarray self.bknn = None # type: sparse.csr_matrix self.constraint = constraint @property def n_samples(self) -> int: return self.data.shape[0]
[docs] def fit(self, data: np.ndarray, sight_k: int=None) -> Any: """Fits the model data: np.ndarray (samples, features) np sight_k: int the farthest point that a node is allowed to connect to when its closest neighbours are not allowed """ self.data = data self.fitdata = data if sight_k is not None: self.sight_k = sight_k logging.debug(f"First search the {self.sight_k} nearest neighbours for {self.n_samples}") if self.metric == "correlation": self.nn = NearestNeighbors(n_neighbors=self.sight_k + 1, metric=self.metric, n_jobs=self.n_jobs, algorithm="brute") else: self.nn = NearestNeighbors(n_neighbors=self.sight_k + 1, metric=self.metric, n_jobs=self.n_jobs, leaf_size=30) self.nn.fit(self.fitdata) return self
[docs] def kneighbors(self, X: np.ndarray=None, maxl: int=None, mode: str="distance") -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Finds the K-neighbors of a point. Returns indices of and distances to the neighbors of each point. Parameters ---------- X : array-like, shape (n_query, n_features), The query point or points. If not provided, neighbors of each indexed point are returned. In this case, the query point is not considered its own neighbor. maxl: int max degree of connectivity allowed mode : "distance" or "connectivity" Decides the kind of output Returns ------- dist_new : np.ndarray (samples, k+1) distances to the NN dsi_new : np.ndarray (samples, k+1) indexes of the NN, first column is the sample itself l: np.ndarray (samples) l[i] is the number of connections from other samples to the sample i NOTE: First column (0) correspond to the sample itself, the nearest nenigbour is at the second column (1) """ if X is not None: self.data = X if maxl is not None: self.maxl = maxl self.dist, self.dsi = self.nn.kneighbors(self.data, return_distance=True) logging.debug(f"Using the initialization network to find a {self.k}-NN graph with maximum connectivity of {self.maxl}") self.dist_new, self.dsi_new, self.l = knn_balance(self.dsi, self.dist, maxl=self.maxl, k=self.k, constraint=self.constraint) if mode == "connectivity": self.dist = np.ones_like(self.dsi) self.dist[:, 0] = 0 return self.dist_new, self.dsi_new, self.l
[docs] def kneighbors_graph(self, X: np.ndarray=None, maxl: int=None, mode: str="distance") -> sparse.csr_matrix: """Retrun the K-neighbors graph as a sparse csr matrix Parameters ---------- X : array-like, shape (n_query, n_features), The query point or points. If not provided, neighbors of each indexed point are returned. In this case, the query point is not considered its own neighbor. maxl: int max degree of connectivity allowed mode : "distance" or "connectivity" Decides the kind of output Returns ------- neighbor_graph : scipy.sparse.csr_matrix The values are either distances or connectivity dependig of the mode parameter NOTE: The diagonal will be zero even though the value 0 is actually stored """ dist_new, dsi_new, l = self.kneighbors(X=X, maxl=maxl, mode=mode) logging.debug("Returning sparse matrix") self.bknn = sparse.csr_matrix((np.ravel(dist_new), np.ravel(dsi_new), np.arange(0, dist_new.shape[0] * dist_new.shape[1] + 1, dist_new.shape[1])), (self.n_samples, self.n_samples)) return self.bknn
[docs] def smooth_data(self, data_to_smooth: np.ndarray, X: np.ndarray=None, maxl: int=None, mutual: bool=False, only_increase: bool=True) -> np.ndarray: """Use the wights learned from knn to smooth any data matrix Arguments --------- data_to_smooth: (features, samples) !! NOTE !! this is different from the input (for speed issues) if the data is provided (samples, features), this will be detected and the correct operation performed at cost of some effciency In the case where samples == samples then the shape (features, samples) will be assumed """ if self.bknn is None: assert (X is None) and (maxl is None), "graph was already fit with different parameters" self.kneighbors_graph(X=X, maxl=maxl, mode=self.mode) if mutual: connectivity = make_mutual(self.bknn > 0) else: connectivity = self.bknn.T > 0 connectivity = connectivity.tolil() connectivity.setdiag(1) w = connectivity_to_weights(connectivity).T assert np.allclose(w.sum(0), 1), "weight matrix need to sum to one over the columns" if data_to_smooth.shape[1] == w.shape[0]: result = sparse.csr_matrix.dot(data_to_smooth, w) elif data_to_smooth.shape[0] == w.shape[0]: result = sparse.csr_matrix.dot(data_to_smooth.T, w).T else: raise ValueError(f"Incorrect size of matrix, none of the axis correspond to the one of graph. {w.shape}") if only_increase: return np.maximum(result, data_to_smooth) else: return result
# Mutual KNN version
[docs]def knn_distance_matrix(data: np.ndarray, metric: str=None, k: int=40, mode: str='connectivity', n_jobs: int=4) -> sparse.csr_matrix: """Calculate a nearest neighbour distance matrix Notice that k is meant as the actual number of neighbors NOT INCLUDING itself To achieve that we call kneighbors_graph with X = None """ if metric == "correlation": nn = NearestNeighbors(n_neighbors=k, metric="correlation", algorithm="brute", n_jobs=n_jobs) nn.fit(data) return nn.kneighbors_graph(X=None, mode=mode) else: nn = NearestNeighbors(n_neighbors=k, n_jobs=n_jobs, ) nn.fit(data) return nn.kneighbors_graph(X=None, mode=mode)
[docs]def make_mutual(knn: sparse.csr.csr_matrix) -> sparse.coo_matrix: """Removes edges between neighbours that are not mutual """ return knn.minimum(knn.T)
[docs]def connectivity_to_weights(mknn: sparse.csr.csr_matrix, axis: int=1) -> sparse.lil_matrix: """Convert a binary connectivity matrix to weights ready to be multiplied to smooth a data matrix """ if type(mknn) is not sparse.csr.csr_matrix: mknn = mknn.tocsr() return mknn.multiply(1. / sparse.csr_matrix.sum(mknn, axis=axis))
[docs]def min_n(row_data: np.ndarray, row_indices: np.ndarray, n: int) -> Tuple[np.ndarray, np.ndarray]: """Find the smallest entry and smallest indices of a row """ i = row_data.argsort()[:n] # i = row_data.argpartition(-n)[-n:] top_values = row_data[i] top_indices = row_indices[i] # do the sparse indices matter? return top_values, top_indices
[docs]def take_top(matrix: sparse.spmatrix, n: int) -> sparse.lil_matrix: """Filter the top nearest neighbours from a sprse distance matrix """ arr_ll = matrix.tolil(copy=True) for i in range(arr_ll.shape[0]): d, r = min_n(np.array(arr_ll.data[i]), np.array(arr_ll.rows[i]), n) arr_ll.data[i] = d.tolist() arr_ll.rows[i] = r.tolist() return arr_ll
# Common functions
[docs]def convolve_by_sparse_weights(data: np.ndarray, w: sparse.csr_matrix) -> np.ndarray: """Use the wights learned from knn to convolve any data matrix NOTE: A improved implementation could detect wich one is sparse and wich kind of sparse and perform faster computation """ w_ = w.T assert np.allclose(w_.sum(0), 1), "weight matrix need to sum to one over the columns" return sparse.csr_matrix.dot(data, w_)
[docs]def knn_smooth_weights(matrix: np.ndarray, metric: str="euclidean", k_search: int=20, k_mutual: int=10, n_jobs: int=10) -> Tuple[sparse.spmatrix, sparse.csr_matrix]: """Find the weights to smooth the dataset using efficient sparse matrix operations Arguments: matrix: (genes, cells) expression matrix metric k_search : int the first k nearest neighbour search number of neighbours k_mutual : int the number of mutual neighbours to select n_jobs return_knn Retruns weights (, knn) """ assert k_search >= k_mutual, "k_search needs to be bigger than k_mutual" knn = knn_distance_matrix(matrix.T, metric=metric, k=k_search, mode="distance", n_jobs=n_jobs) mknn = make_mutual(knn) top_mknn = take_top(mknn, k_mutual) top_mknn.setdiag(1) connectivity = top_mknn > 0 w = connectivity_to_weights(connectivity) return w, knn