What's the best way to update the state manually in Apache Flink?
I'm using Apache Flink in a stock market project to calculate the current price change. The formula is
price_change = (current_price - previous_close_price) / previous_close_price
previous_close_price is a security's closing price on the preceding day of trading. Everyday before the market opens, I need to update previous_close_price.
Now I've come up with several solutions but I don't know which one is the best.
Store
previous_close_pricein redis and fetch the price in every calculation. It's easy and flexible to update the price but this solution could kill the performance.Set the TTL of state to 1 day. Get the new state when the old state is expired. But it's not flexible as the TTL is hardcoded.
Broadcast State Pattern. I'm not sure if this solution works.
Send a special message to flink. When flink receives the message, it updates the
previous_close_price.
Any suggestions are appreicated.
java apache-flink flink-streaming
add a comment |
I'm using Apache Flink in a stock market project to calculate the current price change. The formula is
price_change = (current_price - previous_close_price) / previous_close_price
previous_close_price is a security's closing price on the preceding day of trading. Everyday before the market opens, I need to update previous_close_price.
Now I've come up with several solutions but I don't know which one is the best.
Store
previous_close_pricein redis and fetch the price in every calculation. It's easy and flexible to update the price but this solution could kill the performance.Set the TTL of state to 1 day. Get the new state when the old state is expired. But it's not flexible as the TTL is hardcoded.
Broadcast State Pattern. I'm not sure if this solution works.
Send a special message to flink. When flink receives the message, it updates the
previous_close_price.
Any suggestions are appreicated.
java apache-flink flink-streaming
add a comment |
I'm using Apache Flink in a stock market project to calculate the current price change. The formula is
price_change = (current_price - previous_close_price) / previous_close_price
previous_close_price is a security's closing price on the preceding day of trading. Everyday before the market opens, I need to update previous_close_price.
Now I've come up with several solutions but I don't know which one is the best.
Store
previous_close_pricein redis and fetch the price in every calculation. It's easy and flexible to update the price but this solution could kill the performance.Set the TTL of state to 1 day. Get the new state when the old state is expired. But it's not flexible as the TTL is hardcoded.
Broadcast State Pattern. I'm not sure if this solution works.
Send a special message to flink. When flink receives the message, it updates the
previous_close_price.
Any suggestions are appreicated.
java apache-flink flink-streaming
I'm using Apache Flink in a stock market project to calculate the current price change. The formula is
price_change = (current_price - previous_close_price) / previous_close_price
previous_close_price is a security's closing price on the preceding day of trading. Everyday before the market opens, I need to update previous_close_price.
Now I've come up with several solutions but I don't know which one is the best.
Store
previous_close_pricein redis and fetch the price in every calculation. It's easy and flexible to update the price but this solution could kill the performance.Set the TTL of state to 1 day. Get the new state when the old state is expired. But it's not flexible as the TTL is hardcoded.
Broadcast State Pattern. I'm not sure if this solution works.
Send a special message to flink. When flink receives the message, it updates the
previous_close_price.
Any suggestions are appreicated.
java apache-flink flink-streaming
java apache-flink flink-streaming
asked Nov 19 '18 at 14:43
Kanjie LuKanjie Lu
3051520
3051520
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
I suggest a variant on #4:
Have two sources, one used only for the closing prices, and the other for the stream of trades. Key both streams by the security, and connect them with a CoProcessFunction. Store the previous_close_price in keyed state, within the CoProcessFunction.
Every day, before the market opens, stream in the updated closing prices.
This could be done with a RichCoFlatMap, but I'm suggesting a CoProcessFunction because you might want to use a side output to report errors (e.g. securities where the previous_close_price is missing).
As for the other approaches:
- I don't see any advantage to keeping the previous_close_price data in an external data store.
- I don't think this works very well. There's no hook available for triggering the loading of the new data, and moreover, the state will only be cleared when it's accessed.
- This doesn't feel like a good use case for broadcast state, unless there's a need for everyone in the cluster to know the closing prices for all securities.
That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?
– Kanjie Lu
Nov 20 '18 at 13:46
Yes, that is correct.
– David Anderson
Nov 20 '18 at 14:10
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53377011%2fwhats-the-best-way-to-update-the-state-manually-in-apache-flink%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
I suggest a variant on #4:
Have two sources, one used only for the closing prices, and the other for the stream of trades. Key both streams by the security, and connect them with a CoProcessFunction. Store the previous_close_price in keyed state, within the CoProcessFunction.
Every day, before the market opens, stream in the updated closing prices.
This could be done with a RichCoFlatMap, but I'm suggesting a CoProcessFunction because you might want to use a side output to report errors (e.g. securities where the previous_close_price is missing).
As for the other approaches:
- I don't see any advantage to keeping the previous_close_price data in an external data store.
- I don't think this works very well. There's no hook available for triggering the loading of the new data, and moreover, the state will only be cleared when it's accessed.
- This doesn't feel like a good use case for broadcast state, unless there's a need for everyone in the cluster to know the closing prices for all securities.
That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?
– Kanjie Lu
Nov 20 '18 at 13:46
Yes, that is correct.
– David Anderson
Nov 20 '18 at 14:10
add a comment |
I suggest a variant on #4:
Have two sources, one used only for the closing prices, and the other for the stream of trades. Key both streams by the security, and connect them with a CoProcessFunction. Store the previous_close_price in keyed state, within the CoProcessFunction.
Every day, before the market opens, stream in the updated closing prices.
This could be done with a RichCoFlatMap, but I'm suggesting a CoProcessFunction because you might want to use a side output to report errors (e.g. securities where the previous_close_price is missing).
As for the other approaches:
- I don't see any advantage to keeping the previous_close_price data in an external data store.
- I don't think this works very well. There's no hook available for triggering the loading of the new data, and moreover, the state will only be cleared when it's accessed.
- This doesn't feel like a good use case for broadcast state, unless there's a need for everyone in the cluster to know the closing prices for all securities.
That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?
– Kanjie Lu
Nov 20 '18 at 13:46
Yes, that is correct.
– David Anderson
Nov 20 '18 at 14:10
add a comment |
I suggest a variant on #4:
Have two sources, one used only for the closing prices, and the other for the stream of trades. Key both streams by the security, and connect them with a CoProcessFunction. Store the previous_close_price in keyed state, within the CoProcessFunction.
Every day, before the market opens, stream in the updated closing prices.
This could be done with a RichCoFlatMap, but I'm suggesting a CoProcessFunction because you might want to use a side output to report errors (e.g. securities where the previous_close_price is missing).
As for the other approaches:
- I don't see any advantage to keeping the previous_close_price data in an external data store.
- I don't think this works very well. There's no hook available for triggering the loading of the new data, and moreover, the state will only be cleared when it's accessed.
- This doesn't feel like a good use case for broadcast state, unless there's a need for everyone in the cluster to know the closing prices for all securities.
I suggest a variant on #4:
Have two sources, one used only for the closing prices, and the other for the stream of trades. Key both streams by the security, and connect them with a CoProcessFunction. Store the previous_close_price in keyed state, within the CoProcessFunction.
Every day, before the market opens, stream in the updated closing prices.
This could be done with a RichCoFlatMap, but I'm suggesting a CoProcessFunction because you might want to use a side output to report errors (e.g. securities where the previous_close_price is missing).
As for the other approaches:
- I don't see any advantage to keeping the previous_close_price data in an external data store.
- I don't think this works very well. There's no hook available for triggering the loading of the new data, and moreover, the state will only be cleared when it's accessed.
- This doesn't feel like a good use case for broadcast state, unless there's a need for everyone in the cluster to know the closing prices for all securities.
edited Nov 19 '18 at 19:43
answered Nov 19 '18 at 19:35
David AndersonDavid Anderson
5,54421321
5,54421321
That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?
– Kanjie Lu
Nov 20 '18 at 13:46
Yes, that is correct.
– David Anderson
Nov 20 '18 at 14:10
add a comment |
That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?
– Kanjie Lu
Nov 20 '18 at 13:46
Yes, that is correct.
– David Anderson
Nov 20 '18 at 14:10
That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?
– Kanjie Lu
Nov 20 '18 at 13:46
That's exactly I desired @David Anderson. But should I connect them like tradeStream.keyBy("security").connect(closingPriceStream.keyBy("security")).process(new CoPressFunction(){...})? Does it make sure that the trade stream can share the state with closing price stream if they have the same security?
– Kanjie Lu
Nov 20 '18 at 13:46
Yes, that is correct.
– David Anderson
Nov 20 '18 at 14:10
Yes, that is correct.
– David Anderson
Nov 20 '18 at 14:10
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53377011%2fwhats-the-best-way-to-update-the-state-manually-in-apache-flink%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown