"""
This module cluster the projection of data (usually it is 2D projection) with
one of the standard algorithms and attach a certain number of labels per
cluster.
Example:
>>> from Orange.projection import TSNE
>>> from Orange.data import Table
>>> from orangecontrib.bioinformatics.utils import serverfiles
>>> from orangecontrib.bioinformatics.annotation.annotate_projection import \
... annotate_projection
>>> from orangecontrib.bioinformatics.annotation.annotate_samples import \
... AnnotateSamples
>>>
>>> # load data
>>> data = Table("https://datasets.orange.biolab.si/sc/aml-1k.tab.gz")
>>> marker_p = serverfiles.localpath_download(
... 'marker_genes','panglao_gene_markers.tab')
>>> markers = Table(marker_p)
>>>
>>> # annotate data with labels
>>> annotator = AnnotateSamples()
>>> annotations = annotator.annotate_samples(data, markers)
>>>
>>> # project data in 2D
>>> tsne = TSNE(n_components=2)
>>> tsne_model = tsne(data)
>>> embedding = tsne_model(data)
>>>
>>> # get clusters and annotations for clusters
>>> clusters, clusters_meta, eps = annotate_projection(annotations, embedding,
... clustering_algorithm=DBSCAN, eps=1.2)
In case when user uses a DBSCAN algorithm and do not provide eps to the
`annotate_projection` function it is computed automatically with knn method.
>>> clusters, clusters_meta, eps = annotate_projection(annotations, embedding,
... clustering_algorithm=DBSCAN)
"""
from math import sqrt
from bisect import bisect_left, bisect_right
from collections import Counter
import numpy as np
import pyclipper
from scipy.spatial import Delaunay, distance
from Orange.data import Table, Domain, DiscreteVariable
from Orange.clustering import DBSCAN
[docs]def cluster_data(coordinates, clustering_algorithm=DBSCAN, **kwargs):
"""
This function receives data and cluster them.
Parameters
----------
coordinates : Orange.data.Table
Visualisation coordinates - embeddings
clustering_algorithm : callable
Algorithm used for clustering.
Returns
-------
Orange.data.Table
List of cluster indices.
"""
learner = clustering_algorithm(**kwargs)
clusters = learner(coordinates)
if not isinstance(clusters, np.ndarray): # old clustering method
clusters = clusters(coordinates)
clusters = np.array(
list(
map(int, map(clusters.domain.attributes[0].repr_val, clusters.X[:, 0]))
)
).flatten()
# sort classes in descending order base on number of cases in the cluster
sorted_clust_idx = [v for v, _ in Counter(clusters).most_common() if v != -1]
# re-indexed array
new_clustering = np.empty(len(clusters))
new_clustering[:] = np.nan # nan for not clustered
# reindex based on descending cluster size
for i, v in enumerate(sorted_clust_idx):
new_clustering[clusters == v] = i
# create the table
new_domain = Domain(
[
DiscreteVariable(
"Clusters",
values=["C{}".format(i) for i in range(1, len(sorted_clust_idx) + 1)],
)
]
)
return Table(new_domain, new_clustering.reshape((-1, 1)))
[docs]def assign_labels(clusters, annotations, labels_per_cluster):
"""
This function assigns a certain number of labels per cluster. Each cluster
gets `labels_per_cluster` number of most common labels in cluster assigned.
Parameters
----------
clusters : Orange.data.Table
Cluster indices for each item.
annotations : Orange.data.Table
Table with annotations and their probabilities.
labels_per_cluster : int
Number of labels that need to be assigned to each cluster.
Returns
-------
dict
Dictionary with cluster index as a key and list of annotations as a
value. Each list include tuples with the annotation name and their
proportion in the cluster.
Orange.data.Table
The array with the annotation assigned to the item.
"""
clusters_unique = set(clusters.domain[0].values)
if len(annotations.domain) == 0:
return {}, Table(
Domain([DiscreteVariable("Annotation", values=[])]),
np.ones((len(clusters), 1)) * np.nan,
)
labels = np.array(list(map(str, annotations.domain.attributes)))
# remove rows with all nans
nan_mask = np.isnan(annotations.X).all(axis=1)
ann_not_nan = annotations.X[~nan_mask]
# find indices and labels
annotation_best_idx = np.nanargmax(ann_not_nan, axis=1)
annotation_best = labels[annotation_best_idx]
# join back together
items_annotations = np.empty(annotations.X.shape[0], dtype=labels.dtype)
items_annotations[~nan_mask] = annotation_best
annotations_clusters = {}
for cl in clusters_unique:
mask = (
np.array(
list(map(clusters.domain.attributes[0].repr_val, clusters.X[:, 0]))
).flatten()
== cl
)
labels_cl = items_annotations[mask]
# remove nans from labels
labels_cl_filtered = labels_cl[~(labels_cl == "")]
counts = Counter(labels_cl_filtered)
common_labels = counts.most_common(labels_per_cluster)
if len(common_labels) > 0:
annotations_clusters[cl] = [
(label, c / len(labels_cl)) for label, c in common_labels
]
# pack item annotations to Table
nan_mask = items_annotations == ""
values, indices = np.unique(items_annotations[~nan_mask], return_inverse=True)
corrected_idx = np.ones(items_annotations.shape) * np.nan
corrected_idx[~nan_mask] = indices
domain = Domain([DiscreteVariable("Annotation", values=values)])
item_annotations = Table.from_list(domain, corrected_idx.reshape((-1, 1)))
return annotations_clusters, item_annotations
[docs]def labels_locations(coordinates, clusters):
"""
Function computes the location of the label for each cluster.
The location is compute as a center point.
Parameters
----------
coordinates : Orange.data.Table
Visualisation coordinates - embeddings
clusters : Orange.data.Table
Cluster indices for each item.
Returns
-------
dict
The coordinates for locating the label. Dictionary with cluster index
as a key and tuple (x, y) as a value.
"""
clusters_unique = set(clusters.domain[0].values) - {"-1"} # -1 is not clustered
locations = {}
for cl in clusters_unique:
mask = (
np.array(
list(map(clusters.domain.attributes[0].repr_val, clusters.X[:, 0]))
).flatten()
== cl
)
cl_coordinates = coordinates.X[mask, :]
x, y = 1 / 2 * (np.min(cl_coordinates, axis=0) + np.max(cl_coordinates, axis=0))
locations[cl] = (x, y)
return locations
[docs]def get_epsilon(coordinates, k=10, skip=0.1):
"""
The function computes the epsilon parameter for DBSCAN through method
proposed in the paper.
Parameters
----------
coordinates : Orange.data.Table
Visualisation coordinates - embeddings
k : int
Number kth observed neighbour
skip : float
Percentage of skipped neighborus.
Returns
-------
float
Epsilon parameter for DBSCAN
"""
x = coordinates.X
if len(x) > 1000: # subsampling is required
i = len(x) // 1000
x = x[::i]
d = distance.squareform(distance.pdist(x))
k = min(k + 1, len(coordinates) - 1)
kth_point = np.argpartition(d, k, axis=1)[:, k]
# k+1 since first one is item itself
kth_dist = np.sort(d[np.arange(0, len(kth_point)), kth_point])
# currently mark proportion equal to skip as a noise
return kth_dist[-int(np.round(len(kth_dist) * skip))]
def _angle(v1, v2):
"""
Compute clockwise angles between v1 and v2. Both vectors are
given as a tuple with two points ([x1, y1], [x2, y2]) such that
[x2, y2] of v1 == [x1, y1] of v2.
The angle is given in degrees between 0 and 2 * pi
"""
v1, v2 = np.array(v1), np.array(v2)
x1, y1 = v1[0] - v1[1]
x2, y2 = v2[1] - v2[0]
dot = np.sum(x1 * x2 + y1 * y2) # dot product
det = x1 * y2 - y1 * x1 # determinant
angle = np.arctan2(det, dot) # atan2(y, x) or atan2(sin, cos)
return -angle if angle <= 0 else np.pi + np.pi - angle
def _find_hull(edges_list, points_list, starting_edge):
"""
This function return a single hull which starts and ends in
the starting_edge.
Parameters
edges_list : list
List of edges. Each edge is presented as a tuple of two indices which
tell the starting and ending note. The index correspond to location
of point in points_list. This list must be sorted in the ascending
order.
points_list : list
List of points location. Each point has x and y location.
starting_edge : int
The index of the list where hull starts.
Returns
-------
np.ndarray
The array with the hull/polygon points
list
List of booleans that indicates whether each point was used or not in
the resulting polygon.
"""
firsts = [x[0] for x in edges_list] # first elements for bisection
used_edges = [False] * len(edges_list)
used_edges[starting_edge] = True
# remember start and next point
# start point is required for stop condition
start, next_point = edges_list[starting_edge]
# remember current polygon around points
poly = [points_list[start], points_list[next_point]]
# we count number of steps to stop iteration in case it is locked
# in some dead cycle. It can be a result of some unexpected cases.
count = 0
while start != next_point and count < len(edges_list):
# find the index where the first value equal to next_point
# appear
ind_left = bisect_left(firsts, next_point)
# find the index next to the last value
ind_right = bisect_right(firsts, next_point)
# check if there are more edges available from the same point
if ind_right - ind_left > 1:
# select the most distant one in clockwise direction
# it is probably the point on the outer hull
# with this we prevent a hull to discover cycles inside
# a polygon
ang = -1
for i in range(ind_left, ind_right):
cur_ang = _angle(
(poly[-2], poly[-1]), (poly[-1], points_list[edges_list[i][1]])
)
if cur_ang > ang:
ang = cur_ang
ind_left = i
# save a next point of the polgon
used_edges[ind_left] = True
next_point = edges_list[ind_left][1]
poly.append(points_list[next_point])
count += 1
return np.array(poly), used_edges
[docs]def edges_to_polygon(edges_list, points_list):
"""
This function connect edges in polygons. It computes all possible hulls -
yes some clusters have more of them when they have a hole in the middle.
It then selects one that is outer hull.
Parameters
----------
edges_list : list
List of edges. Each edge is presented as a tuple of two indices which
tell the starting and ending note. The index correspond to location
of point in points_list
points_list : list
List of points location. Each point has x and y location.
Returns
-------
np.ndarray
The array with the hull/polygon points.
"""
# sort based on first element of tuple to enable bisection search
edges_list = sorted(edges_list, key=lambda x: x[0])
# need to use all edges
used = [False] * len(edges_list)
# it is possible that we will find more separate hulls -
# it happen in cases when a polygon has inner cycles
polygons = []
while not all(used):
i = used.index(False)
poly, new_used = _find_hull(edges_list, points_list, i)
polygons.append(poly)
used = [u1 or u2 for u1, u2 in zip(used, new_used)]
# select polygon that is outside - the widest and the highest is the
# most outside
height_width = [np.sum(p.max(axis=0) - p.min(axis=0)) for p in polygons]
i = height_width.index(max(height_width))
return polygons[i]
[docs]def compute_concave_hulls(coordinates, clusters, epsilon):
"""
Function computes the points of the concave hull around points.
Parameters
----------
coordinates : Orange.data.Table
Visualisation coordinates - embeddings
clusters : Orange.data.Table
Cluster indices for each item.
epsilon : float
Epsilon used by DBSCAN to cluster the data
Returns
-------
dict
The points of the concave hull. Dictionary with cluster index
as a key and np.ndaray of points as a value -
[[x1, y1], [x2, y2], [x3, y3], ...]
"""
def get_shape(pts, eps):
"""
Compute the shape (concave hull) of a set of a cluster.
"""
def add_edge(edges_list, i, j):
"""
Add a line between the i-th and j-th points,
if not in the list already. Remove the lines that are not outer
edges - when (j, i) already in list means that two triangles
has same edge what means it is not an outer edge
"""
if (j, i) in edges_list:
# if both neighboring triangles are in shape, it's not a
# boundary edge
edges_list.remove((j, i))
return
edges_list.add((i, j))
if len(pts) < 4:
# When you have a triangle, there is no sense in computing the hull
rng = list(range(3))
return edges_to_polygon(zip(rng, rng[1:] + rng[:1]), pts)
tri = Delaunay(pts)
edges = set()
# loop over triangles:
# ia, ib, ic = indices of corner points of the triangle
for ia, ib, ic in tri.vertices:
pa = pts[ia]
pb = pts[ib]
pc = pts[ic]
# Lengths of sides of triangle
a = sqrt((pa[0] - pb[0]) ** 2 + (pa[1] - pb[1]) ** 2)
b = sqrt((pb[0] - pc[0]) ** 2 + (pb[1] - pc[1]) ** 2)
c = sqrt((pc[0] - pa[0]) ** 2 + (pc[1] - pa[1]) ** 2)
# filter - longest edge of triangle smaller than epsilon
if max(a, b, c) <= eps:
add_edge(edges, ia, ib)
add_edge(edges, ib, ic)
add_edge(edges, ic, ia)
polygon = edges_to_polygon(edges, pts)
return polygon
hulls = {}
clusters_array = np.array(
list(map(clusters.domain.attributes[0].repr_val, clusters.X[:, 0]))
)
for cl in set(clusters_array) - {"None", "?"}:
points = coordinates.X[clusters_array == cl]
# subsample when more than 1000 points
# it keeps time finding hull under 0.3 s on my computer
if points.shape[0] > 1000:
points = points[np.random.randint(points.shape[0], size=1000), :]
# compute the concave hul
hull = get_shape(points, eps=epsilon * 2)
# epsilon seems to be good parameter for lines to be smooth enough
# selecting epsilon for the distance
# shows approximately what is DBSCAN neighbourhood
# those line buffer the line for 3 * epsilon and move it back for
# 2 * epsilon. The it will make a hull more smooth.
# pyclipper work with integer so points need to be scaled first to keep
# precision. It is undo with scal_from_clipper
scaling_factor = 1000
scaled_hull = pyclipper.scale_to_clipper(hull, scaling_factor)
# buffer the hull fro epsilon * 3
pco1 = pyclipper.PyclipperOffset()
pco1.AddPath(scaled_hull, pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON)
im_solution = pco1.Execute(epsilon * scaling_factor * 3)
# buffer the hull fro epsilon * -2
pco2 = pyclipper.PyclipperOffset()
pco2.AddPath(im_solution[0], pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON)
solution = pyclipper.scale_from_clipper(
pco2.Execute(epsilon * scaling_factor * (-2)), scaling_factor
)
hulls[cl] = np.array(solution).reshape(-1, 2)
return hulls
def _filter_clusters(clusters, clusters_meta):
"""
Function removes clusters that does not has any labels
"""
clust_map = {
c: i
for i, c in enumerate(
c for c in clusters.domain["Clusters"].values if c in clusters_meta
)
}
# change cluster indices
clust_idx = np.zeros(clusters.X.shape) * np.nan
for i, cl in enumerate(
map(clusters.domain.attributes[0].repr_val, clusters.X[:, 0])
):
clust_idx[i, 0] = clust_map.get(cl, np.nan)
new_clusters = Table(
Domain(
[
DiscreteVariable(
"Clusters",
values=clusters.domain["Clusters"].values[: len(clust_map)],
)
]
),
clust_idx,
)
# change cluster names in metas
new_clusters_meta = {
"C{}".format(clust_map[cl] + 1): v for cl, v in clusters_meta.items()
}
return new_clusters, new_clusters_meta
[docs]def annotate_projection(
annotations,
coordinates,
clustering_algorithm=DBSCAN,
labels_per_cluster=3,
**kwargs
):
"""
Function cluster the data based on coordinates, and assigns a certain number
of labels per cluster. Each cluster gets `labels_per_cluster` number of most
common labels in cluster assigned.
Parameters
----------
annotations : Orange.data.Table
Table with annotations and their probabilities.
coordinates : Orange.data.Table
Visualisation coordinates - embeddings
clustering_algorithm : callable, optional (default = DBSCAN)
Algorithm used in clustering.
labels_per_cluster : int, optional (default = 3)
Number of labels that need to be assigned to each cluster.
Returns
-------
Orange.data.Table
List of cluster indices.
dict
Dictionary with cluster index as a key and list of annotations as a
value. Each list include tuples with the annotation name and their
proportion in the cluster.
dict
The coordinates for locating the label. Dictionary with cluster index
as a key and tuple (x, y) as a value.
"""
assert len(annotations) == len(
coordinates
), "Number of coordinates does not match to number of annotations"
# sklearn clustering want to have one example
assert len(coordinates) > 0, "At least one data point need to be provided"
assert (
len(coordinates.domain) > 0
), "Coordinates need to have at least one attribute"
eps = kwargs.get("eps", get_epsilon(coordinates))
if clustering_algorithm == DBSCAN:
kwargs["eps"] = eps
# get clusters
clusters = cluster_data(coordinates, clustering_algorithm, **kwargs)
# assign top n labels to group
annotations_cl, item_annotations = assign_labels(
clusters, annotations, labels_per_cluster
)
labels_loc = labels_locations(coordinates, clusters)
concave_hull = compute_concave_hulls(coordinates, clusters, eps)
# crate the dictionary with annotations, labels locations, and hulls for
# each cluster
clusters_meta = {}
# note clusters without labels are not added to the dictionary
for cl in annotations_cl.keys():
clusters_meta[cl] = (annotations_cl[cl], labels_loc[cl], concave_hull[cl])
# we do not want to color clusters that does not have labels so we
# remove clusters without labels from the table
clusters, clusters_meta = _filter_clusters(clusters, clusters_meta)
# add the labels to the cluster table
clusters_ann = Table(
Domain(clusters.domain.attributes + item_annotations.domain.attributes),
np.concatenate((clusters.X, item_annotations.X), axis=1),
)
return clusters_ann, clusters_meta, eps
[docs]def cluster_additional_points(coordinates, hulls, cluster_attribute=None):
"""
This function receives additional points and assign them current existing
clusters based on current concave hull.
Parameters
----------
coordinates : Orange.data.Table
Visualisation coordinates - embeddings
hulls : dict
Concave hull for each cluster
cluster_attribute : Orange.data.DiscreteVariable (optional)
A variable for clusters. If cluster_attribute is provided it will be
used in the creation of the resulting Table.
Returns
-------
Orange.data.Table
Cluster label for each point
"""
def point_in_polygon_test(test_point, polygon_points):
"""
This function uses the horizontal ray casting to find out if the point
is in the hull/polygon. For each point, it tests how many times the
horizontal ray from test_point to infinity crosses the polygon edge. If
it happens odd many times the point is in the polygon.
https://stackoverflow.com/a/2922778/3551700
"""
test_x = test_point[0]
test_y = test_point[1]
# flipping bool from True to False is similar to counting odd numbers
# of intersections. If it will be True at the end odd number of
# intersections happened
is_inside = False
for (x1, y1), (x2, y2) in zip(
polygon_points,
np.concatenate((polygon_points[1:], polygon_points[:1]), axis=0),
):
# ray crosses the edge if test_y between both y from an edge
# and if intersection on the right of the test_x
if (y1 > test_y) != (y2 > test_y):
# compute the intersection between the horizontal ray and
# polygon edge
intersection_x = (x2 - x1) * (test_y - y1) / (y2 - y1) + x1
if test_x < intersection_x:
is_inside = not is_inside
return is_inside
clusters = [None] * len(coordinates)
for cluster, hull in hulls.items():
for i, c in enumerate(coordinates.X):
if point_in_polygon_test(c, hull):
clusters[i] = cluster
if cluster_attribute is not None:
assert all(
i in cluster_attribute.values for i in set(clusters) - {None}
), "cluster_attribute does not have all required values."
# create the table
new_domain = Domain(
[
DiscreteVariable("Clusters", values=sorted(hulls.keys()))
if cluster_attribute is None
else cluster_attribute
]
)
return Table(
new_domain, np.array(list(map(new_domain[0].to_val, clusters))).reshape(-1, 1)
)
if __name__ == "__main__":
# run hull creation at Iris data
data = Table("iris")[:, 2:4]
clustered_data = Table(
Domain([DiscreteVariable("cl", values=["1", "2", "3"])]),
[[0]] * 50 + [[1]] * 50 + [[2]] * 50,
)
compute_concave_hulls(data, clustered_data, epsilon=0.5)