RxJava: Complete stream based on condition
up vote
0
down vote
favorite
I have on Observable, which emits items when some data is coming from a BLE connection:
public interface CommunicationController {
Flowable<DataContainer> dataReceived();
}
On top of this I want to build a Observable
, which completes
when one of the following conditions is true:
a. I receive two messages of a specific type (this is done by using filter
operator on the received DataContainer
item.
communicationController.dataReceived()
.filter(data -> isTypeA(data) || isTypeB(data))
.take(2)
.toList()
.map(dataContainers -> doSomeMappingToCommon object)
b. I receive one message of a specific type (again using filter
operator).
communicationController.dataReceived()
.filter(data -> isTypeC(data))
.firstOrError()
.map(dataContainers -> doSomeMappingToCommon object);
How can I combine those two Observable
s into a single one?
Additionally only one of the two Observables will emit an item.
android rx-java2
add a comment |
up vote
0
down vote
favorite
I have on Observable, which emits items when some data is coming from a BLE connection:
public interface CommunicationController {
Flowable<DataContainer> dataReceived();
}
On top of this I want to build a Observable
, which completes
when one of the following conditions is true:
a. I receive two messages of a specific type (this is done by using filter
operator on the received DataContainer
item.
communicationController.dataReceived()
.filter(data -> isTypeA(data) || isTypeB(data))
.take(2)
.toList()
.map(dataContainers -> doSomeMappingToCommon object)
b. I receive one message of a specific type (again using filter
operator).
communicationController.dataReceived()
.filter(data -> isTypeC(data))
.firstOrError()
.map(dataContainers -> doSomeMappingToCommon object);
How can I combine those two Observable
s into a single one?
Additionally only one of the two Observables will emit an item.
android rx-java2
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I have on Observable, which emits items when some data is coming from a BLE connection:
public interface CommunicationController {
Flowable<DataContainer> dataReceived();
}
On top of this I want to build a Observable
, which completes
when one of the following conditions is true:
a. I receive two messages of a specific type (this is done by using filter
operator on the received DataContainer
item.
communicationController.dataReceived()
.filter(data -> isTypeA(data) || isTypeB(data))
.take(2)
.toList()
.map(dataContainers -> doSomeMappingToCommon object)
b. I receive one message of a specific type (again using filter
operator).
communicationController.dataReceived()
.filter(data -> isTypeC(data))
.firstOrError()
.map(dataContainers -> doSomeMappingToCommon object);
How can I combine those two Observable
s into a single one?
Additionally only one of the two Observables will emit an item.
android rx-java2
I have on Observable, which emits items when some data is coming from a BLE connection:
public interface CommunicationController {
Flowable<DataContainer> dataReceived();
}
On top of this I want to build a Observable
, which completes
when one of the following conditions is true:
a. I receive two messages of a specific type (this is done by using filter
operator on the received DataContainer
item.
communicationController.dataReceived()
.filter(data -> isTypeA(data) || isTypeB(data))
.take(2)
.toList()
.map(dataContainers -> doSomeMappingToCommon object)
b. I receive one message of a specific type (again using filter
operator).
communicationController.dataReceived()
.filter(data -> isTypeC(data))
.firstOrError()
.map(dataContainers -> doSomeMappingToCommon object);
How can I combine those two Observable
s into a single one?
Additionally only one of the two Observables will emit an item.
android rx-java2
android rx-java2
edited Nov 8 at 9:46
asked Nov 8 at 9:34
Christopher
4,12512142
4,12512142
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
up vote
3
down vote
accepted
I think your answer will be Observable.merge: http://reactivex.io/documentation/operators/merge.html
According to the documentation, this does not wait for every observable to emit something.
One option is to use merge and takeUntil: http://reactivex.io/documentation/operators/takeuntil.html
Kotlin code:
val stopStream = Observable.merge(...)
streamThatWillRun = streamThatWillRun.takeUntil(stopStream)
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
add a comment |
up vote
0
down vote
To combine 2 observables, you can use operator zipWith. Example:
Observable<AppMetaDataBiz> appMetaDataObservable = this.mAppRepository.getAppMetaData();
Observable<ProductBiz> productDetailsObservable =
this.mProductRepository.getProductDetails(this.productId);
return productDetailsObservable.zipWith(appMetaDataObservable,
((jmProduct, jmAppMetaData) -> {
//TODO: implement business logic
return jmProduct;
}));
New contributor
Hmm,zipWith
will not solve my problem, as it will only emit one event if bothObservable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.
– Christopher
Nov 8 at 9:46
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
add a comment |
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
3
down vote
accepted
I think your answer will be Observable.merge: http://reactivex.io/documentation/operators/merge.html
According to the documentation, this does not wait for every observable to emit something.
One option is to use merge and takeUntil: http://reactivex.io/documentation/operators/takeuntil.html
Kotlin code:
val stopStream = Observable.merge(...)
streamThatWillRun = streamThatWillRun.takeUntil(stopStream)
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
add a comment |
up vote
3
down vote
accepted
I think your answer will be Observable.merge: http://reactivex.io/documentation/operators/merge.html
According to the documentation, this does not wait for every observable to emit something.
One option is to use merge and takeUntil: http://reactivex.io/documentation/operators/takeuntil.html
Kotlin code:
val stopStream = Observable.merge(...)
streamThatWillRun = streamThatWillRun.takeUntil(stopStream)
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
add a comment |
up vote
3
down vote
accepted
up vote
3
down vote
accepted
I think your answer will be Observable.merge: http://reactivex.io/documentation/operators/merge.html
According to the documentation, this does not wait for every observable to emit something.
One option is to use merge and takeUntil: http://reactivex.io/documentation/operators/takeuntil.html
Kotlin code:
val stopStream = Observable.merge(...)
streamThatWillRun = streamThatWillRun.takeUntil(stopStream)
I think your answer will be Observable.merge: http://reactivex.io/documentation/operators/merge.html
According to the documentation, this does not wait for every observable to emit something.
One option is to use merge and takeUntil: http://reactivex.io/documentation/operators/takeuntil.html
Kotlin code:
val stopStream = Observable.merge(...)
streamThatWillRun = streamThatWillRun.takeUntil(stopStream)
answered Nov 8 at 10:41
Leandro Ocampo
694515
694515
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
add a comment |
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
add a comment |
up vote
0
down vote
To combine 2 observables, you can use operator zipWith. Example:
Observable<AppMetaDataBiz> appMetaDataObservable = this.mAppRepository.getAppMetaData();
Observable<ProductBiz> productDetailsObservable =
this.mProductRepository.getProductDetails(this.productId);
return productDetailsObservable.zipWith(appMetaDataObservable,
((jmProduct, jmAppMetaData) -> {
//TODO: implement business logic
return jmProduct;
}));
New contributor
Hmm,zipWith
will not solve my problem, as it will only emit one event if bothObservable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.
– Christopher
Nov 8 at 9:46
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
add a comment |
up vote
0
down vote
To combine 2 observables, you can use operator zipWith. Example:
Observable<AppMetaDataBiz> appMetaDataObservable = this.mAppRepository.getAppMetaData();
Observable<ProductBiz> productDetailsObservable =
this.mProductRepository.getProductDetails(this.productId);
return productDetailsObservable.zipWith(appMetaDataObservable,
((jmProduct, jmAppMetaData) -> {
//TODO: implement business logic
return jmProduct;
}));
New contributor
Hmm,zipWith
will not solve my problem, as it will only emit one event if bothObservable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.
– Christopher
Nov 8 at 9:46
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
add a comment |
up vote
0
down vote
up vote
0
down vote
To combine 2 observables, you can use operator zipWith. Example:
Observable<AppMetaDataBiz> appMetaDataObservable = this.mAppRepository.getAppMetaData();
Observable<ProductBiz> productDetailsObservable =
this.mProductRepository.getProductDetails(this.productId);
return productDetailsObservable.zipWith(appMetaDataObservable,
((jmProduct, jmAppMetaData) -> {
//TODO: implement business logic
return jmProduct;
}));
New contributor
To combine 2 observables, you can use operator zipWith. Example:
Observable<AppMetaDataBiz> appMetaDataObservable = this.mAppRepository.getAppMetaData();
Observable<ProductBiz> productDetailsObservable =
this.mProductRepository.getProductDetails(this.productId);
return productDetailsObservable.zipWith(appMetaDataObservable,
((jmProduct, jmAppMetaData) -> {
//TODO: implement business logic
return jmProduct;
}));
New contributor
New contributor
answered Nov 8 at 9:40
Louis Solo
1062
1062
New contributor
New contributor
Hmm,zipWith
will not solve my problem, as it will only emit one event if bothObservable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.
– Christopher
Nov 8 at 9:46
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
add a comment |
Hmm,zipWith
will not solve my problem, as it will only emit one event if bothObservable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.
– Christopher
Nov 8 at 9:46
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
Hmm,
zipWith
will not solve my problem, as it will only emit one event if both Observable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.– Christopher
Nov 8 at 9:46
Hmm,
zipWith
will not solve my problem, as it will only emit one event if both Observable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.– Christopher
Nov 8 at 9:46
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53204924%2frxjava-complete-stream-based-on-condition%23new-answer', 'question_page');
}
);
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password