AI TECH/TIL

[실습] Week5 Today I Learn

prefer_all 2022. 10. 21. 11:22
<목차>
1. torch.nn.RNN의 output, h_n
2. Seq2seq 모델 만들기
3. Byte Pair Encoding(BPE)
    - Subword Tokenization이란, 필요성
    - huggingface의 BertTokenizerFast
4. Neural Machine Translation (NMT, 번역모델) 전처리
    - collating, bucketing 

 

output, h_n 톺아보기

torch.nn.RNN, torch.nn.LSTM, torch.nn.GRU는 forward의 결과물로 두 벡터를 return한다.

RNN은 output, h_n 을, LSTM과 GRU는 hidden state와 cell state가 담긴 튜플인 (h_n, c_n)를 return한다.

 

output shape: (seq_len, batch, D * hidden)  (Batch_first = False인 경우)

모든 timestep의 final layer의 hidden state들이다.

ex) 양방향 RNN이면(D=2) 각 토큰별 순방향과 역방향시 h_t시 concat되어 리턴되므로
       마지막 차원의 크기는 2이다.
h_n shape: (D * n_layers, batch, hidden)

마지막 timestep의 모든 layer의 hidden state들이다. 

ex) 양방향 RNN이면 h_n의 짝수번째 벡터는 순방향, 홀수번째 벡터는 역방향 layer의 최종 결과물이다

 

output과 h_n의 관계

레이어 수가 하나인 단방향 RNN의 경우에만 h_n은 output의 부분집합이다.

정확히 말해 마지막 토큰의 벡터인 output[-1]이 h_n이다.

그러나 레이어 수가 2 이상이고, 양방향인 RNN의 경우 output과 h_n의 정확한 관계를 파악하기가 어렵다.

 

Time step의 값 

  • 순방향시 마지막 time-step은 문장 맨 마지막 토큰이다. 문장 마지막 토큰 값은 output 마지막 차원의 첫 hidden_size개의 원소와 같다 (순방향+역방향을 붙인 벡터이기 때문에).
  • 역방향시에는 마지막 time-step이 문장 맨 처음 토큰이다. 문장 처음 토큰 값은 output 마지막 차원내 뒤에서부터 hidden_size개의 원소와 같다.
# forward
print(torch.eq(output[-1, 0, :hidden_size], h_n[2, 0, :]).all())

# backward: last time-step (i.e. first token), last-layer
print(torch.eq(output[0, 0, hidden_size:], h_n[3, 0, :]).all())

D = 2, n_layers = 2로 0,1,2,3. 즉 3일때 마지막 차원의 역방향. 2일때 마지막 차원의 순방향.

 

 

1. 역방향 rnn의 경우

output의 첫번째 토큰(0)은 모든 배치에서 [hidden_size : ] 원소들과 h_n[3]의 원소들은 일치한다 

h_n은 첫 레이어의 순방향, 첫 레이어의 역방향, 두번째 레이어의 순방향, 두번째 레이어의 역방향시 마지막 hidden state들을 차례차례 return한다.

batch_size = output.shape[1]
for batch_idx in range(batch_size):
    if torch.eq(output[0, batch_idx, hidden_size:], h_n[3, batch_idx, :]).all().item():
        print(f'At {batch_idx}th batch output & h_n --> equal')
'''
At 0th batch output & h_n --> equal
At 1th batch output & h_n --> equal
At 2th batch output & h_n --> equal
At 3th batch output & h_n --> equal
'''

D = 2, n_layer = 2니까 0,1,2,3 (4개)의 hidden state를 가지고 있고 h_n[3]이 마지막 역방향 layer

 

2. 순방향 rnn의 경우

for batch_idx in range(batch_size):
    if torch.eq(output[-1, batch_idx, :hidden_size], h_n[2, batch_idx, :]).all().item():
        print(f'At {batch_idx}th batch output & h_n --> equal')
    else:
        print(f'At {batch_idx}th batch output & h_n --> NOT equal')
'''
At 0th batch output & h_n --> equal
At 1th batch output & h_n --> NOT equal
At 2th batch output & h_n --> NOT equal
At 3th batch output & h_n --> NOT equal
'''

 

