MerlinのDNNモデルを記述したコードの解説

概要

音声合成ツールMerlinのDNNモデルを記述したコードの解説。

Theano

学習を促すのはsrc/run_merlin.py151行目のtrain_DNNメソッド。その244行目での記述によるとモデルはsrc/models/deep_rnn.pyDeepRecurrentNetworkクラスによって生成される。
※名前に反してRNN以外の通常のDNNモデルもこのクラスによって生成される。

dnn_model = DeepRecurrentNetwork(n_in= n_ins, hidden_layer_size = hidden_layer_size, n_out = n_outs, L1_reg = l1_reg, L2_reg = l2_reg, hidden_layer_type = hidden_layer_type, output_type = cfg.output_layer_type, dropout_rate = dropout_rate, optimizer = cfg.optimizer, rnn_batch_training = cfg.rnn_batch_training)

まず、このクラスのコンストラク__init__src/model/deep_rnn.py28行目)において隠れ層、出力層、損失関数などについて記述されている。 configurationファイルのhidden_layer_typeは隠れ層の数とそれぞれの活性化関数を記述した項目で、tanhやlstmなどの文字列のリストが入る。この項目に沿ってsrc/layer内に記述される層が生成され、それらがリストで繋げられる。主にサポートしているのは下の5項目。
src/model/deep_rnn.py59行目

self.list_of_activations = ['TANH', 'SIGMOID', 'SOFTMAX', 'RELU', 'RESU']

この5項目に該当しない場合は個別に条件分岐する。
src/models/deep_rnn.py91行目

if hidden_layer_type[i] in self.list_of_activations:
    hidden_activation = hidden_layer_type[i].lower()
    hidden_layer = GeneralLayer(rng, layer_input, input_size, hidden_layer_size[i], activation=hidden_activation, p=self.dropout_rate, training=self.is_train)
elif hidden_layer_type[i] == 'TANH_LHUC':
    hidden_layer = SigmoidLayer_LHUC(rng, layer_input, input_size, hidden_layer_size[i], activation=T.tanh, p=self.dropout_rate, training=self.is_train)
elif hidden_layer_type[i] == 'SLSTM':
    hidden_layer = SimplifiedLstm(rng, layer_input, input_size, hidden_layer_size[i], p=self.dropout_rate, training=self.is_train, rnn_batch_training=self.rnn_batch_training)
elif hidden_layer_type[i] == 'SGRU':
    hidden_layer = SimplifiedGRU(rng, layer_input, input_size, hidden_layer_size[i], p=self.dropout_rate, training=self.is_train, rnn_batch_training=self.rnn_batch_training)
elif hidden_layer_type[i] == 'GRU':
    hidden_layer = GatedRecurrentUnit(rng, layer_input, input_size, hidden_layer_size[i], p=self.dropout_rate, training=self.is_train, rnn_batch_training=self.rnn_batch_training)
elif hidden_layer_type[i] == 'LSTM_NFG':
    hidden_layer = LstmNFG(rng, layer_input, input_size, hidden_layer_size[i], p=self.dropout_r
ate, training=self.is_train, rnn_batch_training=self.rnn_batch_training)
elif hidden_layer_type[i] == 'LSTM_NOG':
    hidden_layer = LstmNOG(rng, layer_input, input_size, hidden_layer_size[i], p=self.dropout_rate, training=self.is_train, rnn_batch_training=self.rnn_batch_training)
elif hidden_layer_type[i] == 'LSTM_NIG':
    hidden_layer = LstmNIG(rng, layer_input, input_size, hidden_layer_size[i], p=self.dropout_rate, training=self.is_train, rnn_batch_training=self.rnn_batch_training)
elif hidden_layer_type[i] == 'LSTM_NPH':
    hidden_layer = LstmNoPeepholes(rng, layer_input, input_size, hidden_layer_size[i], p=self.dropout_rate, training=self.is_train, rnn_batch_training=self.rnn_batch_training)
elif hidden_layer_type[i] == 'LSTM':
    hidden_layer = VanillaLstm(rng, layer_input, input_size, hidden_layer_size[i], p=self.dropout_rate, training=self.is_train, rnn_batch_training=self.rnn_batch_training)
