AI TECH/TIL

[실습] Week4 Today I Learn

prefer_all 2022. 10. 19. 10:44
<목차>
1. Basic RNN, LSTM, GRU 
- PackedSequence
2. Vocabulary 만들기 :  token2id, id2token
3. RNN based Language Model 제작하기
- 모델 제작, BPTT, 모델 평가

 

 

week4 NLP 1주차
실습: Basic RNN

 

RNN은 매 순간 동일한 weight 행렬을 이용해 재귀적으로 학습한다.

이번 실습에서는 데이터를 하나의 batch로 사용한다.

 

기존에 주어진 데이터는 아래와 같으나 padding을 붙여 (10,20)의 데이터가 된다.

 

 

# 데이터 전처리: Padding 코드

valid_lens = []
for i, seq in enumerate(tqdm(data)):
  valid_lens.append(len(seq))
  if len(seq) < max_len:
    data[i] = seq + [pad_id] * (max_len - len(seq))

위 데이터를 하나의 batch로 사용하기 때문에 batch size는 10이고 maximum sequence length는 20이다.

 

 

데이터를 전처리 한 후 word embedding을 위한 embedding layer을 만든다.

embedding dimenstion은 256으로 설정하였는데 이는 각 embedding vector의 크기를 말한다.

# B: batch size, L: maximum sequence length
embedding_size = 256
embedding = nn.Embedding(vocab_size, embedding_size)
# vocab_size는 상수로 100, embedding_size는 256

# d_w: embedding size
batch_emb = embedding(batch)  # (B, L, d_w)
Pytorch의 nn.Embedding의 인자
nn.Embedding(num_embeddings=len(vocab), embedding_dim=3, padding_idx=1)

- num_embeddings : 임베딩을 할 단어들의 개수. 다시 말해 단어 집합의 크기입니다.
- embedding_dim : 임베딩 할 벡터의 차원입니다. 사용자가 정해주는 하이퍼파라미터입니다.
- padding_idx : 선택적으로 사용하는 인자입니다. 패딩을 위한 토큰의 인덱스를 알려줍니다.

 

 

 

이제 layer은 한 개만, 방향은 단방향으로 설정하고 RNN을 돌릴 때 dimension의 변화를 살펴보자. 

input_size는 벡터로 바뀐 단어들을 input으로 넣어주는 거니까 우리가 설정한 embedding_size이고, hidden_size는 임의로 설정해두었다. 

hidden_size = 512  # RNN의 hidden size
num_layers = 1  # 쌓을 RNN layer의 개수
num_dirs = 1  # 1: 단방향 RNN, 2: 양방향 RNN

rnn = nn.RNN(
    input_size=embedding_size,
    hidden_size=hidden_size,
    num_layers=num_layers,
    bidirectional=True if num_dirs > 1 else False
)

 

 

이제 hidden state들의 weight를 학습하자. 우선 0으로 초기화 시킬 것이고, dimension은 다음과 같다.

h_0 = torch.zeros((num_layers * num_dirs, batch.shape[0], hidden_size))  
# (num_layers * num_dirs, B, d_h)

즉 h_0의 shape는 (1, 10, 512) 이다. 채널은 layer과 방향의 곱인데, layer 별로 좌, 우 방향 각각의 정보를 저장해야 하기 때문이다.

 

 

RNN에 batch data를 넣으면 hidden_states, h_n 두 가지 output을 얻을 수 있다.

최종 output인 h_n을 구해보자.

  • hidden_states: 각 time step에 해당하는 hidden state들의 묶음.
  • h_n: 모든 sequence를 거치고 나온 마지막 hidden state.
hidden_states, h_n = rnn(batch_emb.transpose(0, 1), h_0)

# d_h: hidden size, num_layers: layer 개수, num_dirs: 방향의 개수
print(hidden_states.shape)  # (L, B, d_h)
print(h_n.shape)  # (num_layers*num_dirs, B, d_h) = (1, B, d_h)

'''
출력값:
torch.Size([20, 10, 512])
torch.Size([1, 10, 512])
'''

