当前位置: 首页 > news >正文

DASK==延迟对象delayed

官方文档地址

Dask Delayed — Dask documentation

总结为:

输出函数延时

dataframe延时

统一compute

代码:

import dask.dataframe as dd
import fsspec
import pandas as pd
from dask.delayed import delayed
import os# 创建一个模拟的 _write_csv 函数
def _write_csv(df, fil, *, depend_on=None, **kwargs):print(df)print(fil)print(kwargs)with fil as f:df.to_csv(f, **kwargs)return os.path.normpath(fil.path)if __name__ == '__main__':# 创建测试数据data = {'A': range(10),'B': range(10, 20)}df = pd.DataFrame(data)# 将 Pandas DataFrame 转为 Dask DataFrame,分区设置为2print('dd.from_pandas(df, npartitions=2)')dask_df = dd.from_pandas(df, npartitions=2)print(dask_df)print('dd.from_pandas(df, npartitions=2)')print('dfs = dask_df.to_delayed()')dfs = dask_df.to_delayed()print(dfs)print('dfs = dask_df.to_delayed()')# 设置文件名first_file = fsspec.open('output1.csv', 'w')second_file = fsspec.open('output2.csv', 'w')files = [first_file, second_file]to_csv_chunk = delayed(_write_csv, pure=False)print(to_csv_chunk)kwargs = {}values = [to_csv_chunk(dfs[0], files[0], **kwargs)]values.extend([to_csv_chunk(d, f, **kwargs) for d, f in zip(dfs[1:], files[1:])])print(values)# 执行计算并保存文件import daskcompute_kwargs = {}aa = list(dask.compute(*values, **compute_kwargs))print(aa)


http://www.mrgr.cn/news/18238.html

相关文章:

  • 日常实习【面试记录】
  • [CyberSpace‘24] Crypto
  • 物联网之MQTT
  • Java中Object的常用方法
  • zeppline如何配置用户登陆
  • 0基础轻松玩转.NET Web API 8.0【CICD】项目实战
  • 利用衍射进行材料分析--Muad
  • ASP.NET Core 入门教学六 异常设置
  • vscode上传自己开发的npm包
  • [java][代码]java操作XML代码
  • 【Spring Boot-Spring Boot配置文件分类】
  • 正向代理、反向代理
  • 一文弄懂PCIe总线
  • 3. MyBatis 执行原理了解吗?
  • 【C语言】十六进制、二进制、字节、位、指针、数组
  • K8s搭建过程,新手闭眼入!!!超详细教程
  • Win10桌面出现Removable Storage Devices文件夹无法删除
  • python3 whl怎么安装
  • 搭建nmt部署考试系统
  • 【数据结构】Map的使用与注意事项