Spark job completes without executing udf












0















I've been having an issue with a long, complicated spark job which contains a udf.



The issue I've been having is that the udf doesn't seem to get called properly, although there is no error message.



I know it isn't called properly because the output gets written, only anything the udf was supposed to calculate is NULL, and no print statements appear when debugging locally.



The only lead is that this code previously worked using different input data, meaning the error must have something to do with the input.



The change in inputs mostly means different column names are used, which is addressed in the code.



Print statements are executed given the first, 'working' input.



Both inputs are created using the same series of steps from the same database, and by inspection there doesn't appear to be a problem with either one.



I've never experienced this sort of behaviour before, and any leads on what might cause it would be appreciated.



The code is monolithic and inflexible - I'm working on refactoring, but it's not an easy piece to break apart. This is a short version of what is happening:



package mypackage

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.util._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.types._

import scala.collection.{Map => SMap}


object MyObject {

def main(args: Array[String]){
val spark: SparkSession = SparkSession.builder()
.appName("my app")
.config("spark.master", "local")
.getOrCreate()

import spark.implicits._

val bigInput = spark.read.parquet("inputname.parquet")
val reference_table = spark.read.parquet("reference_table.parquet")
val exchange_rate = spark.read.parquet("reference_table.parquet")


val bigInput2 = bigInput
.filter($"column1" === "condition1")
.join(joinargs)
.drop(dropargs)

val bigInput3 = bigInput
.filter($"column2" === "condition2")
.join(joinargs)
.drop(dropargs)

<continue for many lines...>

def mapper1(
arg1: String,
arg2: Double,
arg3: Integer
): List[Double]{
exchange_rate.map(
List(idx1, idx2, idx3),
r.toSeq.toList
.drop(idx4)
.take(arg2)
)
}

def mapper2(){}
...
def mapper5(){}

def my_udf(
arg0: Integer,
arg1: String,
arg2: Double,
arg3: Integer,
...
arg20: String
): Double = {
println("I'm actually doing something!")
val result1 = mapper1(arg1, arg2, arg3)
val result2 = mapper2(arg4, arg5, arg6, arg7)
...
val result5 = mapper5(arg18, arg19, arg20)
result1.take(arg0)
.zipAll(result1, 0.0, 0.0)
.map(x=>_1*x._2)
....
.zipAll(result5, 0.0, 0.0)
.foldLeft(0.0)(_+_)
}

spark.udf.register("myUDF", my_udf_)
val bigResult1 = bigInputFinal.withColumn("Newcolumnname",
callUDF(
"myUDF",
$"col1",
...
$"col20"
)
)

<postprocessing>
bigResultFinal
.filter(<configs>)
.select(<column names>)
.write
.format("parquet")
}
}


To recap



This code runs to completion on each of two input files.



The udf only appears to execute on the first file.



There are no error messages or anything using the second file, although all non-udf logic appears to complete successfully.



Any help greatly appreciated!










