摘要:比如一個機器翻譯模型,輸入是模型輸出湯姆,追逐,杰瑞。模型目前在機器翻譯,圖片描述任務,語音識別都有大量應用,熟練使用對于解決實際問題會有很大的幫助。
介紹
Attention模型形象的比喻就是“圖像對焦”。
上圖是Encoder-Decoder模型,Decoder中每個單詞生成過程如下:
其中C是“語義編碼C”,f是Decoder的非線性變換函數。由此,我們可以看出生成目標句子的每個單詞都使用同一個語義編碼C,即:源句子中的每個單詞的影響力都是一樣的,這如同圖像沒有對焦的情況,現實項目中也存在明顯的不合理。比如一個機器翻譯模型,輸入是“Tom chase Jerry”,模型輸出:“湯姆”,“追逐”,“杰瑞”。在翻譯“杰瑞”的時候顯然“Jerry”的貢獻值最大,如果每個單詞的貢獻值相同明顯不合理。這個問題在輸入句子長度較短時問題不大,但是當輸入句子較長時會丟失很多細節信息(個人覺得此處類似平均池化和最大值池化)。正因為如此,我們引入了Attention思想。
Soft Attention模型使用Attention模型翻譯“杰瑞”的時候,我們可以得到輸入句子中的每個單詞對輸出當前單詞的貢獻值大小如:(Tom,0.3)(Chase,0.2) (Jerry,0.5)。這意味著生成每個單詞yi時不再使用同一個語義編碼C,而是根據yi使用不同的Ci。在引入Attention模型后yi的計算過程改變如下所示:
每個Ci對應源句子中每個單詞的注意力分配概率,示例如下:
f2是Encoder對每個單詞的變換函數,g函數代表整個源句子的中間語義表示的變換函數,一般形式是加權求和:
aji代表注意力分配系數,hj代表源句子中某個單詞的語義編碼,Lx代表源句子中單詞數量。g函數的計算過程如下圖所示:
Attention模型概率計算如果所示,當我們要生成yi單詞,此時我們用i-1時刻的隱藏節點輸出值Hi-1去和源句子中的每個單詞對應RNN隱藏節點狀態hj依次進行對比,即:通過函數F(hj,Hi-1)來獲得yi對源句子中每個單詞對應的對齊可能性,函數F常見方法如下圖所示:
然后使用Softmax函數進行數值歸一化處理。如對“對齊概率”不理解的朋友,可以查看下圖英語-德語翻譯系統中加入Attention機制后,Encoder和Decoder兩個句子中每個單詞對應注意力分配概率分布。
Self Attention模型在Soft Attention模型中,Attention機制發生在Decoder中Yi和Encoder中的所有元素之間。Self Attention模型不是在兩者之間,而是Decoder內部元素之間或者Encoder內部元素之間發生的Attention機制,計算方法和Soft Attention模型一致。那么Self Attention模型有什么好處?我們依然以機器翻譯為例:
如圖所示,Self Attention模型在內部可以捕獲一些句法特征或語義特征。Self Attention模型相比傳統RNN模型需要依次序序列計算,它的感受野更大,可以直接將句子中的任意兩個單詞的聯系通過一個計算步驟聯系起來,可以捕獲遠距離的相互依賴特征(就像列表和數組的區別)。此外,Self Attention模型對于增加計算的并行性也有幫助。
案例我們使用的語言數據集是“英語-西班牙語”,數據集樣本如下圖所示:
數據導入# 數據下載 path_to_zip=tf.keras.utils.get_file( fname="spa-eng.zip", origin="http://download.tensorflow.org/data/spa-eng.zip", # 解壓tar zip文件 extract=True ) path_to_file=os.path.dirname(path_to_zip)+"/spa-eng/spa.txt"
轉碼:
def unicode_to_ascii(sen): return "".join( char for char in unicodedata.normalize("NFD",sen) if unicodedata.category(char) != "Mn" )數據預處理
每條訓練語句添加開始和結束標記
移除句子中的特殊字符
字符轉ID,ID轉字符并排序
將句子補長到預設的最大長度
def preprocess_sentence(w): w = unicode_to_ascii(w.lower().strip()) # 在單詞和標點之間創建空格 # 如: "he is a boy." => "he is a boy ." w = re.sub(r"([?.!,?])", r" 1 ", w) w = re.sub(r"[" "]+", " ", w) # 特殊字符以空格代替 w = re.sub(r"[^a-zA-Z?.!,?]+", " ", w) w = w.rstrip().strip() # 添加開始和結束標記 w = "" + w + " " return w
創建數據集:
def create_dataset(path, num_examples): lines = open(path, encoding="UTF-8").read().strip().split(" ") word_pairs = [[preprocess_sentence(w) for w in l.split(" ")] for l in lines[:num_examples]] # 返回格式:[ENGLISH, SPANISH] return word_pairs
字符轉ID,ID轉字符,并排序:
class LanguageIndex(): def __init__(self,lang): self.lang=lang self.wrod2idx={} self.id2word={} self.vacab=set() self.create_index() def create_index(self): for phrase in self.lang: # 添加到集合中,重復內容不添加 self.vacab.update(phrase.split(" ")) self.vacab=sorted(self.vacab) self.wrod2idx[""]=0 #字符-ID轉換 for index,word in enumerate(self.vacab): self.wrod2idx[word]=index+1 for word,index in self.wrod2idx.items(): self.id2word[index]=word
加載數據集:
# 計算最大長度 def max_length(tensor): return max(len(t) for t in tensor)
def load_dataset(path,num_example): #get inputs outputs pairs=create_dataset(path,num_example) # 獲取ID表示 inp_lang=LanguageIndex(sp for en,sp in pairs) targ_lang=LanguageIndex(en for en,sp in pairs) # LanguageIndex 不包含重復值,以下包含重復值 input_tensor=[[inp_lang.wrod2idx[s]for s in sp.split(" ")]for en,sp in pairs] target_tensor=[[targ_lang.wrod2idx[s]for s in en.split(" ")]for en,sp in pairs] max_length_inp,max_length_tar=max_length(input_tensor),max_length(target_tensor) # 將句子補長到預設的最大長度 # padding: post:后補長,pre:前補長 input_tensor=tf.keras.preprocessing.sequence.pad_sequences( sequences=input_tensor, maxlen=max_length_inp, padding="post" ) target_tensor=tf.keras.preprocessing.sequence.pad_sequences( sequences=target_tensor, maxlen=max_length_tar, padding="post" ) return input_tensor,target_tensor,inp_lang,targ_lang,max_length_inp,max_length_tar
創建訓練集驗證集:
# 本次項目只使用前30000條數據 num_examples = 30000 input_tensor, target_tensor, inp_lang, targ_lang, max_length_inp, max_length_targ = load_dataset(path_to_file, num_examples)
# 訓練集80%,驗證集20% input_tensor_train, input_tensor_val, target_tensor_train, target_tensor_val = train_test_split(input_tensor, target_tensor, test_size=0.2)模型訓練配置
# 打亂數據集 BUFFER_SIZE=len(input_tensor_train) BATCH_SIZE=64 # 每個epoch迭代次數 N_BATCH=BUFFER_SIZE // BATCH_SIZE # 詞嵌入維度 embedding_dim=256 # 隱藏神經元數量 units=1024 vocab_inp_size=len(inp_lang.wrod2idx) vocab_tar_size=len(targ_lang.wrod2idx) dataset=tf.data.Dataset.from_tensor_slices((input_tensor_train,target_tensor_train)).shuffle(BUFFER_SIZE) # drop_remainder 當剩余數據量小于batch_size時候,是否丟棄 dataset=dataset.batch(BATCH_SIZE,drop_remainder="True")案例Attention模型計算
文章開始我們介紹了Attention模型的計算過程,相信你會很容易理解上圖的內容。對每個節點具體方程實現如下:
FC=全連接層,EO=編碼器輸出,H=隱藏層狀態,X=解碼器輸入,模型計算過程如下表示:
score = FC(tanh(FC(EO) + FC(H)))
attention weights = softmax(score, axis = 1)
context vector = sum(attention weights * EO, axis = 1)
embedding output=解碼器輸入X,輸入詞嵌入層
merged vector=concat(embedding output, context vector)
將merged vector輸入到GRU
創建模型GRU配置:
def gru(units): # 使用GPU加速運算 if tf.test.is_gpu_available(): return tf.keras.layers.CuDNNGRU(units, return_sequences=True, return_state=True, # 循環核的初始化方法 # glorot_uniform是sqrt(2 / (fan_in + fan_out))的正態分布產生 # 其中fan_in和fan_out是權重張量的扇入扇出(即輸入和輸出單元數目) recurrent_initializer="glorot_uniform") else: return tf.keras.layers.GRU(units, return_sequences=True, return_state=True, # hard_sigmoid <= -1 輸出0,>=1 輸出1 ,中間為線性 recurrent_activation="sigmoid", recurrent_initializer="glorot_uniform")
編碼器:
class Encoder(tf.keras.Model): def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz): super(Encoder, self).__init__() self.batch_sz = batch_sz self.enc_units = enc_units self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) self.gru = gru(self.enc_units) def call(self, x, hidden): x = self.embedding(x) output, state = self.gru(x, initial_state = hidden) return output, state def initialize_hidden_state(self): return tf.zeros((self.batch_sz, self.enc_units))
解碼器:
class Decoder(tf.keras.Model): def __init__(self,vocab_size,embedding_dim,dec_units,batch_sz): super(Decoder, self).__init__() self.batch_sz=batch_sz self.dec_units=dec_units self.embedding=tf.keras.layers.Embedding( input_shape=vocab_size, output_dim=embedding_dim ) self.gru=gru(self.dec_units) self.fc=tf.keras.layers.Dense(units=vocab_size) # 用于計算score,即:注意力權重系數 self.W1=tf.keras.layers.Dense(self.dec_units) self.W2=tf.keras.layers.Dense(self.dec_units) self.V=tf.keras.layers.Dense(units=1) def __call__(self,x,hidden,ec_output): # tf.expand_dims:在指定索引出增加一維度,值為1,從索引0開始 # axis: 取值范圍是[-階數,階數],二維的時候0指的是列,1指的是行, # 更高維度的時候,數值是由外向里增加,如:3維向量,外向內依次是:0,1,2 # 通過計算score公式可得,需要將hidden維度擴展至:[batch_size,1,hidden_size] hidden_with_time_axis=tf.expand_dims(hidden,axis=1) # score=[batch_size, max_length, 1] score=self.V(tf.nn.tanh(self.W1(ec_output)+self.W2(hidden_with_time_axis))) # 數值歸一化和為1的概率分布值 attention_weight=tf.nn.softmax(score,axis=1) context_vetor=attention_weight*ec_output # 求和平均 context_vetor=tf.reduce_sum(context_vetor,axis=1) X=self.embedding(x) # 合并解碼器embedding輸出和context vector x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1) # output shape=(batch_size,time_step,hidden_size) # state shape=(batch_size,hidden_size) output,state=self.gru(x) # output[batch_size*1,hidden_size] output=tf.reshape(output,shape=(-1,output.shape[2])) x-self.fc(output) return x,state,attention_weight def initilize_hidden_size(self): return tf.zeros((self.batch_sz,self.dec_units))
實例化模型:
encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE) decoder = Decoder(vocab_tar_size, embedding_dim, units, BATCH_SIZE)
損失函數,優化器:
optimizer = tf.train.AdamOptimizer() def loss_function(real, pred): mask = 1 - np.equal(real, 0) loss_ = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=real, logits=pred) * mask return tf.reduce_mean(loss_)
模型保存:
checkpoint_dir = "./training_checkpoints" checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt") checkpoint = tf.train.Checkpoint(optimizer=optimizer, encoder=encoder, decoder=decoder)訓練
由于我們使用Teacher Forcing進行訓練,所以我們簡單介紹下。
如圖所示Teacher Forcing與Free-running不同,在訓練過程中不再是前一時刻的hidden-output作為當前輸入,
而是在Ground Truth中找到對應的上一項作為當前輸入。早期的RNN很弱,如果生成了非常差的結果Free-running的運行方式會導致后面的hidden-output都受到影響。Teacher Forcing運行方式就可以避免這種問題,缺點也很明顯它嚴重依賴標簽數據。
# 迭代10次訓練集 EPOCHS = 10 for epoch in range(EPOCHS): start = time.time() hidden = encoder.initialize_hidden_state() total_loss = 0 for (batch, (inp, targ)) in enumerate(dataset): loss = 0 # 先記錄梯度 with tf.GradientTape() as tape: # 編碼器輸出 enc_output, enc_hidden = encoder(inp, hidden) dec_hidden = enc_hidden dec_input = tf.expand_dims([targ_lang.word2idx["翻譯"]] * BATCH_SIZE, 1) # 使用Teacher forcing運行方式 for t in range(1, targ.shape[1]): # 解碼器輸出 predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output) loss += loss_function(targ[:, t], predictions) # 樣本標簽作為輸入 dec_input = tf.expand_dims(targ[:, t], 1) batch_loss = (loss / int(targ.shape[1])) # one_loss++;batch_loss++ total_loss += batch_loss variables = encoder.variables + decoder.variables gradients = tape.gradient(loss, variables) optimizer.apply_gradients(zip(gradients, variables)) if batch % 100 == 0: print("Epoch {} Batch {} Loss {:.4f}".format(epoch + 1, batch, batch_loss.numpy())) # 每迭代2次訓練集保存一次模型 if (epoch + 1) % 2 == 0: checkpoint.save(file_prefix = checkpoint_prefix)
評估函數我們不使用teacher-forcing模式,解碼器的每步輸入是它前一時刻的hidden-state和編碼器輸出,當模型遇到
# 和訓練模型函數代碼基本一致 def evaluate(sentence, encoder, decoder, inp_lang, targ_lang, max_length_inp, max_length_targ): attention_plot = np.zeros((max_length_targ, max_length_inp)) # 數據預處理 sentence = preprocess_sentence(sentence) # 向量化表示輸入數據 inputs = [inp_lang.word2idx[i] for i in sentence.split(" ")] # 后置補長 inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs], maxlen=max_length_inp, padding="post") inputs = tf.convert_to_tensor(inputs) result = "" hidden = [tf.zeros((1, units))] enc_out, enc_hidden = encoder(inputs, hidden) dec_hidden = enc_hidden # 維度擴展batch_size dec_input = tf.expand_dims([targ_lang.word2idx[""]], 0) for t in range(max_length_targ): predictions, dec_hidden, attention_weights = decoder(dec_input, dec_hidden, enc_out) # 保存權重用于稍后可視化展示 attention_weights = tf.reshape(attention_weights, (-1, )) attention_plot[t] = attention_weights.numpy() predicted_id = tf.argmax(predictions[0]).numpy() # 獲取文本翻譯結果 result += targ_lang.idx2word[predicted_id] + " " # 預設的結束標記 if targ_lang.idx2word[predicted_id] == " ": return result, sentence, attention_plot # 預測值作為輸入,以此輸出下一時刻單詞 dec_input = tf.expand_dims([predicted_id], 0) return result, sentence, attention_plot
可視化權重值:
fig = plt.figure(figsize=(10,10)) ax = fig.add_subplot(1, 1, 1) ax.matshow(attention, cmap="viridis") fontdict = {"fontsize": 14} ax.set_xticklabels([""] + sentence, fontdict=fontdict, rotation=90) ax.set_yticklabels([""] + predicted_sentence, fontdict=fontdict) plt.show()總結
本篇文章篇幅較多,不過項目的重點是Attention思想的理解,Self Attention模型具有更長的感受野,更容易捕獲長距離的相互依賴特征,目前Google機器翻譯模型就大量使用到Self Attention。Attention模型目前在機器翻譯,圖片描述任務,語音識別都有大量應用,熟練使用Attention對于解決實際問題會有很大的幫助。
文章部分內容參考 Yash Katariya 和 張俊林,在此表示感謝。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/42770.html
摘要:本文以機器翻譯為例,深入淺出地介紹了深度學習中注意力機制的原理及關鍵計算機制,同時也抽象出其本質思想,并介紹了注意力模型在圖像及語音等領域的典型應用場景。 最近兩年,注意力模型(Attention Model)被廣泛使用在自然語言處理、圖像識別及語音識別等各種不同類型的深度學習任務中,是深度學習技術中最值得關注與深入了解的核心技術之一。本文以機器翻譯為例,深入淺出地介紹了深度學習中注意力機制...
摘要:摘要在年率先發布上線了機器翻譯系統后,神經網絡表現出的優異性能讓人工智能專家趨之若鶩。目前在阿里翻譯平臺組擔任,主持上線了阿里神經網絡翻譯系統,為阿里巴巴國際化戰略提供豐富的語言支持。 摘要: 在2016年Google率先發布上線了機器翻譯系統后,神經網絡表現出的優異性能讓人工智能專家趨之若鶩。本文將借助多個案例,來帶領大家一同探究RNN和以LSTM為首的各類變種算法背后的工作原理。 ...
閱讀 3528·2021-09-22 15:50
閱讀 3233·2019-08-30 15:54
閱讀 2748·2019-08-30 14:12
閱讀 3058·2019-08-30 11:22
閱讀 2079·2019-08-29 11:16
閱讀 3574·2019-08-26 13:43
閱讀 1192·2019-08-23 18:33
閱讀 920·2019-08-23 18:32