Spark SQL window function with complex condition












15















This is probably easiest to explain through example. Suppose I have a DataFrame of user logins to a website, for instance:



scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows


I would like to add to this a column indicating when they became an active user on the site. But there is one caveat: there is a time period during which a user is considered active, and after this period, if they log in again, their became_active date resets. Suppose this period is 5 days. Then the desired table derived from the above table would be something like this:



+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+


So, in particular, SirChillingtonIV's became_active date was reset because their second login came after the active period expired, but Booooooo99900098's became_active date was not reset the second time he/she logged in, because it fell within the active period.



My initial thought was to use window functions with lag, and then using the lagged values to fill the became_active column; for instance, something starting roughly like:



import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))


Then, the rule to fill in the became_active date would be, if tmp is null (i.e., if it's the first ever login) or if login_date - tmp >= 5 then became_active = login_date; otherwise, go to the next most recent value in tmp and apply the same rule. This suggests a recursive approach, which I'm having trouble imagining a way to implement.



My questions: Is this a viable approach, and if so, how can I "go back" and look at earlier values of tmp until I find one where I stop? I can't, to my knowledge, iterate through values of a Spark SQL Column. Is there another way to achieve this result?










share|improve this question





























    15















    This is probably easiest to explain through example. Suppose I have a DataFrame of user logins to a website, for instance:



    scala> df.show(5)
    +----------------+----------+
    | user_name|login_date|
    +----------------+----------+
    |SirChillingtonIV|2012-01-04|
    |Booooooo99900098|2012-01-04|
    |Booooooo99900098|2012-01-06|
    | OprahWinfreyJr|2012-01-10|
    |SirChillingtonIV|2012-01-11|
    +----------------+----------+
    only showing top 5 rows


    I would like to add to this a column indicating when they became an active user on the site. But there is one caveat: there is a time period during which a user is considered active, and after this period, if they log in again, their became_active date resets. Suppose this period is 5 days. Then the desired table derived from the above table would be something like this:



    +----------------+----------+-------------+
    | user_name|login_date|became_active|
    +----------------+----------+-------------+
    |SirChillingtonIV|2012-01-04| 2012-01-04|
    |Booooooo99900098|2012-01-04| 2012-01-04|
    |Booooooo99900098|2012-01-06| 2012-01-04|
    | OprahWinfreyJr|2012-01-10| 2012-01-10|
    |SirChillingtonIV|2012-01-11| 2012-01-11|
    +----------------+----------+-------------+


    So, in particular, SirChillingtonIV's became_active date was reset because their second login came after the active period expired, but Booooooo99900098's became_active date was not reset the second time he/she logged in, because it fell within the active period.



    My initial thought was to use window functions with lag, and then using the lagged values to fill the became_active column; for instance, something starting roughly like:



    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._

    val window = Window.partitionBy("user_name").orderBy("login_date")
    val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))


    Then, the rule to fill in the became_active date would be, if tmp is null (i.e., if it's the first ever login) or if login_date - tmp >= 5 then became_active = login_date; otherwise, go to the next most recent value in tmp and apply the same rule. This suggests a recursive approach, which I'm having trouble imagining a way to implement.



    My questions: Is this a viable approach, and if so, how can I "go back" and look at earlier values of tmp until I find one where I stop? I can't, to my knowledge, iterate through values of a Spark SQL Column. Is there another way to achieve this result?










    share|improve this question



























      15












      15








      15


      9






      This is probably easiest to explain through example. Suppose I have a DataFrame of user logins to a website, for instance:



      scala> df.show(5)
      +----------------+----------+
      | user_name|login_date|
      +----------------+----------+
      |SirChillingtonIV|2012-01-04|
      |Booooooo99900098|2012-01-04|
      |Booooooo99900098|2012-01-06|
      | OprahWinfreyJr|2012-01-10|
      |SirChillingtonIV|2012-01-11|
      +----------------+----------+
      only showing top 5 rows


      I would like to add to this a column indicating when they became an active user on the site. But there is one caveat: there is a time period during which a user is considered active, and after this period, if they log in again, their became_active date resets. Suppose this period is 5 days. Then the desired table derived from the above table would be something like this:



      +----------------+----------+-------------+
      | user_name|login_date|became_active|
      +----------------+----------+-------------+
      |SirChillingtonIV|2012-01-04| 2012-01-04|
      |Booooooo99900098|2012-01-04| 2012-01-04|
      |Booooooo99900098|2012-01-06| 2012-01-04|
      | OprahWinfreyJr|2012-01-10| 2012-01-10|
      |SirChillingtonIV|2012-01-11| 2012-01-11|
      +----------------+----------+-------------+


      So, in particular, SirChillingtonIV's became_active date was reset because their second login came after the active period expired, but Booooooo99900098's became_active date was not reset the second time he/she logged in, because it fell within the active period.



      My initial thought was to use window functions with lag, and then using the lagged values to fill the became_active column; for instance, something starting roughly like:



      import org.apache.spark.sql.expressions.Window
      import org.apache.spark.sql.functions._

      val window = Window.partitionBy("user_name").orderBy("login_date")
      val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))


      Then, the rule to fill in the became_active date would be, if tmp is null (i.e., if it's the first ever login) or if login_date - tmp >= 5 then became_active = login_date; otherwise, go to the next most recent value in tmp and apply the same rule. This suggests a recursive approach, which I'm having trouble imagining a way to implement.



      My questions: Is this a viable approach, and if so, how can I "go back" and look at earlier values of tmp until I find one where I stop? I can't, to my knowledge, iterate through values of a Spark SQL Column. Is there another way to achieve this result?










      share|improve this question
















      This is probably easiest to explain through example. Suppose I have a DataFrame of user logins to a website, for instance:



      scala> df.show(5)
      +----------------+----------+
      | user_name|login_date|
      +----------------+----------+
      |SirChillingtonIV|2012-01-04|
      |Booooooo99900098|2012-01-04|
      |Booooooo99900098|2012-01-06|
      | OprahWinfreyJr|2012-01-10|
      |SirChillingtonIV|2012-01-11|
      +----------------+----------+
      only showing top 5 rows


      I would like to add to this a column indicating when they became an active user on the site. But there is one caveat: there is a time period during which a user is considered active, and after this period, if they log in again, their became_active date resets. Suppose this period is 5 days. Then the desired table derived from the above table would be something like this:



      +----------------+----------+-------------+
      | user_name|login_date|became_active|
      +----------------+----------+-------------+
      |SirChillingtonIV|2012-01-04| 2012-01-04|
      |Booooooo99900098|2012-01-04| 2012-01-04|
      |Booooooo99900098|2012-01-06| 2012-01-04|
      | OprahWinfreyJr|2012-01-10| 2012-01-10|
      |SirChillingtonIV|2012-01-11| 2012-01-11|
      +----------------+----------+-------------+


      So, in particular, SirChillingtonIV's became_active date was reset because their second login came after the active period expired, but Booooooo99900098's became_active date was not reset the second time he/she logged in, because it fell within the active period.



      My initial thought was to use window functions with lag, and then using the lagged values to fill the became_active column; for instance, something starting roughly like:



      import org.apache.spark.sql.expressions.Window
      import org.apache.spark.sql.functions._

      val window = Window.partitionBy("user_name").orderBy("login_date")
      val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))


      Then, the rule to fill in the became_active date would be, if tmp is null (i.e., if it's the first ever login) or if login_date - tmp >= 5 then became_active = login_date; otherwise, go to the next most recent value in tmp and apply the same rule. This suggests a recursive approach, which I'm having trouble imagining a way to implement.



      My questions: Is this a viable approach, and if so, how can I "go back" and look at earlier values of tmp until I find one where I stop? I can't, to my knowledge, iterate through values of a Spark SQL Column. Is there another way to achieve this result?







      sql apache-spark pyspark apache-spark-sql window-functions






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Dec 21 '18 at 7:00









      User12345

      9371930




      9371930










      asked Feb 24 '17 at 21:25









      user4601931user4601931

      2,12121324




      2,12121324
























          2 Answers
          2






          active

          oldest

          votes


















          26














          Here is the trick. Import a bunch of functions:



          import org.apache.spark.sql.expressions.Window
          import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}


          Define windows:



          val userWindow = Window.partitionBy("user_name").orderBy("login_date")
          val userSessionWindow = Window.partitionBy("user_name", "session")


          Find the points where new sessions starts:



          val newSession =  (coalesce(
          datediff($"login_date", lag($"login_date", 1).over(userWindow)),
          lit(0)
          ) > 5).cast("bigint")

          val sessionized = df.withColumn("session", sum(newSession).over(userWindow))


          Find the earliest date per session:



          val result = sessionized
          .withColumn("became_active", min($"login_date").over(userSessionWindow))
          .drop("session")


          With dataset defined as:



          val df = Seq(
          ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
          ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
          ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
          ("SirChillingtonIV", "2012-08-11")
          ).toDF("user_name", "login_date")


          The result is:



          +----------------+----------+-------------+
          | user_name|login_date|became_active|
          +----------------+----------+-------------+
          | OprahWinfreyJr|2012-01-10| 2012-01-10|
          |SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
          |SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
          |SirChillingtonIV|2012-01-14| 2012-01-11|
          |SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
          |Booooooo99900098|2012-01-04| 2012-01-04|
          |Booooooo99900098|2012-01-06| 2012-01-04|
          +----------------+----------+-------------+





          share|improve this answer
























          • I know it has been a long time, but can you help me understand the coalesce part of the solution??

            – Sanchit Grover
            Apr 15 '18 at 8:33






          • 1





            @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.

            – user6910411
            Apr 15 '18 at 10:19











          • Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?

            – Sanchit Grover
            Apr 15 '18 at 12:02











          • It is a cumulative sum of values in set {0, 1}.

            – user6910411
            Apr 15 '18 at 12:04











          • I would double vote this answer if I could, thx!

            – Madhava Carrillo
            Nov 22 '18 at 10:25



















          1














          Refactoring the above answer to work with Pyspark



          In Pyspark you can do like below.



          create data frame



          df = sqlContext.createDataFrame(
          [
          ("SirChillingtonIV", "2012-01-04"),
          ("Booooooo99900098", "2012-01-04"),
          ("Booooooo99900098", "2012-01-06"),
          ("OprahWinfreyJr", "2012-01-10"),
          ("SirChillingtonIV", "2012-01-11"),
          ("SirChillingtonIV", "2012-01-14"),
          ("SirChillingtonIV", "2012-08-11")
          ],
          ("user_name", "login_date"))


          The above code creates a data frame like below



          +----------------+----------+
          | user_name|login_date|
          +----------------+----------+
          |SirChillingtonIV|2012-01-04|
          |Booooooo99900098|2012-01-04|
          |Booooooo99900098|2012-01-06|
          | OprahWinfreyJr|2012-01-10|
          |SirChillingtonIV|2012-01-11|
          |SirChillingtonIV|2012-01-14|
          |SirChillingtonIV|2012-08-11|
          +----------------+----------+


          Now we want to first find out the difference between login_date is more than 5 days.



          For this do like below.



          Necessary imports



          from pyspark.sql import functions as f
          from pyspark.sql import Window


          # defining window partitions
          login_window = Window.partitionBy("user_name").orderBy("login_date")
          session_window = Window.partitionBy("user_name", "session")

          session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))


          When we run the above line of code if the date_diff is NULL then the coalesce function will replace NULL to 0.



          +----------------+----------+-------+
          | user_name|login_date|session|
          +----------------+----------+-------+
          | OprahWinfreyJr|2012-01-10| 0|
          |SirChillingtonIV|2012-01-04| 0|
          |SirChillingtonIV|2012-01-11| 1|
          |SirChillingtonIV|2012-01-14| 1|
          |SirChillingtonIV|2012-08-11| 2|
          |Booooooo99900098|2012-01-04| 0|
          |Booooooo99900098|2012-01-06| 0|
          +----------------+----------+-------+


          # add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step
          final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")

          +----------------+----------+-------------+
          | user_name|login_date|became_active|
          +----------------+----------+-------------+
          | OprahWinfreyJr|2012-01-10| 2012-01-10|
          |SirChillingtonIV|2012-01-04| 2012-01-04|
          |SirChillingtonIV|2012-01-11| 2012-01-11|
          |SirChillingtonIV|2012-01-14| 2012-01-11|
          |SirChillingtonIV|2012-08-11| 2012-08-11|
          |Booooooo99900098|2012-01-04| 2012-01-04|
          |Booooooo99900098|2012-01-06| 2012-01-04|
          +----------------+----------+-------------+





          share|improve this answer























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f42448564%2fspark-sql-window-function-with-complex-condition%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            26














            Here is the trick. Import a bunch of functions:



            import org.apache.spark.sql.expressions.Window
            import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}


            Define windows:



            val userWindow = Window.partitionBy("user_name").orderBy("login_date")
            val userSessionWindow = Window.partitionBy("user_name", "session")


            Find the points where new sessions starts:



            val newSession =  (coalesce(
            datediff($"login_date", lag($"login_date", 1).over(userWindow)),
            lit(0)
            ) > 5).cast("bigint")

            val sessionized = df.withColumn("session", sum(newSession).over(userWindow))


            Find the earliest date per session:



            val result = sessionized
            .withColumn("became_active", min($"login_date").over(userSessionWindow))
            .drop("session")


            With dataset defined as:



            val df = Seq(
            ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
            ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
            ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
            ("SirChillingtonIV", "2012-08-11")
            ).toDF("user_name", "login_date")


            The result is:



            +----------------+----------+-------------+
            | user_name|login_date|became_active|
            +----------------+----------+-------------+
            | OprahWinfreyJr|2012-01-10| 2012-01-10|
            |SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
            |SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
            |SirChillingtonIV|2012-01-14| 2012-01-11|
            |SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
            |Booooooo99900098|2012-01-04| 2012-01-04|
            |Booooooo99900098|2012-01-06| 2012-01-04|
            +----------------+----------+-------------+





            share|improve this answer
























            • I know it has been a long time, but can you help me understand the coalesce part of the solution??

              – Sanchit Grover
              Apr 15 '18 at 8:33






            • 1





              @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.

              – user6910411
              Apr 15 '18 at 10:19











            • Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?

              – Sanchit Grover
              Apr 15 '18 at 12:02











            • It is a cumulative sum of values in set {0, 1}.

              – user6910411
              Apr 15 '18 at 12:04











            • I would double vote this answer if I could, thx!

              – Madhava Carrillo
              Nov 22 '18 at 10:25
















            26














            Here is the trick. Import a bunch of functions:



            import org.apache.spark.sql.expressions.Window
            import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}


            Define windows:



            val userWindow = Window.partitionBy("user_name").orderBy("login_date")
            val userSessionWindow = Window.partitionBy("user_name", "session")


            Find the points where new sessions starts:



            val newSession =  (coalesce(
            datediff($"login_date", lag($"login_date", 1).over(userWindow)),
            lit(0)
            ) > 5).cast("bigint")

            val sessionized = df.withColumn("session", sum(newSession).over(userWindow))


            Find the earliest date per session:



            val result = sessionized
            .withColumn("became_active", min($"login_date").over(userSessionWindow))
            .drop("session")


            With dataset defined as:



            val df = Seq(
            ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
            ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
            ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
            ("SirChillingtonIV", "2012-08-11")
            ).toDF("user_name", "login_date")


            The result is:



            +----------------+----------+-------------+
            | user_name|login_date|became_active|
            +----------------+----------+-------------+
            | OprahWinfreyJr|2012-01-10| 2012-01-10|
            |SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
            |SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
            |SirChillingtonIV|2012-01-14| 2012-01-11|
            |SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
            |Booooooo99900098|2012-01-04| 2012-01-04|
            |Booooooo99900098|2012-01-06| 2012-01-04|
            +----------------+----------+-------------+





            share|improve this answer
























            • I know it has been a long time, but can you help me understand the coalesce part of the solution??

              – Sanchit Grover
              Apr 15 '18 at 8:33






            • 1





              @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.

              – user6910411
              Apr 15 '18 at 10:19











            • Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?

              – Sanchit Grover
              Apr 15 '18 at 12:02











            • It is a cumulative sum of values in set {0, 1}.

              – user6910411
              Apr 15 '18 at 12:04











            • I would double vote this answer if I could, thx!

              – Madhava Carrillo
              Nov 22 '18 at 10:25














            26












            26








            26







            Here is the trick. Import a bunch of functions:



            import org.apache.spark.sql.expressions.Window
            import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}


            Define windows:



            val userWindow = Window.partitionBy("user_name").orderBy("login_date")
            val userSessionWindow = Window.partitionBy("user_name", "session")


            Find the points where new sessions starts:



            val newSession =  (coalesce(
            datediff($"login_date", lag($"login_date", 1).over(userWindow)),
            lit(0)
            ) > 5).cast("bigint")

            val sessionized = df.withColumn("session", sum(newSession).over(userWindow))


            Find the earliest date per session:



            val result = sessionized
            .withColumn("became_active", min($"login_date").over(userSessionWindow))
            .drop("session")


            With dataset defined as:



            val df = Seq(
            ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
            ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
            ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
            ("SirChillingtonIV", "2012-08-11")
            ).toDF("user_name", "login_date")


            The result is:



            +----------------+----------+-------------+
            | user_name|login_date|became_active|
            +----------------+----------+-------------+
            | OprahWinfreyJr|2012-01-10| 2012-01-10|
            |SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
            |SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
            |SirChillingtonIV|2012-01-14| 2012-01-11|
            |SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
            |Booooooo99900098|2012-01-04| 2012-01-04|
            |Booooooo99900098|2012-01-06| 2012-01-04|
            +----------------+----------+-------------+





            share|improve this answer













            Here is the trick. Import a bunch of functions:



            import org.apache.spark.sql.expressions.Window
            import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}


            Define windows:



            val userWindow = Window.partitionBy("user_name").orderBy("login_date")
            val userSessionWindow = Window.partitionBy("user_name", "session")


            Find the points where new sessions starts:



            val newSession =  (coalesce(
            datediff($"login_date", lag($"login_date", 1).over(userWindow)),
            lit(0)
            ) > 5).cast("bigint")

            val sessionized = df.withColumn("session", sum(newSession).over(userWindow))


            Find the earliest date per session:



            val result = sessionized
            .withColumn("became_active", min($"login_date").over(userSessionWindow))
            .drop("session")


            With dataset defined as:



            val df = Seq(
            ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
            ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
            ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
            ("SirChillingtonIV", "2012-08-11")
            ).toDF("user_name", "login_date")


            The result is:



            +----------------+----------+-------------+
            | user_name|login_date|became_active|
            +----------------+----------+-------------+
            | OprahWinfreyJr|2012-01-10| 2012-01-10|
            |SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
            |SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
            |SirChillingtonIV|2012-01-14| 2012-01-11|
            |SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
            |Booooooo99900098|2012-01-04| 2012-01-04|
            |Booooooo99900098|2012-01-06| 2012-01-04|
            +----------------+----------+-------------+






            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Feb 24 '17 at 22:51









            user6910411user6910411

            33.9k1079101




            33.9k1079101













            • I know it has been a long time, but can you help me understand the coalesce part of the solution??

              – Sanchit Grover
              Apr 15 '18 at 8:33






            • 1





              @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.

              – user6910411
              Apr 15 '18 at 10:19











            • Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?

              – Sanchit Grover
              Apr 15 '18 at 12:02











            • It is a cumulative sum of values in set {0, 1}.

              – user6910411
              Apr 15 '18 at 12:04











            • I would double vote this answer if I could, thx!

              – Madhava Carrillo
              Nov 22 '18 at 10:25



















            • I know it has been a long time, but can you help me understand the coalesce part of the solution??

              – Sanchit Grover
              Apr 15 '18 at 8:33






            • 1





              @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.

              – user6910411
              Apr 15 '18 at 10:19











            • Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?

              – Sanchit Grover
              Apr 15 '18 at 12:02











            • It is a cumulative sum of values in set {0, 1}.

              – user6910411
              Apr 15 '18 at 12:04











            • I would double vote this answer if I could, thx!

              – Madhava Carrillo
              Nov 22 '18 at 10:25

















            I know it has been a long time, but can you help me understand the coalesce part of the solution??

            – Sanchit Grover
            Apr 15 '18 at 8:33





            I know it has been a long time, but can you help me understand the coalesce part of the solution??

            – Sanchit Grover
            Apr 15 '18 at 8:33




            1




            1





            @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.

            – user6910411
            Apr 15 '18 at 10:19





            @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.

            – user6910411
            Apr 15 '18 at 10:19













            Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?

            – Sanchit Grover
            Apr 15 '18 at 12:02





            Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?

            – Sanchit Grover
            Apr 15 '18 at 12:02













            It is a cumulative sum of values in set {0, 1}.

            – user6910411
            Apr 15 '18 at 12:04





            It is a cumulative sum of values in set {0, 1}.

            – user6910411
            Apr 15 '18 at 12:04













            I would double vote this answer if I could, thx!

            – Madhava Carrillo
            Nov 22 '18 at 10:25





            I would double vote this answer if I could, thx!

            – Madhava Carrillo
            Nov 22 '18 at 10:25













            1














            Refactoring the above answer to work with Pyspark



            In Pyspark you can do like below.



            create data frame



            df = sqlContext.createDataFrame(
            [
            ("SirChillingtonIV", "2012-01-04"),
            ("Booooooo99900098", "2012-01-04"),
            ("Booooooo99900098", "2012-01-06"),
            ("OprahWinfreyJr", "2012-01-10"),
            ("SirChillingtonIV", "2012-01-11"),
            ("SirChillingtonIV", "2012-01-14"),
            ("SirChillingtonIV", "2012-08-11")
            ],
            ("user_name", "login_date"))


            The above code creates a data frame like below



            +----------------+----------+
            | user_name|login_date|
            +----------------+----------+
            |SirChillingtonIV|2012-01-04|
            |Booooooo99900098|2012-01-04|
            |Booooooo99900098|2012-01-06|
            | OprahWinfreyJr|2012-01-10|
            |SirChillingtonIV|2012-01-11|
            |SirChillingtonIV|2012-01-14|
            |SirChillingtonIV|2012-08-11|
            +----------------+----------+


            Now we want to first find out the difference between login_date is more than 5 days.



            For this do like below.



            Necessary imports



            from pyspark.sql import functions as f
            from pyspark.sql import Window


            # defining window partitions
            login_window = Window.partitionBy("user_name").orderBy("login_date")
            session_window = Window.partitionBy("user_name", "session")

            session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))


            When we run the above line of code if the date_diff is NULL then the coalesce function will replace NULL to 0.



            +----------------+----------+-------+
            | user_name|login_date|session|
            +----------------+----------+-------+
            | OprahWinfreyJr|2012-01-10| 0|
            |SirChillingtonIV|2012-01-04| 0|
            |SirChillingtonIV|2012-01-11| 1|
            |SirChillingtonIV|2012-01-14| 1|
            |SirChillingtonIV|2012-08-11| 2|
            |Booooooo99900098|2012-01-04| 0|
            |Booooooo99900098|2012-01-06| 0|
            +----------------+----------+-------+


            # add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step
            final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")

            +----------------+----------+-------------+
            | user_name|login_date|became_active|
            +----------------+----------+-------------+
            | OprahWinfreyJr|2012-01-10| 2012-01-10|
            |SirChillingtonIV|2012-01-04| 2012-01-04|
            |SirChillingtonIV|2012-01-11| 2012-01-11|
            |SirChillingtonIV|2012-01-14| 2012-01-11|
            |SirChillingtonIV|2012-08-11| 2012-08-11|
            |Booooooo99900098|2012-01-04| 2012-01-04|
            |Booooooo99900098|2012-01-06| 2012-01-04|
            +----------------+----------+-------------+





            share|improve this answer




























              1














              Refactoring the above answer to work with Pyspark



              In Pyspark you can do like below.



              create data frame



              df = sqlContext.createDataFrame(
              [
              ("SirChillingtonIV", "2012-01-04"),
              ("Booooooo99900098", "2012-01-04"),
              ("Booooooo99900098", "2012-01-06"),
              ("OprahWinfreyJr", "2012-01-10"),
              ("SirChillingtonIV", "2012-01-11"),
              ("SirChillingtonIV", "2012-01-14"),
              ("SirChillingtonIV", "2012-08-11")
              ],
              ("user_name", "login_date"))


              The above code creates a data frame like below



              +----------------+----------+
              | user_name|login_date|
              +----------------+----------+
              |SirChillingtonIV|2012-01-04|
              |Booooooo99900098|2012-01-04|
              |Booooooo99900098|2012-01-06|
              | OprahWinfreyJr|2012-01-10|
              |SirChillingtonIV|2012-01-11|
              |SirChillingtonIV|2012-01-14|
              |SirChillingtonIV|2012-08-11|
              +----------------+----------+


              Now we want to first find out the difference between login_date is more than 5 days.



              For this do like below.



              Necessary imports



              from pyspark.sql import functions as f
              from pyspark.sql import Window


              # defining window partitions
              login_window = Window.partitionBy("user_name").orderBy("login_date")
              session_window = Window.partitionBy("user_name", "session")

              session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))


              When we run the above line of code if the date_diff is NULL then the coalesce function will replace NULL to 0.



              +----------------+----------+-------+
              | user_name|login_date|session|
              +----------------+----------+-------+
              | OprahWinfreyJr|2012-01-10| 0|
              |SirChillingtonIV|2012-01-04| 0|
              |SirChillingtonIV|2012-01-11| 1|
              |SirChillingtonIV|2012-01-14| 1|
              |SirChillingtonIV|2012-08-11| 2|
              |Booooooo99900098|2012-01-04| 0|
              |Booooooo99900098|2012-01-06| 0|
              +----------------+----------+-------+


              # add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step
              final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")

              +----------------+----------+-------------+
              | user_name|login_date|became_active|
              +----------------+----------+-------------+
              | OprahWinfreyJr|2012-01-10| 2012-01-10|
              |SirChillingtonIV|2012-01-04| 2012-01-04|
              |SirChillingtonIV|2012-01-11| 2012-01-11|
              |SirChillingtonIV|2012-01-14| 2012-01-11|
              |SirChillingtonIV|2012-08-11| 2012-08-11|
              |Booooooo99900098|2012-01-04| 2012-01-04|
              |Booooooo99900098|2012-01-06| 2012-01-04|
              +----------------+----------+-------------+





              share|improve this answer


























                1












                1








                1







                Refactoring the above answer to work with Pyspark



                In Pyspark you can do like below.



                create data frame



                df = sqlContext.createDataFrame(
                [
                ("SirChillingtonIV", "2012-01-04"),
                ("Booooooo99900098", "2012-01-04"),
                ("Booooooo99900098", "2012-01-06"),
                ("OprahWinfreyJr", "2012-01-10"),
                ("SirChillingtonIV", "2012-01-11"),
                ("SirChillingtonIV", "2012-01-14"),
                ("SirChillingtonIV", "2012-08-11")
                ],
                ("user_name", "login_date"))


                The above code creates a data frame like below



                +----------------+----------+
                | user_name|login_date|
                +----------------+----------+
                |SirChillingtonIV|2012-01-04|
                |Booooooo99900098|2012-01-04|
                |Booooooo99900098|2012-01-06|
                | OprahWinfreyJr|2012-01-10|
                |SirChillingtonIV|2012-01-11|
                |SirChillingtonIV|2012-01-14|
                |SirChillingtonIV|2012-08-11|
                +----------------+----------+


                Now we want to first find out the difference between login_date is more than 5 days.



                For this do like below.



                Necessary imports



                from pyspark.sql import functions as f
                from pyspark.sql import Window


                # defining window partitions
                login_window = Window.partitionBy("user_name").orderBy("login_date")
                session_window = Window.partitionBy("user_name", "session")

                session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))


                When we run the above line of code if the date_diff is NULL then the coalesce function will replace NULL to 0.



                +----------------+----------+-------+
                | user_name|login_date|session|
                +----------------+----------+-------+
                | OprahWinfreyJr|2012-01-10| 0|
                |SirChillingtonIV|2012-01-04| 0|
                |SirChillingtonIV|2012-01-11| 1|
                |SirChillingtonIV|2012-01-14| 1|
                |SirChillingtonIV|2012-08-11| 2|
                |Booooooo99900098|2012-01-04| 0|
                |Booooooo99900098|2012-01-06| 0|
                +----------------+----------+-------+


                # add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step
                final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")

                +----------------+----------+-------------+
                | user_name|login_date|became_active|
                +----------------+----------+-------------+
                | OprahWinfreyJr|2012-01-10| 2012-01-10|
                |SirChillingtonIV|2012-01-04| 2012-01-04|
                |SirChillingtonIV|2012-01-11| 2012-01-11|
                |SirChillingtonIV|2012-01-14| 2012-01-11|
                |SirChillingtonIV|2012-08-11| 2012-08-11|
                |Booooooo99900098|2012-01-04| 2012-01-04|
                |Booooooo99900098|2012-01-06| 2012-01-04|
                +----------------+----------+-------------+





                share|improve this answer













                Refactoring the above answer to work with Pyspark



                In Pyspark you can do like below.



                create data frame



                df = sqlContext.createDataFrame(
                [
                ("SirChillingtonIV", "2012-01-04"),
                ("Booooooo99900098", "2012-01-04"),
                ("Booooooo99900098", "2012-01-06"),
                ("OprahWinfreyJr", "2012-01-10"),
                ("SirChillingtonIV", "2012-01-11"),
                ("SirChillingtonIV", "2012-01-14"),
                ("SirChillingtonIV", "2012-08-11")
                ],
                ("user_name", "login_date"))


                The above code creates a data frame like below



                +----------------+----------+
                | user_name|login_date|
                +----------------+----------+
                |SirChillingtonIV|2012-01-04|
                |Booooooo99900098|2012-01-04|
                |Booooooo99900098|2012-01-06|
                | OprahWinfreyJr|2012-01-10|
                |SirChillingtonIV|2012-01-11|
                |SirChillingtonIV|2012-01-14|
                |SirChillingtonIV|2012-08-11|
                +----------------+----------+


                Now we want to first find out the difference between login_date is more than 5 days.



                For this do like below.



                Necessary imports



                from pyspark.sql import functions as f
                from pyspark.sql import Window


                # defining window partitions
                login_window = Window.partitionBy("user_name").orderBy("login_date")
                session_window = Window.partitionBy("user_name", "session")

                session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))


                When we run the above line of code if the date_diff is NULL then the coalesce function will replace NULL to 0.



                +----------------+----------+-------+
                | user_name|login_date|session|
                +----------------+----------+-------+
                | OprahWinfreyJr|2012-01-10| 0|
                |SirChillingtonIV|2012-01-04| 0|
                |SirChillingtonIV|2012-01-11| 1|
                |SirChillingtonIV|2012-01-14| 1|
                |SirChillingtonIV|2012-08-11| 2|
                |Booooooo99900098|2012-01-04| 0|
                |Booooooo99900098|2012-01-06| 0|
                +----------------+----------+-------+


                # add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step
                final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")

                +----------------+----------+-------------+
                | user_name|login_date|became_active|
                +----------------+----------+-------------+
                | OprahWinfreyJr|2012-01-10| 2012-01-10|
                |SirChillingtonIV|2012-01-04| 2012-01-04|
                |SirChillingtonIV|2012-01-11| 2012-01-11|
                |SirChillingtonIV|2012-01-14| 2012-01-11|
                |SirChillingtonIV|2012-08-11| 2012-08-11|
                |Booooooo99900098|2012-01-04| 2012-01-04|
                |Booooooo99900098|2012-01-06| 2012-01-04|
                +----------------+----------+-------------+






                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Dec 21 '18 at 1:06









                User12345User12345

                9371930




                9371930






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f42448564%2fspark-sql-window-function-with-complex-condition%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Guess what letter conforming each word

                    Run scheduled task as local user group (not BUILTIN)

                    Port of Spain