Parsing Data in Apache Spark Scala org.apache.spark.SparkException: Task not serializable error when trying...












0















Input file:



___DATE___

2018-11-16T06:3937
Linux hortonworks 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:39:37 up 100 days, 1:04, 2 users, load average: 9.01, 8.30, 8.48
06:30:01 AM all 6.08 0.00 2.83 0.04 0.00 91.06

___DATE___

2018-11-16T06:4037
Linux cloudera 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:40:37 up 100 days, 1:05, 28 users, load average: 8.39, 8.26, 8.45
06:40:01 AM all 6.92 1.11 1.88 0.04 0.00 90.05


Required output:



2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users


I'm trying to get my hands on Spark with Scala.
Trying to parse this input file with Spark 2.3.1 and scala 2.11.6 . Here's my code.



import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SparkConf, SparkContext}

object parse_stats extends App {

case class LoadSchema(date:String)

val conf = new SparkConf().setAppName("ParseStats").setMaster("local[*]")
val sc = new SparkContext(conf)

val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf.set("textinputformat.record.delimiter","___DATE___")

val input = sc.newAPIHadoopFile("C:\Users\rohit\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)

lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r

val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
(if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
(if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
)
}

transformRDD.collect().foreach(println)
}


If run this code from Intellij, I get below output.



((),(),())
((),(),())
((),(),())


If I run from spark-shell, I get below error:



scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration

scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text

scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable

scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}

scala> val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml

scala> hadoopConf.set("textinputformat.record.delimiter","___DATE___")

scala> val input = sc.newAPIHadoopFile("C:\Users\rnimmal1\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
input: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:37

scala>

scala> lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
date_pattern: scala.util.matching.Regex = <lazy>

scala> lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
uname_pattern: scala.util.matching.Regex = <lazy>

scala> lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
cpu_regex: scala.util.matching.Regex = <lazy>

scala>

scala> val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
| (if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
| (if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
| )
| }
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 54 elided
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
Serialization stack:
- object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml)
- field (class: $iw, name: hadoopConf, type: class org.apache.hadoop.conf.Configuration)
- object (class $iw, $iw@63fa0b9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3f4b52fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@338f9bb5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3d63becf)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3aca7082)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4ccfd904)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6e4e7a62)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5aaab2b0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5c51a7eb)
- field (class: $line36.$read, name: $iw, type: class $iw)
- object (class $line36.$read, $line36.$read@2ba3b4a6)
- field (class: $iw, name: $line36$read, type: class $line36.$read)
- object (class $iw, $iw@6559f04e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@8f7cbcc)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@465b16bb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@373efaa2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5f2896fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@f777d41)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@43ec41d7)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@61c0a61)
- field (class: $line38.$read, name: $iw, type: class $iw)
- object (class $line38.$read, $line38.$read@10d1f6da)
- field (class: $iw, name: $line38$read, type: class $line38.$read)
- object (class $iw, $iw@2095e085)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@380cb7e3)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 63 more


What am I missing?










share|improve this question

























  • textinputformat.record.delimiter should be single character, not a word

    – stack0114106
    Nov 20 '18 at 12:36











  • As much as I know "textinputformat.record.delimiter" is not confined to single character. I used the same delimiter with Apache Pig and it worked there, and it should work in spark also, as this property is specific to hadoop.

    – Rohit Nimmala
    Nov 20 '18 at 12:44













  • it can be multibyte if the encoding is utf8 or utf16, but not a word.. I just ran and it ignores DATA and takes the default newline. if you change DATA to "|" the split happens

    – stack0114106
    Nov 20 '18 at 13:18











  • Thanks, I replaced DATE string with "|" in the data set and in the property as well, I see the code is working the same way. :(

    – Rohit Nimmala
    Nov 20 '18 at 14:08











  • Yes, that can also be done.. let me update the answer

    – stack0114106
    Nov 20 '18 at 16:48
















0















Input file:



___DATE___

2018-11-16T06:3937
Linux hortonworks 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:39:37 up 100 days, 1:04, 2 users, load average: 9.01, 8.30, 8.48
06:30:01 AM all 6.08 0.00 2.83 0.04 0.00 91.06

___DATE___

2018-11-16T06:4037
Linux cloudera 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:40:37 up 100 days, 1:05, 28 users, load average: 8.39, 8.26, 8.45
06:40:01 AM all 6.92 1.11 1.88 0.04 0.00 90.05


Required output:



2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users


I'm trying to get my hands on Spark with Scala.
Trying to parse this input file with Spark 2.3.1 and scala 2.11.6 . Here's my code.



import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SparkConf, SparkContext}

