1. ホーム
  2. python

[解決済み] Pythonのマルチプロセッシングで安全にファイルに書き込む

2023-02-03 11:21:51

質問

私は多くのサブ問題を含む大きな数値問題を解決しようとしています。私は異なる独立したサブ問題を異なるコアに分割するためにPythonのマルチプロセッシングモジュール(特にPool.map)を使用しています。各サブ問題は多くのサブ問題の計算を含み、私はこれらの結果を、まだどのプロセスによっても計算されていない場合はファイルに保存し、そうでない場合は計算をスキップしてファイルから結果だけを読み取ることによって、効果的にメモ化しようと試みています。

異なるプロセスが、サブサブ問題がまだ計算されていないかどうか (結果が格納されるファイルを探すことによって) チェックし、まだ計算されていないことを確認し、計算を実行し、そして同時に同じファイルに結果を書き込もうとすることがあります。このような書き込みの衝突を避けるにはどうしたらよいでしょうか?

どのように解決するのですか?

GP89 が良い解決策を挙げています。キューを使用して、ファイルへの単独書き込みアクセス権を持つ専用のプロセスに書き込みタスクを送信します。他のすべてのワーカーは読み取り専用アクセス権を持っています。これにより、衝突をなくすことができます。 以下は apply_async を使用した例ですが、map でも動作します。

import multiprocessing as mp
import time

fn = 'c:/temp/temp.txt'

def worker(arg, q):
    '''stupidly simulates long running process'''
    start = time.clock()
    s = 'this is a test'
    txt = s
    for i in range(200000):
        txt += s 
    done = time.clock() - start
    with open(fn, 'rb') as f:
        size = len(f.read())
    res = 'Process' + str(arg), str(size), done
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    with open(fn, 'w') as f:
        while 1:
            m = q.get()
            if m == 'kill':
                f.write('killed')
                break
            f.write(str(m) + '\n')
            f.flush()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for i in range(80):
        job = pool.apply_async(worker, (i, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()