@@ -1,3 +1,4 @@
import numpy as np
import torch
import torch
from fastNLP.core.vocabulary import Vocabulary
from fastNLP.core.vocabulary import Vocabulary
@@ -26,7 +27,7 @@ class EmbedLoader(BaseLoader):
emb = {}
emb = {}
with open(emb_file, 'r', encoding='utf-8') as f:
with open(emb_file, 'r', encoding='utf-8') as f:
for line in f:
for line in f:
line = list(filter(lambda w: len(w)>0, line.strip().split(' ')))
line = list(filter(lambda w: len(w) > 0, line.strip().split(' ')))
if len(line) > 2:
if len(line) > 2:
emb[line[0]] = torch.Tensor(list(map(float, line[1:])))
emb[line[0]] = torch.Tensor(list(map(float, line[1:])))
return emb
return emb
@@ -35,9 +36,9 @@ class EmbedLoader(BaseLoader):
def _load_pretrain(emb_file, emb_type):
def _load_pretrain(emb_file, emb_type):
"""Read txt data from embedding file and convert to np.array as pre-trained embedding
"""Read txt data from embedding file and convert to np.array as pre-trained embedding
:param emb_file: str, the pre-trained embedding file path
:param emb_type: str, the pre-trained embedding data format
:return dict: {str: np.array}
:param str emb_file: the pre-trained embedding file path
:param str emb_type: the pre-trained embedding data format
:return dict embedding : ` {str: np.array}`
"""
"""
if emb_type == 'glove':
if emb_type == 'glove':
return EmbedLoader._load_glove(emb_file)
return EmbedLoader._load_glove(emb_file)
@@ -45,38 +46,68 @@ class EmbedLoader(BaseLoader):
raise Exception("embedding type {} not support yet".format(emb_type))
raise Exception("embedding type {} not support yet".format(emb_type))
@staticmethod
@staticmethod
def load_embedding(emb_dim, emb_file, emb_type, vocab, emb_pkl ):
def load_embedding(emb_dim, emb_file, emb_type, vocab):
"""Load the pre-trained embedding and combine with the given dictionary.
"""Load the pre-trained embedding and combine with the given dictionary.
:param emb_dim: int, the dimension of the embedding. Should be the same as pre-trained embedding.
:param emb_file: str, the pre-trained embedding file path.
:param emb_type: str, the pre-trained embedding format, support glove now
:param vocab: Vocabulary, a mapping from word to index, can be provided by user or built from pre-trained embedding
:param emb_pkl: str, the embedding pickle file.
:param int emb_dim: the dimension of the embedding. Should be the same as pre-trained embedding.
:param str emb_file: the pre-trained embedding file path.
:param str emb_type: the pre-trained embedding format, support glove now
:param Vocabulary vocab: a mapping from word to index, can be provided by user or built from pre-trained embedding
:return embedding_tensor: Tensor of shape (len(word_dict), emb_dim)
:return embedding_tensor: Tensor of shape (len(word_dict), emb_dim)
vocab: input vocab or vocab built by pre-train
vocab: input vocab or vocab built by pre-train
TODO: fragile code
"""
"""
# If the embedding pickle exists, load it and return.
# if os.path.exists(emb_pkl):
# with open(emb_pkl, "rb") as f:
# embedding_tensor, vocab = _pickle.load(f)
# return embedding_tensor, vocab
# Otherwise, load the pre-trained embedding.
pretrain = EmbedLoader._load_pretrain(emb_file, emb_type)
pretrain = EmbedLoader._load_pretrain(emb_file, emb_type)
if vocab is None:
if vocab is None:
# build vocabulary from pre-trained embedding
# build vocabulary from pre-trained embedding
vocab = Vocabulary()
vocab = Vocabulary()
for w in pretrain.keys():
for w in pretrain.keys():
vocab.update (w)
vocab.add(w)
embedding_tensor = torch.randn(len(vocab), emb_dim)
embedding_tensor = torch.randn(len(vocab), emb_dim)
for w, v in pretrain.items():
for w, v in pretrain.items():
if len(v.shape) > 1 or emb_dim != v.shape[0]:
if len(v.shape) > 1 or emb_dim != v.shape[0]:
raise ValueError('pretrian embedding dim is {}, dismatching required {}'.format(v.shape, (emb_dim,)))
raise ValueError(
"Pretrained embedding dim is {}. Dimension dismatched. Required {}".format(v.shape, (emb_dim,)))
if vocab.has_word(w):
if vocab.has_word(w):
embedding_tensor[vocab[w]] = v
embedding_tensor[vocab[w]] = v
# save and return the result
# with open(emb_pkl, "wb") as f:
# _pickle.dump((embedding_tensor, vocab), f)
return embedding_tensor, vocab
return embedding_tensor, vocab
@staticmethod
def parse_glove_line(line):
line = list(filter(lambda w: len(w) > 0, line.strip().split(" ")))
if len(line) <= 2:
raise RuntimeError("something goes wrong in parsing glove embedding")
return line[0], torch.Tensor(list(map(float, line[1:])))
@staticmethod
def fast_load_embedding(emb_dim, emb_file, vocab):
"""Fast load the pre-trained embedding and combine with the given dictionary.
This loading method uses line-by-line operation.
:param int emb_dim: the dimension of the embedding. Should be the same as pre-trained embedding.
:param str emb_file: the pre-trained embedding file path.
:param Vocabulary vocab: a mapping from word to index, can be provided by user or built from pre-trained embedding
:return numpy.ndarray embedding_matrix:
"""
if vocab is None:
raise RuntimeError("You must provide a vocabulary.")
embedding_matrix = np.zeros(shape=(len(vocab), emb_dim))
hit_flags = np.zeros(shape=(len(vocab),), dtype=int)
with open(emb_file, "r", encoding="utf-8") as f:
for line in f:
word, vector = EmbedLoader.parse_glove_line(line)
if word in vocab:
if len(vector.shape) > 1 or emb_dim != vector.shape[0]:
raise ValueError("Pre-trained embedding dim is {}. Expect {}.".format(vector.shape, (emb_dim,)))
embedding_matrix[vocab[word]] = vector
hit_flags[vocab[word]] = 1
if np.sum(hit_flags) < len(vocab):
# some words from vocab are missing in pre-trained embedding
# we normally sample them
vocab_embed = embedding_matrix[np.where(hit_flags)]
mean, cov = vocab_embed.mean(axis=0), np.cov(vocab_embed.T)
sampled_vectors = np.random.multivariate_normal(mean, cov, size=(len(vocab) - np.sum(hit_flags),))
embedding_matrix[np.where(1 - hit_flags)] = sampled_vectors
return embedding_matrix