You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

common_walk.py 8.9 kB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Tue Aug 18 11:21:31 2020
  5. @author: ljia
  6. @references:
  7. [1] Thomas Gärtner, Peter Flach, and Stefan Wrobel. On graph kernels:
  8. Hardness results and efficient alternatives. Learning Theory and Kernel
  9. Machines, pages 129–143, 2003.
  10. """
  11. import sys
  12. from tqdm import tqdm
  13. import numpy as np
  14. import networkx as nx
  15. from gklearn.utils import SpecialLabel
  16. from gklearn.utils.parallel import parallel_gm, parallel_me
  17. from gklearn.utils.utils import direct_product_graph
  18. from gklearn.kernels import GraphKernel
  19. class CommonWalk(GraphKernel):
  20. def __init__(self, **kwargs):
  21. GraphKernel.__init__(self)
  22. self.__node_labels = kwargs.get('node_labels', [])
  23. self.__edge_labels = kwargs.get('edge_labels', [])
  24. self.__weight = kwargs.get('weight', 1)
  25. self.__compute_method = kwargs.get('compute_method', None)
  26. self.__ds_infos = kwargs.get('ds_infos', {})
  27. self.__compute_method = self.__compute_method.lower()
  28. def _compute_gm_series(self):
  29. self.__check_graphs(self._graphs)
  30. self.__add_dummy_labels(self._graphs)
  31. if not self.__ds_infos['directed']: # convert
  32. self._graphs = [G.to_directed() for G in self._graphs]
  33. # compute Gram matrix.
  34. gram_matrix = np.zeros((len(self._graphs), len(self._graphs)))
  35. from itertools import combinations_with_replacement
  36. itr = combinations_with_replacement(range(0, len(self._graphs)), 2)
  37. if self._verbose >= 2:
  38. iterator = tqdm(itr, desc='calculating kernels', file=sys.stdout)
  39. else:
  40. iterator = itr
  41. # direct product graph method - exponential
  42. if self.__compute_method == 'exp':
  43. for i, j in iterator:
  44. kernel = self.__kernel_do_exp(self._graphs[i], self._graphs[j], self.__weight)
  45. gram_matrix[i][j] = kernel
  46. gram_matrix[j][i] = kernel
  47. # direct product graph method - geometric
  48. elif self.__compute_method == 'geo':
  49. for i, j in iterator:
  50. kernel = self.__kernel_do_geo(self._graphs[i], self._graphs[j], self.__weight)
  51. gram_matrix[i][j] = kernel
  52. gram_matrix[j][i] = kernel
  53. return gram_matrix
  54. def _compute_gm_imap_unordered(self):
  55. self.__check_graphs(self._graphs)
  56. self.__add_dummy_labels(self._graphs)
  57. if not self.__ds_infos['directed']: # convert
  58. self._graphs = [G.to_directed() for G in self._graphs]
  59. # compute Gram matrix.
  60. gram_matrix = np.zeros((len(self._graphs), len(self._graphs)))
  61. # def init_worker(gn_toshare):
  62. # global G_gn
  63. # G_gn = gn_toshare
  64. # direct product graph method - exponential
  65. if self.__compute_method == 'exp':
  66. do_fun = self._wrapper_kernel_do_exp
  67. # direct product graph method - geometric
  68. elif self.__compute_method == 'geo':
  69. do_fun = self._wrapper_kernel_do_geo
  70. parallel_gm(do_fun, gram_matrix, self._graphs, init_worker=self._init_worker_gm,
  71. glbv=(self._graphs,), n_jobs=self._n_jobs, verbose=self._verbose)
  72. return gram_matrix
  73. def _init_worker_gm(gn_toshare):
  74. global G_gn
  75. G_gn = gn_toshare
  76. def _compute_kernel_list_series(self, g1, g_list):
  77. self.__check_graphs(g_list + [g1])
  78. self.__add_dummy_labels(g_list + [g1])
  79. if not self.__ds_infos['directed']: # convert
  80. g1 = g1.to_directed()
  81. g_list = [G.to_directed() for G in g_list]
  82. # compute kernel list.
  83. kernel_list = [None] * len(g_list)
  84. if self._verbose >= 2:
  85. iterator = tqdm(range(len(g_list)), desc='calculating kernels', file=sys.stdout)
  86. else:
  87. iterator = range(len(g_list))
  88. # direct product graph method - exponential
  89. if self.__compute_method == 'exp':
  90. for i in iterator:
  91. kernel = self.__kernel_do_exp(g1, g_list[i], self.__weight)
  92. kernel_list[i] = kernel
  93. # direct product graph method - geometric
  94. elif self.__compute_method == 'geo':
  95. for i in iterator:
  96. kernel = self.__kernel_do_geo(g1, g_list[i], self.__weight)
  97. kernel_list[i] = kernel
  98. return kernel_list
  99. def _compute_kernel_list_imap_unordered(self, g1, g_list):
  100. self.__check_graphs(g_list + [g1])
  101. self.__add_dummy_labels(g_list + [g1])
  102. if not self.__ds_infos['directed']: # convert
  103. g1 = g1.to_directed()
  104. g_list = [G.to_directed() for G in g_list]
  105. # compute kernel list.
  106. kernel_list = [None] * len(g_list)
  107. # def init_worker(g1_toshare, g_list_toshare):
  108. # global G_g1, G_g_list
  109. # G_g1 = g1_toshare
  110. # G_g_list = g_list_toshare
  111. # direct product graph method - exponential
  112. if self.__compute_method == 'exp':
  113. do_fun = self._wrapper_kernel_list_do_exp
  114. # direct product graph method - geometric
  115. elif self.__compute_method == 'geo':
  116. do_fun = self._wrapper_kernel_list_do_geo
  117. def func_assign(result, var_to_assign):
  118. var_to_assign[result[0]] = result[1]
  119. itr = range(len(g_list))
  120. len_itr = len(g_list)
  121. parallel_me(do_fun, func_assign, kernel_list, itr, len_itr=len_itr,
  122. init_worker=self._init_worker_list, glbv=(g1, g_list), method='imap_unordered',
  123. n_jobs=self._n_jobs, itr_desc='calculating kernels', verbose=self._verbose)
  124. return kernel_list
  125. def _init_worker_list(g1_toshare, g_list_toshare):
  126. global G_g1, G_g_list
  127. G_g1 = g1_toshare
  128. G_g_list = g_list_toshare
  129. def _wrapper_kernel_list_do_exp(self, itr):
  130. return itr, self.__kernel_do_exp(G_g1, G_g_list[itr], self.__weight)
  131. def _wrapper_kernel_list_do_geo(self, itr):
  132. return itr, self.__kernel_do_geo(G_g1, G_g_list[itr], self.__weight)
  133. def _compute_single_kernel_series(self, g1, g2):
  134. self.__check_graphs([g1] + [g2])
  135. self.__add_dummy_labels([g1] + [g2])
  136. if not self.__ds_infos['directed']: # convert
  137. g1 = g1.to_directed()
  138. g2 = g2.to_directed()
  139. # direct product graph method - exponential
  140. if self.__compute_method == 'exp':
  141. kernel = self.__kernel_do_exp(g1, g2, self.__weight)
  142. # direct product graph method - geometric
  143. elif self.__compute_method == 'geo':
  144. kernel = self.__kernel_do_geo(g1, g2, self.__weight)
  145. return kernel
  146. def __kernel_do_exp(self, g1, g2, beta):
  147. """Calculate common walk graph kernel between 2 graphs using exponential
  148. series.
  149. Parameters
  150. ----------
  151. g1, g2 : NetworkX graphs
  152. Graphs between which the kernels are calculated.
  153. beta : integer
  154. Weight.
  155. Return
  156. ------
  157. kernel : float
  158. The common walk Kernel between 2 graphs.
  159. """
  160. # get tensor product / direct product
  161. gp = direct_product_graph(g1, g2, self.__node_labels, self.__edge_labels)
  162. # return 0 if the direct product graph have no more than 1 node.
  163. if nx.number_of_nodes(gp) < 2:
  164. return 0
  165. A = nx.adjacency_matrix(gp).todense()
  166. ew, ev = np.linalg.eig(A)
  167. # # remove imaginary part if possible.
  168. # # @todo: don't know if it is necessary.
  169. # for i in range(len(ew)):
  170. # if np.abs(ew[i].imag) < 1e-9:
  171. # ew[i] = ew[i].real
  172. # for i in range(ev.shape[0]):
  173. # for j in range(ev.shape[1]):
  174. # if np.abs(ev[i, j].imag) < 1e-9:
  175. # ev[i, j] = ev[i, j].real
  176. D = np.zeros((len(ew), len(ew)), dtype=complex) # @todo: use complex?
  177. for i in range(len(ew)):
  178. D[i][i] = np.exp(beta * ew[i])
  179. exp_D = ev * D * ev.T
  180. kernel = exp_D.sum()
  181. if (kernel.real == 0 and np.abs(kernel.imag) < 1e-9) or np.abs(kernel.imag / kernel.real) < 1e-9:
  182. kernel = kernel.real
  183. return kernel
  184. def _wrapper_kernel_do_exp(self, itr):
  185. i = itr[0]
  186. j = itr[1]
  187. return i, j, self.__kernel_do_exp(G_gn[i], G_gn[j], self.__weight)
  188. def __kernel_do_geo(self, g1, g2, gamma):
  189. """Calculate common walk graph kernel between 2 graphs using geometric
  190. series.
  191. Parameters
  192. ----------
  193. g1, g2 : NetworkX graphs
  194. Graphs between which the kernels are calculated.
  195. gamma : integer
  196. Weight.
  197. Return
  198. ------
  199. kernel : float
  200. The common walk Kernel between 2 graphs.
  201. """
  202. # get tensor product / direct product
  203. gp = direct_product_graph(g1, g2, self.__node_labels, self.__edge_labels)
  204. # return 0 if the direct product graph have no more than 1 node.
  205. if nx.number_of_nodes(gp) < 2:
  206. return 0
  207. A = nx.adjacency_matrix(gp).todense()
  208. mat = np.identity(len(A)) - gamma * A
  209. # try:
  210. return mat.I.sum()
  211. # except np.linalg.LinAlgError:
  212. # return np.nan
  213. def _wrapper_kernel_do_geo(self, itr):
  214. i = itr[0]
  215. j = itr[1]
  216. return i, j, self.__kernel_do_geo(G_gn[i], G_gn[j], self.__weight)
  217. def __check_graphs(self, Gn):
  218. for g in Gn:
  219. if nx.number_of_nodes(g) == 1:
  220. raise Exception('Graphs must contain more than 1 nodes to construct adjacency matrices.')
  221. def __add_dummy_labels(self, Gn):
  222. if len(self.__node_labels) == 0 or (len(self.__node_labels) == 1 and self.__node_labels[0] == SpecialLabel.DUMMY):
  223. for i in range(len(Gn)):
  224. nx.set_node_attributes(Gn[i], '0', SpecialLabel.DUMMY)
  225. self.__node_labels = [SpecialLabel.DUMMY]
  226. if len(self.__edge_labels) == 0 or (len(self.__edge_labels) == 1 and self.__edge_labels[0] == SpecialLabel.DUMMY):
  227. for i in range(len(Gn)):
  228. nx.set_edge_attributes(Gn[i], '0', SpecialLabel.DUMMY)
  229. self.__edge_labels = [SpecialLabel.DUMMY]

A Python package for graph kernels, graph edit distances and graph pre-image problem.