임베딩된 batch를 transpose하는 이유는 (length, batch_size, embedding_size)로 들어가야 하기 때문이다. (L, B, d_w)

h_n는 마지막으로 나오는 weight이기 때문에 들어갔던 size와 동일할 것이고 따라서 (1, 10, 512) 이다. 

 

 

활용

마지막 hidden state인 h_n를 이용하여 text classification task에 적용해보자.

num_classes를 2로 설정하는 것은 2진 분류로 가정했기 때문이다.

num_classes = 2 
classification_layer = nn.Linear(hidden_size, num_classes)

# C: number of classes
output = classification_layer(h_n.squeeze(0))  # (1, B, d_h) => (B, C)
print(output.shape) # 출력값: torch.Size([10, 2])

classification_layer는 (512,2)로 들어간다. 이는 각 각 input sample size와 output sample size이다.

이후, output을 생성할 때 squeeze를 사용한다. squeeze는 앞의 1을 제거하고 input인 (10, 512)가 (10,2)로 바꾸어준다.

 

 

text classification task 외에도 token-level task에도 적용할 수 있다.

입력 하나 하나에 대한 품사를 나타내주자.  (이전의 output을 다음의 input으로 계속 해서 넣는 Language Model은 아니다)

num_classes = 5 # 매번 hidden으로 품사 분류
entity_layer = nn.Linear(hidden_size, num_classes)

# C: number of classes
output = entity_layer(hidden_states)  # (L, B, d_h) => (L, B, C)
print(output.shape)

(20, 10, 512) (20, 10, 5)로 바뀐다. 


PackedSequence 

RNN에 적용하기 위해 padding 처리 했던 데이터는 다음과 같다.

빨간색 표시와 같이 불필요한 pad 계산이 포함되는 것을 확인할 수 있다.

 

 

이를 해결하기 위해 데이터를 padding 전 원래 길이를 기준으로 정렬해보자.

# [참고] data를 하나의 batch로 만들 때 사용한 코드
# B: batch size, L: maximum sequence length
batch = torch.LongTensor(data)  # (B, L)
batch_lens = torch.LongTensor(valid_lens)  # (B)
sorted_lens, sorted_idx = batch_lens.sort(descending=True)
sorted_batch = batch[sorted_idx]

print(sorted_lens)
# 출력값: tensor([20, 18, 18, 17, 15, 10,  8,  6,  6,  5])

sorted_lens는 다음과 같이 padding을 무시했다.

 

 

이렇게 중간에 끼어 있는 padding 때문에 불필요한 연산이 발생하는 것을 방지하기 위한 것이 packed_padded_sequence이다. 

각 배치 내에서 문장의 길이를 기준으로 정렬한 후packed_padded_sequence를 사용해 하나의 통합된 배치로 만들어준다.  아래 코드 출처

import torch
import torch.nn as nn
import numpy as np

batch_data = ['I love you', 'work hard dream big', 'Live the life you love', 'Antifreeze']
input_seq = [s.split() for s in batch_data]

max_len = 0
for s in input_seq:
  if len(s)> max_len:
    max_len = len(s)
  
vocab = {w:i for i,w in enumerate(set([t for s in input_seq for t in s]), 1)}
vocab['<pad>'] = 0
input_seq = [s+ ["<pad>"]*(max_len-len(s)) if len(s) < max_len else s for s in input_seq]
input_seq2idx = torch.LongTensor( [list(map(vocab.get, s)) for s in input_seq] )
# 문제 상황
print(input_seq)
'''
[['I', 'love', 'you', '<pad>', '<pad>'],
 ['work', 'hard', 'dream', 'big', '<pad>'],
 ['Live', 'the', 'life', 'you', 'love'],
 ['Antifreeze', '<pad>', '<pad>', '<pad>', '<pad>']]
'''

input_seq2idx
'''
tensor([[ 2,  9,  8,  0,  0],
        [ 3,  4, 10,  6,  0],
        [11,  5,  7,  8,  9],
        [ 1,  0,  0,  0,  0]])
'''
# 해결책
from torch.nn.utils.rnn import pack_padded_sequence

