apache-spark - Spark结构化流写入实木复合地板会创建许多文件

原文 标签 apache-spark spark-streaming parquet

Spark Structured Streaming writing to parquet creates so many files

I used structured streaming to load messages from kafka, do some aggreation then write to parquet file. The problem is that there are so many parquet files created (800 files) for only 100 messages from kafka.

The aggregation part is:

return model
            .withColumn("timeStamp", col("timeStamp").cast("timestamp"))
            .withWatermark("timeStamp", "30 seconds")
            .groupBy(window(col("timeStamp"), "5 minutes"))
            .agg(
                count("*").alias("total"));

The query:

StreamingQuery query = result //.orderBy("window")
            .writeStream()
            .outputMode(OutputMode.Append())
            .format("parquet")
            .option("checkpointLocation", "c:\\bigdata\\checkpoints")
            .start("c:\\bigdata\\parquet");

When loading one of the parquet file using spark, it shows empty

+------+-----+
|window|total|
+------+-----+
+------+-----+

How can I save the dataset to only one parquet file? Thanks

Answer

My idea was to use Spark Structured Streaming to consume events from Azure Even Hub then store them on storage in a parquet format.

I finally figured out how to deal with many small files created. Spark version 2.4.0.

This how my query looks like

dfInput
  .repartition(1, col('column_name'))
  .select("*")
  .writeStream
  .format("parquet")
  .option("path", "adl://storage_name.azuredatalakestore.net/streaming")
  .option("checkpointLocation", "adl://storage_name.azuredatalakestore.net/streaming_checkpoint")
  .trigger(processingTime='480 seconds')
  .start()

As a result, I have one file created on a storage location every 480 seconds. To figure out the balance between file size and number of files to avoid OOM error, just play with two parameters: number of partitions and processingTime, which means the batch interval.

I hope you can adjust the solution to your use case.

翻译

我使用结构化流从kafka加载消息,进行一些聚合,然后将其写入镶木地板文件。问题是创建了太多的实木复合地板文件(800个文件),仅用于来自kafka的100条消息。

聚合部分是:

return model
            .withColumn("timeStamp", col("timeStamp").cast("timestamp"))
            .withWatermark("timeStamp", "30 seconds")
            .groupBy(window(col("timeStamp"), "5 minutes"))
            .agg(
                count("*").alias("total"));


查询:

StreamingQuery query = result //.orderBy("window")
            .writeStream()
            .outputMode(OutputMode.Append())
            .format("parquet")
            .option("checkpointLocation", "c:\\bigdata\\checkpoints")
            .start("c:\\bigdata\\parquet");


当使用spark加载一个实木复合地板文件时,它显示为空

+------+-----+
|window|total|
+------+-----+
+------+-----+


如何将数据集仅保存到一个实木复合地板文件中?
谢谢
最佳答案
我的想法是使用Spark结构化流从Azure Even Hub中消耗事件,然后将它们以镶木地板格式存储在存储中。

我终于想出了如何处理创建的许多小文件。
Spark版本2.4.0。

这是我的查询的样子

dfInput
  .repartition(1, col('column_name'))
  .select("*")
  .writeStream
  .format("parquet")
  .option("path", "adl://storage_name.azuredatalakestore.net/streaming")
  .option("checkpointLocation", "adl://storage_name.azuredatalakestore.net/streaming_checkpoint")
  .trigger(processingTime='480 seconds')
  .start()


结果,每480秒在存储位置上创建一个文件。
为了弄清楚文件大小和文件数量之间的平衡以避免OOM错误,只需使用两个参数:分区数和processingTime,这表示批处理间隔。

我希望您可以根据自己的用例调整解决方案。
相关推荐

scala - 如何重组代码以避免警告:“通过创建2元组来适应参数列表”

apache-spark - 访问Spark Mllib二等分K均值树数据

python - 从PySpark中的工作程序节点访问ADLS上二进制文件的最有效方法?

apache-spark - 为什么在本地模式下加入Spark太慢了?

scala - 如何在Spark中为不同的文件名调用单独的逻辑

apache-spark - Spark和InfiniBand

java - 使用Spark SQL时找不到Spark Logging类

apache-spark - Spark 2.0独立模式动态资源分配工作者启动错误

java - 将Json的Dataset列解析为Dataset <Row>

java - Spark数据帧加入范围缓慢