TensorFlowでRNNを実装する

目的

文脈から次に来る文字を予測するRNNを作る。
具体的には連続する英字5文字をインプットデータとして与えた時に次に来る英字1文字をアウトプットするものを作りたい。

方法

TensorFlowを用いる。

RNNへ与える英字データは以下のリンクの記事本文の英字をone_hotベクトル形式にしたものを利用した。そのデータ整形のコードは後述する。
‘An Endless War’: Why 4 U.S. Soldiers Died in a Remote African Desert - The New York Times

RNNの構造を記述したコードは以下の通り。

def __init__(self):
    self.input_layer_size = 27
    self.hidden_layer_size = 30
    self.output_layer_size = 27
    self.chunk_size = 5
    self.batch_size = 128
    self.epoch = 100


def inference(self, input, initial_state):
    w_hidden = tf.Variable(tf.random_normal([self.input_layer_size, self.hidden_layer_size], dtype="float64"))
    b_hidden = tf.Variable(tf.random_normal([self.hidden_layer_size], dtype="float64"))
    w_output = tf.Variable(tf.random_normal([self.hidden_layer_size, self.output_layer_size], dtype="float64"))
    b_output = tf.Variable(tf.random_normal([self.output_layer_size], dtype="float64"))

    input = tf.reshape(tf.transpose(input, [1, 0, 2]), [-1, self.input_layer_size])
    input_to_hidden = (tf.matmul(input, w_hidden) + b_hidden)
    input_to_hidden = tf.split(input_to_hidden, self.chunk_size)

    cell = tf.nn.rnn_cell.BasicRNNCell(self.hidden_layer_size, tf.AUTO_REUSE)
    hidden_to_output, final_state = tf.contrib.rnn.static_rnn(cell, input_to_hidden, initial_state=initial_state)

    output = (tf.matmul(hidden_to_output[-1], w_output) + b_output)

    return output

以下に順に解説していく。

def __init__(self):
    self.input_layer_size = 27
    self.hidden_layer_size = 30
    self.output_layer_size = 27
    self.chunk_size = 5
    self.batch_size = 128
    self.epoch = 1000

input_layer_sizeとoutput_layer_sizeは英字の種類であり、27個(アルファベット26文字+インデント)。chunk_sizeは何文字前まで参照するかを定義し、今回は5文字。

inferenceメソッドはRNNの構造を記したもの。重み(w)とバイアス(b)は通常のDNNと同様。通常のDNN少し異なるのは以下の部分。

input = tf.reshape(tf.transpose(input, [1, 0, 2]), [-1, self.input_layer_size])
input_to_hidden = (tf.matmul(input, w_hidden) + b_hidden)
input_to_hidden = tf.split(input_to_hidden, self.chunk_size)

cell = tf.nn.rnn_cell.BasicRNNCell(self.hidden_layer_size, tf.AUTO_REUSE)
hidden_to_output, final_state = tf.contrib.rnn.static_rnn(cell, input_to_hidden, initial_state=self.initial_state)

output = (tf.matmul(hidden_to_output[-1], w_output) + b_output)

RNNは隠れ層に繰り返しinputを渡し続けるNNであるためinputはデータがchunk_size(5個)だけ繋がったリストとなる。
隠れ層はデータを受け取るたびに出力層へデータを渡す。今回欲しい出力データはこのうちの最後(5個目)のデータである。なぜなら、RNNは隠れ層のcellと呼ばれる部分に過去の入力データを記憶する性質を持つため、最後に出力されるデータは過去全ての入力データを参照したものとなるからである。

コードの全容は以下の通り。

import tensorflow as tf
import numpy as np
import re



