Spring batch Parrall step












0















I have spring-batch job, where i need to pass as an input to the job a list of id's, I would want that from that list of id's to be able pass to a step that could run all of them in parrallel. As for now what I've accomplish is run multiple job instance in a threadpoolExecutor, that executes the job x number of time. This implies that it does single queries for all jobs. And we are talking about over 50 millions records. The records represents a timeseries @specific day a consumption. I need for an id and batchId aggregate by month and send this information to a broker.




  • Reader -> reads from the database according to an id and a timestamps
    representing a time series.

  • Processor -> PassThroughItemProcessor

  • Writer -> Send to AMQP (aggregates the the list of items)


Is there any best practice you could provide me ?





According to the suggestions, this is how my partitioner looks like ;



@Override
public Map<String, ExecutionContext> partition(int gridSize) {
log.debug("START: Partition");

Map<String, ExecutionContext> partitionMap = new HashMap<>();
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger partitionerCounter = new AtomicInteger(0);
Page<Integer> result = null;
do {
result = repository.findDistinctByBatchId(LocalDateTime.parse(batchId, AipForecastService.DEFAULT_DATE_TIME_FORMATTER), Optional.ofNullable(result)
.map(Page::nextPageable)
.orElse(PageRequest.of(0, 100000)));
result
.stream()
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / 100))
.values()
.forEach(listOfInstallation -> {
ExecutionContext context = new ExecutionContext();
context.put("listOfInstallation", listOfInstallation);
partitionMap.put("partition" + partitionerCounter.incrementAndGet(), context);
log.debug("Adding to the partition map {}, listOfInstallation {}", partitionerCounter.get(), listOfInstallation);
});
} while (result.hasNext());

log.debug("END: Created Partitions for installation job of size:{}", partitionMap.size());
return partitionMap;
}