object parse_stats extends App {

case class LoadSchema(date:String)

val conf = new SparkConf().setAppName("ParseStats").setMaster("local[*]")
val sc = new SparkContext(conf)

val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf.set("textinputformat.record.delimiter","___DATE___")

val input = sc.newAPIHadoopFile("C:\Users\rohit\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)

lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r

val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
(if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
(if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
)
}

transformRDD.collect().foreach(println)
}


If run this code from Intellij, I get below output.



((),(),())
((),(),())
((),(),())


If I run from spark-shell, I get below error:



scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration

scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text

scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable

scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}

scala> val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml

scala> hadoopConf.set("textinputformat.record.delimiter","___DATE___")

scala> val input = sc.newAPIHadoopFile("C:\Users\rnimmal1\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
input: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:37

scala>

scala> lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
date_pattern: scala.util.matching.Regex = <lazy>

scala> lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
uname_pattern: scala.util.matching.Regex = <lazy>

scala> lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
cpu_regex: scala.util.matching.Regex = <lazy>

scala>

scala> val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
| (if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
| (if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
| )
| }
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 54 elided
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
Serialization stack:
- object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml)
- field (class: $iw, name: hadoopConf, type: class org.apache.hadoop.conf.Configuration)
- object (class $iw, $iw@63fa0b9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3f4b52fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@338f9bb5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3d63becf)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3aca7082)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4ccfd904)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6e4e7a62)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5aaab2b0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5c51a7eb)
- field (class: $line36.$read, name: $iw, type: class $iw)
- object (class $line36.$read, $line36.$read@2ba3b4a6)
- field (class: $iw, name: $line36$read, type: class $line36.$read)
- object (class $iw, $iw@6559f04e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@8f7cbcc)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@465b16bb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@373efaa2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5f2896fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@f777d41)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@43ec41d7)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@61c0a61)
- field (class: $line38.$read, name: $iw, type: class $iw)
- object (class $line38.$read, $line38.$read@10d1f6da)
- field (class: $iw, name: $line38$read, type: class $line38.$read)
- object (class $iw, $iw@2095e085)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@380cb7e3)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 63 more


What am I missing?










share|improve this question

























  • textinputformat.record.delimiter should be single character, not a word

    – stack0114106
    Nov 20 '18 at 12:36











  • As much as I know "textinputformat.record.delimiter" is not confined to single character. I used the same delimiter with Apache Pig and it worked there, and it should work in spark also, as this property is specific to hadoop.

    – Rohit Nimmala
    Nov 20 '18 at 12:44













  • it can be multibyte if the encoding is utf8 or utf16, but not a word.. I just ran and it ignores DATA and takes the default newline. if you change DATA to "|" the split happens

    – stack0114106
    Nov 20 '18 at 13:18











  • Thanks, I replaced DATE string with "|" in the data set and in the property as well, I see the code is working the same way. :(

    – Rohit Nimmala
    Nov 20 '18 at 14:08











  • Yes, that can also be done.. let me update the answer

    – stack0114106
    Nov 20 '18 at 16:48














0












0








0








Input file:



___DATE___

2018-11-16T06:3937
Linux hortonworks 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:39:37 up 100 days, 1:04, 2 users, load average: 9.01, 8.30, 8.48
06:30:01 AM all 6.08 0.00 2.83 0.04 0.00 91.06

___DATE___

2018-11-16T06:4037
Linux cloudera 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:40:37 up 100 days, 1:05, 28 users, load average: 8.39, 8.26, 8.45
06:40:01 AM all 6.92 1.11 1.88 0.04 0.00 90.05


Required output:



2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users


I'm trying to get my hands on Spark with Scala.
Trying to parse this input file with Spark 2.3.1 and scala 2.11.6 . Here's my code.



import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SparkConf, SparkContext}

