What's the best way to update the state manually in Apache Flink?












2















I'm using Apache Flink in a stock market project to calculate the current price change. The formula is



 price_change = (current_price - previous_close_price) / previous_close_price


previous_close_price is a security's closing price on the preceding day of trading. Everyday before the market opens, I need to update previous_close_price.



Now I've come up with several solutions but I don't know which one is the best.




  1. Store previous_close_price in redis and fetch the price in every calculation. It's easy and flexible to update the price but this solution could kill the performance.


  2. Set the TTL of state to 1 day. Get the new state when the old state is expired. But it's not flexible as the TTL is hardcoded.


  3. Broadcast State Pattern. I'm not sure if this solution works.


  4. Send a special message to flink. When flink receives the message, it updates the previous_close_price.



Any suggestions are appreicated.










share|improve this question



























    2















    I'm using Apache Flink in a stock market project to calculate the current price change. The formula is



     price_change = (current_price - previous_close_price) / previous_close_price


    previous_close_price is a security's closing price on the preceding day of trading. Everyday before the market opens, I need to update previous_close_price.



    Now I've come up with several solutions but I don't know which one is the best.




    1. Store previous_close_price in redis and fetch the price in every calculation. It's easy and flexible to update the price but this solution could kill the performance.


    2. Set the TTL of state to 1 day. Get the new state when the old state is expired. But it's not flexible as the TTL is hardcoded.


    3. Broadcast State Pattern. I'm not sure if this solution works.


    4. Send a special message to flink. When flink receives the message, it updates the previous_close_price.



    Any suggestions are appreicated.










    share|improve this question

























      2












      2








      2








      I'm using Apache Flink in a stock market project to calculate the current price change. The formula is



       price_change = (current_price - previous_close_price) / previous_close_price


      previous_close_price is a security's closing price on the preceding day of trading. Everyday before the market opens, I need to update previous_close_price.



      Now I've come up with several solutions but I don't know which one is the best.




      1. Store previous_close_price in redis and fetch the price in every calculation. It's easy and flexible to update the price but this solution could kill the performance.


      2. Set the TTL of state to 1 day. Get the new state when the old state is expired. But it's not flexible as the TTL is hardcoded.


      3. Broadcast State Pattern. I'm not sure if this solution works.


      4. Send a special message to flink. When flink receives the message, it updates the previous_close_price.



      Any suggestions are appreicated.










      share|improve this question














      I'm using Apache Flink in a stock market project to calculate the current price change. The formula is



       price_change = (current_price - previous_close_price) / previous_close_price


      previous_close_price is a security's closing price on the preceding day of trading. Everyday before the market opens, I need to update previous_close_price.



      Now I've come up with several solutions but I don't know which one is the best.




      1. Store previous_close_price in redis and fetch the price in every calculation. It's easy and flexible to update the price but this solution could kill the performance.


      2. Set the TTL of state to 1 day. Get the new state when the old state is expired. But it's not flexible as the TTL is hardcoded.


      3. Broadcast State Pattern. I'm not sure if this solution works.


      4. Send a special message to flink. When flink receives the message, it updates the previous_close_price.



      Any suggestions are appreicated.







      java apache-flink flink-streaming






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 19 '18 at 14:43









      Kanjie LuKanjie Lu

      3051520




      3051520
























          1 Answer
          1






          active

          oldest

          votes


















          2














          I suggest a variant on #4:



          Have two sources, one used only for the closing prices, and the other for the stream of trades. Key both streams by the security, and connect them with a CoProcessFunction. Store the previous_close_price in keyed state, within the CoProcessFunction.



          Every day, before the market opens, stream in the updated closing prices.



          This could be done with a RichCoFlatMap, but I'm suggesting a CoProcessFunction because you might want to use a side output to report errors (e.g. securities where the previous_close_price is missing).



          As for the other approaches:




          1. I don't see any advantage to keeping the previous_close_price data in an external data store.

          2. I don't think this works very well. There's no hook available for triggering the loading of the new data, and moreover, the state will only be cleared when it's accessed.

          3. This doesn't feel like a good use case for broadcast state, unless there's a need for everyone in the cluster to know the closing prices for all securities.






          share|improve this answer


























          • That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?

            – Kanjie Lu
            Nov 20 '18 at 13:46











          • Yes, that is correct.

            – David Anderson
            Nov 20 '18 at 14:10











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53377011%2fwhats-the-best-way-to-update-the-state-manually-in-apache-flink%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          2














          I suggest a variant on #4:



          Have two sources, one used only for the closing prices, and the other for the stream of trades. Key both streams by the security, and connect them with a CoProcessFunction. Store the previous_close_price in keyed state, within the CoProcessFunction.



          Every day, before the market opens, stream in the updated closing prices.



          This could be done with a RichCoFlatMap, but I'm suggesting a CoProcessFunction because you might want to use a side output to report errors (e.g. securities where the previous_close_price is missing).



          As for the other approaches:




          1. I don't see any advantage to keeping the previous_close_price data in an external data store.

          2. I don't think this works very well. There's no hook available for triggering the loading of the new data, and moreover, the state will only be cleared when it's accessed.

          3. This doesn't feel like a good use case for broadcast state, unless there's a need for everyone in the cluster to know the closing prices for all securities.






          share|improve this answer


























          • That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?

            – Kanjie Lu
            Nov 20 '18 at 13:46











          • Yes, that is correct.

            – David Anderson
            Nov 20 '18 at 14:10
















          2














          I suggest a variant on #4:



          Have two sources, one used only for the closing prices, and the other for the stream of trades. Key both streams by the security, and connect them with a CoProcessFunction. Store the previous_close_price in keyed state, within the CoProcessFunction.



          Every day, before the market opens, stream in the updated closing prices.



          This could be done with a RichCoFlatMap, but I'm suggesting a CoProcessFunction because you might want to use a side output to report errors (e.g. securities where the previous_close_price is missing).



          As for the other approaches:




          1. I don't see any advantage to keeping the previous_close_price data in an external data store.

          2. I don't think this works very well. There's no hook available for triggering the loading of the new data, and moreover, the state will only be cleared when it's accessed.

          3. This doesn't feel like a good use case for broadcast state, unless there's a need for everyone in the cluster to know the closing prices for all securities.






          share|improve this answer


























          • That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?

            – Kanjie Lu
            Nov 20 '18 at 13:46











          • Yes, that is correct.

            – David Anderson
            Nov 20 '18 at 14:10














          2












          2








          2







          I suggest a variant on #4:



          Have two sources, one used only for the closing prices, and the other for the stream of trades. Key both streams by the security, and connect them with a CoProcessFunction. Store the previous_close_price in keyed state, within the CoProcessFunction.



          Every day, before the market opens, stream in the updated closing prices.



          This could be done with a RichCoFlatMap, but I'm suggesting a CoProcessFunction because you might want to use a side output to report errors (e.g. securities where the previous_close_price is missing).



          As for the other approaches:




          1. I don't see any advantage to keeping the previous_close_price data in an external data store.

          2. I don't think this works very well. There's no hook available for triggering the loading of the new data, and moreover, the state will only be cleared when it's accessed.

          3. This doesn't feel like a good use case for broadcast state, unless there's a need for everyone in the cluster to know the closing prices for all securities.






          share|improve this answer















          I suggest a variant on #4:



          Have two sources, one used only for the closing prices, and the other for the stream of trades. Key both streams by the security, and connect them with a CoProcessFunction. Store the previous_close_price in keyed state, within the CoProcessFunction.



          Every day, before the market opens, stream in the updated closing prices.



          This could be done with a RichCoFlatMap, but I'm suggesting a CoProcessFunction because you might want to use a side output to report errors (e.g. securities where the previous_close_price is missing).



          As for the other approaches:




          1. I don't see any advantage to keeping the previous_close_price data in an external data store.

          2. I don't think this works very well. There's no hook available for triggering the loading of the new data, and moreover, the state will only be cleared when it's accessed.

          3. This doesn't feel like a good use case for broadcast state, unless there's a need for everyone in the cluster to know the closing prices for all securities.







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 19 '18 at 19:43

























          answered Nov 19 '18 at 19:35









          David AndersonDavid Anderson

          5,54421321




          5,54421321













          • That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?

            – Kanjie Lu
            Nov 20 '18 at 13:46











          • Yes, that is correct.

            – David Anderson
            Nov 20 '18 at 14:10



















          • That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?

            – Kanjie Lu
            Nov 20 '18 at 13:46











          • Yes, that is correct.

            – David Anderson
            Nov 20 '18 at 14:10

















          That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?

          – Kanjie Lu
          Nov 20 '18 at 13:46





          That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?

          – Kanjie Lu
          Nov 20 '18 at 13:46













          Yes, that is correct.

          – David Anderson
          Nov 20 '18 at 14:10





          Yes, that is correct.

          – David Anderson
          Nov 20 '18 at 14:10


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53377011%2fwhats-the-best-way-to-update-the-state-manually-in-apache-flink%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          How to pass form data using jquery Ajax to insert data in database?

          National Museum of Racing and Hall of Fame

          Guess what letter conforming each word