for batch_idx in range(batch_size):
    for token_idx in range(output.shape[0]):
        if torch.eq(output[token_idx, batch_idx, :hidden_size], h_n[2, batch_idx, :]).all().item():
            print(f'For {token_idx}th token at {batch_idx}th batch output & h_n --> equal')
'''
For 6th token at 0th batch output & h_n --> equal
For 4th token at 1th batch output & h_n --> equal
For 3th token at 2th batch output & h_n --> equal
For 2th token at 3th batch output & h_n --> equal
'''

각 배치마다 output과 h_n이 겹치는 토큰 인덱스가 다른데, 정렬한 인풋의 (패딩 토큰을 제외한) 실토큰의 개수와 일치함을 알 수 있다.

이는 마지막 time step의 hidden state라고 하는 h_n의 경우 (패딩 토큰이 없는) 실제 문장의 마지막 토큰의 hidden state을 담기 때문이다.

 


week5 NLP 2주차
실습: Seq2Seq 구현

 

step 1. 데이터 전처리: trg_data 각 행 앞에 sos_id(1), 마지막에 eos_id(2)가 오게 바꾸고 / 배치화를 위해 padding을 하고(열의 개수를 통일하기 위해 끝에 0 추가하고 / source data를 기준으로 trg_batch 정렬

step 2. Encoder

bidirectional GRU를 이용한 Encoder을 만들자. (양방향이므로 D=2) 아래는 변수 정리한 것임.

self.embedding word embedding layer
self.gru  encoder 역할을 하는 Bi-GRU
self.linear 양/단방향 concat된 hidden state를 decoder의 hidden size에 맞게 linear transformation
embedding_size = 256
hidden_size = 512
num_layers = 2
num_dirs = 2
dropout = 0.1

class Encoder(nn.Module):
  def __init__(self):
    super(Encoder, self).__init__()

    self.embedding = nn.Embedding(vocab_size, embedding_size)
    self.gru = nn.GRU(
        input_size=embedding_size, 
        hidden_size=hidden_size,
        num_layers=num_layers,
        bidirectional=True if num_dirs > 1 else False,
        dropout=dropout
    )
    self.linear = nn.Linear(num_dirs * hidden_size, hidden_size)

  def forward(self, batch, batch_lens):  # batch: (B, S_L), batch_lens: (B)
    # d_w: word embedding size
    batch_emb = self.embedding(batch)  # (B, S_L, d_w)
    batch_emb = batch_emb.transpose(0, 1)  # (S_L, B, d_w)

    packed_input = pack_padded_sequence(batch_emb, batch_lens)

    h_0 = torch.zeros((num_layers * num_dirs, batch.shape[0], hidden_size))  # (num_layers*num_dirs, B, d_h) = (4, B, d_h)
    packed_outputs, h_n = self.gru(packed_input, h_0)  # h_n: (4, B, d_h)
    outputs = pad_packed_sequence(packed_outputs)[0]  # outputs: (S_L, B, 2d_h)

    forward_hidden = h_n[-2, :, :] # forward_hidden: (B, d_h)
    backward_hidden = h_n[-1, :, :] # backward_hidden: (B, d_h)
    hidden = self.linear(torch.cat((forward_hidden, backward_hidden), dim=-1)).unsqueeze(0) # (B, 2d_h) -> (B, d_h) -> (1, B, d_h)

    return outputs, hidden

 

 

step3. Decoder

동일한 설정으로 bidirectional GRU를 이용한 Encoder을 만들자.

class Decoder(nn.Module):
  def __init__(self):
    super(Decoder, self).__init__()

    self.embedding = nn.Embedding(vocab_size, embedding_size)
    self.gru = nn.GRU(
        input_size=embedding_size, 
        hidden_size=hidden_size,
    )
    self.output_layer = nn.Linear(hidden_size, vocab_size)

  def forward(self, batch, hidden):  # batch: (B), hidden: (1, B, d_h)
    batch_emb = self.embedding(batch)  # (B, d_w)
    batch_emb = batch_emb.unsqueeze(0)  # (1, B, d_w)

    outputs, hidden = self.gru(batch_emb, hidden)  # outputs: (1, B, d_h), hidden: (1, B, d_h)
    
    # V: vocab size
    outputs = self.output_layer(outputs)  # (1, B, V)

    return outputs.squeeze(0), hidden

 

step4. Seq2seq 모델 구축

class Seq2seq(nn.Module):
  def __init__(self, encoder, decoder):
    super(Seq2seq, self).__init__()

    self.encoder = encoder
    self.decoder = decoder

  def forward(self, src_batch, src_batch_lens, trg_batch, teacher_forcing_prob=0.5):
    # src_batch: (B, S_L), src_batch_lens: (B), trg_batch: (B, T_L)

    _, hidden = self.encoder(src_batch, src_batch_lens)  # hidden: (1, B, d_h)

    input_ids = trg_batch[:, 0]  # (B) 첫 번째 timestep은 start token id (=1)로만 구성됨
    batch_size = src_batch.shape[0]
    outputs = torch.zeros(trg_max_len, batch_size, vocab_size)  # (T_L, B, V)

    for t in range(1, trg_max_len): # 매 timestep (t) 마다 해당 t의 target id (또는 이전 timestep (t-1)에서 예측된 id)를 입력으로 주고, 다음 단어를 예측하는 과정을 반복함.
      decoder_outputs, hidden = self.decoder(input_ids, hidden)  # decoder_outputs: (B, V), hidden: (1, B, d_h)

      outputs[t] = decoder_outputs
      _, top_ids = torch.max(decoder_outputs, dim=-1)  # top_ids: (B)

      input_ids = trg_batch[:, t] if random.random() > teacher_forcing_prob else top_ids # 사전에 정의한 teacher_forcing_prob에 따라 teacher_forcing을 하는 경우 target id를 입력으로 주고, 아닌 경우 이전 timestep에서 출력된 top_ids를 입력으로 줌.

    return outputs
encoder = Encoder()
decoder = Decoder()
seq2seq = Seq2seq(encoder, decoder)

Byte Pair Encoding(BPE)

서브워드(subword) 토큰화 방법 중 하나이다.

 

💡 서브워드(Subword)는 무엇인가요?

서브워드는 하나의 단어를 여러개의 단위로 분리했을 때 하나의 단위를 나타냅니다. 
love를 서브워드 단위로 나타낸 하나의 예시는 다음과 같습니다.

lo+ve, l+ove, lov+e, etc.


💡 왜 서브워드 토큰화를 사용하나요?

단어단위 임베딩을 사용하는 경우,
학습에 사용되는 말뭉치의 크기가 커질수록 등장하는 단어가 더더욱 많아져
임베딩의 매개변수는 더 커지게 되고 전체 매개변수 대비 단어 임베딩이 차지하는 비중은 매우 높아집니다.

이런 매개변수 비중의 비대칭성을 해결하기 위해 처음에는 문자단위 토큰화(character-level tokenization) 방법이 주목을 받았습니다. 말 그대로 하나의 글자를 기준으로 토큰화을 하는건데요. 이전 예시를 문자단위 토큰화를 하면 다음과 같습니다.

"I have a meal" -> ['I', 'h', 'a', 'v', 'e', 'a', 'm', 'e', 'a', 'l'] "나는 밥을 먹는다" -> ['나', '는', '밥', '을', '먹', '는', '다']

그러나, 문자단위 토큰화 역시 지나치게 긴 Sequence 길이, 성능 저하 등의 문제를 겪으며 서브워드 토큰화가 각광을 받게 되었습니다.

또한, 서브워드 토큰화가 가지는 장점은 Out-of-Vocabulary (OoV) 문제가 없다는 점입니다.

학습 데이터에서 등장하지 않은 단어는 모두 Unknown 토큰 [UNK]로 처리됩니다. 이는 테스트 과정 중에 처음 보는 단어를 모두 [UNK]로 모델의 입력을 넣게 되면서 전체적으로 모델의 성능이 저하될 수 있습니다.

그러나 서브워드 단위로 자르게 된다면 최악의 경우에도 문자단위로 토큰화가 진행됩니다. 이는 서브워드 토큰화는 현재 가지고 있는 Vocab으로 해당 단어가 토큰화할 수 없다면 그 단어를 서브워드 단위로 쪼개 평가하기 때문입니다.

따라서 서브워드 토큰화기는 가장 작은 문자 단위로 서브워드 토큰화가 가능하기 때문에 OoV 문제가 발생하지 않습니다.

Step1. Vocab 만들기

<BPE Vocab 만들기>
Byte Pair Encoding을 통한 Vocab 생성을 구현하세요.
단어의 끝은 '_'를 사용해 주세요.
이때 id2token을 서브워드가 긴 길이 순으로 정렬해 주세요.

Note: 만약 모든 단어에 대해 BPE 알고리즘을 돌리게 되면 매우 비효율적입니다.
왜냐하면 대부분의 단어는 중복되기 때문에 중복되는 단어에 대해서는 한번만 연산할 수 있다면 매우 효율적이기 때문입니다.
따라서 collections 라이브러리의 Counter를 활용해 각 단어의 빈도를 구하고,
각 단어에 빈도를 가중치로 활용하여 BPE를 돌리면 시간을 획기적으로 줄일 수 있습니다.
물론 이는 Optional한 요소입니다.

Arguments:
corpus -- Vocab을 만들기 위한 단어 리스트
max_vocab_size -- 최대 vocab 크기

Return:
id2token -- 서브워드 Vocab. 문자열 리스트 형태로 id로 token을 찾는 매핑으로도 활용 가능
입력 예제(corpus, max_vocab_size) 출력 예제
['abcde'] 
15
['abcde_', 'abcde', 'abcd', 'abc', 'ab', 'd', '_', 'c', 'a', 'e', 'b']
['low'] * 5 + ['lower'] * 2 + ['newest'] * 6 + ['widest'] * 3
19
{'e', 'lo', 'd', 'o', 'est_', 'ne', '_', 'new', 'r', 'est', 'newest_', 'n', 's', 'i', 'l', 'w', 't', 'low', 'es'} 

#
set(build_bpe(corpus, max_vocab_size=19))
['aaaaaaaaaaaa', 'abababab']
8
{'aa', 'abab', 'ab', 'b', 'aaaa', 'a', '_', 'aaaaaaaa'}

#
set(build_bpe(corpus, max_vocab_size=8))
['abc', 'bcd']
10000
['abc_', 'bcd_', 'abc', 'bcd', 'bc', 'd', '_', 'a', 'c', 'b']
from typing import List
from collections import Counter
from itertools import chain

WORD_END = '_' # 단어 끝을 나타내는 문자

def build_bpe(
    corpus: List[str],
    max_vocab_size: int
) -> List[int]:

    id2token: List[str] = None

    # *** 1. 빈도수를 찾기 전 공백 제외 모든 문자 담기 (= Corpus 한 단어를 한 글자씩 쪼개기)
    vocab = list(set(chain(*corpus)) | {WORD_END}) 
    # (1) word가 담긴 list인 corpus를 unpack하고 (2) chain을 통해 리스트로 연결 (3) set으로 중복 제거하면서 합집합(|)으로 WORD_END도 set에 추가
    

    # *** 2. 일단 corpus에 word 자체 몇 번 등장하는 지 Counter을 이용해 빈도수 찾기 -> corpus는 Set이 됨
    corpus = {' '.join(word + WORD_END): count for word, count in Counter(corpus).items()}
    # corpus의 word를 띄어쓰기 해줘야 word 내에서 또 쪼갤 수 있음


    # *** 3. word 내 알파벳 단위로 빈도수 찾기
    while len(vocab) < max_vocab_size:
        counter = Counter()
        for word, word_count in corpus.items(): # Set이므로 items()로 접근
            word = word.split() # 단어를 알파벳 단위로 쪼개기
            counter.update({
                pair: count * word_count 
                for pair, count in Counter(zip(word, word[1:])).items() 
                # 토큰 찾기: 쪼개진 알파벳들 가운데 순서 고려해서 2개 뽑았을 때 경우의 수
            })

        if not counter:
            break
        
        pair = counter.most_common(1)[0][0] # 가장 빈도수가 높은 토큰
        vocab.append(''.join(pair)) # *** 4. vocab에 빈도수가 높은 토큰부터 담기
        corpus = {
            word.replace(' '.join(pair), ''.join(pair)): count
            for word, count in corpus.items()
        }

    id2token = sorted(vocab, key=len, reverse=True)
    return id2token

 

📝

issue: 길이가 고정되지 않는 토큰을 어떻게 찾아서 빈도 수를 셀 것인가

 

*(Asterisk), Chain 복습

corpus = ['low', 'lower', 'lowest', 'widest']
print(*corpus) # low lower lowest widest
print(list(chain(*corpus))) # ['l', 'o', 'w', 'l', 'o', 'w', 'e', 'r', 'l', 'o', 'w', 'e', 's', 't', 'w', 'i', 'd', 'e', 's', 't']
vocab = list(set(chain(*corpus)) | {WORD_END})
print(vocab) # ['d', '_', 'e', 'i', 'o', 'l', 'r', 'w', 't', 's']

 

Counter의 value는 그대로 두고, key명만 수정하기  

print(Counter(corpus).items()) 
#dict_items([('low', 1), ('lower', 1), ('lowest', 1), ('widest', 1)])

corpus = {' '.join(word + WORD_END): count for word, count in Counter(corpus).items()}
print(corpus) 
#{'l o w _': 1, 'l o w e r _': 1, 'l o w e s t _': 1, 'w i d e s t _': 1}
c1 = Counter('love, hate, envy')
print(c1) #Counter({'e': 3, 'v': 2, ',': 2, ' ': 2, 'l': 1, 'o': 1, 'h': 1, 'a': 1, 't': 1, 'n': 1, 'y': 1})
c1 = { 'new '+ key :value for key, value in c1.items()}
print(c1) #{'new l': 1, 'new o': 1, 'new v': 2, 'new e': 3, 'new ,': 2, 'new  ': 2, 'new h': 1, 'new a': 1, 'new t': 1, 'new n': 1, 'new y': 1}

 

 

list에서 순서를 고려하며 2개를 뽑았을 때 경우의 수 구하기(조합)

word = ['a', 'b', 'c', '1', '2']
print(list(zip(word, word[1:]))) #[('a', 'b'), ('b', 'c'), ('c', '1'), ('1', '2')]

 

Collections 모듈인 Counter의 함수 most_common(n)

most_common은 입력된 값의 요소들 중 빈도수(frequency)가 높은 순으로 상위 n를 리스트(list) 안의 튜플(tuple) 형태로 반환

n 입력 안하면 요소 전체를 [('값', 개수)]의 형태로 반환

import collections

c2 = collections.Counter('apple, orange, grape')
print(c2)
print(c2.most_common())
print(c2.most_common(1))
'''
Counter({'a': 3, 'p': 3, 'e': 3, ',': 2, ' ': 2, 'r': 2, 'g': 2, 'l': 1, 'o': 1, 'n': 1})
[('a', 3), ('p', 3), ('e', 3), ('g', 2), (',', 2), ('r', 2), (' ', 2), ('n', 1), ('l', 1), ('o', 1)]
[('a', 3)]
'''

step2. BPE 인코딩

 

💡 만들어진 Vocab으로 텍스트 인코딩하는 방법은 크게 두 가지가 있습니다.

가장 쉬운 방법은 앞에서부터 토큰화하되 가장 긴 것부터 욕심쟁이 기법(Greedy Search)으로 먼저 매칭하는 방법입니다.
Vocab: bcde ab cd bc de a b c d e _abcde ==> ab cd e _
이 방법은 최적의 인코딩을 보장하진 않지만 긴 단어를 빠르게 인코딩하는 것이 가능합니다.


두번째 방법은 가장 길게 매칭되는 것을 전체 텍스트에 대해 먼저 토큰화하는 방법입니다.
Vocab: bcde ab cd bc de a b c d e _abcde ==> a bcde _
두번째 방법은 첫번째 방법보다 느리지만 텍스트를 좀 더 짧게 인코딩하는 것이 가능합니다.
<BPE 인코더>
문장을 받아 BPE 토큰화를 통하여 고유 id의 수열로 바꿉니다.
문장은 공백으로 단어단위 토큰화되어있다고 가정하며, Vocab은 sentence의 모든 문자를 포함한다고 가정합니다.
찾을 수 있는 가장 긴 토큰부터 바꿉니다.

Note: WORD_END를 빼먹지 마세요.

Arguments:
sentence -- 인코드하고자 하는 문장
id2token -- build_bpe를 통해 만들어진 Vocab

Return:
token_ids -- 인코드된 토큰 id 수열
vocab 입력예제(sentence) 출력 예제
['bcc', 'bb', 'bc', 'a', 'b', 'c', WORD_END]
'abbccc'
 [3, 4, 0, 5, 6]
 ['aaaa', 'aa', 'a', WORD_END]
'aaaaaaaa aaaaaaa'
[0, 0, 3, 0, 1, 2, 3]

def encode(
    sentence: str,
    id2token: List[str]
) -> List[int]:

    token_ids: List[int] = None
    idx = 0

    # WORD_END 먼저 추가하기
    sentence = sentence.replace(' ', str(len(vocab)-1))

    # sentence를 순회하며 vocab 찾기
    while idx < len(vocab):
      v = vocab[idx]
      if v in sentence:
          sentence = sentence.replace(v, str(idx))
      idx += 1
    sentence += str(len(vocab)-1)

    return list(map(int,sentence))

step3. BPE 디코딩

[ 196 62 20 6 ] ==> [ I_ li ke_ it_ ] ==> "I_like_it_" ==> "I like it " ==> "I like it"  

위와 같이 해당 id를 해당하는 서브워드로 만든 뒤 합치자. WORD_END는 공백으로 처리하면 된다.

 

<BPE 디코더>
BPE로 토큰화된 id 수열을 받아 문장으로 바꿉니다.
단어단위 토큰화에서의 문장 복원은 단순히 공백을 사이에 넣는 디코딩을 사용합니다.
문장 끝의 공백은 잘라냅니다.

Arguments:
token_ids -- 디코드하고자하는 토큰 id 수열
id2token -- build_bpe를 통해 만들어진 Vocab

Return:
sentence  -- 디코드된 문장
vocab 입력 예제(token_ids) 출력 예제
['bcc', 'bb', 'bc', 'a', 'b', 'c', WORD_END]
[3, 4, 0, 5, 6]
abbccc
['aaaa', 'aa', 'a', WORD_END]
[0, 0, 3, 0, 1, 2, 3]
aaaaaaaa aaaaaaa
def decode(
    token_ids: List[int],
    id2token: List[str]
) -> str:

    sentence =''
    for token in token_ids:
      if token == len(vocab)-1:
        sentence += ' '
      else:
        sentence +=  vocab[token]
    
    # 마지막에 있는 공백만 제거
    if sentence[-1] == ' ':
      return sentence[:-1]
    return sentence

 

 

💡 지금까지 BPE 구현체 코드를 작성해봄으로써 subword 토큰화의 원리를 배웠습니다다.
하지만 위 구현체 코드는 BPE Vocab을 만든느 시간이 오래 걸려 실제로 사용하기에는 한계가 있습니다.
따라서 라이브러리를 활용한 토큰화기를 사용해봅시다.


Transformer 라이브러리는 다양한 Transformer 구현체를 총망라한 라이브러리입다.
Transfomer 외에도 다양한 토큰화기를 지원하는데, 이미 학습된 서브워드 토큰화기 역시 쉽게 불러올 수 있습니다.
! pip install transformers

from transformers import BertTokenizerFast

# BERT 모델에서 사용하는 토큰화를 가져옵니다.
# https://huggingface.co/docs/transformers/model_doc/bert#transformers.BertTokenizerFast
tokenizer = BertTokenizerFast.from_pretrained(
    "bert-base-cased",
    unk_token='<unk>',
    eos_token='<eos>'
)
# 서브워드 토큰화 예시
print(tokenizer.tokenize('I love you forever'))
token_ids = tokenizer("I love you forever", add_special_tokens=False).input_ids
print(token_ids)
print(tokenizer.decode(token_ids))
'''
['I', 'love', 'you', 'forever']
[146, 1567, 1128, 5221]
I love you forever
'''

print(tokenizer.tokenize('UT Austin I miss you'))
token_ids = tokenizer("UT Austin I miss you", add_special_tokens=False).input_ids
print(token_ids)
print(tokenizer.decode(token_ids))
'''
['U', '##T', 'Austin', 'I', 'miss', 'you']
[158, 1942, 5202, 146, 5529, 1128]
UT Austin I miss you
'''

이 토큰화기는 ##을 통하여 현 단어가 이전 단어와 연결되어 있는지를 알려주고 있다. 

 

이 토큰화기를 기반으로 다시 모델을 선언하고 parameter의 개수를 살펴보면 

직접 작성한 토큰화기에 비해 임베딩 매개변수가 확연히 줄은 것을 확인할 수 있다.

그리고 이전과 달리 학습 데이터에 없었던 영어 단어가 새로 나오더라도 토큰화가 가능해졌다.

vocab_size = len(tokenizer)
subword_model = RNNModel('RNN_TANH', vocab_size)
print(f"임베딩 매개변수 개수: {count_parameters(subword_model.embedding)}")
print(f"RNN층 매개변수 개수: {count_parameters(subword_model.rnn)}")
'''
임베딩 매개변수 개수: 5799600
RNN층 매개변수 개수: 160800
'''

Preprocessing for NMT Model

 

번역 모델은 번역하고자 하는 문장(Source)을 입력으로 받고, 번역 결과(Target)을 출력한다.

영어-한글 번역 모델을 학습하기 위해 영어-한글 번역 데이터셋을 전처리하는 방법을 학습해보자.

 

step1. token2id를 사용해 source sentence, target sentence 전처리

<번역을 위한 문장 전처리기>
전처리 규칙:
1. 각 문장은 token2id를 활용하여 고유의 번호를 가진 토큰 id의 수열로 바뀌어야합니다.
2. token2id에 맞는 토큰이 없을 경우 <UNK> 토큰으로 처리합니다.
3. Source 문장은 src_token2id로, Target 문장은 tgt_token2id를 사용해야합니다.
4. Target 문장의 처음에 <SOS> 토큰을 넣으세요.
5. Target 문장의 끝에 <EOS> 토큰을 넣으세요.
6. 전처리된 문장의 총 토큰 개수는 max_len을 초과하면 안됩니다. 만약 초과하면 뒤를 잘라냅니다.

Arguments:
raw_src_sentence -- Source 문장 단어 리스트
raw_tgt_sentence -- Target 문장 단어 리스트 
src_token2id -- Source 언어 토큰들을 id로 매핑하는 딕셔너리
tgt_token2id -- Target 언어 토큰들을 id로 매핑하는 딕셔너리
max_len -- 변환된 토큰 리스트의 최대 길이

Return:
src_sentence -- 전처리된 Source 문장
tgt_sentence -- 전처리된 Target 문장
def preprocess(
    raw_src_sentence: List[str],
    raw_tgt_sentence: List[str],
    src_token2id: Dict[str, int],
    tgt_token2id: Dict[str, int],
    max_len: int
) -> Tuple[List[int], List[int]]:

    # Special tokens
    UNK = Language.UNK_TOKEN_ID
    SOS = Language.SOS_TOKEN_ID
    EOS = Language.EOS_TOKEN_ID

    src_sentence = []
    tgt_sentence = []
    for word in raw_src_sentence:
      if word in src_token2id: # source 딕셔너리에 word가 있는 경우
        src_sentence.append(src_token2id[word])
      else:
        src_sentence.append(UNK)
    
    for word in raw_tgt_sentence:
      if word in tgt_token2id: # target 딕셔너리에 word가 있는 경우
        tgt_sentence.append(tgt_token2id[word])
      else:
        tgt_sentence.append(UNK)

    src_sentence = src_sentence[:max_len] # max_len까지의 sequence만
    tgt_sentence = [SOS] + tgt_sentence[:max_len-2] + [EOS] # SOS, EOS token을 추가하고 max_len까지의 sequence만

    return src_sentence, tgt_sentence

 

eng_sentence = "We always try and show our love for each other"
kor_sentence = "우리는 언제나 서로에게 사랑을 보여주려 노력해요" 
위 문장들을 전처리해보자
from typing import List, Dict, Tuple, Sequence
from collections import Counter
from itertools import chain

class Language(Sequence[List[str]]):
    PAD_TOKEN = '<PAD>'
    PAD_TOKEN_ID = 0
    UNK_TOKEN = '<UNK>'
    UNK_TOKEN_ID = 1
    SOS_TOKEN = '<SOS>'
    SOS_TOKEN_ID = 2
    EOS_TOKEN = '<EOS>'
    EOS_TOKEN_ID = 3

    def __init__(self, sentences: List[str]) -> None:
        self._sentences: List[List[str]] = [sentence.split() for sentence in sentences]

        self.token2id: Dict[str, int] = None
        self.id2token: List[str] = None
    
    def build_vocab(self, min_freq: int=1) -> None:
        SPECIAL_TOKENS: List[str] = [Language.PAD_TOKEN, Language.UNK_TOKEN, Language.SOS_TOKEN, Language.EOS_TOKEN]
        self.id2token = SPECIAL_TOKENS + [word for word, count in Counter(chain(*self._sentences)).items() if count >= min_freq]
        self.token2id = {word: idx for idx, word in enumerate(self.id2token)}
    
    def set_vocab(self, token2id: Dict[str, int], id2token: List[str]) -> None:
        self.token2id = token2id
        self.id2token = id2token
    
    def __getitem__(self, index: int) -> List[str]:
        return self._sentences[index]
    
    def __len__(self) -> int:
        return len(self._sentences)
        
eng_sentence = "We always try and show our love for each other"
kor_sentence = "우리는 언제나 서로에게 사랑을 보여주려 노력해요"
english = Language(eng_sentence)
korean = Language(kor_sentence)
english.build_vocab()
korean.build_vocab()

print(english.token2id)
#{'<PAD>': 0, '<UNK>': 1, '<SOS>': 2, '<EOS>': 3, 'W': 4, 'e': 5, 'a': 6, 'l': 7, 'w': 8, 'y': 9, 's': 10, 't': 11, 'r': 12, 'n': 13, 'd': 14, 'h': 15, 'o': 16, 'u': 17, 'v': 18, 'f': 19, 'c': 20}
print(korean.token2id) 
#{'<PAD>': 0, '<UNK>': 1, '<SOS>': 2, '<EOS>': 3, '우': 4, '리': 5, '는': 6, '언': 7, '제': 8, '나': 9, '서': 10, '로': 11, '에': 12, '게': 13, '사': 14, '랑': 15, '을': 16, '보': 17, '여': 18, '주': 19, '려': 20, '노': 21, '력': 22, '해': 23, '요': 24}
src_sentence, tgt_sentence = preprocess(eng_sentence, kor_sentence, english.token2id, korean.token2id, max_len = 10)
print(src_sentence) # [4, 5, 1, 6, 7, 8, 6, 9, 10, 1]
print(tgt_sentence) # [2, 4, 5, 6, 1, 7, 8, 9, 1, 3]

 

 

step2. 위 전처리 코드를 사용해  기계번역을 위한 dataset 만들기

class NMTDataset(Sequence[Tuple[List[int], List[int]]]):
    def __init__(self, src: Language, tgt: Language, max_len: int=30) -> None:
        assert len(src) == len(tgt)
        assert src.token2id is not None and tgt.token2id is not None

        self._src = src
        self._tgt = tgt
        self._max_len = max_len

    def __getitem__(self, index: int) -> Tuple[List[str], List[str]]:
        return preprocess(self._src[index], self._tgt[index], self._src.token2id, self._tgt.token2id, self._max_len)

    def __len__(self) -> int:
        return len(self._src)
english = Language(['고기를 잡으러 바다로 갈까나', '고기를 잡으러 강으로 갈까나', '이 병에 가득히 넣어가지고요'])
korean = Language(['Shall we go to the sea to catch fish ?', 'Shall we go to the river to catch fish', 'Put it in this bottle'])
english.build_vocab()
korean.build_vocab()
dataset = NMTDataset(src=english, tgt=korean)

print(dataset[0]) #([4, 5, 6, 7], [2, 4, 5, 6, 7, 8, 9, 7, 10, 11, 12, 3])

Collating


Bucketing

 

 

 

 

 

'AI TECH > TIL' 카테고리의 다른 글

Contrastive Learning  (0) 2022.11.03
STS 대회 에러 해결법  (0) 2022.11.02
Wandb Sweep  (0) 2022.11.02
[P stage] Week6 Today I Learn  (0) 2022.11.02
[실습] Week4 Today I Learn  (0) 2022.10.19