class Prepare_data:


    def __init__(self, file):
        self.chunk_size = 5
        self.vocabulary_size = 27
        self.file = open(file)


    def file_to_text(self):
        text = self.file.read()
        return text


    def text_to_array(self):
        text = self.file_to_text()
        text = text.lower()
        text = text.replace("\n", " ")
        text = re.sub(r"[^a-z ]", "", text)
        text = re.sub(r"[ ]+", " ", text)

        code_list = []
        for i in range(len(text)):
            if text[i] == " ":
                code_list.append(self.vocabulary_size - 1)
            else:
                code_list.append(ord(text[i])-ord("a"))
        code_array = np.array(code_list)
        return code_array


    def array_to_one_hot(self):
        array = self.text_to_array()
        one_hot = np.eye(self.vocabulary_size)[array]
        return one_hot


    def make_data(self):
        one_hot = self.array_to_one_hot()
        data_num = one_hot.shape[0] - self.chunk_size
        input_data = np.zeros([data_num, self.chunk_size, self.vocabulary_size])
        output_data = np.zeros([data_num, self.vocabulary_size])
        for i in range(data_num):
            output_data[i, :] = one_hot[i + self.chunk_size, :]
            for j in range(self.chunk_size):
                input_data[i, j, :] = one_hot[i + j, :]
        training_num = data_num * 4 // 5
        input_train = input_data[: training_num]
        output_train = output_data[: training_num]
        input_test = input_data[training_num :]
        output_test = output_data[training_num :]
        return input_train, output_train, input_test, output_test



class Rnn:


    def __init__(self):
        self.input_layer_size = 27
        self.hidden_layer_size = 30
        self.output_layer_size = 27
        self.chunk_size = 5
        self.batch_size = 128
        self.epoch = 100


    def inference(self, input_data, initial_state):
        w_hidden = tf.Variable(tf.random_normal([self.input_layer_size, self.hidden_layer_size], dtype="float64"))
        b_hidden = tf.Variable(tf.random_normal([self.hidden_layer_size], dtype="float64"))
        w_output = tf.Variable(tf.random_normal([self.hidden_layer_size, self.output_layer_size], dtype="float64"))
        b_output = tf.Variable(tf.random_normal([self.output_layer_size], dtype="float64"))

        input_data = tf.reshape(tf.transpose(input_data, [1, 0, 2]), [-1, self.input_layer_size])
        input_to_hidden = (tf.matmul(input_data, w_hidden) + b_hidden)
        input_to_hidden = tf.split(input_to_hidden, self.chunk_size)

        cell = tf.nn.rnn_cell.BasicRNNCell(self.hidden_layer_size, reuse=tf.AUTO_REUSE)
        hidden_to_output, final_state = tf.contrib.rnn.static_rnn(cell, input_to_hidden, initial_state=initial_state)

        output = (tf.matmul(hidden_to_output[-1], w_output) + b_output)

        return output


    def cost(self, output_data, labels):
        cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=labels, logits=output_data))
        return cost


    def training(self, cost):
        optimizer = tf.train.AdamOptimizer()
        training = optimizer.minimize(cost)
        return training


    def train(self, file):
        test_data = Prepare_data(file)
        input_train, output_train, input_test, output_test = test_data.make_data()
        zero_state = np.zeros([self.batch_size, self.hidden_layer_size], dtype="float64")

        input_data = tf.placeholder(tf.float64, [None, self.chunk_size, self.input_layer_size])
        labels = tf.placeholder(tf.float64, [None, self.output_layer_size])
        initial_state = tf.placeholder(tf.float64, [None, self.hidden_layer_size])

        prediction = self.inference(input_data, initial_state)
        cost = self.cost(prediction, labels)
        training = self.training(cost)
        correct = tf.equal(tf.argmax(prediction, 1), tf.argmax(labels, 1))
        accuracy = tf.reduce_mean(tf.cast(correct, dtype="float64"))

        with tf.Session() as sess:
            init = tf.global_variables_initializer()
            sess.run(init)

            for epoch in range(self.epoch):
                step = 1
                sum_cost = 0
                sum_acc = 0

                while self.batch_size * step < input_train.shape[0]:
                    input_batch = input_train[self.batch_size * (step - 1) : self.batch_size * step]
                    output_batch = output_train[self.batch_size * (step - 1) : self.batch_size * step]
                    c, _, a= sess.run([cost, training, accuracy], feed_dict = {input_data: input_batch, labels: output_batch, initial_state: zero_state})
                    sum_cost += c
                    sum_acc += a
                    step += 1

                ave_cost = sum_cost / step
                epoch_acc = sum_acc / step
                print("epoch: {0}, cost: {1}, epoch_accuracy: {2}".format(epoch, ave_cost, epoch_acc))

            print("Training finished")

            saver = tf.train.Saver()
            saver.save(sess, "./rnn_model")

            zero_state = np.zeros([input_test.shape[0], self.hidden_layer_size], dtype = "float64")

            a = sess.run(accuracy, feed_dict = {input_data: input_test, labels: output_test, initial_state: zero_state})
            print("accuracy: {0}".format(a))


    def predict(self, context):
        context = context.replace("\n", " ")
        context = re.sub(r"[^a-z ]", "", context)
        context = re.sub(r"[ ]+", " ", context)

        code_list = []
        for i in range(self.chunk_size):
            if context[- self.chunk_size + i] == " ":
                code_list.append(self.input_layer_size - 1)
            else:
                code_list.append(ord(context[- self.chunk_size + i])-ord("a"))
        code_array = np.array(code_list)
        one_hot = np.eye(self.input_layer_size)[code_array]
        input_pred = np.array([one_hot])

        zero_state = np.zeros([1, self.hidden_layer_size], dtype="float64")

        input_data = tf.placeholder(tf.float64, [None, self.chunk_size, self.input_layer_size])
        initial_state = tf.placeholder(tf.float64, [None, self.hidden_layer_size])

        prediction = tf.nn.softmax(self.inference(input_pred, initial_state))
        labels_pred = tf.argmax(prediction, 1)

        with tf.Session() as sess:
            saver = tf.train.Saver()
            saver.restore(sess, "./rnn_model")

            p, l = sess.run([prediction, labels_pred], feed_dict = {input_data: input_pred, initial_state: zero_state})

            for i in range(27):
                c = "_" if i == 26 else chr(i + ord("a"))
                print("{0}: {1}".format(c, p[0][i]))

            print("prediction: {0}{1}".format(context, "_" if l[0] == 26 else chr(l[0] + ord("a"))))