object parse_stats extends App {

case class LoadSchema(date:String)

val conf = new SparkConf().setAppName("ParseStats").setMaster("local[*]")
val sc = new SparkContext(conf)

val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf.set("textinputformat.record.delimiter","___DATE___")

val input = sc.newAPIHadoopFile("C:\Users\rohit\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)

lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r

val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
(if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
(if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
)
}

transformRDD.collect().foreach(println)
}


If run this code from Intellij, I get below output.



((),(),())
((),(),())
((),(),())


If I run from spark-shell, I get below error:



scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration

scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text

scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable

scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}

scala> val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml

scala> hadoopConf.set("textinputformat.record.delimiter","___DATE___")

scala> val input = sc.newAPIHadoopFile("C:\Users\rnimmal1\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
input: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:37

scala>

scala> lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
date_pattern: scala.util.matching.Regex = <lazy>

scala> lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
uname_pattern: scala.util.matching.Regex = <lazy>

scala> lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
cpu_regex: scala.util.matching.Regex = <lazy>

scala>

scala> val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
| (if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
| (if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
| )
| }
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 54 elided
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
Serialization stack:
- object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml)
- field (class: $iw, name: hadoopConf, type: class org.apache.hadoop.conf.Configuration)
- object (class $iw, $iw@63fa0b9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3f4b52fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@338f9bb5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3d63becf)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3aca7082)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4ccfd904)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6e4e7a62)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5aaab2b0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5c51a7eb)
- field (class: $line36.$read, name: $iw, type: class $iw)
- object (class $line36.$read, $line36.$read@2ba3b4a6)
- field (class: $iw, name: $line36$read, type: class $line36.$read)
- object (class $iw, $iw@6559f04e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@8f7cbcc)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@465b16bb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@373efaa2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5f2896fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@f777d41)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@43ec41d7)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@61c0a61)
- field (class: $line38.$read, name: $iw, type: class $iw)
- object (class $line38.$read, $line38.$read@10d1f6da)
- field (class: $iw, name: $line38$read, type: class $line38.$read)
- object (class $iw, $iw@2095e085)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@380cb7e3)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 63 more


What am I missing?










share|improve this question
















Input file:



___DATE___

2018-11-16T06:3937
Linux hortonworks 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:39:37 up 100 days, 1:04, 2 users, load average: 9.01, 8.30, 8.48
06:30:01 AM all 6.08 0.00 2.83 0.04 0.00 91.06

___DATE___

2018-11-16T06:4037
Linux cloudera 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:40:37 up 100 days, 1:05, 28 users, load average: 8.39, 8.26, 8.45
06:40:01 AM all 6.92 1.11 1.88 0.04 0.00 90.05


Required output:



2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users


I'm trying to get my hands on Spark with Scala.
Trying to parse this input file with Spark 2.3.1 and scala 2.11.6 . Here's my code.



import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SparkConf, SparkContext}

object parse_stats extends App {

case class LoadSchema(date:String)

val conf = new SparkConf().setAppName("ParseStats").setMaster("local[*]")
val sc = new SparkContext(conf)

val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf.set("textinputformat.record.delimiter","___DATE___")

val input = sc.newAPIHadoopFile("C:\Users\rohit\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)

lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r

val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
(if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
(if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
)
}

transformRDD.collect().foreach(println)
}


If run this code from Intellij, I get below output.



((),(),())
((),(),())
((),(),())


If I run from spark-shell, I get below error:



scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration

scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text

scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable

scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}

