Spark Join same data set multiple times on different columns
I have below two data sets.
code,name
IN,India
US,United States
UK,United Kingdom
SG,Singapore
id,name,code1,code2,code3
1,abc,UK,SG,US
2,efg,SG,UK,US
Can we join code1, code2 and code3 with first dataset and get name for each column?
id,name,code1desc,code2desc,code3desc
1,abc,United Kingdom,Singapore,United States
2,efg,Singapore,United Kingdom,United States
The first column join is working, however second column is failing.
Dataset<Row> code1 = people.join(countries, people.col("code1").equalTo(countries.col("code")),"left_outer").withColumnRenamed("name","code1desc");
code1.show();
The code below is failing:
Dataset<Row> code2 = code1.join(countries, code1.col("code2").equalTo(countries.col("code")),"left_outer");
code2.show();
apache-spark apache-spark-sql
add a comment |
I have below two data sets.
code,name
IN,India
US,United States
UK,United Kingdom
SG,Singapore
id,name,code1,code2,code3
1,abc,UK,SG,US
2,efg,SG,UK,US
Can we join code1, code2 and code3 with first dataset and get name for each column?
id,name,code1desc,code2desc,code3desc
1,abc,United Kingdom,Singapore,United States
2,efg,Singapore,United Kingdom,United States
The first column join is working, however second column is failing.
Dataset<Row> code1 = people.join(countries, people.col("code1").equalTo(countries.col("code")),"left_outer").withColumnRenamed("name","code1desc");
code1.show();
The code below is failing:
Dataset<Row> code2 = code1.join(countries, code1.col("code2").equalTo(countries.col("code")),"left_outer");
code2.show();
apache-spark apache-spark-sql
add a comment |
I have below two data sets.
code,name
IN,India
US,United States
UK,United Kingdom
SG,Singapore
id,name,code1,code2,code3
1,abc,UK,SG,US
2,efg,SG,UK,US
Can we join code1, code2 and code3 with first dataset and get name for each column?
id,name,code1desc,code2desc,code3desc
1,abc,United Kingdom,Singapore,United States
2,efg,Singapore,United Kingdom,United States
The first column join is working, however second column is failing.
Dataset<Row> code1 = people.join(countries, people.col("code1").equalTo(countries.col("code")),"left_outer").withColumnRenamed("name","code1desc");
code1.show();
The code below is failing:
Dataset<Row> code2 = code1.join(countries, code1.col("code2").equalTo(countries.col("code")),"left_outer");
code2.show();
apache-spark apache-spark-sql
I have below two data sets.
code,name
IN,India
US,United States
UK,United Kingdom
SG,Singapore
id,name,code1,code2,code3
1,abc,UK,SG,US
2,efg,SG,UK,US
Can we join code1, code2 and code3 with first dataset and get name for each column?
id,name,code1desc,code2desc,code3desc
1,abc,United Kingdom,Singapore,United States
2,efg,Singapore,United Kingdom,United States
The first column join is working, however second column is failing.
Dataset<Row> code1 = people.join(countries, people.col("code1").equalTo(countries.col("code")),"left_outer").withColumnRenamed("name","code1desc");
code1.show();
The code below is failing:
Dataset<Row> code2 = code1.join(countries, code1.col("code2").equalTo(countries.col("code")),"left_outer");
code2.show();
apache-spark apache-spark-sql
apache-spark apache-spark-sql
edited Nov 18 '18 at 7:12
BSMP
2,54952334
2,54952334
asked Nov 18 '18 at 5:30
stacksrstacksr
1
1
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
For each peoples "code[i]" column, join with countries is required, can be done in loop, on Scala:
// data
val countries = List(
("IN", "India"),
("US", "United States"),
("UK", "United Kingdom"),
("SG", "Singapore")
).toDF("code", "name")
val people = List(
(1, "abc", "UK", "SG", "US"),
(2, "efg", "SG", "UK", "US")
).toDF("id", "name", "code1", "code2", "code3")
// action
val countryColumns = List("code1", "code2", "code3")
val result = countryColumns.foldLeft(people)((people, column) =>
people.alias("p")
.join(countries.withColumnRenamed("name", column + "desc").alias("c"),
col("p." + column) === $"c.code",
"left_outer")
.drop(column, "code")
)
Result is:
+---+----+--------------+--------------+-------------+
|id |name|code1desc |code2desc |code3desc |
+---+----+--------------+--------------+-------------+
|1 |abc |United Kingdom|Singapore |United States|
|2 |efg |Singapore |United Kingdom|United States|
+---+----+--------------+--------------+-------------+
Note: if "countries" dataframe is small, broadcast join can be used for better performance.
add a comment |
You could use a udf
assuming your country code dataframe is small enough. First we will collect the codes into a map then apply the udf on each code column.
code_df
is your country_code dataframe while data_df
is your data.
import org.apache.spark.sql.functions._
val mapcode = code_df.rdd.keyBy(row => row(0)).collectAsMap()
println("Showing 10 rows of mapcode")
for ((k,v) <- mapcode) {
printf("key: %s, value: %sn", k, v)
}
def getCode( code: String ) : String = {
val desc = mapcode(code).getAs[String](1)
return desc
}
val getcode_udf = udf(getCode _)
val newdatadf = data_df.withColumn("code1desc", getcode_udf($"code1"))
.withColumn("code2desc", getcode_udf($"code2"))
.withColumn("code3desc", getcode_udf($"code3"))
println("Showing 10 rows of final result")
newdatadf.show(10, truncate = false)
Following is the result :
Showing 10 rows of mapcode
key: IN, value: [IN,India]
key: SG, value: [SG,Singapore]
key: UK, value: [UK,United Kingdom]
key: US, value: [US,United States]
Showing 10 rows of final result
+---+----+-----+-----+-----+--------------+--------------+-------------+
|id |name|code1|code2|code3|code1desc |code2desc |code3desc |
+---+----+-----+-----+-----+--------------+--------------+-------------+
|1 |abc |UK |SG |US |United Kingdom|Singapore |United States|
|2 |efg |SG |UK |US |Singapore |United Kingdom|United States|
+---+----+-----+-----+-----+--------------+--------------+-------------+
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53358183%2fspark-join-same-data-set-multiple-times-on-different-columns%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
For each peoples "code[i]" column, join with countries is required, can be done in loop, on Scala:
// data
val countries = List(
("IN", "India"),
("US", "United States"),
("UK", "United Kingdom"),
("SG", "Singapore")
).toDF("code", "name")
val people = List(
(1, "abc", "UK", "SG", "US"),
(2, "efg", "SG", "UK", "US")
).toDF("id", "name", "code1", "code2", "code3")
// action
val countryColumns = List("code1", "code2", "code3")
val result = countryColumns.foldLeft(people)((people, column) =>
people.alias("p")
.join(countries.withColumnRenamed("name", column + "desc").alias("c"),
col("p." + column) === $"c.code",
"left_outer")
.drop(column, "code")
)
Result is:
+---+----+--------------+--------------+-------------+
|id |name|code1desc |code2desc |code3desc |
+---+----+--------------+--------------+-------------+
|1 |abc |United Kingdom|Singapore |United States|
|2 |efg |Singapore |United Kingdom|United States|
+---+----+--------------+--------------+-------------+
Note: if "countries" dataframe is small, broadcast join can be used for better performance.
add a comment |
For each peoples "code[i]" column, join with countries is required, can be done in loop, on Scala:
// data
val countries = List(
("IN", "India"),
("US", "United States"),
("UK", "United Kingdom"),
("SG", "Singapore")
).toDF("code", "name")
val people = List(
(1, "abc", "UK", "SG", "US"),
(2, "efg", "SG", "UK", "US")
).toDF("id", "name", "code1", "code2", "code3")
// action
val countryColumns = List("code1", "code2", "code3")
val result = countryColumns.foldLeft(people)((people, column) =>
people.alias("p")
.join(countries.withColumnRenamed("name", column + "desc").alias("c"),
col("p." + column) === $"c.code",
"left_outer")
.drop(column, "code")
)
Result is:
+---+----+--------------+--------------+-------------+
|id |name|code1desc |code2desc |code3desc |
+---+----+--------------+--------------+-------------+
|1 |abc |United Kingdom|Singapore |United States|
|2 |efg |Singapore |United Kingdom|United States|
+---+----+--------------+--------------+-------------+
Note: if "countries" dataframe is small, broadcast join can be used for better performance.
add a comment |
For each peoples "code[i]" column, join with countries is required, can be done in loop, on Scala:
// data
val countries = List(
("IN", "India"),
("US", "United States"),
("UK", "United Kingdom"),
("SG", "Singapore")
).toDF("code", "name")
val people = List(
(1, "abc", "UK", "SG", "US"),
(2, "efg", "SG", "UK", "US")
).toDF("id", "name", "code1", "code2", "code3")
// action
val countryColumns = List("code1", "code2", "code3")
val result = countryColumns.foldLeft(people)((people, column) =>
people.alias("p")
.join(countries.withColumnRenamed("name", column + "desc").alias("c"),
col("p." + column) === $"c.code",
"left_outer")
.drop(column, "code")
)
Result is:
+---+----+--------------+--------------+-------------+
|id |name|code1desc |code2desc |code3desc |
+---+----+--------------+--------------+-------------+
|1 |abc |United Kingdom|Singapore |United States|
|2 |efg |Singapore |United Kingdom|United States|
+---+----+--------------+--------------+-------------+
Note: if "countries" dataframe is small, broadcast join can be used for better performance.
For each peoples "code[i]" column, join with countries is required, can be done in loop, on Scala:
// data
val countries = List(
("IN", "India"),
("US", "United States"),
("UK", "United Kingdom"),
("SG", "Singapore")
).toDF("code", "name")
val people = List(
(1, "abc", "UK", "SG", "US"),
(2, "efg", "SG", "UK", "US")
).toDF("id", "name", "code1", "code2", "code3")
// action
val countryColumns = List("code1", "code2", "code3")
val result = countryColumns.foldLeft(people)((people, column) =>
people.alias("p")
.join(countries.withColumnRenamed("name", column + "desc").alias("c"),
col("p." + column) === $"c.code",
"left_outer")
.drop(column, "code")
)
Result is:
+---+----+--------------+--------------+-------------+
|id |name|code1desc |code2desc |code3desc |
+---+----+--------------+--------------+-------------+
|1 |abc |United Kingdom|Singapore |United States|
|2 |efg |Singapore |United Kingdom|United States|
+---+----+--------------+--------------+-------------+
Note: if "countries" dataframe is small, broadcast join can be used for better performance.
answered Nov 18 '18 at 9:01
pasha701pasha701
3,2321613
3,2321613
add a comment |
add a comment |
You could use a udf
assuming your country code dataframe is small enough. First we will collect the codes into a map then apply the udf on each code column.
code_df
is your country_code dataframe while data_df
is your data.
import org.apache.spark.sql.functions._
val mapcode = code_df.rdd.keyBy(row => row(0)).collectAsMap()
println("Showing 10 rows of mapcode")
for ((k,v) <- mapcode) {
printf("key: %s, value: %sn", k, v)
}
def getCode( code: String ) : String = {
val desc = mapcode(code).getAs[String](1)
return desc
}
val getcode_udf = udf(getCode _)
val newdatadf = data_df.withColumn("code1desc", getcode_udf($"code1"))
.withColumn("code2desc", getcode_udf($"code2"))
.withColumn("code3desc", getcode_udf($"code3"))
println("Showing 10 rows of final result")
newdatadf.show(10, truncate = false)
Following is the result :
Showing 10 rows of mapcode
key: IN, value: [IN,India]
key: SG, value: [SG,Singapore]
key: UK, value: [UK,United Kingdom]
key: US, value: [US,United States]
Showing 10 rows of final result
+---+----+-----+-----+-----+--------------+--------------+-------------+
|id |name|code1|code2|code3|code1desc |code2desc |code3desc |
+---+----+-----+-----+-----+--------------+--------------+-------------+
|1 |abc |UK |SG |US |United Kingdom|Singapore |United States|
|2 |efg |SG |UK |US |Singapore |United Kingdom|United States|
+---+----+-----+-----+-----+--------------+--------------+-------------+
add a comment |
You could use a udf
assuming your country code dataframe is small enough. First we will collect the codes into a map then apply the udf on each code column.
code_df
is your country_code dataframe while data_df
is your data.
import org.apache.spark.sql.functions._
val mapcode = code_df.rdd.keyBy(row => row(0)).collectAsMap()
println("Showing 10 rows of mapcode")
for ((k,v) <- mapcode) {
printf("key: %s, value: %sn", k, v)
}
def getCode( code: String ) : String = {
val desc = mapcode(code).getAs[String](1)
return desc
}
val getcode_udf = udf(getCode _)
val newdatadf = data_df.withColumn("code1desc", getcode_udf($"code1"))
.withColumn("code2desc", getcode_udf($"code2"))
.withColumn("code3desc", getcode_udf($"code3"))
println("Showing 10 rows of final result")
newdatadf.show(10, truncate = false)
Following is the result :
Showing 10 rows of mapcode
key: IN, value: [IN,India]
key: SG, value: [SG,Singapore]
key: UK, value: [UK,United Kingdom]
key: US, value: [US,United States]
Showing 10 rows of final result
+---+----+-----+-----+-----+--------------+--------------+-------------+
|id |name|code1|code2|code3|code1desc |code2desc |code3desc |
+---+----+-----+-----+-----+--------------+--------------+-------------+
|1 |abc |UK |SG |US |United Kingdom|Singapore |United States|
|2 |efg |SG |UK |US |Singapore |United Kingdom|United States|
+---+----+-----+-----+-----+--------------+--------------+-------------+
add a comment |
You could use a udf
assuming your country code dataframe is small enough. First we will collect the codes into a map then apply the udf on each code column.
code_df
is your country_code dataframe while data_df
is your data.
import org.apache.spark.sql.functions._
val mapcode = code_df.rdd.keyBy(row => row(0)).collectAsMap()
println("Showing 10 rows of mapcode")
for ((k,v) <- mapcode) {
printf("key: %s, value: %sn", k, v)
}
def getCode( code: String ) : String = {
val desc = mapcode(code).getAs[String](1)
return desc
}
val getcode_udf = udf(getCode _)
val newdatadf = data_df.withColumn("code1desc", getcode_udf($"code1"))
.withColumn("code2desc", getcode_udf($"code2"))
.withColumn("code3desc", getcode_udf($"code3"))
println("Showing 10 rows of final result")
newdatadf.show(10, truncate = false)
Following is the result :
Showing 10 rows of mapcode
key: IN, value: [IN,India]
key: SG, value: [SG,Singapore]
key: UK, value: [UK,United Kingdom]
key: US, value: [US,United States]
Showing 10 rows of final result
+---+----+-----+-----+-----+--------------+--------------+-------------+
|id |name|code1|code2|code3|code1desc |code2desc |code3desc |
+---+----+-----+-----+-----+--------------+--------------+-------------+
|1 |abc |UK |SG |US |United Kingdom|Singapore |United States|
|2 |efg |SG |UK |US |Singapore |United Kingdom|United States|
+---+----+-----+-----+-----+--------------+--------------+-------------+
You could use a udf
assuming your country code dataframe is small enough. First we will collect the codes into a map then apply the udf on each code column.
code_df
is your country_code dataframe while data_df
is your data.
import org.apache.spark.sql.functions._
val mapcode = code_df.rdd.keyBy(row => row(0)).collectAsMap()
println("Showing 10 rows of mapcode")
for ((k,v) <- mapcode) {
printf("key: %s, value: %sn", k, v)
}
def getCode( code: String ) : String = {
val desc = mapcode(code).getAs[String](1)
return desc
}
val getcode_udf = udf(getCode _)
val newdatadf = data_df.withColumn("code1desc", getcode_udf($"code1"))
.withColumn("code2desc", getcode_udf($"code2"))
.withColumn("code3desc", getcode_udf($"code3"))
println("Showing 10 rows of final result")
newdatadf.show(10, truncate = false)
Following is the result :
Showing 10 rows of mapcode
key: IN, value: [IN,India]
key: SG, value: [SG,Singapore]
key: UK, value: [UK,United Kingdom]
key: US, value: [US,United States]
Showing 10 rows of final result
+---+----+-----+-----+-----+--------------+--------------+-------------+
|id |name|code1|code2|code3|code1desc |code2desc |code3desc |
+---+----+-----+-----+-----+--------------+--------------+-------------+
|1 |abc |UK |SG |US |United Kingdom|Singapore |United States|
|2 |efg |SG |UK |US |Singapore |United Kingdom|United States|
+---+----+-----+-----+-----+--------------+--------------+-------------+
answered Nov 18 '18 at 9:32
user238607user238607
653712
653712
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53358183%2fspark-join-same-data-set-multiple-times-on-different-columns%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown