|
- import os
- import re
- import sys
- import json
- import pickle
- import logging
- import subprocess
- import numpy as np
- from collections import deque
-
- from evaluator import Evaluator
- from network import ShuffleNetV2OneShot, PARSED_FLOPS
-
- LAYER_CHOICE = "layer_choice"
- INPUT_CHOICE = "input_choice"
-
-
- _logger = logging.getLogger(__name__)
-
-
- class SPOSEvolution:
- """
- SPOS evolution tuner.
-
- Parameters
- ----------
- max_epochs : int
- Maximum number of epochs to run.
- num_select : int
- Number of survival candidates of each epoch.
- num_population : int
- Number of candidates at the start of each epoch. If candidates generated by
- crossover and mutation are not enough, the rest will be filled with random
- candidates.
- m_prob : float
- The probability of mutation.
- num_crossover : int
- Number of candidates generated by crossover in each epoch.
- num_mutation : int
- Number of candidates generated by mutation in each epoch.
- """
-
- def __init__(self, max_epochs=20, num_select=10, num_population=50, m_prob=0.1,
- num_crossover=25, num_mutation=25, epoch=0):
- assert num_population >= num_select
- self.max_epochs = max_epochs
- self.num_select = num_select
- self.num_population = num_population
- self.m_prob = m_prob
- self.num_crossover = num_crossover
- self.num_mutation = num_mutation
- self.epoch = epoch
- self.search_space = None
- self.random_state = np.random.RandomState(0)
- # self.evl = Evaluator()
-
- # async status
- self._to_evaluate_queue = deque()
- self._sending_parameter_queue = deque()
- self._pending_result_ids = set()
- self._reward_dict = dict()
- self._id2candidate = dict()
- self._st_callback = None
- self.cand_path = "./checkpoints"
- self.acc_path = "./acc"
- self.candidates = [] if epoch == 0 else self.load_candidates() # 第一轮初始尚未有生成的种群
-
- def load_candidates(self):
- # 从self.export_result()写入文件的候选模型,需要读入
- # {"LayerChoice1": [false, false, false, true], ... } -> {"LayerChoice1": {"_idx":3, "_value":"3"}, ... }
- print("## evolution -- load ## begin to load candidates in evolution...\n")
- file_dir, _, files = next(os.walk(self.cand_path))
- files = [i for i in files if "%03d_"%(self.epoch-1) in i]
-
- def get_true_index(l):
- return [i for i in range(len(l)) if l[i]][0]
-
- candidates = []
- for file in files:
- with open(os.path.join(file_dir, file), "r") as f:
- candidate = json.load(f)
-
- # 转换成合适的形式
- cand = {}
- for key, value in candidate.items():
- v = get_true_index(value)
- value = {"_value":str(v), "_idx":int(v)}
- cand.update({key:value})
-
- candidates.append(cand)
- print("## evolution -- load ## candidates loaded \n")
- return candidates
-
- def load_id2candidate(self):
- with open("./id2cand/%03d_id2candidate.json"%(self.epoch - 1), "r") as f:
- self.id2candidate = json.load(f)
-
- def update_search_space(self, search_space):
- """
- Handle the initialization/update event of search space.
- """
- print("## evolution -- update ## updating search space")
- self._search_space = search_space
- self._next_round()
- print("## evolution -- update ## search space updated")
-
- def _next_round(self):
- _logger.info("Epoch %d, generating...", self.epoch)
- if self.epoch == 0:
- self._get_random_population()
- self.export_results(self.candidates)
-
- self.evaluate_cands() # 评估全部的模型
- else:
- self.load_id2candidate()
- self.receive_trial_result()
- best_candidates = self._select_top_candidates()
- if self.epoch >= self.max_epochs:
- return
- self.candidates = self._get_mutation(best_candidates) + self._get_crossover(best_candidates)
- self._get_random_population()
- self.export_results(self.candidates)
- self.evaluate_cands() # 评估全部的模型
- self.epoch += 1
-
- def _random_candidate(self):
- chosen_arch = dict()
- for key, val in self._search_space.items():
- if val["_type"] == LAYER_CHOICE:
- choices = val["_value"]
- index = self.random_state.randint(len(choices))
- chosen_arch[key] = {"_value": choices[index], "_idx": index}
- elif val["_type"] == INPUT_CHOICE:
- raise NotImplementedError("Input choice is not implemented yet.")
- return chosen_arch
-
- def _add_to_evaluate_queue(self, cand):
- _logger.info("Generate candidate %s, adding to eval queue.", self._get_architecture_repr(cand))
- self._reward_dict[self._hashcode(cand)] = 0.
- self._to_evaluate_queue.append(cand)
-
- def _get_random_population(self):
- while len(self.candidates) < self.num_population:
- cand = self._random_candidate()
- if self._is_legal(cand):
- _logger.info("Random candidate generated.")
- self._add_to_evaluate_queue(cand)
- self.candidates.append(cand)
-
- def _get_crossover(self, best):
- result = []
- for _ in range(10 * self.num_crossover):
- cand_p1 = best[self.random_state.randint(len(best))]
- cand_p2 = best[self.random_state.randint(len(best))]
- assert cand_p1.keys() == cand_p2.keys()
- cand = {k: cand_p1[k] if self.random_state.randint(2) == 0 else cand_p2[k]
- for k in cand_p1.keys()}
- if self._is_legal(cand):
- result.append(cand)
- self._add_to_evaluate_queue(cand)
- if len(result) >= self.num_crossover:
- break
- _logger.info("Found %d architectures with crossover.", len(result))
- return result
-
- def _get_mutation(self, best):
- result = []
- for _ in range(10 * self.num_mutation):
- cand = best[self.random_state.randint(len(best))].copy()
- mutation_sample = np.random.random_sample(len(cand))
- for s, k in zip(mutation_sample, cand):
- if s < self.m_prob:
- choices = self._search_space[k]["_value"]
- index = self.random_state.randint(len(choices))
- cand[k] = {"_value": choices[index], "_idx": index}
- if self._is_legal(cand):
- result.append(cand)
- self._add_to_evaluate_queue(cand)
- if len(result) >= self.num_mutation:
- break
- _logger.info("Found %d architectures with mutation.", len(result))
- return result
-
- def _get_architecture_repr(self, cand):
- return re.sub(r"\".*?\": \{\"_idx\": (\d+), \"_value\": \".*?\"\}", r"\1",
- self._hashcode(cand))
-
- def _is_legal(self, cand):
- if self._hashcode(cand) in self._reward_dict:
- return False
- return True
-
- # 将模型输出,并重训练、评估
- def evaluate_cands(self):
- """
- 1、对输出的模型进行重训练
- 2、对重训练后的模型进行评估
- 以上内容通过tester.py脚本完成
- """
- print("## evolution -- evaluate ## begin to evaluate candidates...")
- file_dir, _, files = next(os.walk(self.cand_path)) # 获取文件夹下的文件
- files = [i for i in files if "%03d_"%self.epoch in i]
-
- for file in files:
- file = os.path.join(file_dir, file)
-
- # self.evl.eval_model(epoch=self.epoch, architecture=file)
- python_interpreter_path = sys.executable
- subprocess.run([python_interpreter_path,\
- "evaluator.py", "--architecture", file, "--epoch", str(self.epoch)])
- print("## evolution -- evaluate ## candidates evaluated")
-
- def _select_top_candidates(self):
- print("## evolution -- select ## begin to select top candidates...")
- reward_query = lambda cand: self._reward_dict[self._hashcode(cand)]
- _logger.info("All candidate rewards: %s", list(map(reward_query, self.candidates)))
- result = sorted(self.candidates, key=reward_query, reverse=True)[:self.num_select]
- _logger.info("Best candidate rewards: %s", list(map(reward_query, result)))
- print("## evolution -- select ## selected done")
- return result
-
- @staticmethod
- def _hashcode(d):
- return json.dumps(d, sort_keys=True)
-
- def _bind_and_send_parameters(self):
- """
- There are two types of resources: parameter ids and candidates. This function is called at
- necessary times to bind these resources to send new trials with st_callback.
- """
- result = []
- while self._sending_parameter_queue and self._to_evaluate_queue:
- parameter_id = self._sending_parameter_queue.popleft()
- parameters = self._to_evaluate_queue.popleft()
- self._id2candidate[parameter_id] = parameters
- result.append(parameters)
- self._pending_result_ids.add(parameter_id)
- self._st_callback(parameter_id, parameters)
- _logger.info("Send parameter [%d] %s.", parameter_id, self._get_architecture_repr(parameters))
- return result
-
- def generate_multiple_parameters(self, parameter_id_list, **kwargs):
- """
- Callback function necessary to implement a tuner. This will put more parameter ids into the
- parameter id queue.
- """
- if "st_callback" in kwargs and self._st_callback is None:
- self._st_callback = kwargs["st_callback"]
- for parameter_id in parameter_id_list:
- self._sending_parameter_queue.append(parameter_id)
- self._bind_and_send_parameters()
- return [] # always not use this. might induce problem of over-sending
-
- # def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
- # """
- # Callback function. Receive a trial result.
- # """
- # _logger.info("Candidate %d, reported reward %f", parameter_id, value)
- # self._reward_dict[self._hashcode(self._id2candidate[parameter_id])] = value
-
- def receive_trial_result(self):
- # 获取并更新self._reward_dict
-
- file_dir, _, files = next(os.walk(self.acc_path))
- files = [i for i in files if "%03d_"%(self.epoch-1) in i] # self.epoch-1: 读取上一轮的结果
-
- acc_dict = {}
- for file in files:
- with open(os.path.join(file_dir, file), "r") as f:
- acc_dict.update(json.load(f)) # {"000_001.json":0.56}
-
- for key, value in acc_dict.items():
- key = key.lstrip("./checkpoints/") # 删掉路径,仅保留文件名
- self._reward_dict.update({self.id2candidate[key]: value}) # todo {self.id2candidate[key]: key}
-
-
- def trial_end(self, parameter_id, success, **kwargs):
- """
- Callback function when a trial is ended and resource is released.
- """
- self._pending_result_ids.remove(parameter_id)
- if not self._pending_result_ids and not self._to_evaluate_queue:
- # a new epoch now
- self._next_round()
- assert self._st_callback is not None
- self._bind_and_send_parameters()
-
- def export_results(self, result):
- """
- Export a number of candidates to `checkpoints` dir.
-
- Parameters
- ----------
- result : dict
- Chosen architectures to be exported.
- """
- os.makedirs("checkpoints", exist_ok=True)
- os.makedirs("id2cand", exist_ok=True)
- self.id2candidate = {}
- for i, cand in enumerate(result):
- converted = dict()
- for cand_key, cand_val in cand.items():
- onehot = [k == cand_val["_idx"] for k in range(len(self._search_space[cand_key]["_value"]))]
- converted[cand_key] = onehot
- with open(os.path.join("checkpoints", "%03d_%03d.json" % (self.epoch, i)), "w") as fp:
- json.dump(converted, fp)
-
- """
- self.id2candidate:
- {
- 000_000.json: {"LayerChoice1": {"_values":3, "_idx":3}, "LayerChoice2": {"_values":2, "_idx":2}, ...}
- ......
- }
- """
- self.id2candidate.update({"%03d_%03d.json" % (self.epoch, i): json.dumps(result[i], sort_keys=True)})
- with open("./id2cand/%03d_id2candidate.json"%self.epoch, "w") as f:
- json.dump(self.id2candidate, f)
-
-
- class EvolutionWithFlops(SPOSEvolution):
- """
- This tuner extends the function of evolution tuner, by limiting the flops generated by tuner.
- Needs a function to examine the flops.
- """
-
- def __init__(self, flops_limit=330E6, **kwargs):
- super().__init__(**kwargs)
- # self.model = ShuffleNetV2OneShot()
- self.flops_limit = flops_limit
-
- with open(os.path.join(os.path.dirname(__file__), "./data/op_flops_dict.pkl"), "rb") as fp:
- self._op_flops_dict = pickle.load(fp)
-
- def _is_legal(self, cand):
- if not super()._is_legal(cand):
- return False
- if self.get_candidate_flops(cand) > self.flops_limit:
- return False
- return True
-
- def get_candidate_flops(self, candidate):
- """
- this method is the same with ShuffleNetV2OneShot.get_candidate_flops, but we dont need to initialize that class.
- """
- conv1_flops = self._op_flops_dict["conv1"][(3, 16,
- 224, 224, 2)]
- rest_flops = self._op_flops_dict["rest_operation"][(640, 1000,
- 7, 7, 1)]
- total_flops = conv1_flops + rest_flops
- for k, m in candidate.items():
- parsed_flops_dict = PARSED_FLOPS[k]
- if isinstance(m, dict): # to be compatible with classical nas format
- total_flops += parsed_flops_dict[m["_idx"]]
- else:
- total_flops += parsed_flops_dict[torch.max(m, 0)[1]]
- return total_flops
-
|