| @@ -0,0 +1,603 @@ | |||||
| #!/usr/bin/env python3 | |||||
| # -*- coding: utf-8 -*- | |||||
| """ | |||||
| Created on Fri Apr 10 18:33:13 2020 | |||||
| @author: ljia | |||||
| @references: | |||||
| [1] Liva Ralaivola, Sanjay J Swamidass, Hiroto Saigo, and Pierre | |||||
| Baldi. Graph kernels for chemical informatics. Neural networks, | |||||
| 18(8):1093–1110, 2005. | |||||
| """ | |||||
| import sys | |||||
| from multiprocessing import Pool | |||||
| from tqdm import tqdm | |||||
| import numpy as np | |||||
| import networkx as nx | |||||
| from collections import Counter | |||||
| from functools import partial | |||||
| from gklearn.utils import SpecialLabel | |||||
| from gklearn.utils.parallel import parallel_gm, parallel_me | |||||
| from gklearn.kernels import GraphKernel | |||||
| from gklearn.utils import Trie | |||||
| class PathUpToH(GraphKernel): # @todo: add function for k_func == None | |||||
| def __init__(self, **kwargs): | |||||
| GraphKernel.__init__(self) | |||||
| self.__node_labels = kwargs.get('node_labels', []) | |||||
| self.__edge_labels = kwargs.get('edge_labels', []) | |||||
| self.__depth = int(kwargs.get('depth', 10)) | |||||
| self.__k_func = kwargs.get('k_func', 'MinMax') | |||||
| self.__compute_method = kwargs.get('compute_method', 'trie') | |||||
| self.__ds_infos = kwargs.get('ds_infos', {}) | |||||
| def _compute_gm_series(self): | |||||
| self.__add_dummy_labels(self._graphs) | |||||
| from itertools import combinations_with_replacement | |||||
| itr_kernel = combinations_with_replacement(range(0, len(self._graphs)), 2) | |||||
| if self._verbose >= 2: | |||||
| iterator_ps = tqdm(range(0, len(self._graphs)), desc='getting paths', file=sys.stdout) | |||||
| iterator_kernel = tqdm(itr_kernel, desc='calculating kernels', file=sys.stdout) | |||||
| else: | |||||
| iterator_ps = range(0, len(self._graphs)) | |||||
| iterator_kernel = itr_kernel | |||||
| gram_matrix = np.zeros((len(self._graphs), len(self._graphs))) | |||||
| if self.__compute_method == 'trie': | |||||
| all_paths = [self.__find_all_path_as_trie(self._graphs[i]) for i in iterator_ps] | |||||
| for i, j in iterator_kernel: | |||||
| kernel = self.__kernel_do_trie(all_paths[i], all_paths[j]) | |||||
| gram_matrix[i][j] = kernel | |||||
| gram_matrix[j][i] = kernel | |||||
| else: | |||||
| all_paths = [self.__find_all_paths_until_length(self._graphs[i]) for i in iterator_ps] | |||||
| for i, j in iterator_kernel: | |||||
| kernel = self.__kernel_do_naive(all_paths[i], all_paths[j]) | |||||
| gram_matrix[i][j] = kernel | |||||
| gram_matrix[j][i] = kernel | |||||
| return gram_matrix | |||||
| def _compute_gm_imap_unordered(self): | |||||
| self.__add_dummy_labels(self._graphs) | |||||
| # get all paths of all graphs before calculating kernels to save time, | |||||
| # but this may cost a lot of memory for large datasets. | |||||
| pool = Pool(self._n_jobs) | |||||
| itr = zip(self._graphs, range(0, len(self._graphs))) | |||||
| if len(self._graphs) < 100 * self._n_jobs: | |||||
| chunksize = int(len(self._graphs) / self._n_jobs) + 1 | |||||
| else: | |||||
| chunksize = 100 | |||||
| all_paths = [[] for _ in range(len(self._graphs))] | |||||
| if self.__compute_method == 'trie' and self.__k_func is not None: | |||||
| get_ps_fun = self._wrapper_find_all_path_as_trie | |||||
| elif self.__compute_method != 'trie' and self.__k_func is not None: | |||||
| get_ps_fun = partial(self._wrapper_find_all_paths_until_length, True) | |||||
| else: | |||||
| get_ps_fun = partial(self._wrapper_find_all_paths_until_length, False) | |||||
| if self._verbose >= 2: | |||||
| iterator = tqdm(pool.imap_unordered(get_ps_fun, itr, chunksize), | |||||
| desc='getting paths', file=sys.stdout) | |||||
| else: | |||||
| iterator = pool.imap_unordered(get_ps_fun, itr, chunksize) | |||||
| for i, ps in iterator: | |||||
| all_paths[i] = ps | |||||
| pool.close() | |||||
| pool.join() | |||||
| # compute Gram matrix. | |||||
| gram_matrix = np.zeros((len(self._graphs), len(self._graphs))) | |||||
| if self.__compute_method == 'trie' and self.__k_func is not None: | |||||
| def init_worker(trie_toshare): | |||||
| global G_trie | |||||
| G_trie = trie_toshare | |||||
| do_fun = self._wrapper_kernel_do_trie | |||||
| elif self.__compute_method != 'trie' and self.__k_func is not None: | |||||
| def init_worker(plist_toshare): | |||||
| global G_plist | |||||
| G_plist = plist_toshare | |||||
| do_fun = self._wrapper_kernel_do_naive | |||||
| else: | |||||
| def init_worker(plist_toshare): | |||||
| global G_plist | |||||
| G_plist = plist_toshare | |||||
| do_fun = self.__wrapper_kernel_do_kernelless # @todo: what is this? | |||||
| parallel_gm(do_fun, gram_matrix, self._graphs, init_worker=init_worker, | |||||
| glbv=(all_paths,), n_jobs=self._n_jobs, verbose=self._verbose) | |||||
| return gram_matrix | |||||
| def _compute_kernel_list_series(self, g1, g_list): | |||||
| self.__add_dummy_labels(g_list + [g1]) | |||||
| if self._verbose >= 2: | |||||
| iterator_ps = tqdm(g_list, desc='getting paths', file=sys.stdout) | |||||
| iterator_kernel = tqdm(range(len(g_list)), desc='calculating kernels', file=sys.stdout) | |||||
| else: | |||||
| iterator_ps = g_list | |||||
| iterator_kernel = range(len(g_list)) | |||||
| kernel_list = [None] * len(g_list) | |||||
| if self.__compute_method == 'trie': | |||||
| paths_g1 = self.__find_all_path_as_trie(g1) | |||||
| paths_g_list = [self.__find_all_path_as_trie(g) for g in iterator_ps] | |||||
| for i in iterator_kernel: | |||||
| kernel = self.__kernel_do_trie(paths_g1, paths_g_list[i]) | |||||
| kernel_list[i] = kernel | |||||
| else: | |||||
| paths_g1 = self.__find_all_paths_until_length(g1) | |||||
| paths_g_list = [self.__find_all_paths_until_length(g) for g in iterator_ps] | |||||
| for i in iterator_kernel: | |||||
| kernel = self.__kernel_do_naive(paths_g1, paths_g_list[i]) | |||||
| kernel_list[i] = kernel | |||||
| return kernel_list | |||||
| def _compute_kernel_list_imap_unordered(self, g1, g_list): | |||||
| self.__add_dummy_labels(g_list + [g1]) | |||||
| # get all paths of all graphs before calculating kernels to save time, | |||||
| # but this may cost a lot of memory for large datasets. | |||||
| pool = Pool(self._n_jobs) | |||||
| itr = zip(g_list, range(0, len(g_list))) | |||||
| if len(g_list) < 100 * self._n_jobs: | |||||
| chunksize = int(len(g_list) / self._n_jobs) + 1 | |||||
| else: | |||||
| chunksize = 100 | |||||
| paths_g_list = [[] for _ in range(len(g_list))] | |||||
| if self.__compute_method == 'trie' and self.__k_func is not None: | |||||
| paths_g1 = self.__find_all_path_as_trie(g1) | |||||
| get_ps_fun = self._wrapper_find_all_path_as_trie | |||||
| elif self.__compute_method != 'trie' and self.__k_func is not None: | |||||
| paths_g1 = self.__find_all_paths_until_length(g1) | |||||
| get_ps_fun = partial(self._wrapper_find_all_paths_until_length, True) | |||||
| else: | |||||
| paths_g1 = self.__find_all_paths_until_length(g1) | |||||
| get_ps_fun = partial(self._wrapper_find_all_paths_until_length, False) | |||||
| if self._verbose >= 2: | |||||
| iterator = tqdm(pool.imap_unordered(get_ps_fun, itr, chunksize), | |||||
| desc='getting paths', file=sys.stdout) | |||||
| else: | |||||
| iterator = pool.imap_unordered(get_ps_fun, itr, chunksize) | |||||
| for i, ps in iterator: | |||||
| paths_g_list[i] = ps | |||||
| pool.close() | |||||
| pool.join() | |||||
| # compute kernel list. | |||||
| kernel_list = [None] * len(g_list) | |||||
| def init_worker(p1_toshare, plist_toshare): | |||||
| global G_p1, G_plist | |||||
| G_p1 = p1_toshare | |||||
| G_plist = plist_toshare | |||||
| do_fun = self._wrapper_kernel_list_do | |||||
| def func_assign(result, var_to_assign): | |||||
| var_to_assign[result[0]] = result[1] | |||||
| itr = range(len(g_list)) | |||||
| len_itr = len(g_list) | |||||
| parallel_me(do_fun, func_assign, kernel_list, itr, len_itr=len_itr, | |||||
| init_worker=init_worker, glbv=(paths_g1, paths_g_list), method='imap_unordered', n_jobs=self._n_jobs, itr_desc='calculating kernels', verbose=self._verbose) | |||||
| return kernel_list | |||||
| def _wrapper_kernel_list_do(self, itr): | |||||
| if self.__compute_method == 'trie' and self.__k_func is not None: | |||||
| return itr, self.__kernel_do_trie(G_p1, G_plist[itr]) | |||||
| elif self.__compute_method != 'trie' and self.__k_func is not None: | |||||
| return itr, self.__kernel_do_naive(G_p1, G_plist[itr]) | |||||
| else: | |||||
| return itr, self.__kernel_do_kernelless(G_p1, G_plist[itr]) | |||||
| def _compute_single_kernel_series(self, g1, g2): | |||||
| self.__add_dummy_labels([g1] + [g2]) | |||||
| if self.__compute_method == 'trie': | |||||
| paths_g1 = self.__find_all_path_as_trie(g1) | |||||
| paths_g2 = self.__find_all_path_as_trie(g2) | |||||
| kernel = self.__kernel_do_trie(paths_g1, paths_g2) | |||||
| else: | |||||
| paths_g1 = self.__find_all_paths_until_length(g1) | |||||
| paths_g2 = self.__find_all_paths_until_length(g2) | |||||
| kernel = self.__kernel_do_naive(paths_g1, paths_g2) | |||||
| return kernel | |||||
| def __kernel_do_trie(self, trie1, trie2): | |||||
| """Calculate path graph kernels up to depth d between 2 graphs using trie. | |||||
| Parameters | |||||
| ---------- | |||||
| trie1, trie2 : list | |||||
| Tries that contains all paths in 2 graphs. | |||||
| k_func : function | |||||
| A kernel function applied using different notions of fingerprint | |||||
| similarity. | |||||
| Return | |||||
| ------ | |||||
| kernel : float | |||||
| Path kernel up to h between 2 graphs. | |||||
| """ | |||||
| if self.__k_func == 'tanimoto': | |||||
| # traverse all paths in graph1 and search them in graph2. Deep-first | |||||
| # search is applied. | |||||
| def traverseTrie1t(root, trie2, setlist, pcurrent=[]): | |||||
| for key, node in root['children'].items(): | |||||
| pcurrent.append(key) | |||||
| if node['isEndOfWord']: | |||||
| setlist[1] += 1 | |||||
| count2 = trie2.searchWord(pcurrent) | |||||
| if count2 != 0: | |||||
| setlist[0] += 1 | |||||
| if node['children'] != {}: | |||||
| traverseTrie1t(node, trie2, setlist, pcurrent) | |||||
| else: | |||||
| del pcurrent[-1] | |||||
| if pcurrent != []: | |||||
| del pcurrent[-1] | |||||
| # traverse all paths in graph2 and find out those that are not in | |||||
| # graph1. Deep-first search is applied. | |||||
| def traverseTrie2t(root, trie1, setlist, pcurrent=[]): | |||||
| for key, node in root['children'].items(): | |||||
| pcurrent.append(key) | |||||
| if node['isEndOfWord']: | |||||
| # print(node['count']) | |||||
| count1 = trie1.searchWord(pcurrent) | |||||
| if count1 == 0: | |||||
| setlist[1] += 1 | |||||
| if node['children'] != {}: | |||||
| traverseTrie2t(node, trie1, setlist, pcurrent) | |||||
| else: | |||||
| del pcurrent[-1] | |||||
| if pcurrent != []: | |||||
| del pcurrent[-1] | |||||
| setlist = [0, 0] # intersection and union of path sets of g1, g2. | |||||
| # print(trie1.root) | |||||
| # print(trie2.root) | |||||
| traverseTrie1t(trie1.root, trie2, setlist) | |||||
| # print(setlist) | |||||
| traverseTrie2t(trie2.root, trie1, setlist) | |||||
| # print(setlist) | |||||
| kernel = setlist[0] / setlist[1] | |||||
| elif self.__k_func == 'MinMax': # MinMax kernel | |||||
| # traverse all paths in graph1 and search them in graph2. Deep-first | |||||
| # search is applied. | |||||
| def traverseTrie1m(root, trie2, sumlist, pcurrent=[]): | |||||
| for key, node in root['children'].items(): | |||||
| pcurrent.append(key) | |||||
| if node['isEndOfWord']: | |||||
| # print(node['count']) | |||||
| count1 = node['count'] | |||||
| count2 = trie2.searchWord(pcurrent) | |||||
| sumlist[0] += min(count1, count2) | |||||
| sumlist[1] += max(count1, count2) | |||||
| if node['children'] != {}: | |||||
| traverseTrie1m(node, trie2, sumlist, pcurrent) | |||||
| else: | |||||
| del pcurrent[-1] | |||||
| if pcurrent != []: | |||||
| del pcurrent[-1] | |||||
| # traverse all paths in graph2 and find out those that are not in | |||||
| # graph1. Deep-first search is applied. | |||||
| def traverseTrie2m(root, trie1, sumlist, pcurrent=[]): | |||||
| for key, node in root['children'].items(): | |||||
| pcurrent.append(key) | |||||
| if node['isEndOfWord']: | |||||
| # print(node['count']) | |||||
| count1 = trie1.searchWord(pcurrent) | |||||
| if count1 == 0: | |||||
| sumlist[1] += node['count'] | |||||
| if node['children'] != {}: | |||||
| traverseTrie2m(node, trie1, sumlist, pcurrent) | |||||
| else: | |||||
| del pcurrent[-1] | |||||
| if pcurrent != []: | |||||
| del pcurrent[-1] | |||||
| sumlist = [0, 0] # sum of mins and sum of maxs | |||||
| # print(trie1.root) | |||||
| # print(trie2.root) | |||||
| traverseTrie1m(trie1.root, trie2, sumlist) | |||||
| # print(sumlist) | |||||
| traverseTrie2m(trie2.root, trie1, sumlist) | |||||
| # print(sumlist) | |||||
| kernel = sumlist[0] / sumlist[1] | |||||
| else: | |||||
| raise Exception('The given "k_func" cannot be recognized. Possible choices include: "tanimoto", "MinMax".') | |||||
| return kernel | |||||
| def _wrapper_kernel_do_trie(self, itr): | |||||
| i = itr[0] | |||||
| j = itr[1] | |||||
| return i, j, self.__kernel_do_trie(G_trie[i], G_trie[j]) | |||||
| def __kernel_do_naive(self, paths1, paths2): | |||||
| """Calculate path graph kernels up to depth d between 2 graphs naively. | |||||
| Parameters | |||||
| ---------- | |||||
| paths_list : list of list | |||||
| List of list of paths in all graphs, where for unlabeled graphs, each | |||||
| path is represented by a list of nodes; while for labeled graphs, each | |||||
| path is represented by a string consists of labels of nodes and/or | |||||
| edges on that path. | |||||
| k_func : function | |||||
| A kernel function applied using different notions of fingerprint | |||||
| similarity. | |||||
| Return | |||||
| ------ | |||||
| kernel : float | |||||
| Path kernel up to h between 2 graphs. | |||||
| """ | |||||
| all_paths = list(set(paths1 + paths2)) | |||||
| if self.__k_func == 'tanimoto': | |||||
| length_union = len(set(paths1 + paths2)) | |||||
| kernel = (len(set(paths1)) + len(set(paths2)) - | |||||
| length_union) / length_union | |||||
| # vector1 = [(1 if path in paths1 else 0) for path in all_paths] | |||||
| # vector2 = [(1 if path in paths2 else 0) for path in all_paths] | |||||
| # kernel_uv = np.dot(vector1, vector2) | |||||
| # kernel = kernel_uv / (len(set(paths1)) + len(set(paths2)) - kernel_uv) | |||||
| elif self.__k_func == 'MinMax': # MinMax kernel | |||||
| path_count1 = Counter(paths1) | |||||
| path_count2 = Counter(paths2) | |||||
| vector1 = [(path_count1[key] if (key in path_count1.keys()) else 0) | |||||
| for key in all_paths] | |||||
| vector2 = [(path_count2[key] if (key in path_count2.keys()) else 0) | |||||
| for key in all_paths] | |||||
| kernel = np.sum(np.minimum(vector1, vector2)) / \ | |||||
| np.sum(np.maximum(vector1, vector2)) | |||||
| elif self.__k_func is None: # no sub-kernel used; compare paths directly. | |||||
| path_count1 = Counter(paths1) | |||||
| path_count2 = Counter(paths2) | |||||
| vector1 = [(path_count1[key] if (key in path_count1.keys()) else 0) | |||||
| for key in all_paths] | |||||
| vector2 = [(path_count2[key] if (key in path_count2.keys()) else 0) | |||||
| for key in all_paths] | |||||
| kernel = np.dot(vector1, vector2) | |||||
| else: | |||||
| raise Exception('The given "k_func" cannot be recognized. Possible choices include: "tanimoto", "MinMax" and None.') | |||||
| return kernel | |||||
| def _wrapper_kernel_do_naive(self, itr): | |||||
| i = itr[0] | |||||
| j = itr[1] | |||||
| return i, j, self.__kernel_do_naive(G_plist[i], G_plist[j]) | |||||
| def __find_all_path_as_trie(self, G): | |||||
| # all_path = find_all_paths_until_length(G, length, ds_attrs, | |||||
| # node_label=node_label, | |||||
| # edge_label=edge_label) | |||||
| # ptrie = Trie() | |||||
| # for path in all_path: | |||||
| # ptrie.insertWord(path) | |||||
| # ptrie = Trie() | |||||
| # path_l = [[n] for n in G.nodes] # paths of length l | |||||
| # path_l_str = paths2labelseqs(path_l, G, ds_attrs, node_label, edge_label) | |||||
| # for p in path_l_str: | |||||
| # ptrie.insertWord(p) | |||||
| # for l in range(1, length + 1): | |||||
| # path_lplus1 = [] | |||||
| # for path in path_l: | |||||
| # for neighbor in G[path[-1]]: | |||||
| # if neighbor not in path: | |||||
| # tmp = path + [neighbor] | |||||
| ## if tmp[::-1] not in path_lplus1: | |||||
| # path_lplus1.append(tmp) | |||||
| # path_l = path_lplus1[:] | |||||
| # # consider labels | |||||
| # path_l_str = paths2labelseqs(path_l, G, ds_attrs, node_label, edge_label) | |||||
| # for p in path_l_str: | |||||
| # ptrie.insertWord(p) | |||||
| # | |||||
| # print(time.time() - time1) | |||||
| # print(ptrie.root) | |||||
| # print() | |||||
| # traverse all paths up to length h in a graph and construct a trie with | |||||
| # them. Deep-first search is applied. Notice the reverse of each path is | |||||
| # also stored to the trie. | |||||
| def traverseGraph(root, ptrie, G, pcurrent=[]): | |||||
| if len(pcurrent) < self.__depth + 1: | |||||
| for neighbor in G[root]: | |||||
| if neighbor not in pcurrent: | |||||
| pcurrent.append(neighbor) | |||||
| plstr = self.__paths2labelseqs([pcurrent], G) | |||||
| ptrie.insertWord(plstr[0]) | |||||
| traverseGraph(neighbor, ptrie, G, pcurrent) | |||||
| del pcurrent[-1] | |||||
| ptrie = Trie() | |||||
| path_l = [[n] for n in G.nodes] # paths of length l | |||||
| path_l_str = self.__paths2labelseqs(path_l, G) | |||||
| for p in path_l_str: | |||||
| ptrie.insertWord(p) | |||||
| for n in G.nodes: | |||||
| traverseGraph(n, ptrie, G, pcurrent=[n]) | |||||
| # def traverseGraph(root, all_paths, length, G, ds_attrs, node_label, edge_label, | |||||
| # pcurrent=[]): | |||||
| # if len(pcurrent) < length + 1: | |||||
| # for neighbor in G[root]: | |||||
| # if neighbor not in pcurrent: | |||||
| # pcurrent.append(neighbor) | |||||
| # plstr = paths2labelseqs([pcurrent], G, ds_attrs, | |||||
| # node_label, edge_label) | |||||
| # all_paths.append(pcurrent[:]) | |||||
| # traverseGraph(neighbor, all_paths, length, G, ds_attrs, | |||||
| # node_label, edge_label, pcurrent) | |||||
| # del pcurrent[-1] | |||||
| # | |||||
| # | |||||
| # path_l = [[n] for n in G.nodes] # paths of length l | |||||
| # all_paths = path_l[:] | |||||
| # path_l_str = paths2labelseqs(path_l, G, ds_attrs, node_label, edge_label) | |||||
| ## for p in path_l_str: | |||||
| ## ptrie.insertWord(p) | |||||
| # for n in G.nodes: | |||||
| # traverseGraph(n, all_paths, length, G, ds_attrs, node_label, edge_label, | |||||
| # pcurrent=[n]) | |||||
| # print(ptrie.root) | |||||
| return ptrie | |||||
| def _wrapper_find_all_path_as_trie(self, itr_item): | |||||
| g = itr_item[0] | |||||
| i = itr_item[1] | |||||
| return i, self.__find_all_path_as_trie(g) | |||||
| # @todo: (can be removed maybe) this method find paths repetively, it could be faster. | |||||
| def __find_all_paths_until_length(self, G, tolabelseqs=True): | |||||
| """Find all paths no longer than a certain maximum length in a graph. A | |||||
| recursive depth first search is applied. | |||||
| Parameters | |||||
| ---------- | |||||
| G : NetworkX graphs | |||||
| The graph in which paths are searched. | |||||
| length : integer | |||||
| The maximum length of paths. | |||||
| ds_attrs: dict | |||||
| Dataset attributes. | |||||
| node_label : string | |||||
| Node attribute used as label. The default node label is atom. | |||||
| edge_label : string | |||||
| Edge attribute used as label. The default edge label is bond_type. | |||||
| Return | |||||
| ------ | |||||
| path : list | |||||
| List of paths retrieved, where for unlabeled graphs, each path is | |||||
| represented by a list of nodes; while for labeled graphs, each path is | |||||
| represented by a list of strings consists of labels of nodes and/or | |||||
| edges on that path. | |||||
| """ | |||||
| # path_l = [tuple([n]) for n in G.nodes] # paths of length l | |||||
| # all_paths = path_l[:] | |||||
| # for l in range(1, self.__depth + 1): | |||||
| # path_l_new = [] | |||||
| # for path in path_l: | |||||
| # for neighbor in G[path[-1]]: | |||||
| # if len(path) < 2 or neighbor != path[-2]: | |||||
| # tmp = path + (neighbor, ) | |||||
| # if tuple(tmp[::-1]) not in path_l_new: | |||||
| # path_l_new.append(tuple(tmp)) | |||||
| # all_paths += path_l_new | |||||
| # path_l = path_l_new[:] | |||||
| path_l = [[n] for n in G.nodes] # paths of length l | |||||
| all_paths = [p.copy() for p in path_l] | |||||
| for l in range(1, self.__depth + 1): | |||||
| path_lplus1 = [] | |||||
| for path in path_l: | |||||
| for neighbor in G[path[-1]]: | |||||
| if neighbor not in path: | |||||
| tmp = path + [neighbor] | |||||
| # if tmp[::-1] not in path_lplus1: | |||||
| path_lplus1.append(tmp) | |||||
| all_paths += path_lplus1 | |||||
| path_l = [p.copy() for p in path_lplus1] | |||||
| # for i in range(0, self.__depth + 1): | |||||
| # new_paths = find_all_paths(G, i) | |||||
| # if new_paths == []: | |||||
| # break | |||||
| # all_paths.extend(new_paths) | |||||
| # consider labels | |||||
| # print(paths2labelseqs(all_paths, G, ds_attrs, node_label, edge_label)) | |||||
| # print() | |||||
| return (self.__paths2labelseqs(all_paths, G) if tolabelseqs else all_paths) | |||||
| def _wrapper_find_all_paths_until_length(self, tolabelseqs, itr_item): | |||||
| g = itr_item[0] | |||||
| i = itr_item[1] | |||||
| return i, self.__find_all_paths_until_length(g, tolabelseqs=tolabelseqs) | |||||
| def __paths2labelseqs(self, plist, G): | |||||
| if len(self.__node_labels) > 0: | |||||
| if len(self.__edge_labels) > 0: | |||||
| path_strs = [] | |||||
| for path in plist: | |||||
| pths_tmp = [] | |||||
| for idx, node in enumerate(path[:-1]): | |||||
| pths_tmp.append(tuple(G.nodes[node][nl] for nl in self.__node_labels)) | |||||
| pths_tmp.append(tuple(G[node][path[idx + 1]][el] for el in self.__edge_labels)) | |||||
| pths_tmp.append(tuple(G.nodes[path[-1]][nl] for nl in self.__node_labels)) | |||||
| path_strs.append(tuple(pths_tmp)) | |||||
| else: | |||||
| path_strs = [] | |||||
| for path in plist: | |||||
| pths_tmp = [] | |||||
| for node in path: | |||||
| pths_tmp.append(tuple(G.nodes[node][nl] for nl in self.__node_labels)) | |||||
| path_strs.append(tuple(pths_tmp)) | |||||
| return path_strs | |||||
| else: | |||||
| if len(self.__edge_labels) > 0: | |||||
| path_strs = [] | |||||
| for path in plist: | |||||
| if len(path) == 1: | |||||
| path_strs.append(tuple()) | |||||
| else: | |||||
| pths_tmp = [] | |||||
| for idx, node in enumerate(path[:-1]): | |||||
| pths_tmp.append(tuple(G[node][path[idx + 1]][el] for el in self.__edge_labels)) | |||||
| path_strs.append(tuple(pths_tmp)) | |||||
| return path_strs | |||||
| else: | |||||
| return [tuple(['0' for node in path]) for path in plist] | |||||
| # return [tuple([len(path)]) for path in all_paths] | |||||
| def __add_dummy_labels(self, Gn): | |||||
| if self.__k_func is not None: | |||||
| if len(self.__node_labels) == 0 or (len(self.__node_labels) == 1 and self.__node_labels[0] == SpecialLabel.DUMMY): | |||||
| for i in range(len(Gn)): | |||||
| nx.set_node_attributes(Gn[i], '0', SpecialLabel.DUMMY) | |||||
| self.__node_labels = [SpecialLabel.DUMMY] | |||||
| if len(self.__edge_labels) == 0 or (len(self.__edge_labels) == 1 and self.__edge_labels[0] == SpecialLabel.DUMMY): | |||||
| for i in range(len(Gn)): | |||||
| nx.set_edge_attributes(Gn[i], '0', SpecialLabel.DUMMY) | |||||
| self.__edge_labels = [SpecialLabel.DUMMY] | |||||