# *******step 1. 정렬*******
input_lengths = torch.LongTensor([torch.max(input_seq2idx[i,:].data.nonzero())+1 for i in range(input_seq2idx.size(0))])
# nonzero(): 요소들 중 0이 아닌 값들의 index를 반환

print(input_lengths)
# 출력값: tensor([3, 4, 5, 1])

input_lengths, sorted_idx = input_lengths.sort(descending=True)
input_seq2idx = input_seq2idx[sorted_idx]
print(input_seq2idx)
'''
tensor([[11,  5,  7,  8,  9],
        [ 3,  4, 10,  6,  0],
        [ 2,  9,  8,  0,  0],
        [ 1,  0,  0,  0,  0]])
'''

# *******step 2. 하나의 통합된 배치로*******
packed_input = pack_padded_sequence(input_seq2idx, input_lengths.tolist(), batch_first=True)
print(packed_input)
'''
PackedSequence(data=tensor([11,  3,  2,  1,  5,  4,  9,  7, 10,  8,  8,  6,  9]), batch_sizes=tensor([4, 3, 3, 2, 1]), sorted_indices=None, unsorted_indices=None)
'''

week4 NLP 1주차
실습: LSTM, GRU

 

LSTM

LSTM에서는 cell state가 추가된다. hidden state의 shape와 cell state의 shape는 동일하다.

코드 상에서 LSTM이 RNN과 다른 점은 h_0 뿐만 아니라 c_0도 초기화해서 input으로 넣어준다는 것이다.

embedding_size = 256
hidden_size = 512
num_layers = 1
num_dirs = 1

embedding = nn.Embedding(vocab_size, embedding_size)
lstm = nn.LSTM(
    input_size=embedding_size,
    hidden_size=hidden_size,
    num_layers=num_layers,
    bidirectional=True if num_dirs > 1 else False
)

h_0 = torch.zeros((num_layers * num_dirs, batch.shape[0], hidden_size))  
# (num_layers * num_dirs, B, d_h)
c_0 = torch.zeros((num_layers * num_dirs, batch.shape[0], hidden_size)) # 추가됐다
# (num_layers * num_dirs, B, d_h)

 

 

 

GRU

GRU는 cell state가 없어서 RNN과 동일하게 사용 가능하다. 

GRU를 이용해 LM task를 수행해보자.

 

첫 단어만 input으로 받는 것을 확인할 수 있다.(input_id)

input_id = batch.transpose(0, 1)[0, :]  # (B)
hidden = torch.zeros((num_layers * num_dirs, batch.shape[0], hidden_size))  # (1, B, d_h)

 

 

이번 실습에서는 teacher forcing 없이 이전에 얻은 결과를 다음 input으로 이용한다. 

teacher forcing이란?

처음엔 당연히 제대로된 output이 안나올텐데 잘못된 output을 다음 레벨의 input으로 사용하면 학습이 제대로 안 될 것이다. 그래서 처음엔 input을 인위적으로 실제 정답을 넣어주면서 학습을 시작한다.
즉, 이번 레벨의 output을 정답인 것처럼 진행한다.

 

 

for문을 돌면서 이전의 output을 다음의 input으로 넣어주는데, 매번 정답일 확률이 큰 것을 뽑아서 output으로 정한다.

for t in range(max_len):
  input_emb = embedding(input_id).unsqueeze(0)  # (1, B, d_w)
  output, hidden = gru(input_emb, hidden)  # output: (1, B, d_h), hidden: (1, B, d_h)

  # V: vocab size
  output = output_layer(output)  # (1, B, V)
  probs, top_id = torch.max(output, dim=-1)  # probs: (1, B), top_id: (1, B)

  print("*" * 50)
  print(f"Time step: {t}")
  print(output.shape)
  print(probs.shape)
  print(top_id.shape)

  input_id = top_id.squeeze(0)  # (B)

 

위 실습에서는 layer을 하나, 즉 단방향으로만 했는데 2개 이상의 layer와 양방향으로 진행도 가능하다.

양방향 진행은 단방향 진행과 두 가지 차이가 있다.

