[解決済み] Dask: dask delayでどのようにコードを並列化するか?
質問
初めて並列処理に挑戦することになり、Daskについて調べているのですが、実際にコーディングするのに苦労しています。
サンプルとドキュメントを見ましたが、dask.delayedが一番うまくいくと思います。私は自分の関数をdelayed(function_name)でラップしたり、@delayedデコレータを追加しようとしましたが、うまく動作させることができないようです。Daskはpythonで作られていて、その(はずの)シンプルさから、他の方法よりもDaskを好んだのです。daskがforループで動作しないことは知っていますが、ループの中では動作するらしいです。
私のコードは、他の関数への入力を含む関数を介してファイルを渡し、次のようになります。
from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
name = name.split('.')[0]
....
で、いくつかの前処理を行います。
preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)
で、コンストラクタを呼び出して、関数呼び出しにpre_resultsを渡しています。
fc = FunctionCalls()
Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
input_data=pre_result1, model1=pre_result2)
ここでやっていることは、ファイルをforループに渡して、いくつかの前処理をしてから、ファイルを2つのモデルに渡しています。
これを並列化する方法について、感想やヒントをお願いします。私は奇妙なエラーを取得し始め、私はコードを修正する方法を知りませんでした。コードはそのまま動きます。私はたくさんのpandas dataframes、series、およびnumpy配列を使用しており、私は戻ってdask.dataframesなどで動作するようにすべてを変更しないことを希望しています。
コメント中のコードは読みにくいかもしれません。ここでは、よりフォーマット化された方法で説明します。
下のコードで、print(mean_squared_error)と入力しても、ただ出てくるだけです。Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')
from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = delayed(mse)(observed, prediction)
解決方法は?
最終的に結果を計算するために、dask.computeを呼び出す必要があります。 参照 dask.delayed ドキュメント .
シーケンシャルコード
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]
results = []
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1) # isn't this already a dataframe?
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = mse(observed, prediction)
results.append(mean_squared_error)
並列コード
import dask
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]
delayed_results = []
for count, name in enumerate(filenames):
df = dask.delayed(pd.read_csv)(name)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = dask.delayed(mse)(observed, prediction)
delayed_results.append(mean_squared_error)
results = dask.compute(*delayed_results)
関連
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み】Not on FX application thread; currentThread = JavaFX Application Thread エラーを回避する方法は?
-
[解決済み] POSIXシステムでのゾンビスレッド
-
[解決済み] Win32 InterlockedExchange関数はどのような場合に使用するのですか?
-
[解決済み] デッドロックとは何ですか?
-
[解決済み] 初心者のためのアトミック操作とは?
-
[解決済み】「スレッド」とは(本当は)何ですか?)
-
[解決済み】スレッド間で共有されるリソースは何ですか?
-
[解決済み】糸と繊維の違いは何ですか?
-
[解決済み】Redisはシングルスレッドですが、同時I/Oはどのように行うのですか?
-
[解決済み] Re-entrantロックとは何ですか?