[자연어처리] LSTM을 이용한 챗봇(chatbot) 만들기

2020. 5. 18. 19:02노트/Python : 프로그래밍

 

학습 데이터 

 

ChatData.csv
0.85MB

import pandas as pd 

path = "C:\\Users\\student\\Desktop\\chatbot\\01.data\\"
chatbotData=pd.read_csv(path+"ChatData.csv")
question, answer = list(chatbotData["Q"]) , list(chatbotData["A"])
print(len(question))
print(len(answer))
>>> 
11823
11823

for i in range(10):
    print("질문:" + question[i])
    print("답변:" + answer[i])
    print(" ")
    
>>> 
질문:12시 땡!
답변:하루가 또 가네요.
 
질문:1지망 학교 떨어졌어
답변:위로해 드립니다.
 
질문:3박4일 놀러가고 싶다
답변:여행은 언제나 좋죠.
 
질문:3박4일 정도 놀러가고 싶다
답변:여행은 언제나 좋죠.
 
질문:PPL 심하네
답변:눈살이 찌푸려지죠.
 

 

태깅 

import re 
import matplotlib.pyplot as plt 
import pandas as pd 
import numpy as np 
import os 
from konlpy.tag import Okt 
from keras import models, layers, optimizers, metrics, preprocessing 

# 태그단어 
PAD = "<PADDING>"  # 패딩 
STA = "<START>"    # 시작 
END = "<END>"      # 끝 
OOV = "<OOV>"      # out of vocabulary 
PAD_INDEX = 0
STA_INDEX = 1 
END_INDEX = 2 
OOV_INDEX = 3 

ENCODER_INPUT = 0
DECODER_INPUT = 1
DECODER_TARGET = 2

# 한 문장에서 단어 시퀀스의 최대 개수 
maxSequences = 30 

# 임베딩 벡터 차원 
embeddingDim = 100 

# LSTM 출력 차원 
lstmHiddenDim = 128 

# 정규표현식 필터 
RE_FILTER = re.compile("[.,!?\':;~()]'")

 

형태소 분석기 

# 형태소 분석기 
def posTag(sentences): 
    tagger = Okt()
    sentencePos = []
    for sentence in sentences:
        # 특수문자 제거 
        sentence = re.sub( RE_FILTER  ,  ""  , sentence )
        sentence = " ".join(tagger.morphs(sentence))
        sentencePos.append(sentence)
    return sentencePos 
    
question = posTag(question)
answer = posTag(answer)

# 질문 + 대답 을 하나로 합치기 
sentences = [] 
sentences.extend(question)
sentences.extend(answer)
len(sentences)

 

단어 배열 

words = []
# 단어배열 생성 
for sentence in sentences : 
    for word in sentence.split():
        words.append(word)


# words에서 길이가 0인 단어를 삭제 
# 중복단어를 삭제 
words = [ word for word in words if len(word)>0 ] # 길이가 0인 단어 삭제 
words = list(set(words)) # 중복 단어 삭제 

words[:0] = [PAD,STA,END,OOV]

# 단어에 대한 인덱스를 부여 -> 딕셔너리
wordToIndex = {word:index for index, word in enumerate(words)}
indexToWord = {index:word for index, word in enumerate(words)}
indexToWord

문장 인코딩

(인덱스로 변환) 

# 문장 -> 인덱스로 변환
def convertTextToIndex(sentences, voc, mytype):
    
    sentencesIndex=[]
    for sentence in sentences:
        
        sentenceIndex=[]
        if mytype == DECODER_INPUT:
            
            sentenceIndex.extend([voc[STA]])
            
        for word in sentence.split():
            
            if voc.get(word) is not None: # 단어에 해당하는 인덱스가 있는 경우
                sentenceIndex.extend([voc[word]]) # 단어에 해당되는 인덱스가 추가
                
            else: # 사전에 없는 단어의 경우 OOV추가
                sentenceIndex.extend([voc[OOV]])
                
        if mytype == DECODER_TARGET:
            
            # 디코더 출력은 맨 마지막에 end 추가
            if maxSequences <= len(sentenceIndex):
                sentenceIndex = sentenceIndex[:maxSequences-1] + [voc[END]]
                
            else:
                sentenceIndex += [voc[END]]
                
        else:
            if len(sentenceIndex) > maxSequences:
                sentenceIndex = sentenceIndex[:maxSequences]
                
        # 0으로 채움(pad_sequence)
        sentenceIndex += [wordToIndex[PAD]] * (maxSequences-len(sentenceIndex))
        
        sentencesIndex.append(sentenceIndex)
            
    return np.asarray(sentencesIndex)

 