첫째, h_0의 dimension이 바뀐다. (num_layers * num_dirs, B, d_h) 이었기 때문에 "num_layers * num_dirs"가 바뀐다.

둘째, output dimension에도 차이가 있다. 순방향과 역방향의 hidden state를 concat해서 2배가 된다. 즉, hidden state는 (20, 10, 1024)가 된다.


week4 NLP 1주차
기본 과제 1: Data Preprocessing & Tokenization

 

Q. Vocabulary 만들기

컴퓨터는 글자를 알아볼 수 없기 때문에 각 토큰을 숫자 형식의 유일한 id에 매핑해야 한다.

ex)  ['I', 'have', 'a', 'meal'] ==> [194, 123, 2, 54]

이러한 매핑은 모델 학습 전에 사전 정의되어야 한다. 이때, 모델이 다를 수 있는 토큰들의 집합과 이 매핑을 Vocab라고 흔히 부른다.

우리는 이 과제에서 토큰화된 문장들을 받아 각 토큰을 숫자로 매핑하는 token2id와 그 역매핑인 id2token를 만들 것이다.

자주 안나오는 단어는 과적합을 일으킬 수 있기 때문에 빈도가 적은 단어는 [UNK] 토큰으로 처리한다. 이는 Unknown의 준말이다. 토큰의 id 번호 순서는 [UNK] 토큰을 제외하고는 자유이다.

Arguments:
sentences -- Vocabulary를 만들기 위한 토큰화된 문장들
             ex) sentences = [["this", "sentence", "be", "tokenized", "propery", "."],
                ["jhon", "'s", "book", "is", "n't", "popular"]]
min_freq -- 단일 토큰으로 처리되기 위한 최소 빈도
                데이터셋에서 최소 빈도보다 더 적게 등장하는 토큰은 [UNK] 토큰으로 처리되어야 합니다.

Return:
id2token -- id를 받으면 해당하는 토큰을 반환하는 리스트 
			ex) [194, 123, 2, 54] ==> ['I', 'have', 'a', 'meal']
token2id -- 토큰을 받으면 해당하는 id를 반환하는 딕셔너리 
			ex) ['I', 'have', 'a', 'meal'] ==> [194, 123, 2, 54]
from typing import List, Tuple, Dict
from collections import Counter
from itertools import chain
from collections import defaultdict

# [UNK] 토큰
unk_token = "[UNK]"
unk_token_id = 0 # [UNK] 토큰의 id는 0으로 처리합니다.

def build_vocab(
    sentences: List[List[str]],
    min_freq: int
) -> Tuple[List[str], Dict[str, int]]:

    id2token: List[str] = [unk_token]
    temp: Dict[str, int] = {unk_token: unk_token_id}
    token2id: Dict[str, int] = {unk_token: unk_token_id}

    temp = 1
    for token, cnt in Counter(chain(*sentences)).items():
      if cnt >= min_freq:
        id2token.append(token)
        token2id[token] = temp
        temp +=1

    return id2token, token2id
    assert id2token[unk_token_id] == unk_token and token2id[unk_token] == unk_token_id, \
        "[UNK] 토큰을 적절히 삽입하세요"
    assert len(id2token) == len(token2id), \
        "id2word과 word2id의 크기는 같아야 합니다"
    return id2token, token2id

 

📝

Chain

리스트(lists/tuples/iterables)를 연결

print(list(chain('ABC', 'DEF')))
# 출력값: ['A', 'B', 'C', 'D', 'E', 'F']

 

*(Asterisk)

곱셈 및 거듭제곱 연산, list형 컨테이너 타입의 데이터를 반복확장할 때, 가변인자를 사용하고자 할 때 외에도

컨테이너 타입의 데이터를 unpacking할 때 사용한다.

temp = [['1','2'],['3']]
print(*temp)
# 출력값: ['1', '2'] ['3']

 

Counter

리스트의 원소의 개수를 세는 모듈

from collections import Counter
letters = 'abca'
print(Counter(letters))
# 출력값: Counter({'a': 2, 'b': 1, 'c':1})

 

Counter, *, Chain을 모두 쓴 예제를 살펴보자

