Spark Graphx java.lang.OutOfMemoryError











up vote
0
down vote

favorite












I have a problem with the Spark GraphX module. I have a 5 node cluster, with 23.5G Memory and 24 cores per node. I use the spark-shell for submitting my code, so I use Spark in client mode.
In my configuration I have 1 master node and 4 slave nodes. This is my spark-defaults.conf:



spark.executor.instances                8
spark.executor.memory 10g
spark.driver.memory 18g
spark.executor.cores 10
spark.driver.cores 18
spark.default.parallelism 144
spark.serializer org.apache.spark.serializer.KryoSerializer


I read and store 2 really small files with together 40mb file size.



This is my code:



val input1 = sc.textFile("/home/data/spark/nodes.txt")
val vertexArray = input1.map(line => (line.toLong, mutable.Set[VertexId]()))

val input2 = sc.textFile("/home/data/spark/edges.txt")
val splitRdd = input2.map( line => line.split(" ") )
val edgeArray = splitRdd.map(line => Edge(line(0).toLong, line(1).toLong, "bla"))

val vertices: RDD[(VertexId, mutable.Set[VertexId])] = vertexArray
val edges: RDD[Edge[String]] = edgeArray
val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)

var filteredGraph: Graph[mutable.Set[VertexId], String] = graph.mapVertices((vid, vdata) => {
mutable.Set[VertexId]()
}).cache()
val temp: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
triplet => {
if(triplet.dstId < 0){
triplet.sendToDst(mutable.Set[VertexId](triplet.srcId))
}
},
(oldSet, newSet) => oldSet ++ newSet // Merge Message
)
filteredGraph = filteredGraph.joinVertices(temp)((id, oldSet, newSet) => newSet).cache()
val temp2: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
triplet => {
if(triplet.dstId > 0){
triplet.sendToDst(triplet.srcAttr.filter(id=>triplet.dstId!=id && triplet.dstId < id))
}
},
(oldSet, newSet) => oldSet ++ newSet // Merge Message
)
val candidatesRDD: RDD[(Long, List[Long])] = temp2.map(vertex => {
(vertex._1.asInstanceOf[Long], vertex._2.asInstanceOf[ mutable.Set[Long]].toList)
})


val newNames = Seq("rid", "candidates")
val candidatesDF = candidatesRDD.toDF(newNames: _*)
val candidatesDFMod = candidatesDF.withColumn("candidates", explode($"candidates"))
candidatesDFMod.show


If I do the computation, I get after several time a java.lang.OutOfMemoryError: Java heap space Exeception of one executor. After this Spark tries to compute it again, so it restarts the stages, but it will end up in the same exception again. Why does this happen? The computation fills the complete 10G executor memory. Does I have the wrong configuration of Spark? I tried several permutation of my spark-defaults.conf. I tried 3 Executor per node and more, I changed the memory size and so on. But everytime it ends up in the same exception.



Maybe someone has an idea for this problem?



Kind regards



Sedir Mohammed










