1. ホーム
  2. tensorflow

[解決済み] 入力パイプラインは、keras.utils.Sequenceオブジェクトまたはtf.data.Datasetを使用しますか?

2022-02-14 05:28:56

質問

現在、私は tf.keras.utils.Sequence オブジェクトを使用して、CNN 用の画像バッチを生成します。Tensorflow 2.2を使っていて Model.fit メソッドでモデル化します。 モデルをフィットさせる際、以下のような警告が各エポックに投げられます。 use_multiprocessing=Truetf.keras.model.fit(...) :

WARNING:tensorflow:multiprocessing can interact badly with TensorFlow,
 causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended

このモデルは、ドキュメントから予想されるとおり、うまく最適化されています。 Sequence -ベースのジェネレータです。しかし、もし use_multiprocessing の代わりに非推奨の機能になりそうです。 tf.data オブジェクトを使用する場合は、最新の入力パイプラインを使用するようにしたいと思います。現在、私は次のようなものを使っています。 tf.keras.utils.Sequence 大規模なデータセットを分割するためのグッドプラクティスに関するこの記事に触発された - ベースのジェネレータ。 https://stanford.edu/~shervine/blog/keras-how to generate-data-on-the-fly (ケラス・ハウ・トゥ・ジェネレート・データ・オン・ザ・フライ)

class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, list_IDs, labels, data_dir, batch_size=32, dim=(128,128), n_channels=1,
             n_classes=2, shuffle=True, **augmentation_kwargs):
    'Initialization'
    self.dim = dim
    self.batch_size = batch_size
    self.labels = labels
    self.list_IDs = list_IDs
    self.data_dir = data_dir
    self.n_channels = n_channels
    self.n_classes = n_classes
    self.shuffle = shuffle
    self.on_epoch_end()
    self.augmentor = keras.preprocessing.image.ImageDataGenerator(**augmentation_kwargs)

def __len__(self):
    'Denotes the number of batches per epoch'
    return int(np.floor(len(self.list_IDs) / self.batch_size))

def __getitem__(self, index):
    'Generate one batch of data'
    # Generate indexes of the batch
    indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

    # Find list of IDs
    list_IDs_temp = [self.list_IDs[k] for k in indexes]

    # Generate data
    X, y = self.__data_generation(list_IDs_temp)

    return X, y

def on_epoch_end(self):
    'Updates indexes after each epoch'
    self.indexes = np.arange(len(self.list_IDs))
    if self.shuffle == True:
        np.random.shuffle(self.indexes)

def __data_generation(self, list_IDs_temp):
    'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
    # Initialization
    X = np.empty((self.batch_size, *self.dim))
    y = np.empty((self.batch_size), dtype=int)

    # Generate data
    for i, ID in enumerate(list_IDs_temp):

        # Store sample
        X[i,] = np.load(self.data_dir +'/{}_stars.npy'.format(ID))

        # Store class
        y[i] = self.labels[ID]

    # Reshape and apply augmentation to sample
    X,y = self.augmentor.flow(X.reshape(self.batch_size,*self.dim,1),y=y,
                              shuffle=False,batch_size=self.batch_size)[0]

    return X, y

すべてのクラスのデータは data_dir ディレクトリに格納され、個々の .npy ファイルを作成します。IDは、文字列のリストから得られます。クラスラベルはIDをキーとする辞書から取得されます -- 記事のように。

の直感がとても好きです。 Sequence ジェネレータのセットアップ。また、ランダムなバッチを生成して、期待通りの動作をしているかどうかを簡単にチェックすることができます。しかし、このセットアップをどのように再現すればよいのでしょうか? tf.data ? のマルチプロセシングバッチ生成を再現するにはどうしたらいいですか? Sequence ジェネレータを interleaveprefetch のメソッドは tf.data.Dataset ? そして/または、私は単にこの Sequence -ベースのジェネレータで tf.data.Dataset.from_generator() というメソッドを使うのですか?

よろしくお願いします。

解決方法は?

回答が遅くなるかもしれませんが、私がやったことで、うまくいきました。 1-私のクラスはそのようなものでした。

class DataGen(Sequence):
    def __init__(self, df, sr=8000, seconds=3, batch_size=16, shuffle=True):
        self.files = np.array(df.filepath)
        self.label = np.array(df.label)
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.sr = sr
        self.seconds = seconds
        self.dim = self.sr*self.seconds
        self.on_epoch_end()
    
    def __len__():
        return len(self.label)//self.batch_size
    
    def __getitem__(self, x):
        indexs = self.indexs[np.arange(x, x+self.batch_size)]
        return self.__getBatch__(indexs)
        
    def __getBatch__(self, indexs):
        X, y = [], []
        for i in indexs:
            wav = self.__loadFile__(self.files[i])
            X.append(librosa.feature.mfcc(wav, self.sr).T)
            y.append(self.label[i])
        return tf.convert_to_tensor(X), to_categorical(y, num_classes=2)
        
    def __loadFile__(self, file):
        y, sr = librosa.load(file, sr=8000, mono=True)
        if len(y)>self.dim:
            return y[:self.dim]
        return np.pad(y, (0, self.dim-len(y)), 'constant', constant_values=0)
        
    def on_epoch_end(self):
        self.indexs = np.arange(len(self.label))
        if self.shuffle:
            np.random.shuffle(self.indexs)

2-より、followのような関数に変更します。

def gen(sr=8000, seconds=3, batch_size=16, shuffle=True):
    dim = sr*seconds
    def loadFile(file):
        wav, _ = librosa.load(file, sr=sr, mono=True)
        if len(wav)>dim:
            return wav[:dim]
        return np.pad(wav, (0, dim-len(wav)), 'constant', constant_values=0)
    
    while True:
        indexs = np.arange(len(df))
        if shuffle:
            np.random.shuffle(indexs)
        
        for x in range(len(df)//batch_size):
            X, y = [], []
            for i in indexs[np.arange(x*batch_size, (x+1)*batch_size)]:
                X.append(librosa.feature.mfcc(loadFile(df.filepath[i]), sr).T)
                y.append(df.label[i])
                
            yield tf.convert_to_tensor(X), to_categorical(y, num_classes=2)

3-で問題なく動作します。

dataset = tf.data.Dataset.from_generator(gen, (tf.dtypes.float32, tf.dtypes.int32))