1. ホーム
  2. python

[解決済み] TensorFlowでCSVデータを実際に読み込むには?

2023-05-08 15:06:31

質問

私はTensorFlowの世界では比較的新しい人です。 実際に は、CSVデータをTensorFlowで使える例/ラベルテンソルに読み替える。の例では TensorFlowチュートリアルのCSVデータ読み込みの例 の例は、かなり断片的で、CSVデータで学習できるようになるまでの道のりの一部しかありません。

以下は、そのCSVチュートリアルを基に、私がつなぎ合わせたコードです。

from __future__ import print_function
import tensorflow as tf

def file_len(fname):
    with open(fname) as f:
        for i, l in enumerate(f):
            pass
    return i + 1

filename = "csv_test_data.csv"

# setup text reader
file_length = file_len(filename)
filename_queue = tf.train.string_input_producer([filename])
reader = tf.TextLineReader(skip_header_lines=1)
_, csv_row = reader.read(filename_queue)

# setup CSV decoding
record_defaults = [[0],[0],[0],[0],[0]]
col1,col2,col3,col4,col5 = tf.decode_csv(csv_row, record_defaults=record_defaults)

# turn features back into a tensor
features = tf.stack([col1,col2,col3,col4])

print("loading, " + str(file_length) + " line(s)\n")
with tf.Session() as sess:
  tf.initialize_all_variables().run()

  # start populating filename queue
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  for i in range(file_length):
    # retrieve a single instance
    example, label = sess.run([features, col5])
    print(example, label)

  coord.request_stop()
  coord.join(threads)
  print("\ndone loading")

そして、読み込んだCSVファイルからの簡単な例です。非常に基本的なデータで、4つの特徴カラムと1つのラベルカラムがあります。

0,0,0,0,0
0,15,0,0,0
0,30,0,0,0
0,45,0,0,0

上のコードがやっていることは、すべて CSVファイルから各例を1つずつ表示します。 というもので、いい感じではあるのですが、トレーニングには全く使えません。

ここで悩むのは、1つずつ読み込まれた個々の例を、実際にどのようにトレーニングデータセットにするのかということです。たとえば これはノートブックです UdacityのDeep Learningコースで取り組んでいたものです。私は基本的に、読み込んだCSVデータを次のようなものに取り込みたいと考えています。 train_dataset そして train_labels :

def reformat(dataset, labels):
  dataset = dataset.reshape((-1, image_size * image_size)).astype(np.float32)
  # Map 2 to [0.0, 1.0, 0.0 ...], 3 to [0.0, 0.0, 1.0 ...]
  labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32)
  return dataset, labels
train_dataset, train_labels = reformat(train_dataset, train_labels)
valid_dataset, valid_labels = reformat(valid_dataset, valid_labels)
test_dataset, test_labels = reformat(test_dataset, test_labels)
print('Training set', train_dataset.shape, train_labels.shape)
print('Validation set', valid_dataset.shape, valid_labels.shape)
print('Test set', test_dataset.shape, test_labels.shape)

を使ってみました。 tf.train.shuffle_batch を使ってみましたが、どうにもこうにもハングアップしてしまいます。

  for i in range(file_length):
    # retrieve a single instance
    example, label = sess.run([features, colRelevant])
    example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=file_length, capacity=file_length, min_after_dequeue=10000)
    print(example, label)

では、まとめると、私の質問は以下の通りです。

  • このプロセスについて、私は何を見逃しているのでしょうか?
    • 入力パイプラインを適切に構築する方法について、私が見逃している重要な直観があるように感じられます。
  • CSVファイルの長さを知らなくてもいい方法はありますか?
    • 処理したい行数を知っていなければならないのは、かなり不便な感じがします( for i in range(file_length) の行)

編集してください。 Yaroslavが、私がここで命令文とグラフ構築の部分を混同している可能性があると指摘するとすぐに、それは明確になり始めました。これは、CSVからモデルを学習するときに通常行われるものに近いと思います(モデルの学習コードを除く)。

from __future__ import print_function
import numpy as np
import tensorflow as tf
import math as math
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('dataset')
args = parser.parse_args()

def file_len(fname):
    with open(fname) as f:
        for i, l in enumerate(f):
            pass
    return i + 1

def read_from_csv(filename_queue):
  reader = tf.TextLineReader(skip_header_lines=1)
  _, csv_row = reader.read(filename_queue)
  record_defaults = [[0],[0],[0],[0],[0]]
  colHour,colQuarter,colAction,colUser,colLabel = tf.decode_csv(csv_row, record_defaults=record_defaults)
  features = tf.stack([colHour,colQuarter,colAction,colUser])  
  label = tf.stack([colLabel])  
  return features, label

