1. ホーム
  2. python

[解決済み] 複数のプロセスから1つのファイルを処理する

2023-06-07 05:10:44

質問

大きなテキストファイルがあり、その各行を処理してデータベースに格納したい。単一の単純なプログラムはあまりにも時間がかかるので、私はそれを複数のプロセスまたはスレッドを介して実行したい。 各スレッド/プロセスは、その単一のファイルから異なるデータ(異なる行)を読み、データ(行)の部分のいくつかの操作を行い、最終的に私はデータの全体が処理され、私のデータベースは私が必要とするデータでダンプされるように、それらをデータベースに格納する必要があります。

しかし、私はどのようにこれにアプローチするのか、それを理解することができません。

どのように解決するのですか?

Producer/Consumerパターンが必要です。

基本的なスレッディングの例

ここでは、基本的な例として スレッディングモジュール (マルチプロセシングの代わりに)

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

ファイルオブジェクトをスレッドと共有することはないでしょう。スレッドに キュー にデータ行を供給することで、スレッドに仕事をさせます。そして、各スレッドは行をピックアップし、それを処理し、キューに返します。

にはもっと高度な機能が組み込まれています。 マルチプロセッシングモジュール には、データを共有するための、リストや 特殊なQueue . マルチプロセシングとスレッドの使用にはトレードオフがあり、それは作業がCPUに拘束されるかIOに拘束されるかに依存します。

基本的なマルチプロセシング.Poolの例

以下は、マルチプロセシングプールの基本的な例です。

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

プール は、それ自身のプロセスを管理する便利なオブジェクトです。開いているファイルはその行を反復することができるので、それを pool.map() に渡すと、その行をループしてワーカー関数に渡します。 地図 はブロック化し、完了したら結果全体を返します。これは単純化しすぎた例で pool.map() はファイル全体を一度にメモリに読み込んでから処理を行うことに注意してください。大きなファイルを扱う場合は、この点に注意してください。プロデューサー/コンシューマーのセットアップを設計する、より高度な方法があります。

手動による "プール" 制限と行の再ソート付き

これは、手動による プール.マップ の手動例ですが、一度にイテレート全体を消費するのではなく、キューサイズを設定することで、処理可能な速度で断片的にしか供給しないようにできます。また、行番号を追加して、後で必要なときに行を追跡して参照できるようにしました。

from multiprocessing import Process, Manager
import time
import itertools 

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)