Spark Graphx java.lang.OutOfMemoryError
up vote
0
down vote
favorite
I have a problem with the Spark GraphX module. I have a 5 node cluster, with 23.5G Memory and 24 cores per node. I use the spark-shell for submitting my code, so I use Spark in client mode.
In my configuration I have 1 master node and 4 slave nodes. This is my spark-defaults.conf:
spark.executor.instances 8
spark.executor.memory 10g
spark.driver.memory 18g
spark.executor.cores 10
spark.driver.cores 18
spark.default.parallelism 144
spark.serializer org.apache.spark.serializer.KryoSerializer
I read and store 2 really small files with together 40mb file size.
This is my code:
val input1 = sc.textFile("/home/data/spark/nodes.txt")
val vertexArray = input1.map(line => (line.toLong, mutable.Set[VertexId]()))
val input2 = sc.textFile("/home/data/spark/edges.txt")
val splitRdd = input2.map( line => line.split(" ") )
val edgeArray = splitRdd.map(line => Edge(line(0).toLong, line(1).toLong, "bla"))
val vertices: RDD[(VertexId, mutable.Set[VertexId])] = vertexArray
val edges: RDD[Edge[String]] = edgeArray
val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
var filteredGraph: Graph[mutable.Set[VertexId], String] = graph.mapVertices((vid, vdata) => {
mutable.Set[VertexId]()
}).cache()
val temp: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
triplet => {
if(triplet.dstId < 0){
triplet.sendToDst(mutable.Set[VertexId](triplet.srcId))
}
},
(oldSet, newSet) => oldSet ++ newSet // Merge Message
)
filteredGraph = filteredGraph.joinVertices(temp)((id, oldSet, newSet) => newSet).cache()
val temp2: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
triplet => {
if(triplet.dstId > 0){
triplet.sendToDst(triplet.srcAttr.filter(id=>triplet.dstId!=id && triplet.dstId < id))
}
},
(oldSet, newSet) => oldSet ++ newSet // Merge Message
)
val candidatesRDD: RDD[(Long, List[Long])] = temp2.map(vertex => {
(vertex._1.asInstanceOf[Long], vertex._2.asInstanceOf[ mutable.Set[Long]].toList)
})
val newNames = Seq("rid", "candidates")
val candidatesDF = candidatesRDD.toDF(newNames: _*)
val candidatesDFMod = candidatesDF.withColumn("candidates", explode($"candidates"))
candidatesDFMod.show
If I do the computation, I get after several time a java.lang.OutOfMemoryError: Java heap space
Exeception of one executor. After this Spark tries to compute it again, so it restarts the stages, but it will end up in the same exception again. Why does this happen? The computation fills the complete 10G executor memory. Does I have the wrong configuration of Spark? I tried several permutation of my spark-defaults.conf. I tried 3 Executor per node and more, I changed the memory size and so on. But everytime it ends up in the same exception.
Maybe someone has an idea for this problem?
Kind regards
Sedir Mohammed
java scala apache-spark rdd spark-graphx
add a comment |
up vote
0
down vote
favorite
I have a problem with the Spark GraphX module. I have a 5 node cluster, with 23.5G Memory and 24 cores per node. I use the spark-shell for submitting my code, so I use Spark in client mode.
In my configuration I have 1 master node and 4 slave nodes. This is my spark-defaults.conf:
spark.executor.instances 8
spark.executor.memory 10g
spark.driver.memory 18g
spark.executor.cores 10
spark.driver.cores 18
spark.default.parallelism 144
spark.serializer org.apache.spark.serializer.KryoSerializer
I read and store 2 really small files with together 40mb file size.
This is my code:
val input1 = sc.textFile("/home/data/spark/nodes.txt")
val vertexArray = input1.map(line => (line.toLong, mutable.Set[VertexId]()))
val input2 = sc.textFile("/home/data/spark/edges.txt")
val splitRdd = input2.map( line => line.split(" ") )
val edgeArray = splitRdd.map(line => Edge(line(0).toLong, line(1).toLong, "bla"))
val vertices: RDD[(VertexId, mutable.Set[VertexId])] = vertexArray
val edges: RDD[Edge[String]] = edgeArray
val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
var filteredGraph: Graph[mutable.Set[VertexId], String] = graph.mapVertices((vid, vdata) => {
mutable.Set[VertexId]()
}).cache()
val temp: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
triplet => {
if(triplet.dstId < 0){
triplet.sendToDst(mutable.Set[VertexId](triplet.srcId))
}
},
(oldSet, newSet) => oldSet ++ newSet // Merge Message
)
filteredGraph = filteredGraph.joinVertices(temp)((id, oldSet, newSet) => newSet).cache()
val temp2: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
triplet => {
if(triplet.dstId > 0){
triplet.sendToDst(triplet.srcAttr.filter(id=>triplet.dstId!=id && triplet.dstId < id))
}
},
(oldSet, newSet) => oldSet ++ newSet // Merge Message
)
val candidatesRDD: RDD[(Long, List[Long])] = temp2.map(vertex => {
(vertex._1.asInstanceOf[Long], vertex._2.asInstanceOf[ mutable.Set[Long]].toList)
})
val newNames = Seq("rid", "candidates")
val candidatesDF = candidatesRDD.toDF(newNames: _*)
val candidatesDFMod = candidatesDF.withColumn("candidates", explode($"candidates"))
candidatesDFMod.show
If I do the computation, I get after several time a java.lang.OutOfMemoryError: Java heap space
Exeception of one executor. After this Spark tries to compute it again, so it restarts the stages, but it will end up in the same exception again. Why does this happen? The computation fills the complete 10G executor memory. Does I have the wrong configuration of Spark? I tried several permutation of my spark-defaults.conf. I tried 3 Executor per node and more, I changed the memory size and so on. But everytime it ends up in the same exception.
Maybe someone has an idea for this problem?
Kind regards
Sedir Mohammed
java scala apache-spark rdd spark-graphx
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I have a problem with the Spark GraphX module. I have a 5 node cluster, with 23.5G Memory and 24 cores per node. I use the spark-shell for submitting my code, so I use Spark in client mode.
In my configuration I have 1 master node and 4 slave nodes. This is my spark-defaults.conf:
spark.executor.instances 8
spark.executor.memory 10g
spark.driver.memory 18g
spark.executor.cores 10
spark.driver.cores 18
spark.default.parallelism 144
spark.serializer org.apache.spark.serializer.KryoSerializer
I read and store 2 really small files with together 40mb file size.
This is my code:
val input1 = sc.textFile("/home/data/spark/nodes.txt")
val vertexArray = input1.map(line => (line.toLong, mutable.Set[VertexId]()))
val input2 = sc.textFile("/home/data/spark/edges.txt")
val splitRdd = input2.map( line => line.split(" ") )
val edgeArray = splitRdd.map(line => Edge(line(0).toLong, line(1).toLong, "bla"))
val vertices: RDD[(VertexId, mutable.Set[VertexId])] = vertexArray
val edges: RDD[Edge[String]] = edgeArray
val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
var filteredGraph: Graph[mutable.Set[VertexId], String] = graph.mapVertices((vid, vdata) => {
mutable.Set[VertexId]()
}).cache()
val temp: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
triplet => {
if(triplet.dstId < 0){
triplet.sendToDst(mutable.Set[VertexId](triplet.srcId))
}
},
(oldSet, newSet) => oldSet ++ newSet // Merge Message
)
filteredGraph = filteredGraph.joinVertices(temp)((id, oldSet, newSet) => newSet).cache()
val temp2: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
triplet => {
if(triplet.dstId > 0){
triplet.sendToDst(triplet.srcAttr.filter(id=>triplet.dstId!=id && triplet.dstId < id))
}
},
(oldSet, newSet) => oldSet ++ newSet // Merge Message
)
val candidatesRDD: RDD[(Long, List[Long])] = temp2.map(vertex => {
(vertex._1.asInstanceOf[Long], vertex._2.asInstanceOf[ mutable.Set[Long]].toList)
})
val newNames = Seq("rid", "candidates")
val candidatesDF = candidatesRDD.toDF(newNames: _*)
val candidatesDFMod = candidatesDF.withColumn("candidates", explode($"candidates"))
candidatesDFMod.show
If I do the computation, I get after several time a java.lang.OutOfMemoryError: Java heap space
Exeception of one executor. After this Spark tries to compute it again, so it restarts the stages, but it will end up in the same exception again. Why does this happen? The computation fills the complete 10G executor memory. Does I have the wrong configuration of Spark? I tried several permutation of my spark-defaults.conf. I tried 3 Executor per node and more, I changed the memory size and so on. But everytime it ends up in the same exception.
Maybe someone has an idea for this problem?
Kind regards
Sedir Mohammed
java scala apache-spark rdd spark-graphx
I have a problem with the Spark GraphX module. I have a 5 node cluster, with 23.5G Memory and 24 cores per node. I use the spark-shell for submitting my code, so I use Spark in client mode.
In my configuration I have 1 master node and 4 slave nodes. This is my spark-defaults.conf:
spark.executor.instances 8
spark.executor.memory 10g
spark.driver.memory 18g
spark.executor.cores 10
spark.driver.cores 18
spark.default.parallelism 144
spark.serializer org.apache.spark.serializer.KryoSerializer
I read and store 2 really small files with together 40mb file size.
This is my code:
val input1 = sc.textFile("/home/data/spark/nodes.txt")
val vertexArray = input1.map(line => (line.toLong, mutable.Set[VertexId]()))
val input2 = sc.textFile("/home/data/spark/edges.txt")
val splitRdd = input2.map( line => line.split(" ") )
val edgeArray = splitRdd.map(line => Edge(line(0).toLong, line(1).toLong, "bla"))
val vertices: RDD[(VertexId, mutable.Set[VertexId])] = vertexArray
val edges: RDD[Edge[String]] = edgeArray
val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
var filteredGraph: Graph[mutable.Set[VertexId], String] = graph.mapVertices((vid, vdata) => {
mutable.Set[VertexId]()
}).cache()
val temp: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
triplet => {
if(triplet.dstId < 0){
triplet.sendToDst(mutable.Set[VertexId](triplet.srcId))
}
},
(oldSet, newSet) => oldSet ++ newSet // Merge Message
)
filteredGraph = filteredGraph.joinVertices(temp)((id, oldSet, newSet) => newSet).cache()
val temp2: VertexRDD[mutable.Set[VertexId]] = filteredGraph.aggregateMessages[mutable.Set[VertexId]](
triplet => {
if(triplet.dstId > 0){
triplet.sendToDst(triplet.srcAttr.filter(id=>triplet.dstId!=id && triplet.dstId < id))
}
},
(oldSet, newSet) => oldSet ++ newSet // Merge Message
)
val candidatesRDD: RDD[(Long, List[Long])] = temp2.map(vertex => {
(vertex._1.asInstanceOf[Long], vertex._2.asInstanceOf[ mutable.Set[Long]].toList)
})
val newNames = Seq("rid", "candidates")
val candidatesDF = candidatesRDD.toDF(newNames: _*)
val candidatesDFMod = candidatesDF.withColumn("candidates", explode($"candidates"))
candidatesDFMod.show
If I do the computation, I get after several time a java.lang.OutOfMemoryError: Java heap space
Exeception of one executor. After this Spark tries to compute it again, so it restarts the stages, but it will end up in the same exception again. Why does this happen? The computation fills the complete 10G executor memory. Does I have the wrong configuration of Spark? I tried several permutation of my spark-defaults.conf. I tried 3 Executor per node and more, I changed the memory size and so on. But everytime it ends up in the same exception.
Maybe someone has an idea for this problem?
Kind regards
Sedir Mohammed
java scala apache-spark rdd spark-graphx
java scala apache-spark rdd spark-graphx
asked Nov 12 at 14:31
Sedir Mohammed
255
255
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
up vote
0
down vote
spark.executor.instances 8
spark.executor.cores 10
val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
^^^
There is no point in having 8 executors with 10 cores each available if you make only 4 partitions. Bear in mind that all partitions on an executor together must fit in memory in order to avoid GC thrashing. Try using a larger number of partitions such that 10 partitions easily fit in memory, maybe something like a few hundred MB of input data per partition.
Also, please check that the 10GB of RAM are actually available on every worker node and 16GB on the machine where the driver is executed - if there is no RAM available on some of your workers, you can reduce the number of cores and the amount of memory in your Spark configuration.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53264326%2fspark-graphx-java-lang-outofmemoryerror%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
0
down vote
spark.executor.instances 8
spark.executor.cores 10
val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
^^^
There is no point in having 8 executors with 10 cores each available if you make only 4 partitions. Bear in mind that all partitions on an executor together must fit in memory in order to avoid GC thrashing. Try using a larger number of partitions such that 10 partitions easily fit in memory, maybe something like a few hundred MB of input data per partition.
Also, please check that the 10GB of RAM are actually available on every worker node and 16GB on the machine where the driver is executed - if there is no RAM available on some of your workers, you can reduce the number of cores and the amount of memory in your Spark configuration.
add a comment |
up vote
0
down vote
spark.executor.instances 8
spark.executor.cores 10
val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
^^^
There is no point in having 8 executors with 10 cores each available if you make only 4 partitions. Bear in mind that all partitions on an executor together must fit in memory in order to avoid GC thrashing. Try using a larger number of partitions such that 10 partitions easily fit in memory, maybe something like a few hundred MB of input data per partition.
Also, please check that the 10GB of RAM are actually available on every worker node and 16GB on the machine where the driver is executed - if there is no RAM available on some of your workers, you can reduce the number of cores and the amount of memory in your Spark configuration.
add a comment |
up vote
0
down vote
up vote
0
down vote
spark.executor.instances 8
spark.executor.cores 10
val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
^^^
There is no point in having 8 executors with 10 cores each available if you make only 4 partitions. Bear in mind that all partitions on an executor together must fit in memory in order to avoid GC thrashing. Try using a larger number of partitions such that 10 partitions easily fit in memory, maybe something like a few hundred MB of input data per partition.
Also, please check that the 10GB of RAM are actually available on every worker node and 16GB on the machine where the driver is executed - if there is no RAM available on some of your workers, you can reduce the number of cores and the amount of memory in your Spark configuration.
spark.executor.instances 8
spark.executor.cores 10
val graph = Graph(vertices, edges).partitionBy(EdgePartition2D, 4)
^^^
There is no point in having 8 executors with 10 cores each available if you make only 4 partitions. Bear in mind that all partitions on an executor together must fit in memory in order to avoid GC thrashing. Try using a larger number of partitions such that 10 partitions easily fit in memory, maybe something like a few hundred MB of input data per partition.
Also, please check that the 10GB of RAM are actually available on every worker node and 16GB on the machine where the driver is executed - if there is no RAM available on some of your workers, you can reduce the number of cores and the amount of memory in your Spark configuration.
edited Nov 13 at 8:19
answered Nov 13 at 8:06
Bernhard Stadler
511512
511512
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53264326%2fspark-graphx-java-lang-outofmemoryerror%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown