|
- """
- .. _embed-loader:
-
- 用于读取预训练的embedding, 读取结果可直接载入为模型参数
- """
- import os
-
- import numpy as np
-
- from ..core.vocabulary import Vocabulary
- from .base_loader import BaseLoader
-
- import warnings
-
- class EmbedLoader(BaseLoader):
- """
- 别名::class:`fastNLP.io.EmbedLoader` :class:`fastNLP.io.embed_loader.EmbedLoader`
-
- 这个类用于从预训练的Embedding中load数据。
- """
-
- def __init__(self):
- super(EmbedLoader, self).__init__()
-
- @staticmethod
- def load_with_vocab(embed_filepath, vocab, dtype=np.float32, normalize=True, error='ignore'):
- """
- 从embed_filepath这个预训练的词向量中抽取出vocab这个词表的词的embedding。EmbedLoader将自动判断embed_filepath是
- word2vec(第一行只有两个元素)还是glove格式的数据。
-
- :param str embed_filepath: 预训练的embedding的路径。
- :param vocab: 词表 :class:`~fastNLP.Vocabulary` 类型,读取出现在vocab中的词的embedding。
- 没有出现在vocab中的词的embedding将通过找到的词的embedding的正态分布采样出来,以使得整个Embedding是同分布的。
- :param dtype: 读出的embedding的类型
- :param bool normalize: 是否将每个vector归一化到norm为1
- :param str error: `ignore` , `strict` ; 如果 `ignore` ,错误将自动跳过; 如果 `strict` , 错误将抛出。
- 这里主要可能出错的地方在于词表有空行或者词表出现了维度不一致。
- :return numpy.ndarray: shape为 [len(vocab), dimension], dimension由pretrain的embedding决定。
- """
- assert isinstance(vocab, Vocabulary), "Only fastNLP.Vocabulary is supported."
- if not os.path.exists(embed_filepath):
- raise FileNotFoundError("`{}` does not exist.".format(embed_filepath))
- with open(embed_filepath, 'r', encoding='utf-8') as f:
- hit_flags = np.zeros(len(vocab), dtype=bool)
- line = f.readline().strip()
- parts = line.split()
- start_idx = 0
- if len(parts)==2:
- dim = int(parts[1])
- start_idx += 1
- else:
- dim = len(parts)-1
- f.seek(0)
- matrix = np.random.randn(len(vocab), dim).astype(dtype)
- for idx, line in enumerate(f, start_idx):
- try:
- parts = line.strip().split()
- if parts[0] in vocab:
- index = vocab.to_index(parts[0])
- matrix[index] = np.fromstring(' '.join(parts[1:]), sep=' ', dtype=dtype, count=dim)
- hit_flags[index] = True
- except Exception as e:
- if error == 'ignore':
- warnings.warn("Error occurred at the {} line.".format(idx))
- else:
- print("Error occurred at the {} line.".format(idx))
- raise e
- total_hits = sum(hit_flags)
- print("Found {} out of {} words in the pre-training embedding.".format(total_hits, len(vocab)))
- found_vectors = matrix[hit_flags]
- if len(found_vectors)!=0:
- mean = np.mean(found_vectors, axis=0, keepdims=True)
- std = np.std(found_vectors, axis=0, keepdims=True)
- unfound_vec_num = len(vocab) - total_hits
- r_vecs = np.random.randn(unfound_vec_num, dim).astype(dtype)*std + mean
- matrix[hit_flags==False] = r_vecs
-
- if normalize:
- matrix /= np.linalg.norm(matrix, axis=1, keepdims=True)
-
- return matrix
-
- @staticmethod
- def load_without_vocab(embed_filepath, dtype=np.float32, padding='<pad>', unknown='<unk>', normalize=True,
- error='ignore'):
- """
- 从embed_filepath中读取预训练的word vector。根据预训练的词表读取embedding并生成一个对应的Vocabulary。
-
- :param str embed_filepath: 预训练的embedding的路径。
- :param dtype: 读出的embedding的类型
- :param str padding: the padding tag for vocabulary.
- :param str unknown: the unknown tag for vocabulary.
- :param bool normalize: 是否将每个vector归一化到norm为1
- :param str error: `ignore` , `strict` ; 如果 `ignore` ,错误将自动跳过; 如果 `strict` , 错误将抛出。这里主要可能出错的地
- 方在于词表有空行或者词表出现了维度不一致。
- :return numpy.ndarray: shape为 [len(vocab), dimension], dimension由pretrain的embedding决定。
- :return numpy.ndarray: Vocabulary Embedding的shape是[词表大小+x, 词表维度], "词表大小+x"是由于最终的大小还取决与
- 是否使用padding, 以及unknown有没有在词表中找到对应的词。 Vocabulary中的词的顺序与Embedding的顺序是一一对应的。
- """
- vocab = Vocabulary(padding=padding, unknown=unknown)
- vec_dict = {}
- found_unknown = False
- found_pad = False
-
- with open(embed_filepath, 'r', encoding='utf-8') as f:
- line = f.readline()
- start = 1
- dim = -1
- if len(line.strip().split())!=2:
- f.seek(0)
- start = 0
- for idx, line in enumerate(f, start=start):
- try:
- parts = line.strip().split()
- word = parts[0]
- if dim==-1:
- dim = len(parts)-1
- vec = np.fromstring(' '.join(parts[1:]), sep=' ', dtype=dtype, count=dim)
- vec_dict[word] = vec
- vocab.add_word(word)
- if unknown is not None and unknown==word:
- found_unknown = True
- if found_pad is not None and padding==word:
- found_pad = True
- except Exception as e:
- if error=='ignore':
- warnings.warn("Error occurred at the {} line.".format(idx))
- pass
- else:
- print("Error occurred at the {} line.".format(idx))
- raise e
- if dim==-1:
- raise RuntimeError("{} is an empty file.".format(embed_filepath))
- matrix = np.random.randn(len(vocab), dim).astype(dtype)
- if (unknown is not None and not found_unknown) or (padding is not None and not found_pad):
- start_idx = 0
- if padding is not None:
- start_idx += 1
- if unknown is not None:
- start_idx += 1
-
- mean = np.mean(matrix[start_idx:], axis=0, keepdims=True)
- std = np.std(matrix[start_idx:], axis=0, keepdims=True)
- if (unknown is not None and not found_unknown):
- matrix[start_idx-1] = np.random.randn(1, dim).astype(dtype)*std + mean
- if (padding is not None and not found_pad):
- matrix[0] = np.random.randn(1, dim).astype(dtype)*std + mean
-
- for key, vec in vec_dict.items():
- index = vocab.to_index(key)
- matrix[index] = vec
-
- if normalize:
- matrix /= np.linalg.norm(matrix, axis=1, keepdims=True)
-
- return matrix, vocab
|