1. ホーム
  2. python

[解決済み] 複数のプロセスから共有カウンタをインクリメントするには?

2023-04-05 23:31:51

質問

私は multiprocessing モジュールに問題があります。私は Pool を持つワーカーの map メソッドを使って、たくさんのファイルを同時に分析することができます。ファイルが処理されるたびにカウンタが更新され、あと何ファイル処理できるかを追跡できるようにしたいと思います。以下はサンプルコードです。

import os
import multiprocessing

counter = 0


def analyze(file):
    # Analyze the file.
    global counter
    counter += 1
    print counter


if __name__ == '__main__':
    files = os.listdir('/some/directory')
    pool = multiprocessing.Pool(4)
    pool.map(analyze, files)

これに対する解決策が見当たりません。

どのように解決するのですか?

問題は counter 変数がプロセス間で共有されていないことです。各個別のプロセスがそれ自身のローカルインスタンスを作成し、それをインクリメントしています。

参照 このセクション を参照してください。 あなたの場合、プロセス間で状態を共有するために Value のインスタンスをワーカー間で共有したいかもしれません。

この例の作業バージョンです (いくつかのダミー入力データ付き)。 グローバルな値を使用していますが、これは実際には避けたいことです。

from multiprocessing import Pool, Value
from time import sleep

counter = None

def init(args):
    ''' store the counter for later use '''
    global counter
    counter = args

def analyze_data(args):
    ''' increment the global counter, do something with the input '''
    global counter
    # += operation is not atomic, so we need to get a lock:
    with counter.get_lock():
        counter.value += 1
    print counter.value
    return args * 10

if __name__ == '__main__':
    #inputs = os.listdir(some_directory)

    #
    # initialize a cross-process counter and the input lists
    #
    counter = Value('i', 0)
    inputs = [1, 2, 3, 4]

    #
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    #
    p = Pool(initializer = init, initargs = (counter, ))
    i = p.map_async(analyze_data, inputs, chunksize = 1)
    i.wait()
    print i.get()