|
-
- import jieba
- import math
- import random
- import codecs
- import numpy as np
- import os
-
- def check_bio(tags):
- """
- 检测输入的tags是否是bio编码
- 如果不是bio编码
- 那么错误的类型
- (1)编码不在BIO中
- (2)第一个编码是I
- (3)当前编码不是B,前一个编码不是O
- :param tags:
- :return:
- """
- for i, tag in enumerate(tags):
- if tag == 'O':
- continue
- tag_list = tag.split("-")
- if len(tag_list) != 2 or tag_list[0] not in set(['B','I']):
- #非法编码
- return False
- if tag_list[0] == 'B':
- continue
- elif i == 0 or tags[i-1] == 'O':
- #如果第一个位置不是B或者当前编码不是B并且前一个编码0,则全部转换成B
- tags[i] = 'B' + tag[1:]
- elif tags[i-1][1:] == tag[1:]:
- # 如果当前编码的后面类型编码与tags中的前一个编码中后面类型编码相同则跳过
- continue
- else:
- # 如果编码类型不一致,则重新从B开始编码
- tags[i] = 'B' + tag[1:]
- return True
-
- def bio_to_bioes(tags):
- """
- 把bio编码转换成bioes编码
- 返回新的tags
- :param tags:
- :return:
- """
- new_tags = []
- for i, tag in enumerate(tags):
- if tag == 'O':
- # 直接保留,不变化
- new_tags.append(tag)
- elif tag.split('-')[0] == 'B':
- # 如果tag是以B开头,那么我们就要做下面的判断
- # 首先,如果当前tag不是最后一个,并且紧跟着的后一个是I
- if (i+1) < len(tags) and tags[i+1].split('-')[0] == 'I':
- # 直接保留
- new_tags.append(tag)
- else:
- # 如果是最后一个或者紧跟着的后一个不是I,那么表示单子,需要把B换成S表示单字
- new_tags.append(tag.replace('B-','S-'))
- elif tag.split('-')[0] == 'I':
- # 如果tag是以I开头,那么我们需要进行下面的判断
- # 首先,如果当前tag不是最后一个,并且紧跟着的一个是I
- if (i+1) < len(tags) and tags[i+1].split('-')[0] == 'I':
- # 直接保留
- new_tags.append(tag)
- else:
- # 如果是最后一个,或者后一个不是I开头的,那么就表示一个词的结尾,就把I换成E表示一个词结尾
- new_tags.append(tag.replace('I-', 'E-'))
-
- else:
- raise Exception('非法编码')
- return new_tags
-
- def bioes_to_bio(tags):
- """
- BIOES->BIO
- :param tags:
- :return:
- """
- new_tags = []
- for i, tag in enumerate(tags):
- if tag.split('-')[0] == "B":
- new_tags.append(tag)
- elif tag.split('-')[0] == "I":
- new_tags.append(tag)
- elif tag.split('-')[0] == "S":
- new_tags.append(tag.replace('S-','B-'))
- elif tag.split('-')[0] == "E":
- new_tags.append(tag.replace('E-','I-'))
- elif tag.split('-')[0] == "O":
- new_tags.append(tag)
- else:
- raise Exception('非法编码格式')
- return new_tags
-
-
- def create_dico(item_list):
- """
- 对于item_list中的每一个items,统计items中item在item_list中的次数
- item:出现的次数
- :param item_list:
- :return:
- """
- assert type(item_list) is list
- dico = {}
- for items in item_list:
- for item in items:
- if item not in dico:
- dico[item] = 1
- else:
- dico[item] += 1
- return dico
-
- def create_mapping(dico):
- """
- 创建item to id, id_to_item
- item的排序按词典中出现的次数
- :param dico:
- :return:
- """
- sorted_items = sorted(dico.items(), key=lambda x:(-x[1],x[0]))
- id_to_item = {i:v[0] for i,v in enumerate(sorted_items)}
- item_to_id = {v:k for k, v in id_to_item.items()}
- return item_to_id, id_to_item
-
- def get_seg_features(words):
- """
- 利用jieba分词
- 采用类似bioes的编码,0表示单个字成词, 1表示一个词的开始, 2表示一个词的中间,3表示一个词的结尾
- :param words:
- :return:
- """
- seg_features = []
-
- word_list = list(jieba.cut(words))
-
- for word in word_list:
- if len(word) == 1:
- seg_features.append(0)
- else:
- temp = [2] * len(word)
- temp[0] = 1
- temp[-1] = 3
- seg_features.extend(temp)
- return seg_features
-
- def load_word2vec(emb_file, id_to_word, word_dim, old_weights):
- """
- :param emb_file:
- :param id_to_word:
- :param word_dim:
- :param old_weights:
- :return:
- """
- new_weights = old_weights
- pre_trained = {}
- emb_invalid = 0
- for i, line in enumerate(codecs.open(emb_file, 'r', encoding='utf-8')):
- line = line.rstrip().split()
- if len(line) == word_dim + 1:
- pre_trained[line[0]] = np.array(
- [float(x) for x in line[1:]]
- ).astype(np.float32)
- else:
- emb_invalid = emb_invalid + 1
-
- if emb_invalid > 0:
- print('waring: %i invalid lines' % emb_invalid)
-
- num_words = len(id_to_word)
- for i in range(num_words):
- word = id_to_word[i]
- if word in pre_trained:
- new_weights[i] = pre_trained[word]
- else:
- pass
- print('加载了 %i 个字向量' % len(pre_trained))
-
- return new_weights
-
- def augment_with_pretrained(dico_train, emb_path, test_words):
- """
- :param dico_train:
- :param emb_path:
- :param test_words:
- :return:
- """
- assert os.path.isfile(emb_path)
-
- #加载与训练的词向量
- pretrained = set(
- [
- line.rsplit()[0].strip() for line in codecs.open(emb_path, 'r', encoding='utf-8')
- ]
- )
-
-
- if test_words is None:
- for word in pretrained:
- if word not in dico_train:
- dico_train[word] = 0
- else:
- for word in test_words:
- if any(x in pretrained for x in
- [word, word.lower()]
- ) and word not in dico_train:
- dico_train[word] = 0
-
- word_to_id, id_to_word = create_mapping(dico_train)
-
- return dico_train, word_to_id, id_to_word
-
-
-
-
-
-
-
-
-
-
-
-
-
- class BatchManager(object):
- def __init__(self, data, batch_size):
- self.batch_data = self.sort_and_pad(data, batch_size)
- self.len_data = len(self.batch_data)
- def sort_and_pad(self, data, batch_size):
- num_batch = int(math.ceil(len(data) / batch_size))
- sorted_data = sorted(data, key=lambda x:len(x[0]))
- batch_data = list()
- for i in range(num_batch):
- batch_data.append(self.pad_data(sorted_data[i*batch_size : (i+1)*batch_size]))
- return batch_data
-
- @staticmethod
- def pad_data(data):
- word_list = []
- word_id_list = []
- seg_list = []
- tag_id_list = []
- max_length = max([len(sentence[0]) for sentence in data])
- for line in data:
- words, word_ids, segs, tag_ids = line
- padding = [0] * (max_length - len(words))
- word_list.append(words + padding)
- word_id_list.append(word_ids + padding)
- seg_list.append(segs + padding)
- tag_id_list.append(tag_ids + padding)
- return [word_list, word_id_list, seg_list,tag_id_list]
-
- def iter_batch(self, shuffle=False):
- if shuffle:
- random.shuffle(self.batch_data)
- for idx in range(self.len_data):
- yield self.batch_data[idx]
-
|