- # !/usr/bin/env python
- # -*- coding:utf-8 -*-
- """
- import json
- import os
- import re
- import six
- import numpy as np
- from typing import Tuple
- # import requests # 在 nfs 没有挂载 时使用 url 访问
- import oneflow as flow
- import oneflow.typing as tp
- import logging
- logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
- level=logging.DEBUG)
- current_dir = os.path.dirname(os.path.abspath(__file__))
- label_to_name_file = current_dir + os.sep + "label.names"
- label_2_name = []
- with open(label_to_name_file, 'r') as f:
- label_2_name = f.readlines()
- class TextCNN:
- def __init__(self, emb_sz, emb_dim, ksize_list, n_filters_list, n_classes, dropout):
- self.initializer = flow.random_normal_initializer(stddev=0.1)
- self.emb_sz = emb_sz
- self.emb_dim = emb_dim
- self.ksize_list = ksize_list
- self.n_filters_list = n_filters_list
- self.n_classes = n_classes
- self.dropout = dropout
- self.total_n_filters = sum(self.n_filters_list)
- def get_logits(self, inputs, is_train):
- emb_weight = flow.get_variable(
- 'embedding-weight',
- shape=(self.emb_sz, self.emb_dim),
- dtype=flow.float32,
- trainable=is_train,
- reuse=False,
- initializer=self.initializer,
- )
- data = flow.gather(emb_weight, inputs, axis=0)
- data = flow.transpose(data, [0, 2, 1]) # BLH -> BHL
- data = flow.reshape(data, list(data.shape) + [1])
- seq_length = data.shape[2]
- pooled_list = []
- for i in range(len(self.n_filters_list)):
- ksz = self.ksize_list[i]
- n_filters = self.n_filters_list[i]
- conv = flow.layers.conv2d(data, n_filters, [ksz, 1], data_format="NCHW",
- kernel_initializer=self.initializer, name='conv-{}'.format(i)) # NCHW
- # conv = flow.layers.layer_norm(conv, name='ln-{}'.format(i))
- conv = flow.nn.relu(conv)
- pooled = flow.nn.max_pool2d(conv, [seq_length - ksz + 1, 1], strides=1, padding='VALID', data_format="NCHW")
- pooled_list.append(pooled)
- pooled = flow.concat(pooled_list, 3)
- pooled = flow.reshape(pooled, [-1, self.total_n_filters])
- if is_train:
- pooled = flow.nn.dropout(pooled, rate=self.dropout)
- pooled = flow.layers.dense(pooled, self.total_n_filters, use_bias=True,
- kernel_initializer=self.initializer, name='dense-1')
- pooled = flow.nn.relu(pooled)
- logits = flow.layers.dense(pooled, self.n_classes, use_bias=True,
- kernel_initializer=self.initializer, name='dense-2')
- return logits
- def get_eval_config():
- config = flow.function_config()
- config.default_data_type(flow.float)
- return config
- def pad_sequences(sequences, maxlen=None, dtype='int32',
- padding='pre', truncating='pre', value=0.):
- """Pads sequences to the same length.
- This function transforms a list of
- `num_samples` sequences (lists of integers)
- into a 2D Numpy array of shape `(num_samples, num_timesteps)`.
- `num_timesteps` is either the `maxlen` argument if provided,
- or the length of the longest sequence otherwise.
- Sequences that are shorter than `num_timesteps`
- are padded with `value` at the beginning or the end
- if padding='post.
- Sequences longer than `num_timesteps` are truncated
- so that they fit the desired length.
- The position where padding or truncation happens is determined by
- the arguments `padding` and `truncating`, respectively.
- Pre-padding is the default.
- # Arguments
- sequences: List of lists, where each element is a sequence.
- maxlen: Int, maximum length of all sequences.
- dtype: Type of the output sequences.
- To pad sequences with variable length strings, you can use `object`.
- padding: String, 'pre' or 'post':
- pad either before or after each sequence.
- truncating: String, 'pre' or 'post':
- remove values from sequences larger than
- `maxlen`, either at the beginning or at the end of the sequences.
- value: Float or String, padding value.
- # Returns
- x: Numpy array with shape `(len(sequences), maxlen)`
- # Raises
- ValueError: In case of invalid values for `truncating` or `padding`,
- or in case of invalid shape for a `sequences` entry.
- """
- if not hasattr(sequences, '__len__'):
- raise ValueError('`sequences` must be iterable.')
- num_samples = len(sequences)
- lengths = []
- sample_shape = ()
- flag = True
- # take the sample shape from the first non empty sequence
- # checking for consistency in the main loop below.
- for x in sequences:
- try:
- lengths.append(len(x))
- if flag and len(x):
- sample_shape = np.asarray(x).shape[1:]
- flag = False
- except TypeError:
- raise ValueError('`sequences` must be a list of iterables. '
- 'Found non-iterable: ' + str(x))
- if maxlen is None:
- maxlen = np.max(lengths)
- is_dtype_str = np.issubdtype(dtype, np.str_) or np.issubdtype(dtype, np.unicode_)
- if isinstance(value, six.string_types) and dtype != object and not is_dtype_str:
- raise ValueError("`dtype` {} is not compatible with `value`'s type: {}\n"
- "You should set `dtype=object` for variable length strings."
- .format(dtype, type(value)))
- x = np.full((num_samples, maxlen) + sample_shape, value, dtype=dtype)
- for idx, s in enumerate(sequences):
- if not len(s):
- continue # empty list/array was found
- if truncating == 'pre':
- trunc = s[-maxlen:]
- elif truncating == 'post':
- trunc = s[:maxlen]
- else:
- raise ValueError('Truncating type "%s" '
- 'not understood' % truncating)
- # check `trunc` has expected shape
- trunc = np.asarray(trunc, dtype=dtype)
- if trunc.shape[1:] != sample_shape:
- raise ValueError('Shape of sample %s of sequence at position %s '
- 'is different from expected shape %s' %
- (trunc.shape[1:], idx, sample_shape))
- if padding == 'post':
- x[idx, :len(trunc)] = trunc
- elif padding == 'pre':
- x[idx, -len(trunc):] = trunc
- else:
- raise ValueError('Padding type "%s" not understood' % padding)
- return x
- @flow.global_function('predict', get_eval_config())
- def predict_job(text: tp.Numpy.Placeholder((BATCH_SIZE, 150), dtype=flow.int32),
- ) -> Tuple[tp.Numpy, tp.Numpy]:
- with flow.scope.placement("gpu", "0:0"):
- model = TextCNN(50000, 100, ksize_list=[2, 3, 4, 5], n_filters_list=[100] * 4, n_classes=2, dropout=0.5)
- logits = model.get_logits(text, is_train=False)
- logits = flow.nn.softmax(logits)
- label = flow.math.argmax(logits)
- return label, logits
- class TextCNNClassifier:
- def __init__(self):
- model_load_dir = current_dir + os.sep + "model/textcnn_imdb_of_best_model/"
- word_index_dir = current_dir + os.sep + "model/imdb_word_index/imdb_word_index.json"
- checkpoint = flow.train.CheckPoint()
- checkpoint.init()
- checkpoint.load(model_load_dir)
- with open(word_index_dir) as f:
- word_index = json.load(f)
- word_index = {k: (v + 2) for k, v in word_index.items()}
- word_index["<PAD>"] = 0
- word_index["<START>"] = 1
- word_index["<UNK>"] = 2
- self.word_index = word_index
- def inference(self, text_path_list, id_list, label_list):
- logging.info("infer")
- logging.info(label_list)
- classifications = []
- batch_text = []
- for i, text_path in enumerate(text_path_list):
- text = open(text_path, "r").read()
- """
- # 在 nfs 没有挂载 时使用 url 访问 MinIO 进行测试
- url = "" + text_path
- print(url)
- text = requests.get(url).text # .encode('utf-8').decode('utf-8')
- """
- text = re.sub("[^a-zA-Z']", " ", text)
- text = list(map(lambda x: x.lower(), text.split()))
- text.insert(0, "<START>")
- batch_text.append(
- list(map(lambda x: self.word_index[x] if x in self.word_index else self.word_index["<UNK>"], text))
- )
- if i % BATCH_SIZE == BATCH_SIZE - 1:
- text = pad_sequences(batch_text, value=self.word_index["<PAD>"], padding='post', maxlen=150)
- text = np.array(text, dtype=np.int32)
- label, logits = predict_job(text)
- label = label.tolist()
- logits = logits.tolist()
- for k in range(BATCH_SIZE):
- temp = {}
- temp['annotation'] = []
- temp['annotation'] = json.dumps(temp['annotation'])
- temp['id'] = id_list[i - BATCH_SIZE + 1 + k]
- if label[k] in label_list:
- temp['annotation'] = json.dumps([{'category_id': label_2_name[label[k]].rstrip('\n'), 'score': round(logits[k][label[k]], 4)}])
- classifications.append(temp)
- batch_text = []
- return classifications