def input_pipeline(batch_size, num_epochs=None):
  filename_queue = tf.train.string_input_producer([args.dataset], num_epochs=num_epochs, shuffle=True)  
  example, label = read_from_csv(filename_queue)
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch(
      [example, label], batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch

file_length = file_len(args.dataset) - 1
examples, labels = input_pipeline(file_length, 1)

with tf.Session() as sess:
  tf.initialize_all_variables().run()

  # start populating filename queue
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  try:
    while not coord.should_stop():
      example_batch, label_batch = sess.run([examples, labels])
      print(example_batch)
  except tf.errors.OutOfRangeError:
    print('Done training, epoch reached')
  finally:
    coord.request_stop()

  coord.join(threads) 

どのように解決するのですか?

命令文の部分とグラフ構築の部分が混在しているような気がします。操作 tf.train.shuffle_batch は新しいキューノードを作成し、1つのノードでデータセット全体を処理することができます。だから、あなたがぶら下がっているのは、あなたが大量に作った shuffle_batch のキューを大量に作成し、それらのキューランナーを開始しなかったためにハングアップしているのだと思います。

通常のインプットパイプラインの使い方はこのような感じです。

  1. のようなノードを追加します。 shuffle_batch のようなノードを入力パイプラインに追加します。
  2. (オプション、意図しないグラフの変更を防ぐため) グラフを確定する

--- グラフ構築の終了、命令型プログラミングの開始--。

  1. tf.start_queue_runners
  2. while(True): session.run()

よりスケーラブルにするために(Python GILを避けるために)、TensorFlowパイプラインを使用してすべてのデータを生成することができます。しかし、パフォーマンスが重要でない場合、numpy配列を入力パイプラインにフックするために slice_input_producer. 以下は、いくつかの Print ノードで、何が起こっているかを見てみましょう (メッセージは Print のメッセージはノードが実行されると標準出力に出力されます)

tf.reset_default_graph()

num_examples = 5
num_features = 2
data = np.reshape(np.arange(num_examples*num_features), (num_examples, num_features))
print data

(data_node,) = tf.slice_input_producer([tf.constant(data)], num_epochs=1, shuffle=False)
data_node_debug = tf.Print(data_node, [data_node], "Dequeueing from data_node ")
data_batch = tf.batch([data_node_debug], batch_size=2)
data_batch_debug = tf.Print(data_batch, [data_batch], "Dequeueing from data_batch ")

sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.get_default_graph().finalize()
tf.start_queue_runners()

try:
  while True:
    print sess.run(data_batch_debug)
except tf.errors.OutOfRangeError as e:
  print "No more inputs."

このように表示されるはずです。

[[0 1]
 [2 3]
 [4 5]
 [6 7]
 [8 9]]
[[0 1]
 [2 3]]
[[4 5]
 [6 7]]
No more inputs.

8,9,quot;の数字はフルバッチで埋まらなかったので、生産されなかったのです。また tf.Print は sys.stdout に出力されるため、私の場合はターミナルで別々に表示されます。

追記: 最小限の接続で batch を手動で初期化したキューに接続する最小限のものは github issue 2193 にあります。

また、デバッグのために timeout をセットして、IPythonノートブックが空のキューでハングしないようにすることもできます。私はこのヘルパー関数をセッションに使っています。

def create_session():
  config = tf.ConfigProto(log_device_placement=True)
  config.gpu_options.per_process_gpu_memory_fraction=0.3 # don't hog all vRAM
  config.operation_timeout_in_ms=60000   # terminate on long hangs
  # create interactive session to register a default session
  sess = tf.InteractiveSession("", config=config)
  return sess

スケーラビリティに関する注意事項。

  1. tf.constant は、グラフにデータのコピーをインラインで表示します。Graphの定義サイズには2GBという基本的な制限があるため、データサイズの上限となる
  2. この制限を回避するために v=tf.Variable を使い、そこにデータを保存することで、制限を回避することができます。 v.assign_op を実行し tf.placeholder を右辺に置き、numpy配列をプレースホルダーに送り込む( feed_dict )
  3. これでもまだデータのコピーが2つ作成されるので、メモリを節約するために、独自のバージョンで slice_input_producer を使用して、numpy 配列を操作し、一度に一行ずつアップロードすることができます。 feed_dict