if __name__=="__main__":
    test_rnn = Rnn()
    test_rnn.train("make_data.txt")
    test_rnn.predict("convenenc")

正答率accuracyは4~4.5割程度であった。
また、最後のpredictメソッドを用いて、このコードでは単語convenienceの最後の文字を予測している。出力は以下の通り。

a: 0.019035454464732912
b: 4.738969637915792e-05
c: 0.002247770203502709
d: 0.001837168048159533
e: 0.49697183882149765
f: 6.776540899790401e-05
g: 6.803134720461048e-05
h: 0.005946570921486447
i: 0.1673174318507702
j: 7.938482418039402e-06
k: 0.05220353655820905
l: 0.047701385886800916
m: 0.0013250150854317872
n: 3.290372918878667e-06
o: 0.054324048979877804
p: 9.949375085362273e-05
q: 2.1549538581465823e-06
r: 0.0037619100217858355
s: 0.001999873710888758
t: 0.05791267551071772
u: 0.012747023559624206
v: 0.00020888998332423605
w: 0.0006592194567317431
x: 5.221744771573335e-09
y: 0.03779918142399007
z: 1.128392322579622e-07
_: 0.035704823438861145
prediction: convenience

1字ごとの確率を表示したのちに最終的な予測を表示している。今回は正しく予測できている。

参考

シンプルなRNNで文字レベルの言語モデルをTensorFlowで実装してみる - 今日も窓辺でプログラム
TensorFlow