# Copyright (c) 2023 Vassilis Digalakis Jr, Christos Ziakas
# Licensed under the MIT License.
from typing import List, Optional, Tuple
import numpy as np
from pulp import PULP_CBC_CMD, LpBinary, LpMinimize, LpProblem, LpVariable, lpSum
[docs]class MIOClustering:
"""
Class for solving clustering problems using Mixed-Integer Optimization.
"""
def __init__(
self,
n_clusters: int = None,
time_limit: float = 1200,
ls_pairs_diff_cluster: Optional[List[Tuple[int, int]]] = None,
ls_pairs_same_cluster: Optional[List[Tuple[int, int]]] = None,
):
self.n_clusters = n_clusters
self.ls_pairs_diff_cluster = ls_pairs_diff_cluster
self.ls_pairs_same_cluster = ls_pairs_same_cluster
self.time_limit = time_limit
self.model = LpProblem("Clustering MIO", LpMinimize)
self.z = None # For storing solution for z variables
self.y = None # For storing solution for y variables
self.cluster_means = None
[docs] @staticmethod
def euclidean_distance(point1: np.ndarray, point2: np.ndarray) -> float:
return np.linalg.norm(point1 - point2)
def _initialize_variables(self, num_points: int):
"""
Initialize the decision variables for the optimization problem.
Args:
num_points (int): The number of data points.
Returns:
Tuple: A tuple containing the dictionaries of z and y variables.
"""
z = LpVariable.dicts(
"z",
[
(i, j, k)
for i in range(num_points - 1)
for j in range(i + 1, num_points)
for k in range(self.n_clusters)
],
0,
1,
LpBinary,
)
y = LpVariable.dicts(
"y",
[(i, k) for i in range(num_points) for k in range(self.n_clusters)],
0,
1,
LpBinary,
)
return z, y
def _calculate_distances_noise(self, X: np.ndarray) -> np.ndarray:
"""
Calculate and return the matrix of pairwise distances with added noise.
Args:
X (np.ndarray): The input feature matrix.
Returns:2
np.ndarray: The matrix of pairwise distances with noise.
"""
distances = np.linalg.norm(X[:, np.newaxis] - X, axis=2)
min_dist = np.min(distances[np.nonzero(distances)])
noise = 0.1 * min_dist * (2 * np.random.rand(X.shape[0], X.shape[0], self.n_clusters) - 1)
return distances[:, :, np.newaxis] + noise
def _calculate_distances(self, X: np.ndarray) -> np.ndarray:
"""
Calculate and return the matrix of pairwise distances
Args:
X (np.ndarray): The input feature matrix.
Returns:2
np.ndarray: The matrix of pairwise distances with noise.
"""
distances = np.linalg.norm(X[:, np.newaxis] - X, axis=2)
return np.tile(distances[:, :, np.newaxis], (1, 1, self.n_clusters))
def _add_constraints(self, num_points: int, z: dict, y: dict, coef: np.ndarray, b: int):
"""
Add constraints to the optimization model.
Args:
num_points (int): The number of data points.
z (dict): The decision variables representing pair assignments.
y (dict): The decision variables representing individual assignments.
coef (np.ndarray): Coefficient matrix for the objective function.
b (int): Minimum number of points per cluster.
"""
# Objective
z_opt, y_opt = self._initialize_variables(num_points)
if self.ls_pairs_diff_cluster:
for (i, j) in self.ls_pairs_diff_cluster:
for k in range(self.n_clusters):
z_opt[i, j, k].setInitialValue(0)
z_opt[i, j, k].fixValue()
self.model += lpSum(
z_opt[i, j, k] * coef[i, j, k]
for i in range(num_points - 1)
for j in range(i + 1, num_points)
for k in range(self.n_clusters)
)
# Each point is assigned to exactly one cluster
for i in range(num_points):
self.model += lpSum(y_opt[i, k] for k in range(self.n_clusters)) == 1
# Each cluster has at least b points
for k in range(self.n_clusters):
self.model += lpSum(y_opt[i, k] for i in range(num_points)) >= b
# Relationship between y and z variables
for i in range(num_points - 1):
for j in range(i + 1, num_points):
for k in range(self.n_clusters):
self.model += z_opt[i, j, k] <= y_opt[i, k]
self.model += z_opt[i, j, k] <= y_opt[j, k]
self.model += z_opt[i, j, k] >= y_opt[i, k] + y_opt[j, k] - 1
# Exclusion constraints
if self.ls_pairs_diff_cluster:
for (i, j) in self.ls_pairs_diff_cluster:
for k in range(self.n_clusters):
self.model += y_opt[i, k] + y_opt[j, k] <= 1
# Inclusion constraints
if self.ls_pairs_same_cluster:
for (i, j) in self.ls_pairs_same_cluster:
for k in range(self.n_clusters):
self.model += y_opt[i, k] - y_opt[j, k] == 0
[docs] def fit(self, X: np.ndarray, y: Optional[np.ndarray] = None):
"""
Fit the model to the given data using Mixed-Integer Optimization.
Args:
X (np.ndarray): The input feature matrix.
y (Optional[np.ndarray]): The target vector (not used in this model).
"""
num_points = len(X)
b = int((num_points / self.n_clusters) * 0.1) # Minimum number of points per cluster
coef = self._calculate_distances_noise(X)
self._add_constraints(num_points, self.z, self.y, coef, b)
solver = PULP_CBC_CMD(timeLimit=self.time_limit, warmStart=True)
# Solve the problem
self.model.solve(solver)
self.y = np.zeros((num_points, self.n_clusters))
self.z = np.zeros((num_points, num_points, self.n_clusters))
for v in self.model.variables():
var_value = v.varValue
var_name = v.name
if var_name.startswith("y_"):
# Parse the indices for y
i, k = (
var_name.replace("(", "")
.replace(")", "")
.replace("y_", "")
.replace(",", "")
.split("_")
)
i, k = int(i), int(k)
self.y[i, k] = var_value
elif var_name.startswith("z_"):
# Parse the indices for z
i, j, k = (
var_name.replace("(", "")
.replace(")", "")
.replace("z_", "")
.replace(",", "")
.split("_")
)
i, j, k = int(i), int(j), int(k)
self.z[i, j, k] = var_value
# Extract and store cluster means
self.labels = self._get_cluster_assingments(X.shape[0])
self.cluster_centers = self._compute_cluster_centers(X)
self.wcss = self._compute_wcss(X)
self.silhouette_score = self._compute_silhouette_score(X)
def _get_cluster_assingments(self, n_rows: int) -> np.ndarray:
"""
Predict cluster assignments for new data points based on stored cluster means.
Args:
new_data (np.ndarray): The new data points for which predictions are to be made.
Returns:
np.ndarray: An array of cluster assignments for the new data points.
"""
cluster_assignments = np.zeros(n_rows, dtype=int)
for i in range(n_rows):
cluster_assignments[i] = np.argmax(self.y[i, :]) # np.argmin(distances)
return cluster_assignments
def _compute_wcss(self, X: np.ndarray) -> float:
"""
Compute the Within-Cluster Sum of Squares (WCSS) for the fitted model.
Args:
X (np.ndarray): The input feature matrix used for fitting the model.
Returns:
float: The computed WCSS value.
Raises:
ValueError: If the model has not been fitted yet or if cluster means are not available.
"""
wcss = 0.0
cluster_labels_pred = self.labels
for cluster_idx in range(self.n_clusters):
cluster_points = X[cluster_labels_pred == cluster_idx]
wcss += np.sum((cluster_points - self.cluster_centers[cluster_idx]) ** 2)
return wcss
def _compute_cluster_centers(self, X: np.ndarray) -> np.ndarray:
"""
Extract cluster means after fitting the model.
Args:
X (np.ndarray): The input feature matrix.
Returns:
np.ndarray: An array of cluster means.
"""
# Initialize an array to store means
cluster_centers = np.zeros((self.n_clusters, X.shape[1]))
for i in range(X.shape[0]):
for k in range(self.n_clusters):
cluster_points = [] # List to store data points assigned to the current cluster
if self.labels[i] == k:
cluster_points.append(X[i, :])
if cluster_points:
cluster_centers[k] = np.mean(cluster_points, axis=0)
return cluster_centers
def _compute_silhouette_score(self, X: np.ndarray) -> float:
""" """
from sklearn.metrics import silhouette_score
silhouette_avg = silhouette_score(X, self.labels)
return silhouette_avg
[docs] def predict(self, X: np.ndarray) -> np.ndarray:
"""
Predict cluster assignments for new data points based on stored cluster means.
Args:
new_data (np.ndarray): The new data points for which predictions are to be made.
Returns:
np.ndarray: An array of cluster assignments for the new data points.
"""
num_new_points = len(X)
n_clusters = self.n_clusters
cluster_assignments = np.zeros(num_new_points, dtype=int)
for i in range(num_new_points):
# Calculate distances between the new data point and cluster means
distances = [np.linalg.norm(X[i] - self.cluster_centers[k]) for k in range(n_clusters)]
cluster_assignments[i] = np.argmin(distances)
return cluster_assignments