diff --git a/reproduction/seqence_labelling/ner/model/dilated_cnn.py b/reproduction/seqence_labelling/ner/model/dilated_cnn.py new file mode 100644 index 00000000..cd2fa64b --- /dev/null +++ b/reproduction/seqence_labelling/ner/model/dilated_cnn.py @@ -0,0 +1,111 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F +from fastNLP.modules.decoder import ConditionalRandomField +from fastNLP.modules.encoder import Embedding +from fastNLP.core.utils import seq_len_to_mask +from fastNLP.core.const import Const as C + + +class IDCNN(nn.Module): + def __init__(self, init_embed, char_embed, + num_cls, + repeats, num_layers, num_filters, kernel_size, + use_crf=False, use_projection=False, block_loss=False, + input_dropout=0.3, hidden_dropout=0.2, inner_dropout=0.0): + super(IDCNN, self).__init__() + self.word_embeddings = Embedding(init_embed) + self.char_embeddings = Embedding(char_embed) + embedding_size = self.word_embeddings.embedding_dim + \ + self.char_embeddings.embedding_dim + + self.conv0 = nn.Sequential( + nn.Conv1d(in_channels=embedding_size, + out_channels=num_filters, + kernel_size=kernel_size, + stride=1, dilation=1, + padding=kernel_size//2, + bias=True), + nn.ReLU(), + ) + + block = [] + for layer_i in range(num_layers): + dilated = 2 ** layer_i + block.append(nn.Conv1d( + in_channels=num_filters, + out_channels=num_filters, + kernel_size=kernel_size, + stride=1, dilation=dilated, + padding=(kernel_size//2) * dilated, + bias=True)) + block.append(nn.ReLU()) + self.block = nn.Sequential(*block) + + if use_projection: + self.projection = nn.Sequential( + nn.Conv1d( + in_channels=num_filters, + out_channels=num_filters//2, + kernel_size=1, + bias=True), + nn.ReLU(),) + encode_dim = num_filters // 2 + else: + self.projection = None + encode_dim = num_filters + + self.input_drop = nn.Dropout(input_dropout) + self.hidden_drop = nn.Dropout(hidden_dropout) + self.inner_drop = nn.Dropout(inner_dropout) + self.repeats = repeats + self.out_fc = nn.Conv1d( + in_channels=encode_dim, + out_channels=num_cls, + kernel_size=1, + bias=True) + self.crf = ConditionalRandomField( + num_tags=num_cls) if use_crf else None + self.block_loss = block_loss + + def forward(self, words, chars, seq_len, target=None): + e1 = self.word_embeddings(words) + e2 = self.char_embeddings(chars) + x = torch.cat((e1, e2), dim=-1) # b,l,h + mask = seq_len_to_mask(seq_len) + + x = x.transpose(1, 2) # b,h,l + last_output = self.conv0(x) + output = [] + for repeat in range(self.repeats): + last_output = self.block(last_output) + hidden = self.projection(last_output) if self.projection is not None else last_output + output.append(self.out_fc(hidden)) + + def compute_loss(y, t, mask): + if self.crf is not None and target is not None: + loss = self.crf(y, t, mask) + else: + t.masked_fill_(mask == 0, -100) + loss = F.cross_entropy(y, t, ignore_index=-100) + return loss + + if self.block_loss: + losses = [compute_loss(o, target, mask) for o in output] + loss = sum(losses) + else: + loss = compute_loss(output[-1], target, mask) + + scores = output[-1] + if self.crf is not None: + pred = self.crf.viterbi_decode(scores, target, mask) + else: + pred = scores.max(1)[1] * mask.long() + + return { + C.LOSS: loss, + C.OUTPUT: pred, + } + + def predict(self, words, chars, seq_len): + return self.forward(words, chars, seq_len)[C.OUTPUT]