Merge pull request !80 from luopengting/datavisual_lpttags/v0.2.0-alpha
| @@ -23,9 +23,9 @@ from mindinsight.conf import settings | |||
| # Type of the tensor event from external component | |||
| _Tensor = collections.namedtuple('_Tensor', ['wall_time', 'step', 'value']) | |||
| _Tensor = collections.namedtuple('_Tensor', ['wall_time', 'step', 'value', 'filename']) | |||
| TensorEvent = collections.namedtuple( | |||
| 'TensorEvent', ['wall_time', 'step', 'tag', 'plugin_name', 'value']) | |||
| 'TensorEvent', ['wall_time', 'step', 'tag', 'plugin_name', 'value', 'filename']) | |||
| # config for `EventsData` | |||
| _DEFAULT_STEP_SIZES_PER_TAG = settings.DEFAULT_STEP_SIZES_PER_TAG | |||
| @@ -99,10 +99,11 @@ class EventsData: | |||
| tensor = _Tensor(wall_time=tensor_event.wall_time, | |||
| step=tensor_event.step, | |||
| value=tensor_event.value) | |||
| value=tensor_event.value, | |||
| filename=tensor_event.filename) | |||
| if self._is_out_of_order_step(tensor_event.step, tensor_event.tag): | |||
| self.purge_reservoir_data(tensor_event.step, self._reservoir_by_tag[tag]) | |||
| self.purge_reservoir_data(tensor_event.filename, tensor_event.step, self._reservoir_by_tag[tag]) | |||
| self._reservoir_by_tag[tag].add_sample(tensor) | |||
| @@ -176,7 +177,7 @@ class EventsData: | |||
| return False | |||
| @staticmethod | |||
| def purge_reservoir_data(start_step, tensor_reservoir): | |||
| def purge_reservoir_data(filename, start_step, tensor_reservoir): | |||
| """ | |||
| Purge all tensor event that are out-of-order step after the given start step. | |||
| @@ -188,7 +189,8 @@ class EventsData: | |||
| Returns: | |||
| int, the number of items removed. | |||
| """ | |||
| cnt_out_of_order = tensor_reservoir.remove_sample(lambda x: x.step < start_step) | |||
| cnt_out_of_order = tensor_reservoir.remove_sample( | |||
| lambda x: x.step < start_step or (x.step > start_step and x.filename == filename)) | |||
| return cnt_out_of_order | |||
| @@ -223,7 +223,8 @@ class MSDataLoader: | |||
| step=event.step, | |||
| tag=tag, | |||
| plugin_name=PluginNameEnum.SCALAR.value, | |||
| value=value.scalar_value) | |||
| value=value.scalar_value, | |||
| filename=self._latest_summary_filename) | |||
| self._events_data.add_tensor_event(tensor_event) | |||
| if value.HasField('image'): | |||
| @@ -232,7 +233,8 @@ class MSDataLoader: | |||
| step=event.step, | |||
| tag=tag, | |||
| plugin_name=PluginNameEnum.IMAGE.value, | |||
| value=value.image) | |||
| value=value.image, | |||
| filename=self._latest_summary_filename) | |||
| self._events_data.add_tensor_event(tensor_event) | |||
| if value.HasField('histogram'): | |||
| @@ -242,7 +244,8 @@ class MSDataLoader: | |||
| step=event.step, | |||
| tag=tag, | |||
| plugin_name=PluginNameEnum.HISTOGRAM.value, | |||
| value=histogram_msg) | |||
| value=histogram_msg, | |||
| filename=self._latest_summary_filename) | |||
| self._events_data.add_tensor_event(tensor_event) | |||
| if event.HasField('graph_def'): | |||
| @@ -253,7 +256,8 @@ class MSDataLoader: | |||
| step=event.step, | |||
| tag=self._latest_summary_filename, | |||
| plugin_name=PluginNameEnum.GRAPH.value, | |||
| value=graph) | |||
| value=graph, | |||
| filename=self._latest_summary_filename) | |||
| try: | |||
| graph_tags = self._events_data.list_tags_by_plugin(PluginNameEnum.GRAPH.value) | |||
| @@ -436,7 +440,8 @@ class _PbParser: | |||
| step=0, | |||
| tag=filename, | |||
| plugin_name=PluginNameEnum.GRAPH.value, | |||
| value=graph) | |||
| value=graph, | |||
| filename=filename) | |||
| logger.info("Build graph success, file path: %s.", file_path) | |||
| return tensor_event | |||
| @@ -23,6 +23,24 @@ from mindinsight.utils.exceptions import ParamValueError | |||
| from mindinsight.datavisual.utils.utils import calc_histogram_bins | |||
| def binary_search(samples, target): | |||
| """Binary search target in samples.""" | |||
| left = 0 | |||
| right = len(samples) - 1 | |||
| while left <= right: | |||
| mid = (left + right) // 2 | |||
| if target < samples[mid].step: | |||
| right = mid - 1 | |||
| elif target > samples[mid].step: | |||
| left = mid + 1 | |||
| else: | |||
| return mid | |||
| # if right is -1, it is less than the first one. | |||
| # if list is [1, 2, 4], target is 3, right will be 1, so wo will insert by 2. | |||
| return right + 1 | |||
| class Reservoir: | |||
| """ | |||
| A container based on Reservoir Sampling algorithm. | |||
| @@ -68,18 +86,28 @@ class Reservoir: | |||
| """ | |||
| with self._mutex: | |||
| if len(self._samples) < self._samples_max_size or self._samples_max_size == 0: | |||
| self._samples.append(sample) | |||
| self._add_sample(sample) | |||
| else: | |||
| # Use the Reservoir Sampling algorithm to replace the old sample. | |||
| rand_int = self._sample_selector.randint( | |||
| 0, self._sample_counter) | |||
| rand_int = self._sample_selector.randint(0, self._sample_counter) | |||
| if rand_int < self._samples_max_size: | |||
| self._samples.pop(rand_int) | |||
| self._samples.append(sample) | |||
| else: | |||
| self._samples[-1] = sample | |||
| self._samples = self._samples[:-1] | |||
| self._add_sample(sample) | |||
| self._sample_counter += 1 | |||
| def _add_sample(self, sample): | |||
| """Search the index and add sample.""" | |||
| if not self._samples or sample.step > self._samples[-1].step: | |||
| self._samples.append(sample) | |||
| return | |||
| index = binary_search(self._samples, sample.step) | |||
| if index == len(self._samples): | |||
| self._samples.append(sample) | |||
| else: | |||
| self._samples.insert(index, sample) | |||
| def remove_sample(self, filter_fun): | |||
| """ | |||
| Remove the samples from Reservoir that do not meet the filter criteria. | |||
| @@ -36,9 +36,9 @@ class MockReservoir: | |||
| def __init__(self, size): | |||
| self.size = size | |||
| self._samples = [ | |||
| _Tensor('wall_time1', 1, 'value1'), | |||
| _Tensor('wall_time2', 2, 'value2'), | |||
| _Tensor('wall_time3', 3, 'value3') | |||
| _Tensor('wall_time1', 1, 'value1', 'filename1'), | |||
| _Tensor('wall_time2', 2, 'value2', 'filename2'), | |||
| _Tensor('wall_time3', 3, 'value3', 'filename3') | |||
| ] | |||
| def samples(self): | |||
| @@ -107,7 +107,8 @@ class TestEventsData: | |||
| """Test add_tensor_event success.""" | |||
| ev_data = self.get_ev_data() | |||
| t_event = TensorEvent(wall_time=1, step=4, tag='new_tag', plugin_name='plugin_name1', value='value1') | |||
| t_event = TensorEvent(wall_time=1, step=4, tag='new_tag', | |||
| plugin_name='plugin_name1', value='value1', filename='filename') | |||
| ev_data.add_tensor_event(t_event) | |||
| assert 'tag0' not in ev_data._tags | |||
| @@ -116,4 +117,54 @@ class TestEventsData: | |||
| assert 'tag0' not in ev_data._reservoir_by_tag | |||
| assert 'new_tag' in ev_data._tags_by_plugin['plugin_name1'] | |||
| assert ev_data._reservoir_by_tag['new_tag'].samples()[-1] == _Tensor(t_event.wall_time, t_event.step, | |||
| t_event.value) | |||
| t_event.value, 'filename') | |||
| def test_add_tensor_event_out_of_order(self): | |||
| """Test add_tensor_event success for out_of_order summaries.""" | |||
| wall_time = 1 | |||
| value = '1' | |||
| tag = 'tag' | |||
| plugin_name = 'scalar' | |||
| file1 = 'file1' | |||
| ev_data = EventsData() | |||
| steps = [i for i in range(2, 10)] | |||
| for step in steps: | |||
| t_event = TensorEvent(wall_time=1, step=step, tag=tag, | |||
| plugin_name=plugin_name, value=value, filename=file1) | |||
| ev_data.add_tensor_event(t_event) | |||
| t_event = TensorEvent(wall_time=1, step=1, tag=tag, | |||
| plugin_name=plugin_name, value=value, filename=file1) | |||
| ev_data.add_tensor_event(t_event) | |||
| # Current steps should be: [1, 2, 3, 4, 5, 6, 7, 8, 9] | |||
| assert len(ev_data._reservoir_by_tag[tag].samples()) == len(steps) + 1 | |||
| file2 = 'file2' | |||
| new_steps_1 = [5, 10] | |||
| for step in new_steps_1: | |||
| t_event = TensorEvent(wall_time=1, step=step, tag=tag, | |||
| plugin_name=plugin_name, value=value, filename=file2) | |||
| ev_data.add_tensor_event(t_event) | |||
| assert ev_data._reservoir_by_tag[tag].samples()[-1] == _Tensor(wall_time, step, value, file2) | |||
| # Current steps should be: [1, 2, 3, 4, 5, 10] | |||
| steps = [1, 2, 3, 4, 5, 10] | |||
| samples = ev_data._reservoir_by_tag[tag].samples() | |||
| for step, sample in zip(steps, samples): | |||
| filename = file1 if sample.step < 5 else file2 | |||
| assert sample == _Tensor(wall_time, step, value, filename) | |||
| new_steps_2 = [7, 11, 3] | |||
| for step in new_steps_2: | |||
| t_event = TensorEvent(wall_time=1, step=step, tag=tag, | |||
| plugin_name=plugin_name, value=value, filename=file2) | |||
| ev_data.add_tensor_event(t_event) | |||
| # Current steps should be: [1, 2, 3, 5, 7, 10, 11], file2: [3, 5, 7, 10, 11] | |||
| steps = [1, 2, 3, 5, 7, 10, 11] | |||
| new_steps_2.extend(new_steps_1) | |||
| samples = ev_data._reservoir_by_tag[tag].samples() | |||
| for step, sample in zip(steps, samples): | |||
| filename = file2 if sample.step in new_steps_2 else file1 | |||
| assert sample == _Tensor(wall_time, step, value, filename) | |||
| @@ -27,10 +27,14 @@ class TestHistogramReservoir: | |||
| sample1.value.count = 1 | |||
| sample1.value.max = 102 | |||
| sample1.value.min = 101 | |||
| sample1.step = 2 | |||
| sample1.filename = 'filename' | |||
| sample2 = mock.MagicMock() | |||
| sample2.value.count = 2 | |||
| sample2.value.max = 102 | |||
| sample2.value.min = 101 | |||
| sample2.step = 1 | |||
| sample2.filename = 'filename' | |||
| my_reservoir.add_sample(sample1) | |||
| my_reservoir.add_sample(sample2) | |||
| samples = my_reservoir.samples() | |||
| @@ -216,9 +216,8 @@ class TestImagesProcessor: | |||
| """ | |||
| Test removing sample in reservoir. | |||
| If step list is [1, 3, 5, 7, 9, 2, 3, 4, 15], | |||
| and then [3, 5, 7, 9] will be deleted. | |||
| Results will be [1, 2, 3, 4, 15]. | |||
| If step list is [1, 3, 5, 7, 9, 2, 3, 4, 15] in one summary, | |||
| Results will be [1, 2, 3, 4, 5, 7, 9, 15]. | |||
| """ | |||
| test_tag_name = self._complete_tag_name | |||
| @@ -237,5 +236,4 @@ class TestImagesProcessor: | |||
| except ImageNotExistError: | |||
| not_found_step_list.append(test_step) | |||
| assert current_step_list == [1, 2, 3, 4, 15] | |||
| assert not_found_step_list == [5, 7, 9] | |||
| assert current_step_list == [1, 2, 3, 4, 5, 7, 9, 15] | |||