scala - 如何在Spark中为不同的文件名调用单独的逻辑

原文 标签 scala apache-spark readfile

how to call separate logic for diff file name in spark

I have 3 log files in my folders. Like

foldera = emplog,deptlog,companylog
folderb = emplog,deptlog,companylog
folderc = emplog,deptlog,companylog

I have 3 diff scala program file to extract the data from each of them.

employee.scala
department.scala
companylog.scala

Each of them code like below.

I want to combine all these files and execute them paralleled manner.

   package com.sample
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level, Logger}

object logparser {
  def main(args: Array[String]) = {

      Logger.getLogger("org").setLevel(Level.OFF)     
      Logger.getLogger("akka").setLevel(Level.OFF)
    //Start the Spark context
    val conf = new SparkConf()
      .setAppName("Parser")
      .setMaster("local")

      val sc = new SparkContext(conf)
      val sqlContext= new SQLContext(sc)

      val test = sc.wholeTextFiles("C:\\mkdir\\*\\*")
      .map{l =>
             if(l._1.endsWith("emplog.txt")){ 
             empparser(l._2,sc,sqlContext)
               }

             l
        }
      .foreach{println}
  }

  def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = {
     val emppattern="""[(](\d+)[)]\s([\w\s._]{30})\s+""".r

      import sqlContext.implicits._
     val indrecs = emppattern.findAllIn(record)
    .map{ line => 
      val emppattern(eid,ename) = line

     (eid,ename)
    }
     .toSeq
   .toDF("eid","ename")

   .show() 


  }
}

I have tried my code in attaching each method within same object.

Now 2 questions arise Q1. When I compile I get

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@6b0615ae)
    - field (class: com.sample.logparser$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
    - object (class com.sample.logparser$$anonfun$1, <function1>)

As far as I know(newbie only) Spark context can't be serialized. If I dont pass sc as parameter, I get Nullpointer Exception. How do I solve this?

Q2 :I will insert to hive table code within empparser method after converting to DF. Once that done , I dont want to do anything within my main. But my map code wont execute unless I have action after that. thats why I have foreacch println after that. Is there way to overcome this issue?

Answer

To attempt to answer the question, I'm going to assume that the result of processing a employee or a department results in the same kind of record. I would expect this to be different for each kind of data so I'm keeping the processing of different kinds of records separately to allow for this "adjustment with reality".

First, we define a record case class and parsers for the different kind or record types. (Here I'm copying the same impl for the sake of simplicity)

case class Record(id:String, name: String)

val empParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
  record match {
    case pattern(eid,ename) => Some(Record(eid, ename))
    case _ => None
  }
}

val deptParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
  record match {
    case pattern(eid,ename) => Some(Record(eid, ename))
    case _ => None
  }
}

val companyParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
  record match {
    case pattern(eid,ename) => Some(Record(eid, ename))
    case _ => None
  }
}

We load the data using wholeFiles:

val dataPath = "/.../data/wholefiles/*/*"
val logFiles =  sc.wholeTextFiles(dataPath)

And then, we process the different kind of records by filtering the files to obtain the kind of files that we require and apply the parser we defined above. Note how we are practically repeating the same process. This could be abstracted out.

