RxJava: Complete stream based on condition
up vote
0
down vote
favorite
I have on Observable, which emits items when some data is coming from a BLE connection:
public interface CommunicationController {
Flowable<DataContainer> dataReceived();
}
On top of this I want to build a Observable
, which completes
when one of the following conditions is true:
a. I receive two messages of a specific type (this is done by using filter
operator on the received DataContainer
item.
communicationController.dataReceived()
.filter(data -> isTypeA(data) || isTypeB(data))
.take(2)
.toList()
.map(dataContainers -> doSomeMappingToCommon object)
b. I receive one message of a specific type (again using filter
operator).
communicationController.dataReceived()
.filter(data -> isTypeC(data))
.firstOrError()
.map(dataContainers -> doSomeMappingToCommon object);
How can I combine those two Observable
s into a single one?
Additionally only one of the two Observables will emit an item.

add a comment |
up vote
0
down vote
favorite
I have on Observable, which emits items when some data is coming from a BLE connection:
public interface CommunicationController {
Flowable<DataContainer> dataReceived();
}
On top of this I want to build a Observable
, which completes
when one of the following conditions is true:
a. I receive two messages of a specific type (this is done by using filter
operator on the received DataContainer
item.
communicationController.dataReceived()
.filter(data -> isTypeA(data) || isTypeB(data))
.take(2)
.toList()
.map(dataContainers -> doSomeMappingToCommon object)
b. I receive one message of a specific type (again using filter
operator).
communicationController.dataReceived()
.filter(data -> isTypeC(data))
.firstOrError()
.map(dataContainers -> doSomeMappingToCommon object);
How can I combine those two Observable
s into a single one?
Additionally only one of the two Observables will emit an item.

add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I have on Observable, which emits items when some data is coming from a BLE connection:
public interface CommunicationController {
Flowable<DataContainer> dataReceived();
}
On top of this I want to build a Observable
, which completes
when one of the following conditions is true:
a. I receive two messages of a specific type (this is done by using filter
operator on the received DataContainer
item.
communicationController.dataReceived()
.filter(data -> isTypeA(data) || isTypeB(data))
.take(2)
.toList()
.map(dataContainers -> doSomeMappingToCommon object)
b. I receive one message of a specific type (again using filter
operator).
communicationController.dataReceived()
.filter(data -> isTypeC(data))
.firstOrError()
.map(dataContainers -> doSomeMappingToCommon object);
How can I combine those two Observable
s into a single one?
Additionally only one of the two Observables will emit an item.

I have on Observable, which emits items when some data is coming from a BLE connection:
public interface CommunicationController {
Flowable<DataContainer> dataReceived();
}
On top of this I want to build a Observable
, which completes
when one of the following conditions is true:
a. I receive two messages of a specific type (this is done by using filter
operator on the received DataContainer
item.
communicationController.dataReceived()
.filter(data -> isTypeA(data) || isTypeB(data))
.take(2)
.toList()
.map(dataContainers -> doSomeMappingToCommon object)
b. I receive one message of a specific type (again using filter
operator).
communicationController.dataReceived()
.filter(data -> isTypeC(data))
.firstOrError()
.map(dataContainers -> doSomeMappingToCommon object);
How can I combine those two Observable
s into a single one?
Additionally only one of the two Observables will emit an item.


edited Nov 8 at 9:46
asked Nov 8 at 9:34
Christopher
4,12512142
4,12512142
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
up vote
3
down vote
accepted
I think your answer will be Observable.merge: http://reactivex.io/documentation/operators/merge.html
According to the documentation, this does not wait for every observable to emit something.
One option is to use merge and takeUntil: http://reactivex.io/documentation/operators/takeuntil.html
Kotlin code:
val stopStream = Observable.merge(...)
streamThatWillRun = streamThatWillRun.takeUntil(stopStream)
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
add a comment |
up vote
0
down vote
To combine 2 observables, you can use operator zipWith. Example:
Observable<AppMetaDataBiz> appMetaDataObservable = this.mAppRepository.getAppMetaData();
Observable<ProductBiz> productDetailsObservable =
this.mProductRepository.getProductDetails(this.productId);
return productDetailsObservable.zipWith(appMetaDataObservable,
((jmProduct, jmAppMetaData) -> {
//TODO: implement business logic
return jmProduct;
}));
New contributor
Louis Solo is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
Hmm,zipWith
will not solve my problem, as it will only emit one event if bothObservable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.
– Christopher
Nov 8 at 9:46
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
add a comment |
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
3
down vote
accepted
I think your answer will be Observable.merge: http://reactivex.io/documentation/operators/merge.html
According to the documentation, this does not wait for every observable to emit something.
One option is to use merge and takeUntil: http://reactivex.io/documentation/operators/takeuntil.html
Kotlin code:
val stopStream = Observable.merge(...)
streamThatWillRun = streamThatWillRun.takeUntil(stopStream)
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
add a comment |
up vote
3
down vote
accepted
I think your answer will be Observable.merge: http://reactivex.io/documentation/operators/merge.html
According to the documentation, this does not wait for every observable to emit something.
One option is to use merge and takeUntil: http://reactivex.io/documentation/operators/takeuntil.html
Kotlin code:
val stopStream = Observable.merge(...)
streamThatWillRun = streamThatWillRun.takeUntil(stopStream)
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
add a comment |
up vote
3
down vote
accepted
up vote
3
down vote
accepted
I think your answer will be Observable.merge: http://reactivex.io/documentation/operators/merge.html
According to the documentation, this does not wait for every observable to emit something.
One option is to use merge and takeUntil: http://reactivex.io/documentation/operators/takeuntil.html
Kotlin code:
val stopStream = Observable.merge(...)
streamThatWillRun = streamThatWillRun.takeUntil(stopStream)
I think your answer will be Observable.merge: http://reactivex.io/documentation/operators/merge.html
According to the documentation, this does not wait for every observable to emit something.
One option is to use merge and takeUntil: http://reactivex.io/documentation/operators/takeuntil.html
Kotlin code:
val stopStream = Observable.merge(...)
streamThatWillRun = streamThatWillRun.takeUntil(stopStream)
answered Nov 8 at 10:41


Leandro Ocampo
694515
694515
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
add a comment |
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
Yes, thanks a lot!
– Christopher
Nov 8 at 11:12
add a comment |
up vote
0
down vote
To combine 2 observables, you can use operator zipWith. Example:
Observable<AppMetaDataBiz> appMetaDataObservable = this.mAppRepository.getAppMetaData();
Observable<ProductBiz> productDetailsObservable =
this.mProductRepository.getProductDetails(this.productId);
return productDetailsObservable.zipWith(appMetaDataObservable,
((jmProduct, jmAppMetaData) -> {
//TODO: implement business logic
return jmProduct;
}));
New contributor
Louis Solo is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
Hmm,zipWith
will not solve my problem, as it will only emit one event if bothObservable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.
– Christopher
Nov 8 at 9:46
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
add a comment |
up vote
0
down vote
To combine 2 observables, you can use operator zipWith. Example:
Observable<AppMetaDataBiz> appMetaDataObservable = this.mAppRepository.getAppMetaData();
Observable<ProductBiz> productDetailsObservable =
this.mProductRepository.getProductDetails(this.productId);
return productDetailsObservable.zipWith(appMetaDataObservable,
((jmProduct, jmAppMetaData) -> {
//TODO: implement business logic
return jmProduct;
}));
New contributor
Louis Solo is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
Hmm,zipWith
will not solve my problem, as it will only emit one event if bothObservable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.
– Christopher
Nov 8 at 9:46
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
add a comment |
up vote
0
down vote
up vote
0
down vote
To combine 2 observables, you can use operator zipWith. Example:
Observable<AppMetaDataBiz> appMetaDataObservable = this.mAppRepository.getAppMetaData();
Observable<ProductBiz> productDetailsObservable =
this.mProductRepository.getProductDetails(this.productId);
return productDetailsObservable.zipWith(appMetaDataObservable,
((jmProduct, jmAppMetaData) -> {
//TODO: implement business logic
return jmProduct;
}));
New contributor
Louis Solo is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
To combine 2 observables, you can use operator zipWith. Example:
Observable<AppMetaDataBiz> appMetaDataObservable = this.mAppRepository.getAppMetaData();
Observable<ProductBiz> productDetailsObservable =
this.mProductRepository.getProductDetails(this.productId);
return productDetailsObservable.zipWith(appMetaDataObservable,
((jmProduct, jmAppMetaData) -> {
//TODO: implement business logic
return jmProduct;
}));
New contributor
Louis Solo is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
New contributor
Louis Solo is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
answered Nov 8 at 9:40
Louis Solo
1062
1062
New contributor
Louis Solo is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
New contributor
Louis Solo is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
Louis Solo is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
Hmm,zipWith
will not solve my problem, as it will only emit one event if bothObservable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.
– Christopher
Nov 8 at 9:46
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
add a comment |
Hmm,zipWith
will not solve my problem, as it will only emit one event if bothObservable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.
– Christopher
Nov 8 at 9:46
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
Hmm,
zipWith
will not solve my problem, as it will only emit one event if both Observable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.– Christopher
Nov 8 at 9:46
Hmm,
zipWith
will not solve my problem, as it will only emit one event if both Observable
s emit one event. In my case it's more that only one of the two observables will emit one item. I should clarify this in the question.– Christopher
Nov 8 at 9:46
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
Hmm, I may get your idea. If I'm right, combineLatest may help you: reactivex.io/documentation/operators/combinelatest.html
– Louis Solo
Nov 8 at 9:52
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53204924%2frxjava-complete-stream-based-on-condition%23new-answer', 'question_page');
}
);
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password