| @@ -0,0 +1,141 @@ | |||
| #!/usr/bin/env python3 | |||
| # -*- coding: utf-8 -*- | |||
| """ | |||
| Created on Mon May 11 11:03:01 2020 | |||
| @author: ljia | |||
| """ | |||
| import numpy as np | |||
| from sklearn.model_selection import ShuffleSplit | |||
| from sklearn.neighbors import KNeighborsClassifier | |||
| from sklearn.metrics import accuracy_score | |||
| from gklearn.utils.utils import get_graph_kernel_by_name | |||
| # from gklearn.preimage.utils import get_same_item_indices | |||
| def sum_squares(a, b): | |||
| """ | |||
| Return the sum of squares of the difference between a and b, aka MSE | |||
| """ | |||
| return np.sum([(a[i] - b[i])**2 for i in range(len(a))]) | |||
| def euclid_d(x, y): | |||
| """ | |||
| 1D euclidean distance | |||
| """ | |||
| return np.sqrt((x-y)**2) | |||
| def man_d(x, y): | |||
| """ | |||
| 1D manhattan distance | |||
| """ | |||
| return np.abs((x-y)) | |||
| def knn_regression(D_app, D_test, y_app, y_test, n_neighbors, verbose=True, text=None): | |||
| from sklearn.neighbors import KNeighborsRegressor | |||
| knn = KNeighborsRegressor(n_neighbors=n_neighbors, metric='precomputed') | |||
| knn.fit(D_app, y_app) | |||
| y_pred = knn.predict(D_app) | |||
| y_pred_test = knn.predict(D_test.T) | |||
| perf_app = np.sqrt(sum_squares(y_pred, y_app)/len(y_app)) | |||
| perf_test = np.sqrt(sum_squares(y_pred_test, y_test)/len(y_test)) | |||
| if (verbose): | |||
| print("Learning error with {} train examples : {}".format(text, perf_app)) | |||
| print("Test error with {} train examples : {}".format(text, perf_test)) | |||
| return perf_app, perf_test | |||
| def knn_classification(d_app, d_test, y_app, y_test, n_neighbors, verbose=True, text=None): | |||
| knn = KNeighborsClassifier(n_neighbors=n_neighbors, metric='precomputed') | |||
| knn.fit(d_app, y_app) | |||
| y_pred = knn.predict(d_app) | |||
| y_pred_test = knn.predict(d_test.T) | |||
| perf_app = accuracy_score(y_app, y_pred) | |||
| perf_test = accuracy_score(y_test, y_pred_test) | |||
| if (verbose): | |||
| print("Learning accuracy with {} costs : {}".format(text, perf_app)) | |||
| print("Test accuracy with {} costs : {}".format(text, perf_test)) | |||
| return perf_app, perf_test | |||
| def knn_cv(dataset, kernel_options, trainset=None, n_neighbors=1, n_splits=50, test_size=0.9, verbose=True): | |||
| ''' | |||
| Perform a knn classification cross-validation on given dataset. | |||
| ''' | |||
| # Gn = dataset.graphs | |||
| y_all = dataset.targets | |||
| # compute kernel distances. | |||
| dis_mat = __compute_kernel_distances(dataset, kernel_options, trainset=trainset) | |||
| rs = ShuffleSplit(n_splits=n_splits, test_size=test_size, random_state=0) | |||
| # train_indices = [[] for _ in range(n_splits)] | |||
| # test_indices = [[] for _ in range(n_splits)] | |||
| # idx_targets = get_same_item_indices(y_all) | |||
| # for key, item in idx_targets.items(): | |||
| # i = 0 | |||
| # for train_i, test_i in rs.split(item): # @todo: careful when parallel. | |||
| # train_indices[i] += [item[idx] for idx in train_i] | |||
| # test_indices[i] += [item[idx] for idx in test_i] | |||
| # i += 1 | |||
| accuracies = [] | |||
| # for trial in range(len(train_indices)): | |||
| # train_index = train_indices[trial] | |||
| # test_index = test_indices[trial] | |||
| for train_index, test_index in rs.split(y_all): | |||
| # print(train_index, test_index) | |||
| # G_app = [Gn[i] for i in train_index] | |||
| # G_test = [Gn[i] for i in test_index] | |||
| y_app = [y_all[i] for i in train_index] | |||
| y_test = [y_all[i] for i in test_index] | |||
| N = len(train_index) | |||
| d_app = dis_mat.copy() | |||
| d_app = d_app[train_index,:] | |||
| d_app = d_app[:,train_index] | |||
| d_test = np.zeros((N, len(test_index))) | |||
| for i in range(N): | |||
| for j in range(len(test_index)): | |||
| d_test[i, j] = dis_mat[train_index[i], test_index[j]] | |||
| accuracies.append(knn_classification(d_app, d_test, y_app, y_test, n_neighbors, verbose=verbose, text='')) | |||
| results = {} | |||
| results['ave_perf_train'] = np.mean([i[0] for i in accuracies], axis=0) | |||
| results['std_perf_train'] = np.std([i[0] for i in accuracies], axis=0, ddof=1) | |||
| results['ave_perf_test'] = np.mean([i[1] for i in accuracies], axis=0) | |||
| results['std_perf_test'] = np.std([i[1] for i in accuracies], axis=0, ddof=1) | |||
| return results | |||
| def __compute_kernel_distances(dataset, kernel_options, trainset=None): | |||
| graph_kernel = get_graph_kernel_by_name(kernel_options['name'], | |||
| node_labels=dataset.node_labels, | |||
| edge_labels=dataset.edge_labels, | |||
| node_attrs=dataset.node_attrs, | |||
| edge_attrs=dataset.edge_attrs, | |||
| ds_infos=dataset.get_dataset_infos(keys=['directed']), | |||
| kernel_options=kernel_options) | |||
| gram_matrix, run_time = graph_kernel.compute(dataset.graphs, **kernel_options) | |||
| dis_mat, _, _, _ = graph_kernel.compute_distance_matrix() | |||
| if trainset is not None: | |||
| gram_matrix_unnorm = graph_kernel.gram_matrix_unnorm | |||
| return dis_mat | |||