|
- import os
- from collections import namedtuple
-
- import numpy as np
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torch.optim as optim
- from torch.autograd import Variable
-
- from model.base_model import BaseModel
-
- USE_GPU = True
-
-
- class CharLM(BaseModel):
-
- """
- Controller of the Character-level Neural Language Model
- To do:
- - where the data goes, call data savers.
- """
- DataTuple = namedtuple("DataTuple", ["feature", "label"])
-
- def __init__(self, lstm_batch_size, lstm_seq_len):
- super(CharLM, self).__init__()
- """
- Settings: should come from config loader or pre-processing
- """
- self.word_embed_dim = 300
- self.char_embedding_dim = 15
- self.cnn_batch_size = lstm_batch_size * lstm_seq_len
- self.lstm_seq_len = lstm_seq_len
- self.lstm_batch_size = lstm_batch_size
- self.num_epoch = 10
- self.old_PPL = 100000
- self.best_PPL = 100000
-
- """
- These parameters are set by pre-processing.
- """
- self.max_word_len = None
- self.num_char = None
- self.vocab_size = None
- self.preprocess("./data_for_tests/charlm.txt")
-
- self.data = None # named tuple to store all data set
- self.data_ready = False
- self.criterion = nn.CrossEntropyLoss()
- self._loss = None
- self.use_gpu = USE_GPU
-
- # word_emb_dim == hidden_size / num of hidden units
- self.hidden = (to_var(torch.zeros(2, self.lstm_batch_size, self.word_embed_dim)),
- to_var(torch.zeros(2, self.lstm_batch_size, self.word_embed_dim)))
-
- self.model = charLM(self.char_embedding_dim,
- self.word_embed_dim,
- self.vocab_size,
- self.num_char,
- use_gpu=self.use_gpu)
- for param in self.model.parameters():
- nn.init.uniform(param.data, -0.05, 0.05)
-
- self.learning_rate = 0.1
- self.optimizer = None
-
- def prepare_input(self, raw_text):
- """
- :param raw_text: raw input text consisting of words
- :return: torch.Tensor, torch.Tensor
- feature matrix, label vector
- This function is only called once in Trainer.train, but may called multiple times in Tester.test
- So Tester will save test input for frequent calls.
- """
- if os.path.exists("cache/prep.pt") is False:
- self.preprocess("./data_for_tests/charlm.txt") # To do: This is not good. Need to fix..
- objects = torch.load("cache/prep.pt")
- word_dict = objects["word_dict"]
- char_dict = objects["char_dict"]
- max_word_len = self.max_word_len
- print("word/char dictionary built. Start making inputs.")
-
- words = raw_text
- input_vec = np.array(text2vec(words, char_dict, max_word_len))
- # Labels are next-word index in word_dict with the same length as inputs
- input_label = np.array([word_dict[w] for w in words[1:]] + [word_dict[words[-1]]])
- feature_input = torch.from_numpy(input_vec)
- label_input = torch.from_numpy(input_label)
- return feature_input, label_input
-
- def mode(self, test=False):
- if test:
- self.model.eval()
- else:
- self.model.train()
-
- def data_forward(self, x):
- """
- :param x: Tensor of size [lstm_batch_size, lstm_seq_len, max_word_len+2]
- :return: Tensor of size [num_words, ?]
- """
- # additional processing of inputs after batching
- num_seq = x.size()[0] // self.lstm_seq_len
- x = x[:num_seq * self.lstm_seq_len, :]
- x = x.view(-1, self.lstm_seq_len, self.max_word_len + 2)
-
- # detach hidden state of LSTM from last batch
- hidden = [state.detach() for state in self.hidden]
- output, self.hidden = self.model(to_var(x), hidden)
- return output
-
- def grad_backward(self):
- self.model.zero_grad()
- self._loss.backward()
- torch.nn.utils.clip_grad_norm(self.model.parameters(), 5, norm_type=2)
- self.optimizer.step()
-
- def get_loss(self, predict, truth):
- self._loss = self.criterion(predict, to_var(truth))
- return self._loss.data # No pytorch data structure exposed outsides
-
- def define_optimizer(self):
- # redefine optimizer for every new epoch
- self.optimizer = optim.SGD(self.model.parameters(), lr=self.learning_rate, momentum=0.85)
-
- def save(self):
- print("network saved")
- # torch.save(self.model, "cache/model.pkl")
-
- def preprocess(self, all_text_files):
- word_dict, char_dict = create_word_char_dict(all_text_files)
- num_char = len(char_dict)
- self.vocab_size = len(word_dict)
- char_dict["BOW"] = num_char + 1
- char_dict["EOW"] = num_char + 2
- char_dict["PAD"] = 0
- self.num_char = num_char + 3
- # char_dict is a dict of (int, string), int counting from 0 to 47
- reverse_word_dict = {value: key for key, value in word_dict.items()}
- self.max_word_len = max([len(word) for word in word_dict])
- objects = {
- "word_dict": word_dict,
- "char_dict": char_dict,
- "reverse_word_dict": reverse_word_dict,
- }
- torch.save(objects, "cache/prep.pt")
- print("Preprocess done.")
-
-
- """
- Global Functions
- """
-
-
- def batch_generator(x, batch_size):
- # x: [num_words, in_channel, height, width]
- # partitions x into batches
- num_step = x.size()[0] // batch_size
- for t in range(num_step):
- yield x[t * batch_size:(t + 1) * batch_size]
-
-
- def text2vec(words, char_dict, max_word_len):
- """ Return list of list of int """
- word_vec = []
- for word in words:
- vec = [char_dict[ch] for ch in word]
- if len(vec) < max_word_len:
- vec += [char_dict["PAD"] for _ in range(max_word_len - len(vec))]
- vec = [char_dict["BOW"]] + vec + [char_dict["EOW"]]
- word_vec.append(vec)
- return word_vec
-
-
- def read_data(file_name):
- with open(file_name, 'r') as f:
- corpus = f.read().lower()
- import re
- corpus = re.sub(r"<unk>", "unk", corpus)
- return corpus.split()
-
-
- def get_char_dict(vocabulary):
- char_dict = dict()
- count = 1
- for word in vocabulary:
- for ch in word:
- if ch not in char_dict:
- char_dict[ch] = count
- count += 1
- return char_dict
-
-
- def create_word_char_dict(*file_name):
- text = []
- for file in file_name:
- text += read_data(file)
- word_dict = {word: ix for ix, word in enumerate(set(text))}
- char_dict = get_char_dict(word_dict)
- return word_dict, char_dict
-
-
- def to_var(x):
- if torch.cuda.is_available() and USE_GPU:
- x = x.cuda()
- return Variable(x)
-
-
- """
- Neural Network
- """
-
-
- class Highway(nn.Module):
- """Highway network"""
-
- def __init__(self, input_size):
- super(Highway, self).__init__()
- self.fc1 = nn.Linear(input_size, input_size, bias=True)
- self.fc2 = nn.Linear(input_size, input_size, bias=True)
-
- def forward(self, x):
- t = F.sigmoid(self.fc1(x))
- return torch.mul(t, F.relu(self.fc2(x))) + torch.mul(1 - t, x)
-
-
- class charLM(nn.Module):
- """Character-level Neural Language Model
- CNN + highway network + LSTM
- # Input:
- 4D tensor with shape [batch_size, in_channel, height, width]
- # Output:
- 2D Tensor with shape [batch_size, vocab_size]
- # Arguments:
- char_emb_dim: the size of each character's embedding
- word_emb_dim: the size of each word's embedding
- vocab_size: num of unique words
- num_char: num of characters
- use_gpu: True or False
- """
-
- def __init__(self, char_emb_dim, word_emb_dim,
- vocab_size, num_char, use_gpu):
- super(charLM, self).__init__()
- self.char_emb_dim = char_emb_dim
- self.word_emb_dim = word_emb_dim
- self.vocab_size = vocab_size
-
- # char embedding layer
- self.char_embed = nn.Embedding(num_char, char_emb_dim)
-
- # convolutions of filters with different sizes
- self.convolutions = []
-
- # list of tuples: (the number of filter, width)
- # self.filter_num_width = [(25, 1), (50, 2), (75, 3), (100, 4), (125, 5), (150, 6)]
- self.filter_num_width = [(25, 1), (50, 2), (75, 3)]
-
- for out_channel, filter_width in self.filter_num_width:
- self.convolutions.append(
- nn.Conv2d(
- 1, # in_channel
- out_channel, # out_channel
- kernel_size=(char_emb_dim, filter_width), # (height, width)
- bias=True
- )
- )
-
- self.highway_input_dim = sum([x for x, y in self.filter_num_width])
-
- self.batch_norm = nn.BatchNorm1d(self.highway_input_dim, affine=False)
-
- # highway net
- self.highway1 = Highway(self.highway_input_dim)
- self.highway2 = Highway(self.highway_input_dim)
-
- # LSTM
- self.lstm_num_layers = 2
-
- self.lstm = nn.LSTM(input_size=self.highway_input_dim,
- hidden_size=self.word_emb_dim,
- num_layers=self.lstm_num_layers,
- bias=True,
- dropout=0.5,
- batch_first=True)
-
- # output layer
- self.dropout = nn.Dropout(p=0.5)
- self.linear = nn.Linear(self.word_emb_dim, self.vocab_size)
-
- if use_gpu is True:
- for x in range(len(self.convolutions)):
- self.convolutions[x] = self.convolutions[x].cuda()
- self.highway1 = self.highway1.cuda()
- self.highway2 = self.highway2.cuda()
- self.lstm = self.lstm.cuda()
- self.dropout = self.dropout.cuda()
- self.char_embed = self.char_embed.cuda()
- self.linear = self.linear.cuda()
- self.batch_norm = self.batch_norm.cuda()
-
- def forward(self, x, hidden):
- # Input: Variable of Tensor with shape [num_seq, seq_len, max_word_len+2]
- # Return: Variable of Tensor with shape [num_words, len(word_dict)]
- lstm_batch_size = x.size()[0]
- lstm_seq_len = x.size()[1]
-
- x = x.contiguous().view(-1, x.size()[2])
- # [num_seq*seq_len, max_word_len+2]
-
- x = self.char_embed(x)
- # [num_seq*seq_len, max_word_len+2, char_emb_dim]
-
- x = torch.transpose(x.view(x.size()[0], 1, x.size()[1], -1), 2, 3)
- # [num_seq*seq_len, 1, char_emb_dim, max_word_len+2]
-
- x = self.conv_layers(x)
- # [num_seq*seq_len, total_num_filters]
-
- x = self.batch_norm(x)
- # [num_seq*seq_len, total_num_filters]
-
- x = self.highway1(x)
- x = self.highway2(x)
- # [num_seq*seq_len, total_num_filters]
-
- x = x.contiguous().view(lstm_batch_size, lstm_seq_len, -1)
- # [num_seq, seq_len, total_num_filters]
-
- x, hidden = self.lstm(x, hidden)
- # [seq_len, num_seq, hidden_size]
-
- x = self.dropout(x)
- # [seq_len, num_seq, hidden_size]
-
- x = x.contiguous().view(lstm_batch_size * lstm_seq_len, -1)
- # [num_seq*seq_len, hidden_size]
-
- x = self.linear(x)
- # [num_seq*seq_len, vocab_size]
- return x, hidden
-
- def conv_layers(self, x):
- chosen_list = list()
- for conv in self.convolutions:
- feature_map = F.tanh(conv(x))
- # (batch_size, out_channel, 1, max_word_len-width+1)
- chosen = torch.max(feature_map, 3)[0]
- # (batch_size, out_channel, 1)
- chosen = chosen.squeeze()
- # (batch_size, out_channel)
- chosen_list.append(chosen)
-
- # (batch_size, total_num_filers)
- return torch.cat(chosen_list, 1)
|