scala> val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml

scala> hadoopConf.set("textinputformat.record.delimiter","___DATE___")

scala> val input = sc.newAPIHadoopFile("C:\Users\rnimmal1\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
input: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:37

scala>

scala> lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
date_pattern: scala.util.matching.Regex = <lazy>

scala> lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
uname_pattern: scala.util.matching.Regex = <lazy>

scala> lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
cpu_regex: scala.util.matching.Regex = <lazy>

scala>

scala> val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
| (if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
| (if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
| )
| }
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 54 elided
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
Serialization stack:
- object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml)
- field (class: $iw, name: hadoopConf, type: class org.apache.hadoop.conf.Configuration)
- object (class $iw, $iw@63fa0b9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3f4b52fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@338f9bb5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3d63becf)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3aca7082)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4ccfd904)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6e4e7a62)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5aaab2b0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5c51a7eb)
- field (class: $line36.$read, name: $iw, type: class $iw)
- object (class $line36.$read, $line36.$read@2ba3b4a6)
- field (class: $iw, name: $line36$read, type: class $line36.$read)
- object (class $iw, $iw@6559f04e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@8f7cbcc)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@465b16bb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@373efaa2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5f2896fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@f777d41)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@43ec41d7)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@61c0a61)
- field (class: $line38.$read, name: $iw, type: class $iw)
- object (class $line38.$read, $line38.$read@10d1f6da)
- field (class: $iw, name: $line38$read, type: class $line38.$read)
- object (class $iw, $iw@2095e085)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@380cb7e3)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 63 more


What am I missing?







scala apache-spark rdd hadoop2 spark-shell






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 20 '18 at 12:42









antoine-sac

2,67621241




2,67621241










asked Nov 20 '18 at 11:55









Rohit NimmalaRohit Nimmala

33419




