<!DOCTYPE html>
<html>
<head>
<link rel="stylesheet" href="style.css">
<script src="https://npmcdn.com/rxjs@5.0.0-beta.7/bundles/Rx.umd.js"></script>
<script src="script.js"></script>
</head>
<body>
<div id="displaying">
Click inside the <b><code>document</code></b> to toggle behavior.<br/>
<b><code>pausableBuffered</code></b> is currently <b>unpaused</b>.<br/>
The live count is –.<br/>
</div>
<div><p>Open your console to see the latest emissions from <b><code>pausableBuffered</code></b>.</p></div>
<div id="log">The last 20 items emitted by <b><code>pausableBuffered</code></b>:<br/></div>
</body>
</html>
/********* Setup *********/
/**
* Build a toggle to control the `pauser`; alternate
* emitting `true` and `false` every click on the
* `document`. `scan()` lets us do this easily.
*/
var clicks = Rx.Observable.fromEvent(document, 'click')
.scan(function(acc) {
return !acc;
}, true);
var pauser = new Rx.Subject();
clicks.subscribe(pauser);
var interval = Rx.Observable.interval(400);
var source = new Rx.Subject();
/**
* We need to use a subject subscribed to `interval`,
* otherwise we receive the interval from the
* beginning every time the `pausableBuffered` is
* toggled.
*/
interval.subscribe(source);
/********* Buffering *********/
/**
* The `buffer()` will close & emit its array
* every time the `pauser` emits. We want to
* emit the items from the array in order as fast as
* possible. We can do this with `.mergeAll()`.
*
* Instead of using the `buffer()` method presented
* below, you can also use `bufferToggle()` like this:
* ```
* .bufferToggle(pauser, function (toggle) {
* return toggle ? Rx.Observable.empty() : pauser;
* })
* ```
*
* Using `bufferToggle()` should be more performant.
* I'm not sure because I haven't inspected the
* implementation, but if it works like I think it
* does, then using it would skip buffering emissions
* while unpaused. It is more code, however. If the
* `pausableBuffered` is going to be running unpaused
* most of the time, then the perfomance gain will be
* negligible and the `buffer()` syntax is probably
* preferred.
*/
var buffer = new Rx.Subject();
source
.buffer(pauser)
.mergeAll()
.subscribe(buffer);
/********* Pausing *********/
/**
* `pausableBuffered` should alternately emit from
* `buffer` and `source`.
*/
var pausableBuffered = pauser.switchMap(function(paused) {
return paused ? buffer : source;
});
pausableBuffered.subscribe(function(x) {
console.log('Next: ' + x.toString());
});
/********* Displaying *********/
var makeTemplate = function(count, isPaused) {
var paused = isPaused ? 'paused & buffering' : 'unpaused';
return 'Click inside the <b><code>document</code></b> to toggle behavior.<br/><b><code>pausableBuffered</code></b> is currently <b>' + paused + '</b>.<br/>The live count is <b>' + count + '</b>.';
};
Rx.Observable.combineLatest(source, pauser, makeTemplate)
.subscribe(function(inner) {
document.getElementById('displaying').innerHTML = inner;
});
var log = [];
var makeLog = function(latest) {
log.push(latest);
log = log.slice(-20);
document.getElementById('log').innerHTML = log.reduce(function(acc, curr) {
return acc.replace('</ol>', '') + '<li> – <b><code>' + curr + '</code></b></li></ol>';
}, 'The last 20 items emitted by <b><code>pausableBuffered</code></b>:<br/><ol>');
};
pausableBuffered.subscribe(makeLog);
/********* Start *********/
pauser.next(false);
/* Styles go here */
#log ol {
list-style-type: lower-roman;
}
## `pausableBuffered` in `rxjs@5`
This is an implementation of `pausableBuffered` from
`rxjs@4` in `rxjs@5.0.0-beta.7`. Inspired by a discussion here:
https://github.com/ReactiveX/rxjs/issues/1542.
### The magic
We need a `pauser` and a `source`. In the plunker these are an observable listening
to clicks on the `document` and an interval. We also need a `buffer` that we'll
subscribe to the buffered output the source.
```js
var pauser = new Rx.Subject();
var source = new Rx.Subject();
var buffer = new Rx.Subject();
getObservableSomehow.subscribe(source);
```
> **N.B.**: We don't want to use the original source observable directly because
when we do, the emissions from our `pausableBuffered` get reset to the beginning
of the sequence each time we toggle the pauser.
We want to buffer the emissions from `source`, and close and emit this buffer
whenever `pauser` emits (i.e. is toggled). We don't want to emit arrays, but
rather the indivual items, so we make sure to `mergeAll()` before we subcribe
`buffer`.
```js
source
.buffer(pauser)
.mergeAll()
.subscribe(buffer);
```
Finally, we use a `switchMap()` on `pauser` to emit items either from `source` or
`buffer`.
```js
var pausableBuffered = pauser.switchMap(function(paused) {
return paused ? buffer : source;
});
pausableBuffered.subscribe(function(x) {
console.log('Next: ' + x.toString());
});
```
### Alternate implementation: `bufferToggle()`
Instead of using the `buffer()` method presented above, you can also use
`bufferToggle()` like this:
```
source
.bufferToggle(pauser, function (toggle) {
return toggle ? Rx.Observable.empty() : pauser;
})
.mergeAll()
.subscribe(buffer);
```
Using `bufferToggle()` may be more performant. I'm not sure because I haven't
inspected the implementation, but if it works like I think it does, then using
it would skip buffering emissions while unpaused. It is more code, however. If
the `pausableBuffered` is going to be running unpaused most of the time, then
the perfomance gain will be negligible and the `buffer()` syntax is probably
preferred.