From 24484decfb323ecb9af82bf74e78408909920496 Mon Sep 17 00:00:00 2001 From: KStruniawski <103257344+KStruniawski@users.noreply.github.com> Date: Thu, 25 Apr 2024 13:38:22 +0200 Subject: [PATCH] fix in KELMLayer (inv instead of pinv) --- Layers/KELMLayer.py | 358 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 344 insertions(+), 14 deletions(-) diff --git a/Layers/KELMLayer.py b/Layers/KELMLayer.py index 27b1bb3..b76c917 100644 --- a/Layers/KELMLayer.py +++ b/Layers/KELMLayer.py @@ -1,4 +1,7 @@ import numpy as np +from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering +from sklearn.feature_selection import mutual_info_regression +from sklearn.neighbors import NearestNeighbors from Resources.ActivationFunction import ActivationFunction import tensorflow as tf @@ -92,8 +95,9 @@ class KELMLayer: >>> model = KELMModel(layer) """ - def __init__(self, kernel: Kernel, activation='tanh', act_params=None, C=1.0, - nystrom_approximation=False, landmark_selection_method='random', **params): + def __init__(self, kernel: Kernel, activation='tanh', act_params=None, C=0.0, + nystrom_approximation=False, landmark_selection_method='random', + random_pct=0.1, **params): self.K = None self.error_history = None self.feature_map = None @@ -139,6 +143,8 @@ def __init__(self, kernel: Kernel, activation='tanh', act_params=None, C=1.0, else: self.kernel = kernel + self.random_pct = random_pct + def build(self, input_shape): """ Build the layer with the given input shape. @@ -183,23 +189,29 @@ def fit(self, x, y): x = tf.cast(x, dtype=tf.float32) self.input = x + n_samples = int(self.random_pct * x.shape[0]) + if self.nystrom_approximation: - num_rows = tf.shape(x)[0] - shuffled_indices = tf.random.shuffle(tf.range(num_rows)) - selected_indices = shuffled_indices[:100] - L = tf.gather(x, selected_indices) + if self.landmark_selection_method != "stratified" and \ + self.landmark_selection_method != "information_gain_based": + L = eval(f"{self.landmark_selection_method}_sampling(x, n_samples)") + else: + y_new = tf.argmax(y, axis=1) + y_new = tf.cast(y_new, dtype=tf.int32) + L = eval(f"{self.landmark_selection_method}_sampling(x, y_new, n_samples)") C = calculate_pairwise_distances_vector(x, L, self.kernel.ev) W = calculate_pairwise_distances(L, self.kernel.ev) - self.K = tf.matmul(tf.matmul(C, tf.linalg.inv(W)), C, transpose_b=True) - else: - self.K = calculate_pairwise_distances(x, self.kernel.ev) - - if self.C is not None: - self.K = tf.linalg.set_diag(self.K, tf.linalg.diag_part(self.K) + self.C) + diagonal = tf.linalg.diag_part(W) + diagonal_with_small_value = diagonal + 0.00001 + W = tf.linalg.set_diag(W, diagonal_with_small_value) + K = tf.matmul(tf.matmul(C, tf.linalg.inv(W)), C, transpose_b=True) else: - self.K = tf.linalg.set_diag(self.K, tf.linalg.diag_part(self.K)) + K = calculate_pairwise_distances(x, self.kernel.ev) - self.K = tf.linalg.pinv(self.K) + diagonal = tf.linalg.diag_part(K) + diagonal_with_small_value = diagonal + 0.1 + K = tf.linalg.set_diag(K, diagonal_with_small_value) + self.K = tf.linalg.inv(K) self.beta = tf.matmul(self.K, y) # self.output = self.activation(tf.matmul(x, self.beta, transpose_b=True)) @@ -346,4 +358,322 @@ def load(cls, attributes): return layer +# Random Sampling +def random_sampling(data, n_samples): + num_rows = tf.shape(data)[0] + selected_indices = tf.random.shuffle(tf.range(num_rows))[:n_samples] + sampled_data = tf.gather(data, selected_indices) + return sampled_data + + +# Uniform Sampling +def uniform_sampling(data, n_samples): + num_rows = tf.shape(data)[0] + indices = tf.range(num_rows) + shuffled_indices = tf.random.shuffle(indices) + selected_indices = shuffled_indices[:n_samples] + sampled_data = tf.gather(data, selected_indices) + return sampled_data + + +# K-Means Clustering Sampling +def kmeans_sampling(data, n_samples): + data_np = data.numpy() + kmeans = KMeans(n_clusters=n_samples, random_state=0) + kmeans.fit(data_np) + centroids = kmeans.cluster_centers_ + centroids_tensor = tf.convert_to_tensor(centroids, dtype=tf.float32) + return centroids_tensor + + +def stratified_sampling(data, labels, n_samples_per_class): + # Get unique class labels + unique_labels, _ = tf.unique(labels) + n_samples_per_class = int(n_samples_per_class / len(unique_labels)) + + # Initialize list to store sampled landmarks + sampled_landmarks = [] + + # Sample landmarks from each stratum (class) + for label in unique_labels: + # Get indices of data points with the current label + indices = tf.where(tf.equal(labels, label))[:, 0] + + # Sample landmarks from the current stratum + sampled_indices = tf.random.shuffle(indices)[:n_samples_per_class] + + # Add sampled data points to the list + sampled_landmarks.extend(tf.gather(data, sampled_indices)) + + # Convert sampled landmarks to tensor + sampled_landmarks_tensor = tf.convert_to_tensor(sampled_landmarks) + + return sampled_landmarks_tensor + + +def greedy_sampling(data, n_samples): + # Initialize list to store sampled landmarks + sampled_landmarks = [] + + # Compute the leverage scores + q, _ = tf.linalg.qr(data, full_matrices=False) + leverage_scores = tf.reduce_sum(tf.square(q), axis=1) + + # Greedily select points with highest leverage scores + for _ in range(n_samples): + max_index = tf.argmax(leverage_scores) + sampled_landmarks.append(data[max_index]) + leverage_scores = tf.tensor_scatter_nd_update(leverage_scores, [[max_index]], [0.]) + + # Convert sampled landmarks to tensor + sampled_landmarks_tensor = tf.convert_to_tensor(sampled_landmarks) + + return sampled_landmarks_tensor + + +def farthest_first_traversal_sampling(data, n_samples): + # Initialize list to store sampled landmarks + sampled_landmarks = [] + + # Randomly select the first landmark + initial_index = tf.random.uniform((), maxval=tf.shape(data)[0], dtype=tf.int32) + sampled_landmarks.append(data[initial_index]) + + # Compute pairwise distances between data points and the selected landmarks + distances = tf.norm(data - sampled_landmarks[0], axis=1) + + for _ in range(1, n_samples): + # Find the data point farthest from the selected landmarks + farthest_index = tf.argmax(distances) + farthest_point = data[farthest_index] + + # Update sampled landmarks and distances + sampled_landmarks.append(farthest_point) + distances = tf.minimum(distances, tf.norm(data - farthest_point, axis=1)) + + # Convert sampled landmarks to tensor + sampled_landmarks_tensor = tf.convert_to_tensor(sampled_landmarks) + + return sampled_landmarks_tensor + + +def spectral_sampling(data, n_samples): + # Compute the kernel matrix using a Gaussian kernel + kernel_matrix = tf.exp(-tf.norm(data[:, None] - data[None, :], axis=-1) ** 2) + + # Compute the eigenvectors and eigenvalues of the kernel matrix + eigenvalues, eigenvectors = tf.linalg.eigh(kernel_matrix) + + # Select points corresponding to the top eigenvectors as landmarks + indices = tf.argsort(eigenvalues, direction='DESCENDING')[:n_samples] + sampled_landmarks = tf.gather(data, indices) + + return sampled_landmarks + + +def density_based_sampling(data, n_samples): + # Convert data to TensorFlow tensor + data_tensor = tf.convert_to_tensor(data, dtype=tf.float32) + + # Apply DBSCAN to identify dense regions + dbscan = DBSCAN(eps=0.5, min_samples=5) + dbscan.fit(data) + + # Find the indices of samples that belong to dense regions + dense_indices = np.where(dbscan.labels_ != -1)[0] + + # Convert dense indices to TensorFlow tensor + dense_indices_tensor = tf.constant(dense_indices, dtype=tf.int32) + + # Randomly select samples from dense regions + sampled_indices = tf.random.shuffle(dense_indices_tensor)[:n_samples] + + # Extract sampled landmarks + sampled_landmarks = tf.gather(data_tensor, sampled_indices) + + return sampled_landmarks + + +def hierarchical_clustering_sampling(data, n_samples): + # Convert data to TensorFlow tensor + data_tensor = tf.convert_to_tensor(data, dtype=tf.float32) + + # Apply Agglomerative Clustering to build hierarchical clusters + clustering = AgglomerativeClustering(n_clusters=n_samples, linkage='ward') + clustering.fit(data) + + # Get the indices of cluster centers + cluster_centers_indices = np.unique(clustering.labels_) + + # Randomly select cluster centers as landmarks + sampled_cluster_centers = np.random.choice(cluster_centers_indices, size=n_samples, replace=False) + + # Convert sampled cluster centers to TensorFlow tensor + sampled_cluster_centers_tensor = tf.constant(sampled_cluster_centers, dtype=tf.int32) + + # Extract sampled landmarks + sampled_landmarks = tf.gather(data_tensor, sampled_cluster_centers_tensor) + + return sampled_landmarks + + +def entropy_based_sampling(data, n_samples): + # Convert data to TensorFlow tensor + data_tensor = tf.convert_to_tensor(data, dtype=tf.float32) + + # Compute entropy for each data point + entropy = -tf.reduce_sum(data_tensor * tf.math.log(data_tensor), axis=1) + + # Get indices of top entropy points + top_indices = tf.argsort(entropy, direction='DESCENDING')[:n_samples] + + # Extract sampled landmarks + sampled_landmarks = tf.gather(data_tensor, top_indices) + + return sampled_landmarks + + +def mutual_information_based_sampling(data, n_samples): + data_tensor = tf.convert_to_tensor(data, dtype=tf.float32) + + # Calculate mutual information between each pair of data points + mutual_info = np.zeros((data.shape[0], data.shape[0])) + for i in range(data.shape[1]): + for j in range(data.shape[1]): + # Convert data slices to numpy arrays for mutual_info_regression + mi_data_i = data[:, i].numpy().reshape(-1, 1) + mi_data_j = data[:, j].numpy().reshape(-1, 1) + mutual_info[:, i] += mutual_info_regression(mi_data_i, mi_data_j.ravel()) # Use ravel() here + + # Sum mutual information across features + mutual_info_sum = tf.reduce_sum(mutual_info, axis=1) + + # Get indices of top mutual information points + top_indices = tf.argsort(mutual_info_sum, direction='DESCENDING')[:n_samples] + + # Extract sampled landmarks + sampled_landmarks = tf.gather(data_tensor, top_indices) + + return sampled_landmarks + + +def conditional_mutual_information_based_sampling(data, n_samples, n_neighbors=5): + # Convert data to TensorFlow tensor + data_tensor = tf.convert_to_tensor(data, dtype=tf.float32) + + # Initialize NearestNeighbors model + nbrs = NearestNeighbors(n_neighbors=n_neighbors).fit(data) + + # Find nearest neighbors for each data point + distances, indices = nbrs.kneighbors(data) + + # Calculate conditional mutual information + cmi_values = [] + for i in range(data.shape[0]): + # Calculate conditional entropy H(X|Y,Z) for each data point + conditional_entropy = [] + for j in range(data.shape[0]): + if j != i: # Exclude the current data point + # Calculate distances to neighbors excluding the current data point + distances_j = distances[j][1:] # Exclude the first element (distance to self) + mean_distance_j = tf.reduce_mean(distances_j) + + # Find nearest neighbors of data point i excluding itself + neighbors_i = nbrs.kneighbors([data[j]], return_distance=False)[0][ + 1:] # Exclude the first element (index of self) + + # Calculate mean distance from data point i to its neighbors, conditioned on data point j + mean_distance_i_given_j = tf.reduce_mean(tf.gather(distances[i], neighbors_i)) + + # Calculate conditional entropy H(X|Y,Z) + conditional_entropy.append(tf.math.log(mean_distance_i_given_j / mean_distance_j)) + + # Calculate conditional mutual information I(X;Y|Z) + cmi_values.append(tf.reduce_sum(conditional_entropy)) + + # Get indices of top conditional mutual information points + top_indices = tf.argsort(cmi_values, direction='DESCENDING')[:n_samples] + + # Extract sampled entries + sampled_entries = tf.gather(data_tensor, top_indices) + + return sampled_entries + + +def joint_entropy_based_sampling(data, n_samples, subset_size): + # Convert data to TensorFlow tensor + data_tensor = tf.convert_to_tensor(data, dtype=tf.float32) + + # Calculate number of subsets + num_subsets = data.shape[0] - subset_size + 1 + + # Calculate joint entropy for each subset + joint_entropies = [] + for i in range(num_subsets): + subset = data_tensor[i:i + subset_size] # Extract subset + # Compute joint entropy of the subset + subset_entropy = -tf.reduce_sum(subset * tf.math.log(subset + 1e-10)) # Add a small epsilon to prevent log(0) + joint_entropies.append(subset_entropy) + + # Find subsets with high joint entropy + high_entropy_indices = tf.argsort(joint_entropies, direction='DESCENDING')[:n_samples] + + # Sample entries from high-entropy subsets + sampled_entries = tf.gather(data_tensor, high_entropy_indices) + + return sampled_entries + + +def compute_entropy(labels): + # Ensure labels is a 1D vector + labels = tf.reshape(labels, [-1]) + unique_labels, _ = tf.unique(labels) + label_counts = tf.math.bincount(unique_labels) + probabilities = tf.cast(label_counts, tf.float32) / tf.cast(tf.size(labels), tf.float32) + entropy = -tf.reduce_sum(probabilities * tf.math.log(probabilities)) + return entropy + + +def compute_information_gain(data_point, labels): + # Assuming binary classification, modify as needed for other tasks + # Split data point into two subsets based on a threshold (e.g., median value) + threshold = tf.reduce_mean(data_point) + subset1_indices = tf.where(data_point < threshold) + subset2_indices = tf.where(data_point >= threshold) + + if tf.size(subset1_indices) == 0 or tf.size(subset2_indices) == 0: + # If one of the subsets is empty, return 0 information gain + return tf.constant(0.0) + + subset1_labels = tf.gather(tf.reshape(labels, [-1]), subset1_indices) + subset2_labels = tf.gather(tf.reshape(labels, [-1]), subset2_indices) + + # Calculate entropy for original labels + original_entropy = compute_entropy(labels) + + # Calculate weighted average of entropies of subsets + subset1_entropy = compute_entropy(subset1_labels) + subset2_entropy = compute_entropy(subset2_labels) + + scale1 = (tf.size(subset1_labels) / tf.size(labels)) + scale2 = (tf.size(subset2_labels) / tf.size(labels)) + # Calculate information gain (entropy reduction) + information_gain = original_entropy - tf.cast(scale1, dtype=tf.float32) * subset1_entropy \ + - tf.cast(scale2, dtype=tf.float32) * subset2_entropy + + return information_gain + + +def information_gain_based_sampling(data, labels, n_samples): + # Calculate information gain for each data point + information_gains = [] + for i in range(data.shape[0]): + point = data[i] # Extract data point + information_gain = compute_information_gain(point, labels) + # Append information gain to the list + information_gains.append(information_gain) + + # Find data points with high information gain + high_gain_indices = tf.argsort(information_gains, direction='DESCENDING')[:n_samples] + return tf.gather(data, high_gain_indices) \ No newline at end of file