In the previous post we created an Observable and emitted data using the onNext() method. We have a subscriber who receives the data when the publisher pushes the data. How about play with the data-receival a bit?
We have the temperature of four cities pushed one after the other to the subscriber. Let’s say, we would like to control the data received. We’ll use a simple yet effective method called buffer(). buffer() method has a plenty of overloaded versions. Let’s use some of them in this post.
Let’s buffer 2 results and release them to the subscriber as shown in the code below.
public static void main(String[] args) throws Exception{ Observable<String> producer = Observable.create(emitter -> { for(String city : cities){ String result = "Temperature of " + city + " is " + Math.random()*50 + " deg celsius"; emitter.onNext(result); } }); producer .buffer(2) .subscribe((result) -> { System.out.println("Received result"); result.forEach(r -> System.out.println(r)); }); }
In the code we have used the buffer(int) method to buffer two results. When you run the code, the output is here.
Received result Temperature of Chennai is 21.098286787127048 deg celsius Temperature of London is 36.91353784588999 deg celsius Received result Temperature of Paris is 41.135339668859615 deg celsius Temperature of Rome is 29.937183626664833 deg celsius
We can buffer with time also. Let’s buffer the results for a particular timespan say 2 seconds as shown here.
producer .buffer(500, TimeUnit.MILLISECONDS) .subscribe((result) -> { System.out.println("Received result"); result.forEach(r -> System.out.println(r)); }); Thread.sleep(3000);
We are buffering the output for 500 milliseconds and releasing them. To avoid the main thread-exit I have added a sleep statement for 3 seconds. Here’s the output of this code.
Received result Temperature of Chennai is 18.942744552146106 deg celsius Temperature of London is 42.56709977653646 deg celsius Temperature of Paris is 0.5756428440447103 deg celsius Temperature of Rome is 10.926217595325788 deg celsius Received result Received result Received result Received result Received result
So what we notice is, the subscriber’s result handler is invoked every 500 milliseconds. When there’s an output, it’s displayed. We can use both the count and timespan options as shown below.
producer .buffer(1,TimeUnit.SECONDS,3) .subscribe((result) -> { System.out.println("Received result"); result.forEach(r -> System.out.println(r)); }); Thread.sleep(3000);
In this case, the publisher buffers for 1 second or till 3 items are received. The output of this code is shown below.
Received result Temperature of Chennai is 15.55913859792542 deg celsius Temperature of London is 49.949073211549475 deg celsius Temperature of Paris is 32.1107414407108 deg celsius Received result Temperature of Rome is 15.025921328198798 deg celsius Received result Received result