share|improve this question


























    up vote
    0
    down vote

    favorite












    I have a problem with the Spark GraphX module. I have a 5 node cluster, with 23.5G Memory and 24 cores per node. I use the spark-shell for submitting my code, so I use Spark in client mode.
    In my configuration I have 1 master node and 4 slave nodes. This is my spark-defaults.conf:



    spark.executor.instances                8
    spark.executor.memory 10g
    spark.driver.memory 18g
    spark.executor.cores 10
    spark.driver.cores 18
    spark.default.parallelism 144
    spark.serializer org.apache.spark.serializer.KryoSerializer


    I read and store 2 really small files with together 40mb file size.



    This is my code:



    val input1 = sc.textFile("/home/data/spark/nodes.txt")
    val vertexArray = input1.map(line => (line.toLong, mutable.Set[VertexId]()))

    val input2 = sc.textFile("/home/data/spark/edges.txt")
    val splitRdd = input2.map( line => line.split(" ") )
    val edgeArray = splitRdd.map(line => Edge(line(0).toLong, line(1).toLong, "bla"))

    val vertices: RDD[(VertexId, mutable.Set[VertexId])] = vertexArray
    val edges: RDD[Edge[String]] = edgeArray
    val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)

    var filteredGraph: Graph[mutable.Set[VertexId], String] = graph.mapVertices((vid, vdata) => {
    mutable.Set[VertexId]()
    }).cache()
    val temp: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
    triplet => {
    if(triplet.dstId < 0){
    triplet.sendToDst(mutable.Set[VertexId](triplet.srcId))
    }
    },
    (oldSet, newSet) => oldSet ++ newSet // Merge Message
    )
    filteredGraph = filteredGraph.joinVertices(temp)((id, oldSet, newSet) => newSet).cache()
    val temp2: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
    triplet => {
    if(triplet.dstId > 0){
    triplet.sendToDst(triplet.srcAttr.filter(id=>triplet.dstId!=id && triplet.dstId < id))
    }
    },
    (oldSet, newSet) => oldSet ++ newSet // Merge Message
    )
    val candidatesRDD: RDD[(Long, List[Long])] = temp2.map(vertex => {
    (vertex._1.asInstanceOf[Long], vertex._2.asInstanceOf[ mutable.Set[Long]].toList)
    })


    val newNames = Seq("rid", "candidates")
    val candidatesDF = candidatesRDD.toDF(newNames: _*)
    val candidatesDFMod = candidatesDF.withColumn("candidates", explode($"candidates"))
    candidatesDFMod.show


    If I do the computation, I get after several time a java.lang.OutOfMemoryError: Java heap space Exeception of one executor. After this Spark tries to compute it again, so it restarts the stages, but it will end up in the same exception again. Why does this happen? The computation fills the complete 10G executor memory. Does I have the wrong configuration of Spark? I tried several permutation of my spark-defaults.conf. I tried 3 Executor per node and more, I changed the memory size and so on. But everytime it ends up in the same exception.



    Maybe someone has an idea for this problem?



    Kind regards



    Sedir Mohammed










    share|improve this question
























      up vote
      0
      down vote

      favorite









      up vote
      0
      down vote

      favorite











      I have a problem with the Spark GraphX module. I have a 5 node cluster, with 23.5G Memory and 24 cores per node. I use the spark-shell for submitting my code, so I use Spark in client mode.
      In my configuration I have 1 master node and 4 slave nodes. This is my spark-defaults.conf:



      spark.executor.instances                8
      spark.executor.memory 10g
      spark.driver.memory 18g
      spark.executor.cores 10
      spark.driver.cores 18
      spark.default.parallelism 144
      spark.serializer org.apache.spark.serializer.KryoSerializer


      I read and store 2 really small files with together 40mb file size.



      This is my code:



      val input1 = sc.textFile("/home/data/spark/nodes.txt")
      val vertexArray = input1.map(line => (line.toLong, mutable.Set[VertexId]()))

      val input2 = sc.textFile("/home/data/spark/edges.txt")
      val splitRdd = input2.map( line => line.split(" ") )
      val edgeArray = splitRdd.map(line => Edge(line(0).toLong, line(1).toLong, "bla"))

      val vertices: RDD[(VertexId, mutable.Set[VertexId])] = vertexArray
      val edges: RDD[Edge[String]] = edgeArray
      val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)

      var filteredGraph: Graph[mutable.Set[VertexId], String] = graph.mapVertices((vid, vdata) => {
      mutable.Set[VertexId]()
      }).cache()
      val temp: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
      triplet => {
      if(triplet.dstId < 0){
      triplet.sendToDst(mutable.Set[VertexId](triplet.srcId))
      }
      },
      (oldSet, newSet) => oldSet ++ newSet // Merge Message
      )
      filteredGraph = filteredGraph.joinVertices(temp)((id, oldSet, newSet) => newSet).cache()
      val temp2: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
      triplet => {
      if(triplet.dstId > 0){
      triplet.sendToDst(triplet.srcAttr.filter(id=>triplet.dstId!=id && triplet.dstId < id))
      }
      },
      (oldSet, newSet) => oldSet ++ newSet // Merge Message
      )
      val candidatesRDD: RDD[(Long, List[Long])] = temp2.map(vertex => {
      (vertex._1.asInstanceOf[Long], vertex._2.asInstanceOf[ mutable.Set[Long]].toList)
      })


      val newNames = Seq("rid", "candidates")
      val candidatesDF = candidatesRDD.toDF(newNames: _*)
      val candidatesDFMod = candidatesDF.withColumn("candidates", explode($"candidates"))
      candidatesDFMod.show


      If I do the computation, I get after several time a java.lang.OutOfMemoryError: Java heap space Exeception of one executor. After this Spark tries to compute it again, so it restarts the stages, but it will end up in the same exception again. Why does this happen? The computation fills the complete 10G executor memory. Does I have the wrong configuration of Spark? I tried several permutation of my spark-defaults.conf. I tried 3 Executor per node and more, I changed the memory size and so on. But everytime it ends up in the same exception.



      Maybe someone has an idea for this problem?



      Kind regards



      Sedir Mohammed










      share|improve this question













      I have a problem with the Spark GraphX module. I have a 5 node cluster, with 23.5G Memory and 24 cores per node. I use the spark-shell for submitting my code, so I use Spark in client mode.
      In my configuration I have 1 master node and 4 slave nodes. This is my spark-defaults.conf:



      spark.executor.instances                8
      spark.executor.memory 10g
      spark.driver.memory 18g
      spark.executor.cores 10
      spark.driver.cores 18
      spark.default.parallelism 144
      spark.serializer org.apache.spark.serializer.KryoSerializer


      I read and store 2 really small files with together 40mb file size.



      This is my code:



      val input1 = sc.textFile("/home/data/spark/nodes.txt")
      val vertexArray = input1.map(line => (line.toLong, mutable.Set[VertexId]()))

      val input2 = sc.textFile("/home/data/spark/edges.txt")
      val splitRdd = input2.map( line => line.split(" ") )
      val edgeArray = splitRdd.map(line => Edge(line(0).toLong, line(1).toLong, "bla"))

      val vertices: RDD[(VertexId, mutable.Set[VertexId])] = vertexArray
      val edges: RDD[Edge[String]] = edgeArray
      val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)

      var filteredGraph: Graph[mutable.Set[VertexId], String] = graph.mapVertices((vid, vdata) => {
      mutable.Set[VertexId]()
      }).cache()
      val temp: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
      triplet => {
      if(triplet.dstId < 0){
      triplet.sendToDst(mutable.Set[VertexId](triplet.srcId))
      }
      },
      (oldSet, newSet) => oldSet ++ newSet // Merge Message
      )
      filteredGraph = filteredGraph.joinVertices(temp)((id, oldSet, newSet) => newSet).cache()
      val temp2: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
      triplet => {
      if(triplet.dstId > 0){
      triplet.sendToDst(triplet.srcAttr.filter(id=>triplet.dstId!=id && triplet.dstId < id))
      }
      },
      (oldSet, newSet) => oldSet ++ newSet // Merge Message
      )
      val candidatesRDD: RDD[(Long, List[Long])] = temp2.map(vertex => {
      (vertex._1.asInstanceOf[Long], vertex._2.asInstanceOf[ mutable.Set[Long]].toList)
      })


      val newNames = Seq("rid", "candidates")
      val candidatesDF = candidatesRDD.toDF(newNames: _*)
      val candidatesDFMod = candidatesDF.withColumn("candidates", explode($"candidates"))
      candidatesDFMod.show


      If I do the computation, I get after several time a java.lang.OutOfMemoryError: Java heap space Exeception of one executor. After this Spark tries to compute it again, so it restarts the stages, but it will end up in the same exception again. Why does this happen? The computation fills the complete 10G executor memory. Does I have the wrong configuration of Spark? I tried several permutation of my spark-defaults.conf. I tried 3 Executor per node and more, I changed the memory size and so on. But everytime it ends up in the same exception.



      Maybe someone has an idea for this problem?



      Kind regards



      Sedir Mohammed







      java scala apache-spark rdd spark-graphx






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 12 at 14:31









      Sedir Mohammed

      255




      255
























          1 Answer
          1






          active

          oldest

          votes

















          up vote
          0
          down vote













          spark.executor.instances 8



          spark.executor.cores 10



          val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
          ^^^



          There is no point in having 8 executors with 10 cores each available if you make only 4 partitions. Bear in mind that all partitions on an executor together must fit in memory in order to avoid GC thrashing. Try using a larger number of partitions such that 10 partitions easily fit in memory, maybe something like a few hundred MB of input data per partition.
          Also, please check that the 10GB of RAM are actually available on every worker node and 16GB on the machine where the driver is executed - if there is no RAM available on some of your workers, you can reduce the number of cores and the amount of memory in your Spark configuration.






          share|improve this answer























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53264326%2fspark-graphx-java-lang-outofmemoryerror%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes








            up vote
            0
            down vote













            spark.executor.instances 8



            spark.executor.cores 10



            val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
            ^^^



            There is no point in having 8 executors with 10 cores each available if you make only 4 partitions. Bear in mind that all partitions on an executor together must fit in memory in order to avoid GC thrashing. Try using a larger number of partitions such that 10 partitions easily fit in memory, maybe something like a few hundred MB of input data per partition.
            Also, please check that the 10GB of RAM are actually available on every worker node and 16GB on the machine where the driver is executed - if there is no RAM available on some of your workers, you can reduce the number of cores and the amount of memory in your Spark configuration.






            share|improve this answer



























              up vote
              0
              down vote













              spark.executor.instances 8



              spark.executor.cores 10



              val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
              ^^^



              There is no point in having 8 executors with 10 cores each available if you make only 4 partitions. Bear in mind that all partitions on an executor together must fit in memory in order to avoid GC thrashing. Try using a larger number of partitions such that 10 partitions easily fit in memory, maybe something like a few hundred MB of input data per partition.
              Also, please check that the 10GB of RAM are actually available on every worker node and 16GB on the machine where the driver is executed - if there is no RAM available on some of your workers, you can reduce the number of cores and the amount of memory in your Spark configuration.






              share|improve this answer

























                up vote
                0
                down vote










                up vote
                0
                down vote









                spark.executor.instances 8



                spark.executor.cores 10



                val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
                ^^^



                There is no point in having 8 executors with 10 cores each available if you make only 4 partitions. Bear in mind that all partitions on an executor together must fit in memory in order to avoid GC thrashing. Try using a larger number of partitions such that 10 partitions easily fit in memory, maybe something like a few hundred MB of input data per partition.
                Also, please check that the 10GB of RAM are actually available on every worker node and 16GB on the machine where the driver is executed - if there is no RAM available on some of your workers, you can reduce the number of cores and the amount of memory in your Spark configuration.






                share|improve this answer














                spark.executor.instances 8



                spark.executor.cores 10



                val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
                ^^^



                There is no point in having 8 executors with 10 cores each available if you make only 4 partitions. Bear in mind that all partitions on an executor together must fit in memory in order to avoid GC thrashing. Try using a larger number of partitions such that 10 partitions easily fit in memory, maybe something like a few hundred MB of input data per partition.
                Also, please check that the 10GB of RAM are actually available on every worker node and 16GB on the machine where the driver is executed - if there is no RAM available on some of your workers, you can reduce the number of cores and the amount of memory in your Spark configuration.







                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Nov 13 at 8:19

























                answered Nov 13 at 8:06









                Bernhard Stadler

                511512




                511512






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.





                    Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


                    Please pay close attention to the following guidance:


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53264326%2fspark-graphx-java-lang-outofmemoryerror%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Guess what letter conforming each word

                    Port of Spain

                    Run scheduled task as local user group (not BUILTIN)