scala - 为什么在使用3000列的DataFrame计数后,spark-shell会打印数千行代码?什么是JaninoRuntimeException和64 KB?

原文 标签 scala apache-spark

Why does spark-shell print thousands lines of code after count on DataFrame with 3000 columns? What's JaninoRuntimeException and 64 KB?

(With spark-2.1.0-bin-hadoop2.7 version from the official website on local machine)

When I executed a simple spark command in spark-shell, it starts to print out thousands and thousands lines of code before throwing an error. What are these "code"?

I was running spark on my local machine. The command I ran was a simple df.count where df is a DataFrame.

Please see a screenshot below (the codes fly by so fast I could only take screenshots to see what's going on). More details are below the image. enter image description here

More details:

I created the data frame df by

val df: DataFrame = spark.createDataFrame(rows, schema)
// rows: RDD[Row]
// schema: StructType
// There were about 3000 columns and 700 rows (testing set) of data in df. 
// The following line ran successfully and returned the correct value
rows.count
// The following line threw exception after printing out tons of codes as shown in the screenshot above
df.count

The exception thrown after the "codes" is:

...
/* 181897 */     apply_81(i);
/* 181898 */     result.setTotalSize(holder.totalSize());
/* 181899 */     return result;
/* 181900 */   }
/* 181901 */ }

at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:889)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:941)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:938)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:854)
at org.codehaus.janino.CodeContext.writeShort(CodeContext.java:959) 

Edit: As @TzachZohar pointed out, this looks like one of the known bugs (https://issues.apache.org/jira/browse/SPARK-16845) that was fixed but not released from the spark project.

I pulled the spark master, built it from the source, and retried my example. Now I got a new exception following the generated code:

/* 308608 */     apply_1560(i);
/* 308609 */     apply_1561(i);
/* 308610 */     result.setTotalSize(holder.totalSize());
/* 308611 */     return result;
/* 308612 */   }
/* 308613 */ }

at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:941)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:998)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:995)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF
at org.codehaus.janino.util.ClassFile.addToConstantPool(ClassFile.java:499)

It looks like a pull request is addressing the second problem: https://github.com/apache/spark/pull/16648

Answer

This is a bug. It is related to runtime code being generated on the JVM. So it seems to be hard for the Scala team to resolve. (There is much discussion on JIRA).

The error occurred with me when doing row operations. Even df.head() on a dataframe of 700 rows would cause the Exception.

The workaround for me was to convert the dataframe to a sparse data RDD (i.e., RDD[LabeledPoint]) and run rowwise operations on the RDD. It's much faster and more memory efficient. HOwever, it only works with numeric data. Categorical variables (factors, target etc) need to be converted to Double.

That said, I am new to Scala myself, so I my code is probably a tad amateurish. But it works.

CreateRow

@throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label: Int): LabeledPoint =
{
  try
  {
    logger.info(s"fieldNameSeq $fieldNameSeq")
    val values: Map[String, Long] = rowIn.getValuesMap(fieldNameSeq)

    val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)

    //println(s"convertRowToLabeledPoint row values ${sortedValuesMap}")
    print(".")

    val rowValuesItr: Iterable[Long] = sortedValuesMap.values

    var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
    var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
    var currentPosition: Int = 0


    rowValuesItr.foreach
    {
      kv =>
        if (kv > 0)
        {
          valuesArray += kv.toDouble;
          positionsArray += currentPosition;
        }
        currentPosition = currentPosition + 1;
    }

    new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size, positionsArray.toArray, valuesArray.toArray))
  }
  catch
  {
    case ex: Exception =>
    {
      throw new Exception(ex)
    }
  }
}

private def castColumnTo(df: DataFrame, cn: String, tpe: DataType): DataFrame =
{

  //println("castColumnTo")
  df.withColumn(cn, df(cn).cast(tpe)

  )
}

Provide a Dataframe and return RDD LabeledPOint

@throws(classOf[Exception])
 def convertToLibSvm(spark:SparkSession,mDF : DataFrame, targetColumnName:String): RDD[LabeledPoint] =
{
  try
  {


    val fieldSeq: scala.collection.Seq[StructField] = mDF.schema.fields.toSeq.filter(f => f.dataType == IntegerType || f.dataType == LongType)
    val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)


    val indexer = new StringIndexer()
      .setInputCol(targetColumnName)
      .setOutputCol(targetColumnName+"_Indexed")
    val mDFTypedIndexed = indexer.fit(mDF).transform(mDF).drop(targetColumnName)
    val mDFFinal = castColumnTo(mDFTypedIndexed, targetColumnName+"_Indexed", IntegerType)

    //mDFFinal.show()
    //only doubles accepted by sparse vector, so that's what we filter for


    var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]()

    mDFFinal.collect().foreach
    {

      row => positionsArray += convertRowToLabeledPoint(row, fieldNameSeq, row.getAs(targetColumnName+"_Indexed"));

    }

    spark.sparkContext.parallelize(positionsArray.toSeq)

  }
  catch
  {
    case ex: Exception =>
    {
      throw new Exception(ex)
    }
  }
}

翻译

(使用本地计算机上的官方网站上的spark-2.1.0-bin-hadoop2.7版本)

当我在spark-shell中执行一个简单的spark命令时,在抛出错误之前,它开始打印出成千上万行代码。这些“代码”是什么?

我在本地计算机上运行spark。我运行的命令是一个简单的df.count,其中df是一个DataFrame。

请在下面查看屏幕截图(代码飞速飞快,我只能截取屏幕截图以查看发生了什么)。更多详细信息在图像下方。 enter image description here

更多细节:

我通过创建数据框df

val df: DataFrame = spark.createDataFrame(rows, schema)
// rows: RDD[Row]
// schema: StructType
// There were about 3000 columns and 700 rows (testing set) of data in df. 
// The following line ran successfully and returned the correct value
rows.count
// The following line threw exception after printing out tons of codes as shown in the screenshot above
df.count


在“代码”之后引发的异常是:

...
/* 181897 */     apply_81(i);
/* 181898 */     result.setTotalSize(holder.totalSize());
/* 181899 */     return result;
/* 181900 */   }
/* 181901 */ }

at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:889)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:941)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:938)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:854)
at org.codehaus.janino.CodeContext.writeShort(CodeContext.java:959) 




编辑:正如@TzachZohar所指出的,这看起来像是已修复但尚未从spark项目释放的已知错误(https://issues.apache.org/jira/browse/SPARK-16845)之一。

我拉出了火花大师,从源头上构建了它,然后重试了我的示例。现在,在生成的代码之后出现了一个新的异常:

/* 308608 */     apply_1560(i);
/* 308609 */     apply_1561(i);
/* 308610 */     result.setTotalSize(holder.totalSize());
/* 308611 */     return result;
/* 308612 */   }
/* 308613 */ }

at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:941)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:998)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:995)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF
at org.codehaus.janino.util.ClassFile.addToConstantPool(ClassFile.java:499)


似乎请求请求正在解决第二个问题:https://github.com/apache/spark/pull/16648
最佳答案
这是一个错误。它与在JVM上生成的运行时代码有关。因此,Scala团队似乎很难解决。 (关于JIRA的讨论很多)。

行操作时发生错误。即使是700行数据框中的df.head()也会导致异常。

我的解决方法是将数据帧转换为稀疏数据RDD(即RDD [LabeledPoint])并在RDD上运行按行操作。它更快,内存效率更高。但是,它仅适用于数字数据。分类变量(因子,目标等)需要转换为Double。

就是说,我本人还是Scala的新手,所以我的代码可能有点业余。但这有效。

创建行

@throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label: Int): LabeledPoint =
{
  try
  {
    logger.info(s"fieldNameSeq $fieldNameSeq")
    val values: Map[String, Long] = rowIn.getValuesMap(fieldNameSeq)

    val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)

    //println(s"convertRowToLabeledPoint row values ${sortedValuesMap}")
    print(".")

    val rowValuesItr: Iterable[Long] = sortedValuesMap.values

    var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
    var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
    var currentPosition: Int = 0


    rowValuesItr.foreach
    {
      kv =>
        if (kv > 0)
        {
          valuesArray += kv.toDouble;
          positionsArray += currentPosition;
        }
        currentPosition = currentPosition + 1;
    }

    new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size, positionsArray.toArray, valuesArray.toArray))
  }
  catch
  {
    case ex: Exception =>
    {
      throw new Exception(ex)
    }
  }
}

private def castColumnTo(df: DataFrame, cn: String, tpe: DataType): DataFrame =
{

  //println("castColumnTo")
  df.withColumn(cn, df(cn).cast(tpe)

  )
}


提供一个数据框并返回RDD LabeledPOint

@throws(classOf[Exception])
 def convertToLibSvm(spark:SparkSession,mDF : DataFrame, targetColumnName:String): RDD[LabeledPoint] =
{
  try
  {


    val fieldSeq: scala.collection.Seq[StructField] = mDF.schema.fields.toSeq.filter(f => f.dataType == IntegerType || f.dataType == LongType)
    val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)


    val indexer = new StringIndexer()
      .setInputCol(targetColumnName)
      .setOutputCol(targetColumnName+"_Indexed")
    val mDFTypedIndexed = indexer.fit(mDF).transform(mDF).drop(targetColumnName)
    val mDFFinal = castColumnTo(mDFTypedIndexed, targetColumnName+"_Indexed", IntegerType)

    //mDFFinal.show()
    //only doubles accepted by sparse vector, so that's what we filter for


    var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]()

    mDFFinal.collect().foreach
    {

      row => positionsArray += convertRowToLabeledPoint(row, fieldNameSeq, row.getAs(targetColumnName+"_Indexed"));

    }

    spark.sparkContext.parallelize(positionsArray.toSeq)

  }
  catch
  {
    case ex: Exception =>
    {
      throw new Exception(ex)
    }
  }
}
相关推荐

scala - 从SBT运行控制台时收到错误“无法检索源模块:org.scala-sbt:compiler-interface:0.13.13:component”

scala - 为什么在Scala的类型参数列表中所有不变的泛型类位置都不变?

scala - 如何强制sbt一次获取所有需要的东西?

scala - 案例类别列表中的Scalatest Double等价

scala - 如何实现多个Silhouette Authenticator?

scala - 使用ScalaJ-Http时如何进行单元测试?

scala - 如何重组代码以避免警告:“通过创建2元组来适应参数列表”

scala - 如何在Spark中为不同的文件名调用单独的逻辑

scala - Akka.Kafka-警告消息-恢复分区

eclipse - 安装Mac OS X Sierra(Mac OS 10.12)后,Eclipse scala-ide无法启动