# 인코더 입력, 디코더 입력, 디코더 출력 -> 인덱스 변환
xEncoder = convertTextToIndex(question, wordToIndex, ENCODER_INPUT)
print(xEncoder[0])

xDecoder = convertTextToIndex(answer, wordToIndex, DECODER_INPUT)
print(xDecoder[0])

yDecoder = convertTextToIndex(answer, wordToIndex, DECODER_TARGET)
print(yDecoder[0])

>>> 
[11288  9241  8527     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0]
[    1  2980   954  2846 10312  3334     0     0     0     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0]
[ 2980   954  2846 10312  3334     2     0     0     0     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0]

인코더 입력 : 12시 땡 
디코더 입력 : START 하루 가   또    가네요 
디코더 출력 : 하루 가    또  가네요   END 

 

원핫 인코딩 

#np.zeros((2,3,4))
oneHotData=np.zeros((len(yDecoder),maxSequences,len(words)))
#100(답변개수), 30(최대 단어 개수), 454(전체 단어 집합 개수)

np.shape(oneHotData)#디코더 출력
#하루 가 또 가네요 END
for i, seq in enumerate(yDecoder):# (100,30)
    for j, index in enumerate(seq):
        oneHotData[i,j,index]=1
yDecoder=oneHotData

 

모델 생성 

#훈련 모델 생성
#인코더 정의

#입력 문장의 인덱스 sequence를 입력
encoderInputs=layers.Input(shape=(None,))
#임베딩 계층
encoderOutputs=layers.Embedding(len(words),embeddingDim)(encoderInputs)

encoderOutputs,stateH, stateC=layers.LSTM(lstmHiddenDim,return_state=True, 
            dropout=0.2, recurrent_dropout=0.5)(encoderOutputs)
#return_state=True => 상태값 리턴
#LSTM은 2개 상태 존재(셀, 히든 스테이트)

encoderStates=[stateH, stateC]


#디코더 정의
#출력 문장의 인덱스 sequence를 입력
decoderInputs=layers.Input(shape=(None,))
#임베딩 계층
decoderEmbedding=layers.Embedding(len(words),
                                embeddingDim)
decoderOutputs=decoderEmbedding(decoderInputs)



decoderLSTM=layers.LSTM(lstmHiddenDim,
                        return_state=True, 
            return_sequences=True, 
                        dropout=0.2, 
                        recurrent_dropout=0.5)
decoderOutputs, _, _=decoderLSTM(decoderOutputs,initial_state=encoderStates)
decoderDense=layers.Dense(len(words), 
                          activation="softmax")
decoderOutputs=decoderDense(decoderOutputs)
model=models.Model([encoderInputs, decoderInputs],
             decoderOutputs)
             
model.compile(optimizer='rmsprop',
             loss='categorical_crossentropy',
             metrics=['accuracy'])
             
#예측 모델 인코더 정의
encoderModel=models.Model(encoderInputs, 
                          encoderStates)

#예측 모델 디코더 정의
#바로 앞에 있는 디코더의 출력(상태)을 입력 받아서
#예측을 해야 함.
decoderStateInputH=layers.Input(shape=(lstmHiddenDim,))
decoderStateInputC=layers.Input(shape=(lstmHiddenDim,))
decoderStatesInputs=[decoderStateInputH,decoderStateInputC]

