Spark Join same data set multiple times on different columns












0















I have below two data sets.




code,name
IN,India
US,United States
UK,United Kingdom
SG,Singapore



id,name,code1,code2,code3
1,abc,UK,SG,US
2,efg,SG,UK,US


Can we join code1, code2 and code3 with first dataset and get name for each column?




id,name,code1desc,code2desc,code3desc
1,abc,United Kingdom,Singapore,United States
2,efg,Singapore,United Kingdom,United States


The first column join is working, however second column is failing.



Dataset<Row> code1 = people.join(countries, people.col("code1").equalTo(countries.col("code")),"left_outer").withColumnRenamed("name","code1desc");
code1.show();


The code below is failing:



Dataset<Row> code2 = code1.join(countries, code1.col("code2").equalTo(countries.col("code")),"left_outer");
code2.show();









share|improve this question





























    0















    I have below two data sets.




    code,name
    IN,India
    US,United States
    UK,United Kingdom
    SG,Singapore



    id,name,code1,code2,code3
    1,abc,UK,SG,US
    2,efg,SG,UK,US


    Can we join code1, code2 and code3 with first dataset and get name for each column?




    id,name,code1desc,code2desc,code3desc
    1,abc,United Kingdom,Singapore,United States
    2,efg,Singapore,United Kingdom,United States


    The first column join is working, however second column is failing.



    Dataset<Row> code1 = people.join(countries, people.col("code1").equalTo(countries.col("code")),"left_outer").withColumnRenamed("name","code1desc");
    code1.show();


    The code below is failing:



    Dataset<Row> code2 = code1.join(countries, code1.col("code2").equalTo(countries.col("code")),"left_outer");
    code2.show();









    share|improve this question



























      0












      0








      0


      1






      I have below two data sets.




      code,name
      IN,India
      US,United States
      UK,United Kingdom
      SG,Singapore



      id,name,code1,code2,code3
      1,abc,UK,SG,US
      2,efg,SG,UK,US


      Can we join code1, code2 and code3 with first dataset and get name for each column?




      id,name,code1desc,code2desc,code3desc
      1,abc,United Kingdom,Singapore,United States
      2,efg,Singapore,United Kingdom,United States


      The first column join is working, however second column is failing.



      Dataset<Row> code1 = people.join(countries, people.col("code1").equalTo(countries.col("code")),"left_outer").withColumnRenamed("name","code1desc");
      code1.show();


      The code below is failing:



      Dataset<Row> code2 = code1.join(countries, code1.col("code2").equalTo(countries.col("code")),"left_outer");
      code2.show();









      share|improve this question
















      I have below two data sets.




      code,name
      IN,India
      US,United States
      UK,United Kingdom
      SG,Singapore



      id,name,code1,code2,code3
      1,abc,UK,SG,US
      2,efg,SG,UK,US


      Can we join code1, code2 and code3 with first dataset and get name for each column?




      id,name,code1desc,code2desc,code3desc
      1,abc,United Kingdom,Singapore,United States
      2,efg,Singapore,United Kingdom,United States


      The first column join is working, however second column is failing.



      Dataset<Row> code1 = people.join(countries, people.col("code1").equalTo(countries.col("code")),"left_outer").withColumnRenamed("name","code1desc");
      code1.show();


      The code below is failing:



      Dataset<Row> code2 = code1.join(countries, code1.col("code2").equalTo(countries.col("code")),"left_outer");
      code2.show();






      apache-spark apache-spark-sql






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 18 '18 at 7:12









      BSMP

      2,54952334




      2,54952334










      asked Nov 18 '18 at 5:30









      stacksrstacksr

      1




      1
























          2 Answers
          2






          active

          oldest

          votes


















          0














          For each peoples "code[i]" column, join with countries is required, can be done in loop, on Scala:



          // data 
          val countries = List(
          ("IN", "India"),
          ("US", "United States"),
          ("UK", "United Kingdom"),
          ("SG", "Singapore")
          ).toDF("code", "name")

          val people = List(
          (1, "abc", "UK", "SG", "US"),
          (2, "efg", "SG", "UK", "US")
          ).toDF("id", "name", "code1", "code2", "code3")

          // action
          val countryColumns = List("code1", "code2", "code3")
          val result = countryColumns.foldLeft(people)((people, column) =>
          people.alias("p")
          .join(countries.withColumnRenamed("name", column + "desc").alias("c"),
          col("p." + column) === $"c.code",
          "left_outer")
          .drop(column, "code")
          )


          Result is:



          +---+----+--------------+--------------+-------------+
          |id |name|code1desc |code2desc |code3desc |
          +---+----+--------------+--------------+-------------+
          |1 |abc |United Kingdom|Singapore |United States|
          |2 |efg |Singapore |United Kingdom|United States|
          +---+----+--------------+--------------+-------------+


          Note: if "countries" dataframe is small, broadcast join can be used for better performance.






          share|improve this answer































            0














            You could use a udf assuming your country code dataframe is small enough. First we will collect the codes into a map then apply the udf on each code column.



            code_df is your country_code dataframe while data_df is your data.



            import org.apache.spark.sql.functions._

            val mapcode = code_df.rdd.keyBy(row => row(0)).collectAsMap()
            println("Showing 10 rows of mapcode")

            for ((k,v) <- mapcode) {
            printf("key: %s, value: %sn", k, v)
            }


            def getCode( code: String ) : String = {
            val desc = mapcode(code).getAs[String](1)
            return desc
            }

            val getcode_udf = udf(getCode _)

            val newdatadf = data_df.withColumn("code1desc", getcode_udf($"code1"))
            .withColumn("code2desc", getcode_udf($"code2"))
            .withColumn("code3desc", getcode_udf($"code3"))

            println("Showing 10 rows of final result")
            newdatadf.show(10, truncate = false)


            Following is the result :



            Showing 10 rows of mapcode
            key: IN, value: [IN,India]
            key: SG, value: [SG,Singapore]
            key: UK, value: [UK,United Kingdom]
            key: US, value: [US,United States]
            Showing 10 rows of final result
            +---+----+-----+-----+-----+--------------+--------------+-------------+
            |id |name|code1|code2|code3|code1desc |code2desc |code3desc |
            +---+----+-----+-----+-----+--------------+--------------+-------------+
            |1 |abc |UK |SG |US |United Kingdom|Singapore |United States|
            |2 |efg |SG |UK |US |Singapore |United Kingdom|United States|
            +---+----+-----+-----+-----+--------------+--------------+-------------+





            share|improve this answer























              Your Answer






              StackExchange.ifUsing("editor", function () {
              StackExchange.using("externalEditor", function () {
              StackExchange.using("snippets", function () {
              StackExchange.snippets.init();
              });
              });
              }, "code-snippets");

              StackExchange.ready(function() {
              var channelOptions = {
              tags: "".split(" "),
              id: "1"
              };
              initTagRenderer("".split(" "), "".split(" "), channelOptions);

              StackExchange.using("externalEditor", function() {
              // Have to fire editor after snippets, if snippets enabled
              if (StackExchange.settings.snippets.snippetsEnabled) {
              StackExchange.using("snippets", function() {
              createEditor();
              });
              }
              else {
              createEditor();
              }
              });

              function createEditor() {
              StackExchange.prepareEditor({
              heartbeatType: 'answer',
              autoActivateHeartbeat: false,
              convertImagesToLinks: true,
              noModals: true,
              showLowRepImageUploadWarning: true,
              reputationToPostImages: 10,
              bindNavPrevention: true,
              postfix: "",
              imageUploader: {
              brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
              contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
              allowUrls: true
              },
              onDemand: true,
              discardSelector: ".discard-answer"
              ,immediatelyShowMarkdownHelp:true
              });


              }
              });














              draft saved

              draft discarded


















              StackExchange.ready(
              function () {
              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53358183%2fspark-join-same-data-set-multiple-times-on-different-columns%23new-answer', 'question_page');
              }
              );

              Post as a guest















              Required, but never shown

























              2 Answers
              2






              active

              oldest

              votes








              2 Answers
              2






              active

              oldest

              votes









              active

              oldest

              votes






              active

              oldest

              votes









              0














              For each peoples "code[i]" column, join with countries is required, can be done in loop, on Scala:



              // data 
              val countries = List(
              ("IN", "India"),
              ("US", "United States"),
              ("UK", "United Kingdom"),
              ("SG", "Singapore")
              ).toDF("code", "name")

              val people = List(
              (1, "abc", "UK", "SG", "US"),
              (2, "efg", "SG", "UK", "US")
              ).toDF("id", "name", "code1", "code2", "code3")

              // action
              val countryColumns = List("code1", "code2", "code3")
              val result = countryColumns.foldLeft(people)((people, column) =>
              people.alias("p")
              .join(countries.withColumnRenamed("name", column + "desc").alias("c"),
              col("p." + column) === $"c.code",
              "left_outer")
              .drop(column, "code")
              )


              Result is:



              +---+----+--------------+--------------+-------------+
              |id |name|code1desc |code2desc |code3desc |
              +---+----+--------------+--------------+-------------+
              |1 |abc |United Kingdom|Singapore |United States|
              |2 |efg |Singapore |United Kingdom|United States|
              +---+----+--------------+--------------+-------------+


              Note: if "countries" dataframe is small, broadcast join can be used for better performance.






              share|improve this answer




























                0














                For each peoples "code[i]" column, join with countries is required, can be done in loop, on Scala:



                // data 
                val countries = List(
                ("IN", "India"),
                ("US", "United States"),
                ("UK", "United Kingdom"),
                ("SG", "Singapore")
                ).toDF("code", "name")

                val people = List(
                (1, "abc", "UK", "SG", "US"),
                (2, "efg", "SG", "UK", "US")
                ).toDF("id", "name", "code1", "code2", "code3")

                // action
                val countryColumns = List("code1", "code2", "code3")
                val result = countryColumns.foldLeft(people)((people, column) =>
                people.alias("p")
                .join(countries.withColumnRenamed("name", column + "desc").alias("c"),
                col("p." + column) === $"c.code",
                "left_outer")
                .drop(column, "code")
                )


                Result is:



                +---+----+--------------+--------------+-------------+
                |id |name|code1desc |code2desc |code3desc |
                +---+----+--------------+--------------+-------------+
                |1 |abc |United Kingdom|Singapore |United States|
                |2 |efg |Singapore |United Kingdom|United States|
                +---+----+--------------+--------------+-------------+


                Note: if "countries" dataframe is small, broadcast join can be used for better performance.






                share|improve this answer


























                  0












                  0








                  0







                  For each peoples "code[i]" column, join with countries is required, can be done in loop, on Scala:



                  // data 
                  val countries = List(
                  ("IN", "India"),
                  ("US", "United States"),
                  ("UK", "United Kingdom"),
                  ("SG", "Singapore")
                  ).toDF("code", "name")

                  val people = List(
                  (1, "abc", "UK", "SG", "US"),
                  (2, "efg", "SG", "UK", "US")
                  ).toDF("id", "name", "code1", "code2", "code3")

                  // action
                  val countryColumns = List("code1", "code2", "code3")
                  val result = countryColumns.foldLeft(people)((people, column) =>
                  people.alias("p")
                  .join(countries.withColumnRenamed("name", column + "desc").alias("c"),
                  col("p." + column) === $"c.code",
                  "left_outer")
                  .drop(column, "code")
                  )


                  Result is:



                  +---+----+--------------+--------------+-------------+
                  |id |name|code1desc |code2desc |code3desc |
                  +---+----+--------------+--------------+-------------+
                  |1 |abc |United Kingdom|Singapore |United States|
                  |2 |efg |Singapore |United Kingdom|United States|
                  +---+----+--------------+--------------+-------------+


                  Note: if "countries" dataframe is small, broadcast join can be used for better performance.






                  share|improve this answer













                  For each peoples "code[i]" column, join with countries is required, can be done in loop, on Scala:



                  // data 
                  val countries = List(
                  ("IN", "India"),
                  ("US", "United States"),
                  ("UK", "United Kingdom"),
                  ("SG", "Singapore")
                  ).toDF("code", "name")

                  val people = List(
                  (1, "abc", "UK", "SG", "US"),
                  (2, "efg", "SG", "UK", "US")
                  ).toDF("id", "name", "code1", "code2", "code3")

                  // action
                  val countryColumns = List("code1", "code2", "code3")
                  val result = countryColumns.foldLeft(people)((people, column) =>
                  people.alias("p")
                  .join(countries.withColumnRenamed("name", column + "desc").alias("c"),
                  col("p." + column) === $"c.code",
                  "left_outer")
                  .drop(column, "code")
                  )


                  Result is:



                  +---+----+--------------+--------------+-------------+
                  |id |name|code1desc |code2desc |code3desc |
                  +---+----+--------------+--------------+-------------+
                  |1 |abc |United Kingdom|Singapore |United States|
                  |2 |efg |Singapore |United Kingdom|United States|
                  +---+----+--------------+--------------+-------------+


                  Note: if "countries" dataframe is small, broadcast join can be used for better performance.







                  share|improve this answer












                  share|improve this answer



                  share|improve this answer










                  answered Nov 18 '18 at 9:01









                  pasha701pasha701

                  3,2321613




                  3,2321613

























                      0














                      You could use a udf assuming your country code dataframe is small enough. First we will collect the codes into a map then apply the udf on each code column.



                      code_df is your country_code dataframe while data_df is your data.



                      import org.apache.spark.sql.functions._

                      val mapcode = code_df.rdd.keyBy(row => row(0)).collectAsMap()
                      println("Showing 10 rows of mapcode")

                      for ((k,v) <- mapcode) {
                      printf("key: %s, value: %sn", k, v)
                      }


                      def getCode( code: String ) : String = {
                      val desc = mapcode(code).getAs[String](1)
                      return desc
                      }

                      val getcode_udf = udf(getCode _)

                      val newdatadf = data_df.withColumn("code1desc", getcode_udf($"code1"))
                      .withColumn("code2desc", getcode_udf($"code2"))
                      .withColumn("code3desc", getcode_udf($"code3"))

                      println("Showing 10 rows of final result")
                      newdatadf.show(10, truncate = false)


                      Following is the result :



                      Showing 10 rows of mapcode
                      key: IN, value: [IN,India]
                      key: SG, value: [SG,Singapore]
                      key: UK, value: [UK,United Kingdom]
                      key: US, value: [US,United States]
                      Showing 10 rows of final result
                      +---+----+-----+-----+-----+--------------+--------------+-------------+
                      |id |name|code1|code2|code3|code1desc |code2desc |code3desc |
                      +---+----+-----+-----+-----+--------------+--------------+-------------+
                      |1 |abc |UK |SG |US |United Kingdom|Singapore |United States|
                      |2 |efg |SG |UK |US |Singapore |United Kingdom|United States|
                      +---+----+-----+-----+-----+--------------+--------------+-------------+





                      share|improve this answer




























                        0














                        You could use a udf assuming your country code dataframe is small enough. First we will collect the codes into a map then apply the udf on each code column.



                        code_df is your country_code dataframe while data_df is your data.



                        import org.apache.spark.sql.functions._

                        val mapcode = code_df.rdd.keyBy(row => row(0)).collectAsMap()
                        println("Showing 10 rows of mapcode")

                        for ((k,v) <- mapcode) {
                        printf("key: %s, value: %sn", k, v)
                        }


                        def getCode( code: String ) : String = {
                        val desc = mapcode(code).getAs[String](1)
                        return desc
                        }

                        val getcode_udf = udf(getCode _)

                        val newdatadf = data_df.withColumn("code1desc", getcode_udf($"code1"))
                        .withColumn("code2desc", getcode_udf($"code2"))
                        .withColumn("code3desc", getcode_udf($"code3"))

                        println("Showing 10 rows of final result")
                        newdatadf.show(10, truncate = false)


                        Following is the result :



                        Showing 10 rows of mapcode
                        key: IN, value: [IN,India]
                        key: SG, value: [SG,Singapore]
                        key: UK, value: [UK,United Kingdom]
                        key: US, value: [US,United States]
                        Showing 10 rows of final result
                        +---+----+-----+-----+-----+--------------+--------------+-------------+
                        |id |name|code1|code2|code3|code1desc |code2desc |code3desc |
                        +---+----+-----+-----+-----+--------------+--------------+-------------+
                        |1 |abc |UK |SG |US |United Kingdom|Singapore |United States|
                        |2 |efg |SG |UK |US |Singapore |United Kingdom|United States|
                        +---+----+-----+-----+-----+--------------+--------------+-------------+





                        share|improve this answer


























                          0












                          0








                          0







                          You could use a udf assuming your country code dataframe is small enough. First we will collect the codes into a map then apply the udf on each code column.



                          code_df is your country_code dataframe while data_df is your data.



                          import org.apache.spark.sql.functions._

                          val mapcode = code_df.rdd.keyBy(row => row(0)).collectAsMap()
                          println("Showing 10 rows of mapcode")

                          for ((k,v) <- mapcode) {
                          printf("key: %s, value: %sn", k, v)
                          }


                          def getCode( code: String ) : String = {
                          val desc = mapcode(code).getAs[String](1)
                          return desc
                          }

                          val getcode_udf = udf(getCode _)

                          val newdatadf = data_df.withColumn("code1desc", getcode_udf($"code1"))
                          .withColumn("code2desc", getcode_udf($"code2"))
                          .withColumn("code3desc", getcode_udf($"code3"))

                          println("Showing 10 rows of final result")
                          newdatadf.show(10, truncate = false)


                          Following is the result :



                          Showing 10 rows of mapcode
                          key: IN, value: [IN,India]
                          key: SG, value: [SG,Singapore]
                          key: UK, value: [UK,United Kingdom]
                          key: US, value: [US,United States]
                          Showing 10 rows of final result
                          +---+----+-----+-----+-----+--------------+--------------+-------------+
                          |id |name|code1|code2|code3|code1desc |code2desc |code3desc |
                          +---+----+-----+-----+-----+--------------+--------------+-------------+
                          |1 |abc |UK |SG |US |United Kingdom|Singapore |United States|
                          |2 |efg |SG |UK |US |Singapore |United Kingdom|United States|
                          +---+----+-----+-----+-----+--------------+--------------+-------------+





                          share|improve this answer













                          You could use a udf assuming your country code dataframe is small enough. First we will collect the codes into a map then apply the udf on each code column.



                          code_df is your country_code dataframe while data_df is your data.



                          import org.apache.spark.sql.functions._

                          val mapcode = code_df.rdd.keyBy(row => row(0)).collectAsMap()
                          println("Showing 10 rows of mapcode")

                          for ((k,v) <- mapcode) {
                          printf("key: %s, value: %sn", k, v)
                          }


                          def getCode( code: String ) : String = {
                          val desc = mapcode(code).getAs[String](1)
                          return desc
                          }

                          val getcode_udf = udf(getCode _)

                          val newdatadf = data_df.withColumn("code1desc", getcode_udf($"code1"))
                          .withColumn("code2desc", getcode_udf($"code2"))
                          .withColumn("code3desc", getcode_udf($"code3"))

                          println("Showing 10 rows of final result")
                          newdatadf.show(10, truncate = false)


                          Following is the result :



                          Showing 10 rows of mapcode
                          key: IN, value: [IN,India]
                          key: SG, value: [SG,Singapore]
                          key: UK, value: [UK,United Kingdom]
                          key: US, value: [US,United States]
                          Showing 10 rows of final result
                          +---+----+-----+-----+-----+--------------+--------------+-------------+
                          |id |name|code1|code2|code3|code1desc |code2desc |code3desc |
                          +---+----+-----+-----+-----+--------------+--------------+-------------+
                          |1 |abc |UK |SG |US |United Kingdom|Singapore |United States|
                          |2 |efg |SG |UK |US |Singapore |United Kingdom|United States|
                          +---+----+-----+-----+-----+--------------+--------------+-------------+






                          share|improve this answer












                          share|improve this answer



                          share|improve this answer










                          answered Nov 18 '18 at 9:32









                          user238607user238607

                          653712




                          653712






























                              draft saved

                              draft discarded




















































                              Thanks for contributing an answer to Stack Overflow!


                              • Please be sure to answer the question. Provide details and share your research!

                              But avoid



                              • Asking for help, clarification, or responding to other answers.

                              • Making statements based on opinion; back them up with references or personal experience.


                              To learn more, see our tips on writing great answers.




                              draft saved


                              draft discarded














                              StackExchange.ready(
                              function () {
                              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53358183%2fspark-join-same-data-set-multiple-times-on-different-columns%23new-answer', 'question_page');
                              }
                              );

                              Post as a guest















                              Required, but never shown





















































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown

































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown







                              Popular posts from this blog

                              Guess what letter conforming each word

                              Run scheduled task as local user group (not BUILTIN)

                              Port of Spain