elif hidden_layer_type[i] == 'BSLSTM':
    hidden_layer = BidirectionSLstm(rng, layer_input, input_size, hidden_layer_size[i], hidden_layer_size[i], p=self.dropout_rate, training=self.is_train, rnn_batch_training=self.rnn_batch_training)
elif hidden_layer_type[i] == 'BLSTM':
    hidden_layer = BidirectionLstm(rng, layer_input, input_size, hidden_layer_size[i], hidden_layer_size[i], p=self.dropout_rate, training=self.is_train, rnn_batch_training=self.rnn_batch_training)
elif hidden_layer_type[i] == 'RNN':
    hidden_layer = VanillaRNN(rng, layer_input, input_size, hidden_layer_size[i], p=self.dropout_rate, training=self.is_train, rnn_batch_training=self.rnn_batch_training)
elif hidden_layer_type[i] == 'LSTM_LHUC':
    hidden_layer = VanillaLstm_LHUC(rng, layer_input, input_size, hidden_layer_size[i], p=self.dropout_rate, training=self.is_train, rnn_batch_training=self.rnn_batch_training)
else:
    logger.critical("This hidden layer type: %s is not supported right now! \n Please use one of the following: SLSTM, BSLSTM, TANH, SIGMOID\n" %(hidden_layer_type[i]))
    sys.exit(1)

かなり多くのパターンをサポートしていることが分かる。
出力層はlinearrecurrentと上のself.list_of_activationsリスト内の項目がサポートされている。
src/models/deep_rnn.py131行目

if output_activation == 'linear':
    self.final_layer = LinearLayer(rng, self.rnn_layers[-1].output, input_size, self.n_out)
elif output_activation == 'recurrent':
    self.final_layer = RecurrentOutputLayer(rng, self.rnn_layers[-1].output, input_size, self.n_out, rnn_batch_training=self.rnn_batch_training)
elif output_type.upper() in self.list_of_activations:
     self.final_layer = GeneralLayer(rng, self.rnn_layers[-1].output, input_size, self.n_out, activation=output_activation)
else:
    logger.critical("This output layer type: %s is not supported right now! \n Please use one of the following: LINEAR, BSLSTM\n" %(output_type))
    sys.exit(1)

これら出力層も隠れ層同様にsrc/layer内のファイルで記述されている。
損失関数はデフォルトで二乗誤差が設定されているが、全部で3種類サポートされている。 src/models/deep_rnn.py149行目

if self.loss_function == 'CCE':
    self.finetune_cost = self.categorical_crossentropy_loss(self.final_layer.output, self.y) 
    self.errors        = self.categorical_crossentropy_loss(self.final_layer.output, self.y) 
elif self.loss_function == 'Hinge':    
    self.finetune_cost = self.multiclass_hinge_loss(self.final_layer.output, self.y)
    self.errors        = self.multiclass_hinge_loss(self.final_layer.output, self.y)
elif self.loss_function == 'MMSE':
    if self.rnn_batch_training:
        self.y_mod = T.reshape(self.y, (-1, n_out))
        self.final_layer_output = T.reshape(self.final_layer.output, (-1, n_out))

        nonzero_rows = T.any(self.y_mod, 1).nonzero()
            
        self.y_mod = self.y_mod[nonzero_rows]
        self.final_layer_output = self.final_layer_output[nonzero_rows]
            
        self.finetune_cost = T.mean(T.sum((self.final_layer_output - self.y_mod) ** 2, axis=1))
        self.errors = T.mean(T.sum((self.final_layer_output - self.y_mod) ** 2, axis=1))
    else:
        self.finetune_cost = T.mean(T.sum((self.final_layer.output - self.y) ** 2, axis=1))
        self.errors = T.mean(T.sum((self.final_layer.output - self.y) ** 2, axis=1))