Counter(chain(*[['a', 'b', 'c'], ['b', 'c', 'd']]))
# Counter({'a': 1, 'b': 2, 'c': 2, 'd': 1})

# 단계 1. chain으로 리스트를 연결 ['a', 'b', 'c', 'b', 'c', 'd']
# 2. *를 통해 언패킹 chain(['a', 'b', 'c', 'b', 'c', 'd'])
# 3. Counter을 통해 각 원소의 개수 계산

week4 NLP 1주차
기본 과제 2: Word-level language modeling with RNN

 

데이터 클래스 준비 과정에 대한 설명은 생략한다.

 

모델 아키텍쳐를 준비하는 코드를 작성해보자. RNNModelEmbedding, RNN module, Projection 를 포함한 컨테이너 모듈로, 아래 사진과 같이 이전 hidden state와 input을 받아 다음 토큰의 log probability와 다음 hidden state를 반환합니다.

<RNN 모델의 forward 함수 구현>
위의 그림과 __init__ 함수 내 주석을 참고하여 forward 함수를 구현하세요.

Hint 1: RNN 모델에선 Dropout을 곳곳에 적용하는 것이 성능이 좋다고 알려져 있습니다.
예를 들어, Embedding 이후와 Projection 전에도 적용할 수 있습니다.
Hint 2: 최종 확률값을 구하기 위해서 Projection 이후에 F.log_softmax를 사용하면 됩니다.

Arguments:
input -- 토큰화 및 배치화된 문장들의 텐서
            dtype: torch.long
            shape: [batch_size, sequence_lentgh]
prev_hidden -- 이전의 hidden state
            dtype: torch.float
            shape: RNN, GRU - [num_layers, batch_size, hidden_size]
                   LSTM - ([num_layers, batch_size, hidden_size], [num_layers, batch_size, hidden_size])

Return:
log_prob -- 다음 토큰을 예측한 확률에 log를 취한 값
            dtype: torch.float
            shape: [batch_size, sequence_length, vocab_size]
next_hidden -- 이후의 hidden state
            dtype: torch.float
            shape: RNN, GRU - [num_layers, batch_size, hidden_size]
                   LSTM - ([num_layers, batch_size, hidden_size], [num_layers, batch_size, hidden_size])
from typing import Union, Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F