share|improve this question





























    0















    I have spring-batch job, where i need to pass as an input to the job a list of id's, I would want that from that list of id's to be able pass to a step that could run all of them in parrallel. As for now what I've accomplish is run multiple job instance in a threadpoolExecutor, that executes the job x number of time. This implies that it does single queries for all jobs. And we are talking about over 50 millions records. The records represents a timeseries @specific day a consumption. I need for an id and batchId aggregate by month and send this information to a broker.




    • Reader -> reads from the database according to an id and a timestamps
      representing a time series.

    • Processor -> PassThroughItemProcessor

    • Writer -> Send to AMQP (aggregates the the list of items)


    Is there any best practice you could provide me ?





    According to the suggestions, this is how my partitioner looks like ;



    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
    log.debug("START: Partition");

    Map<String, ExecutionContext> partitionMap = new HashMap<>();
    final AtomicInteger counter = new AtomicInteger(0);
    final AtomicInteger partitionerCounter = new AtomicInteger(0);
    Page<Integer> result = null;
    do {
    result = repository.findDistinctByBatchId(LocalDateTime.parse(batchId, AipForecastService.DEFAULT_DATE_TIME_FORMATTER), Optional.ofNullable(result)
    .map(Page::nextPageable)
    .orElse(PageRequest.of(0, 100000)));
    result
    .stream()
    .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / 100))
    .values()
    .forEach(listOfInstallation -> {
    ExecutionContext context = new ExecutionContext();
    context.put("listOfInstallation", listOfInstallation);
    partitionMap.put("partition" + partitionerCounter.incrementAndGet(), context);
    log.debug("Adding to the partition map {}, listOfInstallation {}", partitionerCounter.get(), listOfInstallation);
    });
    } while (result.hasNext());

    log.debug("END: Created Partitions for installation job of size:{}", partitionMap.size());
    return partitionMap;
    }









    share|improve this question



























      0












      0








      0








      I have spring-batch job, where i need to pass as an input to the job a list of id's, I would want that from that list of id's to be able pass to a step that could run all of them in parrallel. As for now what I've accomplish is run multiple job instance in a threadpoolExecutor, that executes the job x number of time. This implies that it does single queries for all jobs. And we are talking about over 50 millions records. The records represents a timeseries @specific day a consumption. I need for an id and batchId aggregate by month and send this information to a broker.




      • Reader -> reads from the database according to an id and a timestamps
        representing a time series.

      • Processor -> PassThroughItemProcessor

      • Writer -> Send to AMQP (aggregates the the list of items)


      Is there any best practice you could provide me ?





      According to the suggestions, this is how my partitioner looks like ;



      @Override
      public Map<String, ExecutionContext> partition(int gridSize) {
      log.debug("START: Partition");

      Map<String, ExecutionContext> partitionMap = new HashMap<>();
      final AtomicInteger counter = new AtomicInteger(0);
      final AtomicInteger partitionerCounter = new AtomicInteger(0);
      Page<Integer> result = null;
      do {
      result = repository.findDistinctByBatchId(LocalDateTime.parse(batchId, AipForecastService.DEFAULT_DATE_TIME_FORMATTER), Optional.ofNullable(result)
      .map(Page::nextPageable)
      .orElse(PageRequest.of(0, 100000)));
      result
      .stream()
      .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / 100))
      .values()
      .forEach(listOfInstallation -> {
      ExecutionContext context = new ExecutionContext();
      context.put("listOfInstallation", listOfInstallation);
      partitionMap.put("partition" + partitionerCounter.incrementAndGet(), context);
      log.debug("Adding to the partition map {}, listOfInstallation {}", partitionerCounter.get(), listOfInstallation);
      });
      } while (result.hasNext());

      log.debug("END: Created Partitions for installation job of size:{}", partitionMap.size());
      return partitionMap;
      }









      share|improve this question
















      I have spring-batch job, where i need to pass as an input to the job a list of id's, I would want that from that list of id's to be able pass to a step that could run all of them in parrallel. As for now what I've accomplish is run multiple job instance in a threadpoolExecutor, that executes the job x number of time. This implies that it does single queries for all jobs. And we are talking about over 50 millions records. The records represents a timeseries @specific day a consumption. I need for an id and batchId aggregate by month and send this information to a broker.




      • Reader -> reads from the database according to an id and a timestamps
        representing a time series.

      • Processor -> PassThroughItemProcessor

      • Writer -> Send to AMQP (aggregates the the list of items)


      Is there any best practice you could provide me ?





      According to the suggestions, this is how my partitioner looks like ;



      @Override
      public Map<String, ExecutionContext> partition(int gridSize) {
      log.debug("START: Partition");

      Map<String, ExecutionContext> partitionMap = new HashMap<>();
      final AtomicInteger counter = new AtomicInteger(0);
      final AtomicInteger partitionerCounter = new AtomicInteger(0);
      Page<Integer> result = null;
      do {
      result = repository.findDistinctByBatchId(LocalDateTime.parse(batchId, AipForecastService.DEFAULT_DATE_TIME_FORMATTER), Optional.ofNullable(result)
      .map(Page::nextPageable)
      .orElse(PageRequest.of(0, 100000)));
      result
      .stream()
      .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / 100))
      .values()
      .forEach(listOfInstallation -> {
      ExecutionContext context = new ExecutionContext();
      context.put("listOfInstallation", listOfInstallation);
      partitionMap.put("partition" + partitionerCounter.incrementAndGet(), context);
      log.debug("Adding to the partition map {}, listOfInstallation {}", partitionerCounter.get(), listOfInstallation);
      });
      } while (result.hasNext());

      log.debug("END: Created Partitions for installation job of size:{}", partitionMap.size());
      return partitionMap;
      }






      spring-batch






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 23 '18 at 13:31







      Jimmy Bway

















      asked Nov 19 '18 at 14:47









      Jimmy BwayJimmy Bway

      226




      226
























          1 Answer
          1






          active

          oldest

          votes


















          1















          i need to pass as an input to the job a list of id's, I would want that from that list of id's to be able pass to a step that could run all of them in parrallel




          You can partition that list and use a partitioned step to process partitions in parallel.




          Is there any best practice you could provide me ?




          If you choose the partitioned step route (which looks appropriate to me for your use case), I would recommend to not create a partition per id (unless you have a reasonable number of IDs). You can create for example a partition per range of IDs and make each worker step do the read/process/write logic you described which could be definitely done in parallel.



          Hope this helps.






          share|improve this answer
























          • Thanks for the reply, when you Say "i would recommend to notre create a partition per ID unless you have reasonable number of IDs" does 240k sounds reasonable for you ? 240k represents the total number of ID from wich a timeseries is associate. Possible results could be 240k * 365 Days * 2 years = 262800000. Also i was wondering if you could provide a concrete example of how to use the partionner for both case you mentionned

            – Jimmy Bway
            Nov 20 '18 at 12:25













          • Also need to keep in mind that my Writer have to send the aggregated data for a specific by YearMonth, therefore the payload sent to the Exchange has to represent a full YearMonth

            – Jimmy Bway
            Nov 20 '18 at 12:36






          • 1





            240k partitions is too high. So having a partition for each ID is not a good option for you. You can partition by range in that case. There is no best value for the number of partitions, but 100 is a good start I think. For examples, you can find one for local partitioning here: github.com/spring-projects/spring-batch/tree/master/… and another one for remote partitioning here: github.com/spring-projects/spring-batch/tree/master/…

            – Mahmoud Ben Hassine
            Nov 20 '18 at 14:26













          • Hi, I've tried as you pointed out. Perhaps using the Default Implementation of the PartitionHandler, makes futureCalls and blocking with (.get()) until the query is all executors returned the rs. Isn't there a better way I could implement ?

            – Jimmy Bway
            Nov 22 '18 at 21:17











          • I've paste the content of my implementation of the partitioner.

            – Jimmy Bway
            Nov 23 '18 at 13:22











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53377084%2fspring-batch-parrall-step%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1















          i need to pass as an input to the job a list of id's, I would want that from that list of id's to be able pass to a step that could run all of them in parrallel




          You can partition that list and use a partitioned step to process partitions in parallel.




          Is there any best practice you could provide me ?




          If you choose the partitioned step route (which looks appropriate to me for your use case), I would recommend to not create a partition per id (unless you have a reasonable number of IDs). You can create for example a partition per range of IDs and make each worker step do the read/process/write logic you described which could be definitely done in parallel.



          Hope this helps.






          share|improve this answer
























          • Thanks for the reply, when you Say "i would recommend to notre create a partition per ID unless you have reasonable number of IDs" does 240k sounds reasonable for you ? 240k represents the total number of ID from wich a timeseries is associate. Possible results could be 240k * 365 Days * 2 years = 262800000. Also i was wondering if you could provide a concrete example of how to use the partionner for both case you mentionned

            – Jimmy Bway
            Nov 20 '18 at 12:25













          • Also need to keep in mind that my Writer have to send the aggregated data for a specific by YearMonth, therefore the payload sent to the Exchange has to represent a full YearMonth

            – Jimmy Bway
            Nov 20 '18 at 12:36






          • 1





            240k partitions is too high. So having a partition for each ID is not a good option for you. You can partition by range in that case. There is no best value for the number of partitions, but 100 is a good start I think. For examples, you can find one for local partitioning here: github.com/spring-projects/spring-batch/tree/master/… and another one for remote partitioning here: github.com/spring-projects/spring-batch/tree/master/…

            – Mahmoud Ben Hassine
            Nov 20 '18 at 14:26













          • Hi, I've tried as you pointed out. Perhaps using the Default Implementation of the PartitionHandler, makes futureCalls and blocking with (.get()) until the query is all executors returned the rs. Isn't there a better way I could implement ?

            – Jimmy Bway
            Nov 22 '18 at 21:17











          • I've paste the content of my implementation of the partitioner.

            – Jimmy Bway
            Nov 23 '18 at 13:22
















          1















          i need to pass as an input to the job a list of id's, I would want that from that list of id's to be able pass to a step that could run all of them in parrallel




          You can partition that list and use a partitioned step to process partitions in parallel.




          Is there any best practice you could provide me ?




          If you choose the partitioned step route (which looks appropriate to me for your use case), I would recommend to not create a partition per id (unless you have a reasonable number of IDs). You can create for example a partition per range of IDs and make each worker step do the read/process/write logic you described which could be definitely done in parallel.



          Hope this helps.






          share|improve this answer
























          • Thanks for the reply, when you Say "i would recommend to notre create a partition per ID unless you have reasonable number of IDs" does 240k sounds reasonable for you ? 240k represents the total number of ID from wich a timeseries is associate. Possible results could be 240k * 365 Days * 2 years = 262800000. Also i was wondering if you could provide a concrete example of how to use the partionner for both case you mentionned

            – Jimmy Bway
            Nov 20 '18 at 12:25













          • Also need to keep in mind that my Writer have to send the aggregated data for a specific by YearMonth, therefore the payload sent to the Exchange has to represent a full YearMonth

            – Jimmy Bway
            Nov 20 '18 at 12:36






          • 1





            240k partitions is too high. So having a partition for each ID is not a good option for you. You can partition by range in that case. There is no best value for the number of partitions, but 100 is a good start I think. For examples, you can find one for local partitioning here: github.com/spring-projects/spring-batch/tree/master/… and another one for remote partitioning here: github.com/spring-projects/spring-batch/tree/master/…

            – Mahmoud Ben Hassine
            Nov 20 '18 at 14:26













          • Hi, I've tried as you pointed out. Perhaps using the Default Implementation of the PartitionHandler, makes futureCalls and blocking with (.get()) until the query is all executors returned the rs. Isn't there a better way I could implement ?

            – Jimmy Bway
            Nov 22 '18 at 21:17











          • I've paste the content of my implementation of the partitioner.

            – Jimmy Bway
            Nov 23 '18 at 13:22














          1












          1








          1








          i need to pass as an input to the job a list of id's, I would want that from that list of id's to be able pass to a step that could run all of them in parrallel




          You can partition that list and use a partitioned step to process partitions in parallel.




          Is there any best practice you could provide me ?




          If you choose the partitioned step route (which looks appropriate to me for your use case), I would recommend to not create a partition per id (unless you have a reasonable number of IDs). You can create for example a partition per range of IDs and make each worker step do the read/process/write logic you described which could be definitely done in parallel.



          Hope this helps.






          share|improve this answer














          i need to pass as an input to the job a list of id's, I would want that from that list of id's to be able pass to a step that could run all of them in parrallel




          You can partition that list and use a partitioned step to process partitions in parallel.




          Is there any best practice you could provide me ?




          If you choose the partitioned step route (which looks appropriate to me for your use case), I would recommend to not create a partition per id (unless you have a reasonable number of IDs). You can create for example a partition per range of IDs and make each worker step do the read/process/write logic you described which could be definitely done in parallel.



          Hope this helps.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 20 '18 at 8:36









          Mahmoud Ben HassineMahmoud Ben Hassine

          4,3101714




          4,3101714













          • Thanks for the reply, when you Say "i would recommend to notre create a partition per ID unless you have reasonable number of IDs" does 240k sounds reasonable for you ? 240k represents the total number of ID from wich a timeseries is associate. Possible results could be 240k * 365 Days * 2 years = 262800000. Also i was wondering if you could provide a concrete example of how to use the partionner for both case you mentionned

            – Jimmy Bway
            Nov 20 '18 at 12:25













          • Also need to keep in mind that my Writer have to send the aggregated data for a specific by YearMonth, therefore the payload sent to the Exchange has to represent a full YearMonth

            – Jimmy Bway
            Nov 20 '18 at 12:36






          • 1





            240k partitions is too high. So having a partition for each ID is not a good option for you. You can partition by range in that case. There is no best value for the number of partitions, but 100 is a good start I think. For examples, you can find one for local partitioning here: github.com/spring-projects/spring-batch/tree/master/… and another one for remote partitioning here: github.com/spring-projects/spring-batch/tree/master/…

            – Mahmoud Ben Hassine
            Nov 20 '18 at 14:26













          • Hi, I've tried as you pointed out. Perhaps using the Default Implementation of the PartitionHandler, makes futureCalls and blocking with (.get()) until the query is all executors returned the rs. Isn't there a better way I could implement ?

            – Jimmy Bway
            Nov 22 '18 at 21:17











          • I've paste the content of my implementation of the partitioner.

            – Jimmy Bway
            Nov 23 '18 at 13:22



















          • Thanks for the reply, when you Say "i would recommend to notre create a partition per ID unless you have reasonable number of IDs" does 240k sounds reasonable for you ? 240k represents the total number of ID from wich a timeseries is associate. Possible results could be 240k * 365 Days * 2 years = 262800000. Also i was wondering if you could provide a concrete example of how to use the partionner for both case you mentionned

            – Jimmy Bway
            Nov 20 '18 at 12:25













          • Also need to keep in mind that my Writer have to send the aggregated data for a specific by YearMonth, therefore the payload sent to the Exchange has to represent a full YearMonth

            – Jimmy Bway
            Nov 20 '18 at 12:36






          • 1





            240k partitions is too high. So having a partition for each ID is not a good option for you. You can partition by range in that case. There is no best value for the number of partitions, but 100 is a good start I think. For examples, you can find one for local partitioning here: github.com/spring-projects/spring-batch/tree/master/… and another one for remote partitioning here: github.com/spring-projects/spring-batch/tree/master/…

            – Mahmoud Ben Hassine
            Nov 20 '18 at 14:26













          • Hi, I've tried as you pointed out. Perhaps using the Default Implementation of the PartitionHandler, makes futureCalls and blocking with (.get()) until the query is all executors returned the rs. Isn't there a better way I could implement ?

            – Jimmy Bway
            Nov 22 '18 at 21:17











          • I've paste the content of my implementation of the partitioner.

            – Jimmy Bway
            Nov 23 '18 at 13:22

















          Thanks for the reply, when you Say "i would recommend to notre create a partition per ID unless you have reasonable number of IDs" does 240k sounds reasonable for you ? 240k represents the total number of ID from wich a timeseries is associate. Possible results could be 240k * 365 Days * 2 years = 262800000. Also i was wondering if you could provide a concrete example of how to use the partionner for both case you mentionned

          – Jimmy Bway
          Nov 20 '18 at 12:25







          Thanks for the reply, when you Say "i would recommend to notre create a partition per ID unless you have reasonable number of IDs" does 240k sounds reasonable for you ? 240k represents the total number of ID from wich a timeseries is associate. Possible results could be 240k * 365 Days * 2 years = 262800000. Also i was wondering if you could provide a concrete example of how to use the partionner for both case you mentionned

          – Jimmy Bway
          Nov 20 '18 at 12:25















          Also need to keep in mind that my Writer have to send the aggregated data for a specific by YearMonth, therefore the payload sent to the Exchange has to represent a full YearMonth

          – Jimmy Bway
          Nov 20 '18 at 12:36





          Also need to keep in mind that my Writer have to send the aggregated data for a specific by YearMonth, therefore the payload sent to the Exchange has to represent a full YearMonth

          – Jimmy Bway
          Nov 20 '18 at 12:36




          1




          1





          240k partitions is too high. So having a partition for each ID is not a good option for you. You can partition by range in that case. There is no best value for the number of partitions, but 100 is a good start I think. For examples, you can find one for local partitioning here: github.com/spring-projects/spring-batch/tree/master/… and another one for remote partitioning here: github.com/spring-projects/spring-batch/tree/master/…

          – Mahmoud Ben Hassine
          Nov 20 '18 at 14:26







          240k partitions is too high. So having a partition for each ID is not a good option for you. You can partition by range in that case. There is no best value for the number of partitions, but 100 is a good start I think. For examples, you can find one for local partitioning here: github.com/spring-projects/spring-batch/tree/master/… and another one for remote partitioning here: github.com/spring-projects/spring-batch/tree/master/…

          – Mahmoud Ben Hassine
          Nov 20 '18 at 14:26















          Hi, I've tried as you pointed out. Perhaps using the Default Implementation of the PartitionHandler, makes futureCalls and blocking with (.get()) until the query is all executors returned the rs. Isn't there a better way I could implement ?

          – Jimmy Bway
          Nov 22 '18 at 21:17





          Hi, I've tried as you pointed out. Perhaps using the Default Implementation of the PartitionHandler, makes futureCalls and blocking with (.get()) until the query is all executors returned the rs. Isn't there a better way I could implement ?

          – Jimmy Bway
          Nov 22 '18 at 21:17













          I've paste the content of my implementation of the partitioner.

          – Jimmy Bway
          Nov 23 '18 at 13:22





          I've paste the content of my implementation of the partitioner.

          – Jimmy Bway
          Nov 23 '18 at 13:22


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53377084%2fspring-batch-parrall-step%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Guess what letter conforming each word

          Run scheduled task as local user group (not BUILTIN)

          Port of Spain