当前位置: 首页 > news >正文

【数仓建模过程】Spark数据清洗篇

1. 环境准备

首先,确保你有适当的Spark环境。你可以使用Spark SQL、Spark DataFrame API或Spark RDD API来处理数据。对于数据仓库任务,通常推荐使用Spark SQL或DataFrame API,因为它们更适合结构化数据处理。

2. 数据读取

从ODS层或其他数据源读取数据。Spark支持多种数据源,如Hive表、HDFS文件、关系型数据库等。

from pyspark.sql import SparkSession# 创建SparkSession
spark = SparkSession.builder \.appName("DWD Data Processing") \.enableHiveSupport() \.getOrCreate()# 从ODS层读取数据
ods_data = spark.sql("SELECT * FROM ods.your_table")

2. 数据质量检查

在进行数据清洗之前,我们应该先检查数据质量,以了解数据的现状。

from pyspark.sql.functions import col, count, when, isnan, isnull# 检查每列的空值和NaN值
null_nan_counts = ods_data.select([count(when(isnull(c) | isnan(c), c)).alias(c + '_null_or_nan_count')for c in ods_data.columns
])# 计算空值和NaN的百分比
total_count = ods_data.count()
null_nan_percentages = null_nan_counts.select([(col(c) / total_count * 100).alias(c + '_null_or_nan_percentage')for c in null_nan_counts.columns
])# 显示结果
null_nan_counts.show()
null_nan_percentages.show()# 检查唯一值
unique_counts = ods_data.agg(*[countDistinct(col(c)).alias(c + '_distinct_count') for c in ods_data.columns])
unique_counts.show()

3. 数据清洗和转换

基于数据质量检查的结果,我们可以更有针对性地进行数据清洗和转换。

from pyspark.sql.functions import col, when, count, lit
from pyspark.sql.types import IntegerType# 计算每行的非空值数量
row_counts = ods_data.select([(count(when(~(isnull(c) | isnan(c)), c)) / len(ods_data.columns)).alias('non_null_ratio')for c in ods_data.columns
])# 设定一个阈值,例如要求至少50%的列有非空值
threshold = 0.5# 筛选出满足条件的行
filtered_data = ods_data.withColumn('non_null_ratio', row_counts.non_null_ratio) \.filter(col('non_null_ratio') >= threshold) \.drop('non_null_ratio')print(f"原始数据行数: {ods_data.count()}")
print(f"过滤后数据行数: {filtered_data.count()}")# 处理剩余的空值和NaN
cleaned_data = filtered_datafor column in filtered_data.columns:# 根据数据类型和业务需求选择适当的填充值if filtered_data.schema[column].dataType == IntegerType():cleaned_data = cleaned_data.fillna({column: 0})else:cleaned_data = cleaned_data.fillna({column: "Unknown"})# 数据类型转换(如果需要)
converted_data = cleaned_data.withColumn("age", col("age").cast(IntegerType()))# 条件转换
processed_data = converted_data.withColumn("age_group", when(col("age") < 18, "Under 18").when((col("age") >= 18) & (col("age") < 60), "Adult").otherwise("Senior")
)## 4. 清洗后的数据质量检查在数据清洗和转换之后,我们应该再次检查数据质量,以确保我们的处理达到了预期效果。```python
# 再次检查空值和NaN(这次应该没有空值了)
post_clean_null_nan_counts = processed_data.select([count(when(isnull(c) | isnan(c), c)).alias(c + '_null_or_nan_count')for c in processed_data.columns
])post_clean_null_nan_counts.show()# 再次检查唯一值
post_clean_unique_counts = processed_data.agg(*[countDistinct(col(c)).alias(c + '_distinct_count') for c in processed_data.columns])
post_clean_unique_counts.show()

3. 数据清洗和转换

对数据进行必要的清洗和转换操作,以符合DWD层的要求。这可能包括处理空值、数据类型转换、去重等。

from pyspark.sql.functions import col, when, count, lit, length, to_timestamp, window, sum, from_unixtime, unix_timestamp
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.sql.window import Window# 步骤1: 转换时间戳格式
filtered_data = ods_data.withColumn("timestamp",to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")
)# 确保时间戳转换成功
filtered_data = filtered_data.filter(col("timestamp").isNotNull())print(f"时间戳转换后,数据行数: {filtered_data.count()}")# 步骤2: 删除只有user_id,其他列全为null的数据
columns_except_user_id = [c for c in filtered_data.columns if c != 'user_id']
filtered_data = filtered_data.filter(sum([col(c).isNotNull() for c in columns_except_user_id]) > 0)print(f"删除只有user_id的行后,数据行数: {filtered_data.count()}")# 步骤3: 处理user_id在同一分钟内出现多次且状态没有变化的情况
window_spec = Window.partitionBy('user_id', window('timestamp', '1 minute'))
duplicate_data = filtered_data.withColumn('count', count('user_id').over(window_spec)) \.withColumn('status_changed', sum(col('status').cast('int')).over(window_spec) > 0) \.filter((col('count') > 1) & (col('status_changed') == False))deduplicated_data = filtered_data.join(duplicate_data, ['user_id', 'timestamp'], 'left_anti')print(f"去重后,数据行数: {deduplicated_data.count()}")# 步骤4: 处理数据格式错误
cleaned_data = deduplicated_data.filter((length(col('name')) <= 20) &  # 姓名不超过20个字符(col('age').between(0, 120)) &  # 年龄在0到120之间(col('email').rlike('^[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,4}$'))  # 邮箱格式检查
)print(f"处理数据格式错误后,数据行数: {cleaned_data.count()}")# 步骤5: 处理剩余的空值和NaN
for column in cleaned_data.columns:if cleaned_data.schema[column].dataType == IntegerType():cleaned_data = cleaned_data.fillna({column: 0})elif cleaned_data.schema[column].dataType == TimestampType():# 对于时间戳类型,我们可以选择填充为一个特定的日期,或者保持为null# 这里我们选择保持为nullpasselse:cleaned_data = cleaned_data.fillna({column: "Unknown"})# 步骤6: 数据类型转换和条件转换
converted_data = cleaned_data.withColumn("age", col("age").cast(IntegerType()))processed_data = converted_data.withColumn("age_group", when(col("age") < 18, "Under 18").when((col("age") >= 18) & (col("age") < 60), "Adult").otherwise("Senior")
)# 步骤7: 删除空值比例过高的行
row_counts = processed_data.select([(count(when(~(col(c).isNull() | isnan(col(c))), c)) / len(processed_data.columns)).alias('non_null_ratio')for c in processed_data.columns
])threshold = 0.5
final_data = processed_data.withColumn('non_null_ratio', row_counts.non_null_ratio) \.filter(col('non_null_ratio') >= threshold) \.drop('non_null_ratio')print(f"最终数据行数: {final_data.count()}")# 显示最终数据的样本,包括转换后的时间戳
final_data.select("user_id", "timestamp", "name", "age", "email", "status").show(5, truncate=False)