class RNNModel(nn.Module):
    def __init__(self, 
        rnn_type: str,
        vocab_size: int,
        embedding_size: int=200,
        hidden_size: int=200,
        num_hidden_layers: int=2,
        dropout: float=0.5
    ):
        super().__init__()
        self.rnn_type = rnn_type
        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        self.num_hidden_layer = num_hidden_layers
        assert rnn_type in {'LSTM', 'GRU', 'RNN_TANH', 'RNN_RELU'}

        # 정수 형태의 id를 고유 벡터 형식으로 나타내기 위하여 학습 가능한 Embedding Layer를 사용합니다.
        # https://pytorch.org/docs/stable/generated/torch.nn.Embedding.html
        self.embedding = nn.Embedding(vocab_size, embedding_size)

        # Dropout은 RNN 사용시 많이 쓰입니다.
        # https://pytorch.org/docs/stable/generated/torch.nn.Dropout.html 
        self.dropout = nn.Dropout(dropout)

        if rnn_type.startswith('RNN'): #self.rnn 가져다 쓰면 알아서 rnn_type
            # Pytorch에서 제공하는 기본 RNN을 사용해 봅시다.
            # https://pytorch.org/docs/stable/generated/torch.nn.RNN.html 
            nonlinearity = rnn_type.split('_')[-1].lower()
            self.rnn = nn.RNN( 
                embedding_size, 
                hidden_size, 
                num_hidden_layers,
                batch_first=True, 
                nonlinearity=nonlinearity,
                dropout=dropout
            )
        else:
            # Pytorch의 LSTM과 GRU를 사용해 봅시다.
            # LSTM: https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html
            # RGU: https://pytorch.org/docs/stable/generated/torch.nn.GRU.html
            self.rnn = getattr(nn, rnn_type)(
                embedding_size,
                hidden_size,
                num_hidden_layers,
                batch_first=True,
                dropout=dropout
            )

        # 최종적으로 나온 hidden state를 이용해 다음 토큰을 예측하는 출력층을 구성합시다.
        self.projection = nn.Linear(hidden_size, vocab_size)

    def forward(
        self, 
        input: torch.Tensor,
        prev_hidden: Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]
    ):
        log_prob: torch.Tensor = None
        next_hidden: Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]] = None
        
        # 1. embedding 2. dropout 3.project 4.log_prob 계산
        embeded = self.dropout(self.embedding(input))
        outputs, next_hidden = self.rnn(embeded, prev_hidden)
        log_prob = self.projection(self.dropout(outputs))
        log_prob = F.log_softmax(log_prob)

        '''
        [ANSWER CODE]
        emb = self.dropout(self.embedding(input))
        output, next_hidden = self.rnn(emb, prev_hidden)
        log_prob = self.projection(self.dropout(output)).log_softmax(dim=-1)
        '''
        assert list(log_prob.shape) == list(input.shape) + [self.vocab_size]
        assert prev_hidden.shape == next_hidden.shape if self.rnn_type != 'LSTM' \
          else prev_hidden[0].shape == next_hidden[0].shape == next_hidden[1].shape
        
        return log_prob, next_hidden
    
    def init_hidden(self, batch_size: int):
        """ 첫 hidden state를 반환하는 함수 """
        weight = self.projection.weight
        
        if self.rnn_type == 'LSTM':
            return (weight.new_zeros(self.num_hidden_layer, batch_size, self.hidden_size),
                    weight.new_zeros(self.num_hidden_layer, batch_size, self.hidden_size))
        else:
            return weight.new_zeros(self.num_hidden_layer, batch_size, self.hidden_size)
    
    @property
    def device(self):   # 현재 모델의 device를 반환하는 프로퍼티
        return self.projection.weight.device

 

RNNModel을 통한 모델 생성은 다음과 같다.

rnn_type = 'LSTM'      # 'LSTM', 'GRU', 'RNN_TANH', 'RNN_RELU'
vocab_size = len(corpus.dictionary)
model = RNNModel(rnn_type, vocab_size=vocab_size)

 

 

이제 모델을 학습해보자.

모델 학습에 필요한 argument를 설정하고,

데이터를 불러와 모델을 build 한 후 train, dev 데이터로 학습 및 evaluate 하고,

loss 와 perplexity score 를 모니터링하여 학습 현황을 확인한다.

 

BPTT

전체 말뭉치에 대해 RNN 계산을 해서 Gradient(기울기)를 역전파하는 것은 시간도 오래 걸리고 병렬화도 불가능하다.

따라서 말뭉치를 batch size만큼 잘라 각각을 학습 sample로 사용한다.

 

그러나 배치화를 하였음에도 하나의 sample은 여전히 길다. 이를 해결하기 위해 한번에 sequence_length만큼에 대해서만 역전파를 수행해서 전체 sequence를 학습시키는 BPTT를 사용하자.

 

처음 데이터 셋 :
[ a b c d e <eos> f g h i j k l m n <eos> o p q r s <eos> t u v w x y z <eos> ] 

batch_size = 4로 나눈 후 (개수가 부족해서 못 채운 부분은 잘라냄)
[[ a b c d e <eos> ],
[ f g h i j k ],
[ l m n <eos> o p ],
[ r s <eos> t u v ]]

배치화된 데이터 셋을 sequence_length =2 로 나눈 후 

[[[ a b ], [ c d ], [ e <eos> ]],
[[ f g ], [ h i ], [ j k ]],
[[ l m ], [ n <eos> ], [ o p ]],
[[ r s ], [ <eos> t ], [ u v ]]]

현재 shape는 (batch_size, num_sample, sequence_length)이다.
BPTT는 num_sample 부분을 순회하면서 기울기를 계산합니다.
따라서 이를 
reshape하여 (num_sample, batch_size, sequence_length)로 구성하면 편리하다.

