apache-spark - 为什么在本地模式下加入Spark太慢了?

原文 标签 apache-spark pyspark apache-spark-sql spark-dataframe

Why join in spark in local mode is so slow?

I am using spark in local mode and a simple join is taking too long. I have fetched two dataframes: A (8 columns and 2.3 million rows) and B(8 columns and 1.2 million rows) and joining them using A.join(B,condition,'left') and called an action at last. It creates a single job with three stages, each for two dataframes extraction and one for joining. Surprisingly stage with extraction of dataframe A is taking around 8 minutes and that of dataframe B is taking 1 minute. And join happens within seconds. My important configuration settings are:

  1. spark.master local[*]
  2. spark.driver.cores 8
  3. spark.executor.memory 30g
  4. spark.driver.memory 30g
  5. spark.serializer org.apache.spark.serializer.KryoSerializer
  6. spark.sql.shuffle.partitions 16

The only executor is driver itself. While extracting dataframes, i have partitioned it in 32(also tried 16,64,50,100,200) parts. I have seen shuffle write memory to be 100 MB for Stage with dataframe A extraction. So to avoid shuffle i made 16 initial partitions for both dataframes and broadcasted dataframe B(smaller), but it is not helping. There is still shuffle write memory. I have used broadcast(B) syntax for this. Am I doing something wrong? Why shuffling is still there? Also when i see event timelines its showing only four cores are processing at any point of time. Although I have a 2core*4 processor machine.Why is that so?


In short, "Join"<=>Shuffling, the big question here is how uniformly are your data distributed over partitions (see for example https://0x0fff.com/spark-architecture-shuffle/ , https://www.slideshare.net/SparkSummit/handling-data-skew-adaptively-in-spark-using-dynamic-repartitioning and just Google the problem). Few possibilities to improve efficiency:

  • think more about your data (A and B) and partition data wisely;
  • analyze, are your data skewed?;
  • go into UI and look at the tasks timing;
  • choose such keys for partitions that during "join" only few partitions from dataset A shuffle with few partitions of B;



spark.driver.cores 8
spark.executor.memory 30克
spark.driver.memory 30克
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.shuffle.partitions 16

唯一的执行者是驱动程序本身。在提取数据帧时,我将其划分为32个部分(也尝试了16,64,50,100,200个)。我已经看到提取数据帧A的阶段的随机写入内存为100 MB。因此,为避免随机播放,我对数据帧和广播的数据帧B(较小)都做了16个初始分区,但这没有帮助。仍然有随机写入存储器。我为此使用了broadcast(B)语法。难道我做错了什么?为什么改组仍然存在?另外,当我看到事件时间表时,在任何时间点仅显示四个内核正在处理。虽然我有一台2core * 4处理器的机器,为什么会这样呢?
简而言之,“ Join” <=> Shuffling,这里的主要问题是您的数据在分区上的分布情况如何(例如,请参见https://0x0fff.com/spark-architecture-shuffle/https://www.slideshare.net/SparkSummit/handling-data-skew-adaptively-in-spark-using-dynamic-repartitioning和Google)。


scala - 如何在Spark中为不同的文件名调用单独的逻辑

apache-spark - Spark和InfiniBand

java - 使用Spark SQL时找不到Spark Logging类

apache-spark - Spark 2.0独立模式动态资源分配工作者启动错误

java - 将Json的Dataset列解析为Dataset <Row>

java - Spark数据帧加入范围缓慢

scala - 计算Spark中UDF的调用

scala - 火花一次输出到kafka

apache-spark - 使用Spark Streaming读取Kafka记录时未序列化异常

python - 如何使用PySpark进行嵌套的for-each循环