val empLogs = logFiles.filter{case (filename, content) => filename.endsWith("emplog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> empParser(line))}
val deptLogs = logFiles.filter{case (filename, content) => filename.endsWith("deptlog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> deptParser(line))}
val compLogs = logFiles.filter{case (filename, content) => filename.endsWith("companylog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> companyParser(line))}

We now convert to a DataFrame

val empDF  = empLogs.toDF

And we could do the same for the other record types as well.

There's plenty of room to reduce code duplication in this process depending on whether we can find commonalities in the processes of the different data types.

翻译

我的文件夹中有3个日志文件。
喜欢

foldera = emplog,deptlog,companylog
folderb = emplog,deptlog,companylog
folderc = emplog,deptlog,companylog


我有3个diff scala程序文件,用于从每个文件中提取数据。

employee.scala
department.scala
companylog.scala


他们每个人都像下面的代码。

我想合并所有这些文件并以并行方式执行它们。

   package com.sample
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level, Logger}

object logparser {
  def main(args: Array[String]) = {

      Logger.getLogger("org").setLevel(Level.OFF)     
      Logger.getLogger("akka").setLevel(Level.OFF)
    //Start the Spark context
    val conf = new SparkConf()
      .setAppName("Parser")
      .setMaster("local")

      val sc = new SparkContext(conf)
      val sqlContext= new SQLContext(sc)

      val test = sc.wholeTextFiles("C:\\mkdir\\*\\*")
      .map{l =>
             if(l._1.endsWith("emplog.txt")){ 
             empparser(l._2,sc,sqlContext)
               }

             l
        }
      .foreach{println}
  }

  def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = {
     val emppattern="""[(](\d+)[)]\s([\w\s._]{30})\s+""".r

      import sqlContext.implicits._
     val indrecs = emppattern.findAllIn(record)
    .map{ line => 
      val emppattern(eid,ename) = line

     (eid,ename)
    }
     .toSeq
   .toDF("eid","ename")

   .show() 


  }
}


我已经尝试将每个方法附加到同一对象中的代码。

现在出现两个问题
Q1。当我编译时,我得到

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@6b0615ae)
    - field (class: com.sample.logparser$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
    - object (class com.sample.logparser$$anonfun$1, <function1>)


据我所知(仅限新手),Spark上下文无法序列化。如果不将sc作为参数传递,则会出现Nullpointer异常。我该如何解决?

Q2:转换为DF后,我将在empparser方法中插入到蜂巢表代码。一旦完成,我就不想在自己的主体内做任何事情。但是除非执行此操作,否则我的地图代码将不会执行。那就是为什么我在那之后先取println。有办法解决这个问题吗?
最佳答案
为了尝试回答这个问题,我将假设处理雇员或部门的结果产生相同的记录。我希望每种数据都会有所不同,因此我将分别处理不同种类的记录,以实现这种“实际调整”。

首先,我们为不同的种类或记录类型定义记录case class和解析器。 (在这里,为了简单起见,我复制了相同的隐含内容)

case class Record(id:String, name: String)

val empParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
  record match {
    case pattern(eid,ename) => Some(Record(eid, ename))
    case _ => None
  }
}

val deptParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
  record match {
    case pattern(eid,ename) => Some(Record(eid, ename))
    case _ => None
  }
}

val companyParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
  record match {
    case pattern(eid,ename) => Some(Record(eid, ename))
    case _ => None
  }
}


我们使用wholeFiles加载数据:

val dataPath = "/.../data/wholefiles/*/*"
val logFiles =  sc.wholeTextFiles(dataPath)


然后,我们通过过滤文件来获取所需的文件种类并应用上面定义的解析器来处理不同类型的记录。请注意,我们实际上是如何重复相同的过程。这可以抽象出来。

val empLogs = logFiles.filter{case (filename, content) => filename.endsWith("emplog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> empParser(line))}
val deptLogs = logFiles.filter{case (filename, content) => filename.endsWith("deptlog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> deptParser(line))}
val compLogs = logFiles.filter{case (filename, content) => filename.endsWith("companylog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> companyParser(line))}


现在,我们转换为一个DataFrame

val empDF  = empLogs.toDF


我们也可以对其他记录类型执行相同的操作。

根据我们是否可以在不同数据类型的过程中找到共同点,在此过程中有足够的空间来减少代码重复。
相关推荐

scala - Akka.Kafka-警告消息-恢复分区

eclipse - 安装Mac OS X Sierra(Mac OS 10.12)后,Eclipse scala-ide无法启动

scala - 计算Spark中UDF的调用

scala - 火花一次输出到kafka

scala - 如何为nd4j和deeplearning4j设置Scala SBT项目

scala - 从scala的类型获取ParameterizedType?

scala - ADT类型类实例的泛型派生

scala - Scala akka-http WebSocket:如何保存客户端连接,并在需要时向客户端推送消息?

java - 实施Google云存储的最佳方法是什么?

scala - Scala volatile类型:@uncheckedStable如何不安全?