[解決済み] 複数のプロセスから1つのファイルを処理する
質問
大きなテキストファイルがあり、その各行を処理してデータベースに格納したい。単一の単純なプログラムはあまりにも時間がかかるので、私はそれを複数のプロセスまたはスレッドを介して実行したい。 各スレッド/プロセスは、その単一のファイルから異なるデータ(異なる行)を読み、データ(行)の部分のいくつかの操作を行い、最終的に私はデータの全体が処理され、私のデータベースは私が必要とするデータでダンプされるように、それらをデータベースに格納する必要があります。
しかし、私はどのようにこれにアプローチするのか、それを理解することができません。
どのように解決するのですか?
Producer/Consumerパターンが必要です。
基本的なスレッディングの例
ここでは、基本的な例として スレッディングモジュール (マルチプロセシングの代わりに)
import threading
import Queue
import sys
def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
# process
result = item
out_queue.put(result)
in_queue.task_done()
if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20
# start for workers
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()
# produce data
for i in xrange(total):
work.put(i)
work.join()
# get the results
for i in xrange(total):
print results.get()
sys.exit()
ファイルオブジェクトをスレッドと共有することはないでしょう。スレッドに キュー にデータ行を供給することで、スレッドに仕事をさせます。そして、各スレッドは行をピックアップし、それを処理し、キューに返します。
にはもっと高度な機能が組み込まれています。 マルチプロセッシングモジュール には、データを共有するための、リストや 特殊なQueue . マルチプロセシングとスレッドの使用にはトレードオフがあり、それは作業がCPUに拘束されるかIOに拘束されるかに依存します。
基本的なマルチプロセシング.Poolの例
以下は、マルチプロセシングプールの基本的な例です。
from multiprocessing import Pool
def process_line(line):
return "FOO: %s" % line
if __name__ == "__main__":
pool = Pool(4)
with open('file.txt') as source_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(process_line, source_file, 4)
print results
プール
は、それ自身のプロセスを管理する便利なオブジェクトです。開いているファイルはその行を反復することができるので、それを
pool.map()
に渡すと、その行をループしてワーカー関数に渡します。
地図
はブロック化し、完了したら結果全体を返します。これは単純化しすぎた例で
pool.map()
はファイル全体を一度にメモリに読み込んでから処理を行うことに注意してください。大きなファイルを扱う場合は、この点に注意してください。プロデューサー/コンシューマーのセットアップを設計する、より高度な方法があります。
手動による "プール" 制限と行の再ソート付き
これは、手動による プール.マップ の手動例ですが、一度にイテレート全体を消費するのではなく、キューサイズを設定することで、処理可能な速度で断片的にしか供給しないようにできます。また、行番号を追加して、後で必要なときに行を追跡して参照できるようにしました。
from multiprocessing import Process, Manager
import time
import itertools
def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item
# exit signal
if line == None:
return
# fake work
time.sleep(.5)
result = (line_no, line)
out_list.append(result)
if __name__ == "__main__":
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)
# produce data
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
for p in pool:
p.join()
# get the results
# example: [(1, "foo"), (10, "bar"), (0, "start")]
print sorted(results)
関連
-
[解決済み】クロススレッド操作が有効でない。作成されたスレッド以外のスレッドからアクセスされたコントロール
-
[解決済み] 複数の例外を1行でキャッチする(ブロックを除く)
-
[解決済み] ファイルのコピー方法について教えてください。
-
[解決済み] Pythonでファイルやフォルダを削除する方法は?
-
[解決済み] pandasを使った "大量データ "ワークフロー【終了しました
-
[解決済み] 2次元アレイにおけるピーク検出
-
[解決済み] Pythonで大きなファイルを読み込むための遅延メソッド?
-
[解決済み】2つの辞書を1つの式でマージする(辞書の和をとる)には?)
-
[解決済み] SQLAlchemy - テーブルのリストを取得する
-
[解決済み] Python Empty Generator 関数
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み】リストをほぼ同じ長さのN個のパーツに分割する
-
[解決済み] Pythonのキャッシュライブラリはありますか?
-
[解決済み] PILからopenCVフォーマットへの変換
-
[解決済み] dict を txt ファイルに書き、それを読み取る?
-
[解決済み] データフレームをソートした後にインデックスを更新する
-
[解決済み] オブジェクトのリストに特定の属性値を持つオブジェクトが含まれているかどうかをチェックする
-
[解決済み] Django で全てのリクエストヘッダを取得するにはどうすれば良いですか?
-
[解決済み] Pythonによる一対のクロスプロダクト [重複] (英語)
-
[解決済み] if 節の終了方法
-
[解決済み] Pythonの文字列書式をリストで使う