4. 数据质量检查

在导入DWD层之前,进行数据质量检查是很重要的。

# 示例:检查空值比例
null_counts = processed_data.select([sum(col(c).isNull().cast("int")).alias(c) for c in processed_data.columns])
null_percentages = null_counts.select([((col(c) / processed_data.count()) * 100).alias(c) for c in null_counts.columns])# 示例:检查唯一值
unique_counts = processed_data.agg(*[countDistinct(col(c)).alias(c) for c in processed_data.columns])

5. 数据写入

将处理后的数据写入DWD层。通常,DWD层的数据会以Hive表的形式存储。

# 写入Hive表
processed_data.write \.mode("overwrite") \.format("parquet") \.saveAsTable("dwd.your_dwd_table")

6. 数据分区

对于大型表,考虑使用分区来优化查询性能。

# 按日期分区写入
processed_data.write \.partitionBy("date") \.mode("overwrite") \.format("parquet") \.saveAsTable("dwd.your_dwd_table")

7. 处理增量数据

在实际场景中,我们只需要处理增量数据而不是每次全量处理。

# 假设我们有一个最后处理时间
last_process_time = get_last_process_time()# 只处理新数据
new_data = spark.sql(f"SELECT * FROM ods.your_table WHERE update_time > '{last_process_time}'")# 处理新数据...# 将新数据追加到DWD表
new_processed_data.write \.mode("append") \.format("parquet") \.saveAsTable("dwd.your_dwd_table")# 更新处理时间
update_last_process_time(current_timestamp)

8. 错误处理和日志记录

确保添加适当的错误处理和日志记录,以便于故障排除和监控。

import logginglogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)try:# 你的Spark处理代码logger.info("Data processing completed successfully")
except Exception as e:logger.error(f"Error occurred during data processing: {str(e)}")raise

9. 优化性能

根据需要使用Spark的优化技术,如缓存频繁使用的数据、调整分区等。

# 缓存频繁使用的数据
processed_data.cache()# 重分区以优化并行度
optimized_data = processed_data.repartition(100)

10. 数据血缘和元数据管理

记录数据的血缘关系和元数据,这对于数据治理和追踪非常重要。

# 示例:记录元数据
metadata = {"source_table": "ods.your_table","target_table": "dwd.your_dwd_table","process_date": current_date,"columns_processed": processed_data.columns
}
log_metadata(metadata)

通过以上步骤,你可以使用Spark有效地处理数据并将其导入DWD层。记住,实际的实现可能需要根据你的具体需求和数据特性进行调整。同时,定期监控和优化你的Spark作业以确保其效率和可靠性也是很重要的。


http://www.mrgr.cn/news/15137.html

相关文章:

  • FPGA中的存储器--学习笔记
  • MySQL集群
  • 深度学习基础—迁移学习、多任务学习和端对端学习
  • OpenGL笔记二十之深度检测概念
  • 常见的服务器容器和漏洞类型汇总
  • 深度学习示例1-全零通道的 MNIST 数据训练模型
  • 【matlab】数组操作:寻找最大值和最小值及其位置ind2sub函数
  • 2024 年的 Web3 游戏:演变、趋势和市场动态
  • 自然语言处理系列四十六》Elasticsearch搜索引擎》Elasticsearch安装部署和使用
  • MariaDB 和 MySQL 版本关联
  • 双向链表的复杂操作、内核链表、栈
  • 一个批量爬取微博数据的神器
  • milvus资源限制 benchmarker压测 qps优化
  • 相机SD卡格式化了怎么恢复?
  • Flask+LayUI开发手记(五):树型表格实现数据展示与编辑
  • 无人机的工业应用场景
  • Adobe Illustrator学习宝典(自用)
  • 【dp力扣】买卖股票的最佳时机III
  • iOS/iPadOS18.1Beta3发布,新增通知摘要和AI消除功能
  • 基于web旅游信息平台的设计与实现