[[[ a b ], [ f g ], [ l m ], [ r s ]],
[[ c d ], [ h, i ], [ n <eos> ], [ <eos> t ]],
[[ e <eos> ], [ j k ], [ o p ], [ u v ]]]

 

첫번째 샘플인 [[ a b ], [ f g ], [ l m ], [ r s ]]는 각 배치의 첫번째 sequence이고,

두번째 샘플인 [[ c d ], [ h, i ], [ n <eos> ], [ <eos> t ]]는 각 배치의 두번째 sequence,

그리고 마지막 샘플인 [[ e <eos> ], [ j k ], [ o p ], [ u v ]]]는 각 배치의 마지막 부분이다.

<BPTT 배치화 함수>
한 줄로 길게 구성된 데이터를 받아 BPTT를 위해 배치화합니다.
batch_size * sequence_length의 배수에 맞지 않아 뒤에 남는 부분은 잘라버립니다.
이 후 배수에 맞게 조절된 데이터로 BPTT 배치화를 진행합니다.


Arguments:
data -- 학습 데이터가 담긴 텐서
        dtype: torch.long
        shape: [data_lentgh]
batch_size -- 배치 크기
sequence_length -- 한 샘플의 길이

Return:
batches -- 배치화된 텐서
           dtype: torch.long
           shape: [num_sample, batch_size, sequence_length]
def bptt_batchify(
    data: torch.Tensor,
    batch_size: int,
    sequence_length: int
):
    batches: torch.Tensor = None
    
    sample_len = ((len(data) // batch_size) // sequence_length)*sequence_length
    batches = data[:sample_len * batch_size].reshape((batch_size, -1)).reshape((batch_size,-1, sequence_length)).transpose(0,1)
  
    '''
    [ANSWER CODE]
    length = data.numel() // (batch_size * sequence_length) \
                           * (batch_size * sequence_length)
    batches = data[:length].reshape(batch_size, -1, sequence_length).transpose(0, 1)
    '''
    return batches

 

train, val, test data에 BPTT를 적용해보자

batch_size = 16
sequence_length = 64

train_data = bptt_batchify(corpus.train, batch_size, sequence_length)
val_data = bptt_batchify(corpus.valid, batch_size, sequence_length)
test_data = bptt_batchify(corpus.test, batch_size, sequence_length)

 

모델 학습은 BPTT의 방식으로 batch sample 내의 sequence 전체를 순환하여 학습을 진행한다 (자세한 코드 생략)

모델 평가는 다음과 같이 이뤄진다.

<모델 평가 코드>

모델을 받아 해당 데이터에 대해 평가해 평균 Loss 값을 반환합니다.

Arguments:
model -- 평가할 RNN 모델
data -- 평가용 데이터
    dtype: torch.long
    shape: [num_sample, batch_size, sequence_length]

Return:
loss -- 계산된 평균 Loss 값
# 학습 과정이 아니므로 기울기 계산 과정은 불필요합니다. 
@torch.no_grad()
def evaluate(
    model: RNNModel,
    data: torch.Tensor
):
    # Evaluation 모드로 바꾸는 것을 깜빡하지 마세요!
    # Dropout은 평가할 때랑 학습할 때 다르게 작동합니다.
    model.eval()
    
    loss: float = None
    hidden = model.init_hidden(batch_size)
    progress_bar = tqdm(data, desc="Eval")
    for bid, batch in enumerate(progress_bar, start=1):
      batch = batch.to(model.device)

      output, hidden = model(batch, hidden)
      if model.rnn_type == 'LSTM':
        hidden = tuple(tensor.detach() for tensor in hidden)
      else:
        hidden = hidden.detach()
      
      loss = F.nll_loss(output[:, :-1, :].transpose(1,2), batch[:, 1:])

    '''
    [ANSWER CODE]
    total_loss = 0.
    hidden = model.init_hidden(data.shape[1])
    
    for batch in data:
        batch = batch.to(model.device)

        output, hidden = model(batch, hidden)
        total_loss += F.nll_loss(output[:, :-1, :].transpose(1, 2), batch[:, 1:]).item()
    
    loss = total_loss / len(data)
    '''

    return loss