share|improve this question





























    0















    I've been having an issue with a long, complicated spark job which contains a udf.



    The issue I've been having is that the udf doesn't seem to get called properly, although there is no error message.



    I know it isn't called properly because the output gets written, only anything the udf was supposed to calculate is NULL, and no print statements appear when debugging locally.



    The only lead is that this code previously worked using different input data, meaning the error must have something to do with the input.



    The change in inputs mostly means different column names are used, which is addressed in the code.



    Print statements are executed given the first, 'working' input.



    Both inputs are created using the same series of steps from the same database, and by inspection there doesn't appear to be a problem with either one.



    I've never experienced this sort of behaviour before, and any leads on what might cause it would be appreciated.



    The code is monolithic and inflexible - I'm working on refactoring, but it's not an easy piece to break apart. This is a short version of what is happening:



    package mypackage

    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    import org.apache.spark.util._
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.sql.types._

    import scala.collection.{Map => SMap}


    object MyObject {

    def main(args: Array[String]){
    val spark: SparkSession = SparkSession.builder()
    .appName("my app")
    .config("spark.master", "local")
    .getOrCreate()

    import spark.implicits._

    val bigInput = spark.read.parquet("inputname.parquet")
    val reference_table = spark.read.parquet("reference_table.parquet")
    val exchange_rate = spark.read.parquet("reference_table.parquet")


    val bigInput2 = bigInput
    .filter($"column1" === "condition1")
    .join(joinargs)
    .drop(dropargs)

    val bigInput3 = bigInput
    .filter($"column2" === "condition2")
    .join(joinargs)
    .drop(dropargs)

    <continue for many lines...>

    def mapper1(
    arg1: String,
    arg2: Double,
    arg3: Integer
    ): List[Double]{
    exchange_rate.map(
    List(idx1, idx2, idx3),
    r.toSeq.toList
    .drop(idx4)
    .take(arg2)
    )
    }

    def mapper2(){}
    ...
    def mapper5(){}

    def my_udf(
    arg0: Integer,
    arg1: String,
    arg2: Double,
    arg3: Integer,
    ...
    arg20: String
    ): Double = {
    println("I'm actually doing something!")
    val result1 = mapper1(arg1, arg2, arg3)
    val result2 = mapper2(arg4, arg5, arg6, arg7)
    ...
    val result5 = mapper5(arg18, arg19, arg20)
    result1.take(arg0)
    .zipAll(result1, 0.0, 0.0)
    .map(x=>_1*x._2)
    ....
    .zipAll(result5, 0.0, 0.0)
    .foldLeft(0.0)(_+_)
    }

    spark.udf.register("myUDF", my_udf_)
    val bigResult1 = bigInputFinal.withColumn("Newcolumnname",
    callUDF(
    "myUDF",
    $"col1",
    ...
    $"col20"
    )
    )

    <postprocessing>
    bigResultFinal
    .filter(<configs>)
    .select(<column names>)
    .write
    .format("parquet")
    }
    }


    To recap



    This code runs to completion on each of two input files.



    The udf only appears to execute on the first file.



    There are no error messages or anything using the second file, although all non-udf logic appears to complete successfully.



    Any help greatly appreciated!










    share|improve this question



























      0












      0








      0








      I've been having an issue with a long, complicated spark job which contains a udf.



      The issue I've been having is that the udf doesn't seem to get called properly, although there is no error message.



      I know it isn't called properly because the output gets written, only anything the udf was supposed to calculate is NULL, and no print statements appear when debugging locally.



      The only lead is that this code previously worked using different input data, meaning the error must have something to do with the input.



      The change in inputs mostly means different column names are used, which is addressed in the code.



      Print statements are executed given the first, 'working' input.



      Both inputs are created using the same series of steps from the same database, and by inspection there doesn't appear to be a problem with either one.



      I've never experienced this sort of behaviour before, and any leads on what might cause it would be appreciated.



      The code is monolithic and inflexible - I'm working on refactoring, but it's not an easy piece to break apart. This is a short version of what is happening:



      package mypackage

      import org.apache.spark.sql.DataFrame
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.functions._
      import org.apache.spark.util._
      import org.apache.spark.SparkContext
      import org.apache.spark.SparkContext._
      import org.apache.spark.sql.types._

      import scala.collection.{Map => SMap}


      object MyObject {

      def main(args: Array[String]){
      val spark: SparkSession = SparkSession.builder()
      .appName("my app")
      .config("spark.master", "local")
      .getOrCreate()

      import spark.implicits._

      val bigInput = spark.read.parquet("inputname.parquet")
      val reference_table = spark.read.parquet("reference_table.parquet")
      val exchange_rate = spark.read.parquet("reference_table.parquet")


      val bigInput2 = bigInput
      .filter($"column1" === "condition1")
      .join(joinargs)
      .drop(dropargs)

      val bigInput3 = bigInput
      .filter($"column2" === "condition2")
      .join(joinargs)
      .drop(dropargs)

      <continue for many lines...>

      def mapper1(
      arg1: String,
      arg2: Double,
      arg3: Integer
      ): List[Double]{
      exchange_rate.map(
      List(idx1, idx2, idx3),
      r.toSeq.toList
      .drop(idx4)
      .take(arg2)
      )
      }

      def mapper2(){}
      ...
      def mapper5(){}

      def my_udf(
      arg0: Integer,
      arg1: String,
      arg2: Double,
      arg3: Integer,
      ...
      arg20: String
      ): Double = {
      println("I'm actually doing something!")
      val result1 = mapper1(arg1, arg2, arg3)
      val result2 = mapper2(arg4, arg5, arg6, arg7)
      ...
      val result5 = mapper5(arg18, arg19, arg20)
      result1.take(arg0)
      .zipAll(result1, 0.0, 0.0)
      .map(x=>_1*x._2)
      ....
      .zipAll(result5, 0.0, 0.0)
      .foldLeft(0.0)(_+_)
      }

      spark.udf.register("myUDF", my_udf_)
      val bigResult1 = bigInputFinal.withColumn("Newcolumnname",
      callUDF(
      "myUDF",
      $"col1",
      ...
      $"col20"
      )
      )

      <postprocessing>
      bigResultFinal
      .filter(<configs>)
      .select(<column names>)
      .write
      .format("parquet")
      }
      }


      To recap



      This code runs to completion on each of two input files.



      The udf only appears to execute on the first file.



      There are no error messages or anything using the second file, although all non-udf logic appears to complete successfully.



      Any help greatly appreciated!










      share|improve this question
















      I've been having an issue with a long, complicated spark job which contains a udf.



      The issue I've been having is that the udf doesn't seem to get called properly, although there is no error message.



      I know it isn't called properly because the output gets written, only anything the udf was supposed to calculate is NULL, and no print statements appear when debugging locally.



      The only lead is that this code previously worked using different input data, meaning the error must have something to do with the input.



      The change in inputs mostly means different column names are used, which is addressed in the code.



      Print statements are executed given the first, 'working' input.



      Both inputs are created using the same series of steps from the same database, and by inspection there doesn't appear to be a problem with either one.



      I've never experienced this sort of behaviour before, and any leads on what might cause it would be appreciated.



      The code is monolithic and inflexible - I'm working on refactoring, but it's not an easy piece to break apart. This is a short version of what is happening:



      package mypackage

      import org.apache.spark.sql.DataFrame
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.functions._
      import org.apache.spark.util._
      import org.apache.spark.SparkContext
      import org.apache.spark.SparkContext._
      import org.apache.spark.sql.types._

      import scala.collection.{Map => SMap}


      object MyObject {

      def main(args: Array[String]){
      val spark: SparkSession = SparkSession.builder()
      .appName("my app")
      .config("spark.master", "local")
      .getOrCreate()

      import spark.implicits._

      val bigInput = spark.read.parquet("inputname.parquet")
      val reference_table = spark.read.parquet("reference_table.parquet")
      val exchange_rate = spark.read.parquet("reference_table.parquet")


      val bigInput2 = bigInput
      .filter($"column1" === "condition1")
      .join(joinargs)
      .drop(dropargs)

      val bigInput3 = bigInput
      .filter($"column2" === "condition2")
      .join(joinargs)
      .drop(dropargs)

      <continue for many lines...>

      def mapper1(
      arg1: String,
      arg2: Double,
      arg3: Integer
      ): List[Double]{
      exchange_rate.map(
      List(idx1, idx2, idx3),
      r.toSeq.toList
      .drop(idx4)
      .take(arg2)
      )
      }

      def mapper2(){}
      ...
      def mapper5(){}

      def my_udf(
      arg0: Integer,
      arg1: String,
      arg2: Double,
      arg3: Integer,
      ...
      arg20: String
      ): Double = {
      println("I'm actually doing something!")
      val result1 = mapper1(arg1, arg2, arg3)
      val result2 = mapper2(arg4, arg5, arg6, arg7)
      ...
      val result5 = mapper5(arg18, arg19, arg20)
      result1.take(arg0)
      .zipAll(result1, 0.0, 0.0)
      .map(x=>_1*x._2)
      ....
      .zipAll(result5, 0.0, 0.0)
      .foldLeft(0.0)(_+_)
      }

      spark.udf.register("myUDF", my_udf_)
      val bigResult1 = bigInputFinal.withColumn("Newcolumnname",
      callUDF(
      "myUDF",
      $"col1",
      ...
      $"col20"
      )
      )

      <postprocessing>
      bigResultFinal
      .filter(<configs>)
      .select(<column names>)
      .write
      .format("parquet")
      }
      }


      To recap



      This code runs to completion on each of two input files.



      The udf only appears to execute on the first file.



      There are no error messages or anything using the second file, although all non-udf logic appears to complete successfully.



      Any help greatly appreciated!







      scala apache-spark






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 19 '18 at 17:28









      alepuzio

      1,19522531




      1,19522531










      asked Nov 19 '18 at 16:27









      DrDeadKneeDrDeadKnee

      614




      614
























          0






          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53378913%2fspark-job-completes-without-executing-udf%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53378913%2fspark-job-completes-without-executing-udf%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Guess what letter conforming each word

          Port of Spain

          Run scheduled task as local user group (not BUILTIN)