DASK==延迟对象delayed
官方文档地址
Dask Delayed — Dask documentation
总结为:
输出函数延时
dataframe延时
统一compute
代码:
import dask.dataframe as dd
import fsspec
import pandas as pd
from dask.delayed import delayed
import os# 创建一个模拟的 _write_csv 函数
def _write_csv(df, fil, *, depend_on=None, **kwargs):print(df)print(fil)print(kwargs)with fil as f:df.to_csv(f, **kwargs)return os.path.normpath(fil.path)if __name__ == '__main__':# 创建测试数据data = {'A': range(10),'B': range(10, 20)}df = pd.DataFrame(data)# 将 Pandas DataFrame 转为 Dask DataFrame,分区设置为2print('dd.from_pandas(df, npartitions=2)')dask_df = dd.from_pandas(df, npartitions=2)print(dask_df)print('dd.from_pandas(df, npartitions=2)')print('dfs = dask_df.to_delayed()')dfs = dask_df.to_delayed()print(dfs)print('dfs = dask_df.to_delayed()')# 设置文件名first_file = fsspec.open('output1.csv', 'w')second_file = fsspec.open('output2.csv', 'w')files = [first_file, second_file]to_csv_chunk = delayed(_write_csv, pure=False)print(to_csv_chunk)kwargs = {}values = [to_csv_chunk(dfs[0], files[0], **kwargs)]values.extend([to_csv_chunk(d, f, **kwargs) for d, f in zip(dfs[1:], files[1:])])print(values)# 执行计算并保存文件import daskcompute_kwargs = {}aa = list(dask.compute(*values, **compute_kwargs))print(aa)