#임베딩 계층
decoderOutputs=decoderEmbedding(decoderInputs)
#LSTM 계층
decoderOutputs, stateH, stateC=decoderLSTM(decoderOutputs,
           initial_state=decoderStatesInputs)
decoderStates=[stateH, stateC]

#Dense계층을 통해 원핫 형식으로 예측 단어 인덱스를 추출
decoderOutputs=decoderDense(decoderOutputs)

#예측 모델 디코더 설정
decoderModel=models.Model([decoderInputs]+decoderStatesInputs,
            [decoderOutputs]+decoderStates)

 

인덱스를 문장으로 변환 

#인덱스를 문장으로 변환
def convertIndexToText(indexs, voc):
    #구현
    sentence=""
    for i in indexs:
        if i==END_INDEX: #종료 인덱스
            break;
        if voc.get(i) is not None:
            sentence+=voc[i]
        else:
            sentence.extend([voc[OOV_INDEX]])
        sentence +=" "    
    return sentence    

#에폭
for epoch in range(10):
    print("total epoch:", epoch+1)
    history=model.fit([xEncoder,xDecoder], yDecoder,
             epochs=100,
             batch_size=64,
                     verbose=0)
    print("accuracy :", history.history['accuracy'])
    print("loss :", history.history['loss'])
    #문장 예측
    #3박 4일 놀러 가고 싶다 -> 여행 은 언제나 좋죠
    
    inputEncoder=xEncoder[2].reshape(1,xEncoder[2].shape[0]) #(30,) ->(1,30)
    inputDecoder=xDecoder[2].reshape(1,xDecoder[2].shape[0]) #(30,) ->(1,30)
    
    results=model.predict([inputEncoder,inputDecoder])
    
    #결과값에 대해서 가장 큰 값의 위치를 구함
    index=np.argmax(results[0], 1)
    #인덱스 -> 문장으로 변환
    sentence=convertIndexToText(index, indexToWord)
    print(sentence)
    print()
    

 

# 문장 입력
#[[320, 157, ..., 19,0,0,0,0...,0]]
def makePredictInput(sentence):    
    sentences=[]
    sentences.append(sentence)
    sentences=posTag(sentences)
    inputSeq=convertTextToIndex(sentences,
                                wordToIndex,
                                ENCODER_INPUT)
    return inputSeq
    
makePredictInput("3박4일 놀러가고 싶다")

>>> 
array([[ 4552,  4334, 12220,   893,  1422,   608,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0]])

 

예측 답변 생성 

#예측 답변 생성
def generateText(inputSeq):
        #입력을 인코더에 넣고, 마지막 상태 구함
        states=encoderModel.predict(inputSeq)
        
        #목표 시퀀스 초기화
        targetSeq=np.zeros((1,1))
        #<START> 시그널을 추가
        targetSeq[0,0]=STA_INDEX
        #인덱스 초기화
        indexs=[]
        
        #디코더 반복
        while 1:
            decoderOuputs, stateH, stateC=decoderModel.predict([targetSeq]+states)
            #결과를 원핫인코딩 형식으로 변환
            index=np.argmax(decoderOuputs[0,0,:])
            indexs.append(index)
            #종료 체크
            if index==END_INDEX or len(indexs)>=maxSequences:
                break
                
                #targetSeq를 이전 출력으로 설정
            targetSeq=np.zeros((1,1))
            targetSeq[0,0]=index    #STA_INDEX 
            
            #디코더의 이전 상태를 다음 디코더 예측에 사용
            states=[stateH, stateC]
            
        #인덱스를 문장으로 변환
        sentence=convertIndexToText(indexs, indexToWord)
        
        return sentence   

 

#입력문장 -> 인덱스
inputSeq=makePredictInput("놀러가고 싶다")
#[[320, 157, ..., 19,0,0,0,0...,0]]

sentence=generateText(inputSeq)
sentence #여행은 언제나 좋죠

>>> '단 짠으로 두 개 사는게 진리 죠 . '