Do you have a list of Observable streams that you would like to combine into one stream? You could use Observable.merge(observable).merge(observable). But what if you have a list and don't want to chain each and everyone of them? After reading this, you know how to use the Observable.merge factory method to merge a list of observables.

Merge multiple Python rx Observables into one Observable without creating an empty Observable.

Let’s say you have three streams that produce strings. You would like to end up with one stream to subscribe to:

stream1       ----x---------
stream2       ------x-------
stream3       --------x-----

single_stream ----x-x-x-----

Python allows you to merge observables by chaining them:

single_stream = stream1.merge(stream2).merge(stream3)

That works as long as you have a fixed number of streams. If you have a list of observables, you need to do something like:

streams = [stream1, stream2, stream3]

single_stream = Observable.empty()

for s in streams:
single_stream = single_stream.merge(s)

But there is an easier way. We can use the Observable.merge factory method to create a new observable, based on a list of observables.

Use Observable.merge factory to create an observable from a list of observables

streams = [stream1, stream2, stream3]
single_stream = Observable.merge(streams)

If you want to test this code yourself, open a terminal and create a project like this:

mkdir rx_fun
cd rx_fun/
python3 -m venv env
source env/bin/activate
pip install rx

Create test.py and enter this code:

from rx.subjects import Subject
from rx import Observable

stream1 = Subject()
stream2 = Subject()
stream3 = Subject()

streams = [stream1, stream2, stream3]

single_stream = Observable.merge(streams)

single_stream.subscribe(lambda x: print(x))

stream1.on_next("vera")
stream2.on_next("chuck")
stream3.on_next("dave")

If you run this code, the result is:

vera
chuck
dave
Written by Loek van den Ouweland on 2019-02-09.
Questions regarding this artice? You can send them to the address below.