src/run_merlin.pyを見ると244行目でモデルを定義したのちに、284行目でbuild_finetune_functionメソッドを用いていることが分かる。このメソッドは、上のDeepRecurrentNetworkクラスのメソッドで、src/models/deep_rnn.py187行目に記述されている。損失関数を定義するメソッドであるが、以下のupdatesオプティマイザにあたり、メソッドを繰り返すたびに共有変数(T.share)を更新するアルゴリズムである。
src/models/deep_rnn.py248行目

train_model = theano.function(inputs = [lr, mom],  #index, batch_size
                              outputs = self.errors,
                              updates = updates,
                              givens = {self.x: train_set_x, #[index*batch_size:(index + 1)*batch_size]
                                        self.y: train_set_y,
                                        self.is_train: np.cast['int32'](1)}, on_unused_input='ignore')

ただ、build_finetune_functionという名前でありながら、どこでファインチューニングしているかがよく分からない。

TensorFlow

モデルの大枠はsrc/tensorflow_lib/models.pyに記述されており、実際の学習や予測の様子はsrc/tensorflow_lib/train.pyに記述されている。models.pyのクラスをtrain.pyのクラスが継承することで変数やメソッドを共有している。流れとしては以下の通り。
1 src/run_merlin.py816行目でsrc/run_tensorflow_with_merlin_io.pyのオブジェクト生成、850行目でそれにモデル定義・学習を指示する。

elif cfg.switch_to_tensorflow:
    ### call Tensorflowclass and use an instance ###
    from run_tensorflow_with_merlin_io import TensorflowClass
    tf_instance = TensorflowClass(cfg)
elif cfg.switch_to_tensorflow:
    tf_instance.train_tensorflow_model()

2 src/run_tensorflow_with_merlin_io.py148行目でsrc/tensorflow_lib/train.pyのオブジェクト生成、182行目でそれにモデル定義を指示、195行目でそれにモデル学習を指示する。

if not self.encoder_decoder:
    self.tensorflow_models = TrainTensorflowModels(self.inp_dim, self.hidden_layer_size, self.out_dim, self.hidden_layer_type, self.model_dir, output_type=self.output_layer_type, dropout_rate=self.dropout_rate, loss_function=self.loss_function, optimizer=self.optimizer)
else:
    self.encoder_decoder_models = Train_Encoder_Decoder_Models(self.inp_dim,self.hidden_layer_size,self.out_dim,self.hidden_layer_type,output_type=self.output_layer_type, dropout_rate=self.dropout_rate,loss_function=self.loss_function,optimizer=self.optimizer, attention=self.attention,cbhg=self.cbhg)
#### define the model ####
if self.sequential_training:
    utt_length=train_flen["utt2framenum"].values()
    self.tensorflow_models.get_max_step(max(utt_length))
    self.tensorflow_models.define_sequence_model()

elif self.encoder_decoder:
    utt_length=train_flen["utt2framenum"].values()
    super(Train_Encoder_Decoder_Models,self.encoder_decoder_models).__setattr__("max_step",max(utt_length))
    self.encoder_decoder_models.define_encoder_decoder()
else:
    self.tensorflow_models.define_feedforward_model()
#### train the model ####
print('training...')
if self.sequential_training:
    ### Train feedforward model ###
    self.tensorflow_models.train_sequence_model(train_x, train_y, batch_size=self.batch_size, num_of_epochs=self.num_of_epochs, shuffle_data=self.shuffle_data,utt_length=utt_length)

elif self.encoder_decoder:
            
self.encoder_decoder_models.train_encoder_decoder_model(train_x,train_y,batch_size=self.batch_size,num_of_epochs=self.num_of_epochs,shuffle_data=True,utt_length=utt_length)
else:
    self.tensorflow_models.train_feedforward_model(train_x, train_y, batch_size=self.batch_size, num_of_epochs=self.num_of_epochs, shuffle_data=self.shuffle_data)

3 src/tensorflow_lib/train.pysrc/tensorflow_lib/train.pyのクラスはsrc/tensorflow_lib/model.pyのクラスを継承したもの。src/tensorflow_lib/model.pyのメソッドでモデルを定義、src/tensorflow_lib/train.pyのメソッドで学習を行う。モデルの種類によって実行するメソッドが変わってくる。