|
- import numpy as np
- import torch
- import torch.autograd as autograd
- import torch.nn as nn
- import torch.nn.functional as F
- import math, copy, time
- from torch.autograd import Variable
-
- # import matplotlib.pyplot as plt
-
-
- def clones(module, N):
- "Produce N identical layers."
- return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
-
-
- def subsequent_mask(size):
- "Mask out subsequent positions."
- attn_shape = (1, size, size)
- subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype("uint8")
- return torch.from_numpy(subsequent_mask).eq(False)
-
-
- def attention(query, key, value, mask=None, dropout=None):
- "Compute 'Scaled Dot Product Attention'"
- d_k = query.size(-1)
- scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
- if mask is not None:
- # print(scores.size(),mask.size()) # [bsz,1,1,len]
- scores = scores.masked_fill(mask.eq(False), -1e9)
- p_attn = F.softmax(scores, dim=-1)
- if dropout is not None:
- p_attn = dropout(p_attn)
- return torch.matmul(p_attn, value), p_attn
-
-
- class MultiHeadedAttention(nn.Module):
- def __init__(self, h, d_model, dropout=0.1):
- "Take in model size and number of heads."
- super(MultiHeadedAttention, self).__init__()
- assert d_model % h == 0
- # We assume d_v always equals d_k
- self.d_k = d_model // h
- self.h = h
- self.linears = clones(nn.Linear(d_model, d_model), 4)
- self.attn = None
- self.dropout = nn.Dropout(p=dropout)
-
- def forward(self, query, key, value, mask=None):
- "Implements Figure 2"
- if mask is not None:
- # Same mask applied to all h heads.
- mask = mask.unsqueeze(1)
-
- nbatches = query.size(0)
-
- # 1) Do all the linear projections in batch from d_model => h x d_k
- query, key, value = [
- l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
- for l, x in zip(self.linears, (query, key, value))
- ]
-
- # 2) Apply attention on all the projected vectors in batch.
- x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)
-
- # 3) "Concat" using a view and apply a final linear.
- x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k)
- return self.linears[-1](x)
-
-
- class LayerNorm(nn.Module):
- "Construct a layernorm module (See citation for details)."
-
- def __init__(self, features, eps=1e-6):
- super(LayerNorm, self).__init__()
- self.a_2 = nn.Parameter(torch.ones(features))
- self.b_2 = nn.Parameter(torch.zeros(features))
- self.eps = eps
-
- def forward(self, x):
- mean = x.mean(-1, keepdim=True)
- std = x.std(-1, keepdim=True)
- return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
-
-
- class PositionwiseFeedForward(nn.Module):
- "Implements FFN equation."
-
- def __init__(self, d_model, d_ff, dropout=0.1):
- super(PositionwiseFeedForward, self).__init__()
- self.w_1 = nn.Linear(d_model, d_ff)
- self.w_2 = nn.Linear(d_ff, d_model)
- self.dropout = nn.Dropout(dropout)
-
- def forward(self, x):
- return self.w_2(self.dropout(F.relu(self.w_1(x))))
-
-
- class SublayerConnection(nn.Module):
- """
- A residual connection followed by a layer norm.
- Note for code simplicity the norm is first as opposed to last.
- """
-
- def __init__(self, size, dropout):
- super(SublayerConnection, self).__init__()
- self.norm = LayerNorm(size)
- self.dropout = nn.Dropout(dropout)
-
- def forward(self, x, sublayer):
- "Apply residual connection to any sublayer with the same size."
- return x + self.dropout(sublayer(self.norm(x)))
-
-
- class EncoderLayer(nn.Module):
- "Encoder is made up of self-attn and feed forward (defined below)"
-
- def __init__(self, size, self_attn, feed_forward, dropout):
- super(EncoderLayer, self).__init__()
- self.self_attn = self_attn
- self.feed_forward = feed_forward
- self.sublayer = clones(SublayerConnection(size, dropout), 2)
- self.size = size
-
- def forward(self, x, mask):
- "Follow Figure 1 (left) for connections."
- x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
- return self.sublayer[1](x, self.feed_forward)
-
-
- class Encoder(nn.Module):
- "Core encoder is a stack of N layers"
-
- def __init__(self, layer, N):
- super(Encoder, self).__init__()
- self.layers = clones(layer, N)
- self.norm = LayerNorm(layer.size)
-
- def forward(self, x, mask):
- # print(x.size(),mask.size())
- "Pass the input (and mask) through each layer in turn."
- mask = mask.byte().unsqueeze(-2)
- for layer in self.layers:
- x = layer(x, mask)
- return self.norm(x)
-
-
- def make_encoder(N=6, d_model=512, d_ff=2048, h=8, dropout=0.1):
- c = copy.deepcopy
- attn = MultiHeadedAttention(h, d_model)
- ff = PositionwiseFeedForward(d_model, d_ff, dropout)
- return Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N)
|