1. ホーム
  2. multithreading

[解決済み] Dask: dask delayでどのようにコードを並列化するか?

2022-02-08 08:05:46

質問

初めて並列処理に挑戦することになり、Daskについて調べているのですが、実際にコーディングするのに苦労しています。

サンプルとドキュメントを見ましたが、dask.delayedが一番うまくいくと思います。私は自分の関数をdelayed(function_name)でラップしたり、@delayedデコレータを追加しようとしましたが、うまく動作させることができないようです。Daskはpythonで作られていて、その(はずの)シンプルさから、他の方法よりもDaskを好んだのです。daskがforループで動作しないことは知っていますが、ループの中では動作するらしいです。

私のコードは、他の関数への入力を含む関数を介してファイルを渡し、次のようになります。

from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
    name = name.split('.')[0]
    ....

で、いくつかの前処理を行います。

    preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)

で、コンストラクタを呼び出して、関数呼び出しにpre_resultsを渡しています。

    fc = FunctionCalls()
    Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
                             input_data=pre_result1, model1=pre_result2)

ここでやっていることは、ファイルをforループに渡して、いくつかの前処理をしてから、ファイルを2つのモデルに渡しています。

これを並列化する方法について、感想やヒントをお願いします。私は奇妙なエラーを取得し始め、私はコードを修正する方法を知りませんでした。コードはそのまま動きます。私はたくさんのpandas dataframes、series、およびnumpy配列を使用しており、私は戻ってdask.dataframesなどで動作するようにすべてを変更しないことを希望しています。

コメント中のコードは読みにくいかもしれません。ここでは、よりフォーマット化された方法で説明します。

下のコードで、print(mean_squared_error)と入力しても、ただ出てくるだけです。Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')

from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']

for count, name in enumerate(filenames):
    file1 = pd.read_csv(name)
    df = pd.DataFrame(file1)
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = delayed(mse)(observed, prediction)

解決方法は?

最終的に結果を計算するために、dask.computeを呼び出す必要があります。 参照 dask.delayed ドキュメント .

シーケンシャルコード

import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

results = []
for count, name in enumerate(filenames):
    file1 = pd.read_csv(name)
    df = pd.DataFrame(file1)  # isn't this already a dataframe?
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = mse(observed, prediction)  
    results.append(mean_squared_error)

並列コード

import dask
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

delayed_results = []
for count, name in enumerate(filenames):
    df = dask.delayed(pd.read_csv)(name)
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = dask.delayed(mse)(observed, prediction)
    delayed_results.append(mean_squared_error)

results = dask.compute(*delayed_results)