33419













  • textinputformat.record.delimiter should be single character, not a word

    – stack0114106
    Nov 20 '18 at 12:36











  • As much as I know "textinputformat.record.delimiter" is not confined to single character. I used the same delimiter with Apache Pig and it worked there, and it should work in spark also, as this property is specific to hadoop.

    – Rohit Nimmala
    Nov 20 '18 at 12:44













  • it can be multibyte if the encoding is utf8 or utf16, but not a word.. I just ran and it ignores DATA and takes the default newline. if you change DATA to "|" the split happens

    – stack0114106
    Nov 20 '18 at 13:18











  • Thanks, I replaced DATE string with "|" in the data set and in the property as well, I see the code is working the same way. :(

    – Rohit Nimmala
    Nov 20 '18 at 14:08











  • Yes, that can also be done.. let me update the answer

    – stack0114106
    Nov 20 '18 at 16:48



















  • textinputformat.record.delimiter should be single character, not a word

    – stack0114106
    Nov 20 '18 at 12:36











  • As much as I know "textinputformat.record.delimiter" is not confined to single character. I used the same delimiter with Apache Pig and it worked there, and it should work in spark also, as this property is specific to hadoop.

    – Rohit Nimmala
    Nov 20 '18 at 12:44













  • it can be multibyte if the encoding is utf8 or utf16, but not a word.. I just ran and it ignores DATA and takes the default newline. if you change DATA to "|" the split happens

    – stack0114106
    Nov 20 '18 at 13:18











  • Thanks, I replaced DATE string with "|" in the data set and in the property as well, I see the code is working the same way. :(

    – Rohit Nimmala
    Nov 20 '18 at 14:08











  • Yes, that can also be done.. let me update the answer

    – stack0114106
    Nov 20 '18 at 16:48

















textinputformat.record.delimiter should be single character, not a word

– stack0114106
Nov 20 '18 at 12:36





textinputformat.record.delimiter should be single character, not a word

– stack0114106
Nov 20 '18 at 12:36













As much as I know "textinputformat.record.delimiter" is not confined to single character. I used the same delimiter with Apache Pig and it worked there, and it should work in spark also, as this property is specific to hadoop.

– Rohit Nimmala
Nov 20 '18 at 12:44







As much as I know "textinputformat.record.delimiter" is not confined to single character. I used the same delimiter with Apache Pig and it worked there, and it should work in spark also, as this property is specific to hadoop.

– Rohit Nimmala
Nov 20 '18 at 12:44















it can be multibyte if the encoding is utf8 or utf16, but not a word.. I just ran and it ignores DATA and takes the default newline. if you change DATA to "|" the split happens

– stack0114106
Nov 20 '18 at 13:18





it can be multibyte if the encoding is utf8 or utf16, but not a word.. I just ran and it ignores DATA and takes the default newline. if you change DATA to "|" the split happens

– stack0114106
Nov 20 '18 at 13:18













Thanks, I replaced DATE string with "|" in the data set and in the property as well, I see the code is working the same way. :(

– Rohit Nimmala
Nov 20 '18 at 14:08





Thanks, I replaced DATE string with "|" in the data set and in the property as well, I see the code is working the same way. :(

– Rohit Nimmala
Nov 20 '18 at 14:08













Yes, that can also be done.. let me update the answer

– stack0114106
Nov 20 '18 at 16:48





Yes, that can also be done.. let me update the answer

– stack0114106
Nov 20 '18 at 16:48












2 Answers
2






active

oldest

votes


















1














After changing the __DATA__ to pipe "|" , the below code snippet produces the required output. Note that I have used Windows platform, so I'm replacing "r". Please check out



val spark = SparkSession.builder().appName("Spark_test").master("local[*]").getOrCreate()

import spark.implicits._

val file1 = spark.sparkContext.textFile("./in/machine_logs.txt")

spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter","|")

val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
.map( line => { val x = line.split("""n""")
val p = x(2).replaceAll("\r","") // not needed if Unix platform
val q = x(3).split(" ")(1)
val r = x(4).split(",")(2)
(p + "," + q + "," + r)
} )

file2.collect.foreach(println)
//file2.saveAsTextFile("./in/machine_logs.out") --> comment above line and uncomment this line to save in file


Output:



2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users


Update1:



With regex matching:



val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
val uname_pattern="(Linux) (.*?) [0-9a-zA-z-#() . : _ /]+(GNU/Linux)".r
val cpu_regex="""(.+),(.*?),s+(load average)[:][0-9 . ,]+""".r
val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
.map( line => {
var q = ""; var r = "";
val p = date_pattern.findFirstIn(line).mkString
uname_pattern.findAllIn(line).matchData.foreach(m=> {q = m.group(2).mkString} )
cpu_regex.findAllIn(line).matchData.foreach(m=> {r = m.group(2).mkString} )
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)





share|improve this answer


























  • Thanks for the answer, let me try.

    – Rohit Nimmala
    Nov 20 '18 at 15:09











  • This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.

    – Rohit Nimmala
    Nov 20 '18 at 16:13













  • updated the answer. Note that I tweaked the regex that you used as there were some mistakes

    – stack0114106
    Nov 20 '18 at 16:51











  • Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)

    – Rohit Nimmala
    Nov 20 '18 at 17:00











  • glad that you figured it out..

    – stack0114106
    Nov 20 '18 at 17:06



















1














I believe the problem is that you are defining those filters objects (date_pattern) outside of the RDD, so Spark has to send the entire parse_stats object to all of the executors, which it cannot do because it cannot serialize that entire object. This doesn't happen when you run it in local mode because it doesn't need to send any objects to other executors.



Check out the great answer here: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects



This gist has some quick and simple ways to avoid serialization: https://gist.github.com/kmader/1d64e64621e63d566f67






share|improve this answer























    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53392471%2fparsing-data-in-apache-spark-scala-org-apache-spark-sparkexception-task-not-ser%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    2 Answers
    2






    active

    oldest

    votes








    2 Answers
    2






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    1














    After changing the __DATA__ to pipe "|" , the below code snippet produces the required output. Note that I have used Windows platform, so I'm replacing "r". Please check out



    val spark = SparkSession.builder().appName("Spark_test").master("local[*]").getOrCreate()

    import spark.implicits._

    val file1 = spark.sparkContext.textFile("./in/machine_logs.txt")

    spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter","|")

    val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
    .map( line => { val x = line.split("""n""")
    val p = x(2).replaceAll("\r","") // not needed if Unix platform
    val q = x(3).split(" ")(1)
    val r = x(4).split(",")(2)
    (p + "," + q + "," + r)
    } )

    file2.collect.foreach(println)
    //file2.saveAsTextFile("./in/machine_logs.out") --> comment above line and uncomment this line to save in file


    Output:



    2018-11-16T06:3937,hortonworks, 2 users
    2018-11-16T06:4037,cloudera, 28 users


    Update1:



    With regex matching:



    val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
    val uname_pattern="(Linux) (.*?) [0-9a-zA-z-#() . : _ /]+(GNU/Linux)".r
    val cpu_regex="""(.+),(.*?),s+(load average)[:][0-9 . ,]+""".r
    val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
    .map( line => {
    var q = ""; var r = "";
    val p = date_pattern.findFirstIn(line).mkString
    uname_pattern.findAllIn(line).matchData.foreach(m=> {q = m.group(2).mkString} )
    cpu_regex.findAllIn(line).matchData.foreach(m=> {r = m.group(2).mkString} )
    (p + "," + q + "," + r)
    } )
    file2.collect.foreach(println)





    share|improve this answer


























    • Thanks for the answer, let me try.

      – Rohit Nimmala
      Nov 20 '18 at 15:09











    • This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.

      – Rohit Nimmala
      Nov 20 '18 at 16:13













    • updated the answer. Note that I tweaked the regex that you used as there were some mistakes

      – stack0114106
      Nov 20 '18 at 16:51











    • Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)

      – Rohit Nimmala
      Nov 20 '18 at 17:00











    • glad that you figured it out..

      – stack0114106
      Nov 20 '18 at 17:06
















    1














    After changing the __DATA__ to pipe "|" , the below code snippet produces the required output. Note that I have used Windows platform, so I'm replacing "r". Please check out



    val spark = SparkSession.builder().appName("Spark_test").master("local[*]").getOrCreate()

    import spark.implicits._

    val file1 = spark.sparkContext.textFile("./in/machine_logs.txt")

    spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter","|")

    val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
    .map( line => { val x = line.split("""n""")
    val p = x(2).replaceAll("\r","") // not needed if Unix platform
    val q = x(3).split(" ")(1)
    val r = x(4).split(",")(2)
    (p + "," + q + "," + r)
    } )

    file2.collect.foreach(println)
    //file2.saveAsTextFile("./in/machine_logs.out") --> comment above line and uncomment this line to save in file


    Output:



    2018-11-16T06:3937,hortonworks, 2 users
    2018-11-16T06:4037,cloudera, 28 users


    Update1:



    With regex matching:



    val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
    val uname_pattern="(Linux) (.*?) [0-9a-zA-z-#() . : _ /]+(GNU/Linux)".r
    val cpu_regex="""(.+),(.*?),s+(load average)[:][0-9 . ,]+""".r
    val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
    .map( line => {
    var q = ""; var r = "";
    val p = date_pattern.findFirstIn(line).mkString
    uname_pattern.findAllIn(line).matchData.foreach(m=> {q = m.group(2).mkString} )
    cpu_regex.findAllIn(line).matchData.foreach(m=> {r = m.group(2).mkString} )
    (p + "," + q + "," + r)
    } )
    file2.collect.foreach(println)





    share|improve this answer


























    • Thanks for the answer, let me try.

      – Rohit Nimmala
      Nov 20 '18 at 15:09











    • This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.

      – Rohit Nimmala
      Nov 20 '18 at 16:13













    • updated the answer. Note that I tweaked the regex that you used as there were some mistakes

      – stack0114106
      Nov 20 '18 at 16:51











    • Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)

      – Rohit Nimmala
      Nov 20 '18 at 17:00











    • glad that you figured it out..

      – stack0114106
      Nov 20 '18 at 17:06














    1












    1








    1







    After changing the __DATA__ to pipe "|" , the below code snippet produces the required output. Note that I have used Windows platform, so I'm replacing "r". Please check out



    val spark = SparkSession.builder().appName("Spark_test").master("local[*]").getOrCreate()

    import spark.implicits._

    val file1 = spark.sparkContext.textFile("./in/machine_logs.txt")

    spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter","|")

    val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
    .map( line => { val x = line.split("""n""")
    val p = x(2).replaceAll("\r","") // not needed if Unix platform
    val q = x(3).split(" ")(1)
    val r = x(4).split(",")(2)
    (p + "," + q + "," + r)
    } )

    file2.collect.foreach(println)
    //file2.saveAsTextFile("./in/machine_logs.out") --> comment above line and uncomment this line to save in file


    Output:



    2018-11-16T06:3937,hortonworks, 2 users
    2018-11-16T06:4037,cloudera, 28 users


    Update1:



    With regex matching:



    val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
    val uname_pattern="(Linux) (.*?) [0-9a-zA-z-#() . : _ /]+(GNU/Linux)".r
    val cpu_regex="""(.+),(.*?),s+(load average)[:][0-9 . ,]+""".r
    val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
    .map( line => {
    var q = ""; var r = "";
    val p = date_pattern.findFirstIn(line).mkString
    uname_pattern.findAllIn(line).matchData.foreach(m=> {q = m.group(2).mkString} )
    cpu_regex.findAllIn(line).matchData.foreach(m=> {r = m.group(2).mkString} )
    (p + "," + q + "," + r)
    } )
    file2.collect.foreach(println)





    share|improve this answer















    After changing the __DATA__ to pipe "|" , the below code snippet produces the required output. Note that I have used Windows platform, so I'm replacing "r". Please check out



    val spark = SparkSession.builder().appName("Spark_test").master("local[*]").getOrCreate()

    import spark.implicits._

    val file1 = spark.sparkContext.textFile("./in/machine_logs.txt")

    spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter","|")

    val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
    .map( line => { val x = line.split("""n""")
    val p = x(2).replaceAll("\r","") // not needed if Unix platform
    val q = x(3).split(" ")(1)
    val r = x(4).split(",")(2)
    (p + "," + q + "," + r)
    } )

    file2.collect.foreach(println)
    //file2.saveAsTextFile("./in/machine_logs.out") --> comment above line and uncomment this line to save in file


    Output:



    2018-11-16T06:3937,hortonworks, 2 users
    2018-11-16T06:4037,cloudera, 28 users


    Update1:



    With regex matching:



    val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
    val uname_pattern="(Linux) (.*?) [0-9a-zA-z-#() . : _ /]+(GNU/Linux)".r
    val cpu_regex="""(.+),(.*?),s+(load average)[:][0-9 . ,]+""".r
    val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
    .map( line => {
    var q = ""; var r = "";
    val p = date_pattern.findFirstIn(line).mkString
    uname_pattern.findAllIn(line).matchData.foreach(m=> {q = m.group(2).mkString} )
    cpu_regex.findAllIn(line).matchData.foreach(m=> {r = m.group(2).mkString} )
    (p + "," + q + "," + r)
    } )
    file2.collect.foreach(println)






    share|improve this answer














    share|improve this answer



    share|improve this answer








    edited Nov 20 '18 at 16:49

























    answered Nov 20 '18 at 14:49









    stack0114106stack0114106

    3,8482420




    3,8482420













    • Thanks for the answer, let me try.

      – Rohit Nimmala
      Nov 20 '18 at 15:09











    • This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.

      – Rohit Nimmala
      Nov 20 '18 at 16:13













    • updated the answer. Note that I tweaked the regex that you used as there were some mistakes

      – stack0114106
      Nov 20 '18 at 16:51











    • Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)

      – Rohit Nimmala
      Nov 20 '18 at 17:00











    • glad that you figured it out..

      – stack0114106
      Nov 20 '18 at 17:06



















    • Thanks for the answer, let me try.

      – Rohit Nimmala
      Nov 20 '18 at 15:09











    • This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.

      – Rohit Nimmala
      Nov 20 '18 at 16:13













    • updated the answer. Note that I tweaked the regex that you used as there were some mistakes

      – stack0114106
      Nov 20 '18 at 16:51











    • Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)

      – Rohit Nimmala
      Nov 20 '18 at 17:00











    • glad that you figured it out..

      – stack0114106
      Nov 20 '18 at 17:06

















    Thanks for the answer, let me try.

    – Rohit Nimmala
    Nov 20 '18 at 15:09





    Thanks for the answer, let me try.

    – Rohit Nimmala
    Nov 20 '18 at 15:09













    This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.

    – Rohit Nimmala
    Nov 20 '18 at 16:13







    This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.

    – Rohit Nimmala
    Nov 20 '18 at 16:13















    updated the answer. Note that I tweaked the regex that you used as there were some mistakes

    – stack0114106
    Nov 20 '18 at 16:51





    updated the answer. Note that I tweaked the regex that you used as there were some mistakes

    – stack0114106
    Nov 20 '18 at 16:51













    Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)

    – Rohit Nimmala
    Nov 20 '18 at 17:00





    Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)

    – Rohit Nimmala
    Nov 20 '18 at 17:00













    glad that you figured it out..

    – stack0114106
    Nov 20 '18 at 17:06





    glad that you figured it out..

    – stack0114106
    Nov 20 '18 at 17:06













    1














    I believe the problem is that you are defining those filters objects (date_pattern) outside of the RDD, so Spark has to send the entire parse_stats object to all of the executors, which it cannot do because it cannot serialize that entire object. This doesn't happen when you run it in local mode because it doesn't need to send any objects to other executors.



    Check out the great answer here: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects



    This gist has some quick and simple ways to avoid serialization: https://gist.github.com/kmader/1d64e64621e63d566f67






    share|improve this answer




























      1














      I believe the problem is that you are defining those filters objects (date_pattern) outside of the RDD, so Spark has to send the entire parse_stats object to all of the executors, which it cannot do because it cannot serialize that entire object. This doesn't happen when you run it in local mode because it doesn't need to send any objects to other executors.



      Check out the great answer here: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects



      This gist has some quick and simple ways to avoid serialization: https://gist.github.com/kmader/1d64e64621e63d566f67






      share|improve this answer


























        1












        1








        1







        I believe the problem is that you are defining those filters objects (date_pattern) outside of the RDD, so Spark has to send the entire parse_stats object to all of the executors, which it cannot do because it cannot serialize that entire object. This doesn't happen when you run it in local mode because it doesn't need to send any objects to other executors.



        Check out the great answer here: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects



        This gist has some quick and simple ways to avoid serialization: https://gist.github.com/kmader/1d64e64621e63d566f67






        share|improve this answer













        I believe the problem is that you are defining those filters objects (date_pattern) outside of the RDD, so Spark has to send the entire parse_stats object to all of the executors, which it cannot do because it cannot serialize that entire object. This doesn't happen when you run it in local mode because it doesn't need to send any objects to other executors.



        Check out the great answer here: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects



        This gist has some quick and simple ways to avoid serialization: https://gist.github.com/kmader/1d64e64621e63d566f67







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Nov 20 '18 at 16:01









        MBillauMBillau

        5,28522129




        5,28522129






























            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53392471%2fparsing-data-in-apache-spark-scala-org-apache-spark-sparkexception-task-not-ser%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Guess what letter conforming each word

            Port of Spain

            Run scheduled task as local user group (not BUILTIN)