<!DOCTYPE html>
<html>

  <head>
    <link rel="stylesheet" href="style.css">
    <script src="https://npmcdn.com/rxjs@5.0.0-beta.7/bundles/Rx.umd.js"></script>
    <script src="script.js"></script>
  </head>

  <body>
    <div id="displaying">
      Click inside the <b><code>document</code></b> to toggle behavior.<br/>
      <b><code>pausableBuffered</code></b> is currently <b>unpaused</b>.<br/>
      The live count is –.<br/>
    </div>
    <div><p>Open your console to see the latest emissions from <b><code>pausableBuffered</code></b>.</p></div>
    <div id="log">The last 20 items emitted by <b><code>pausableBuffered</code></b>:<br/></div>
  </body>

</html>
/********* Setup *********/
/**
 * Build a toggle to control the `pauser`; alternate 
 * emitting `true` and `false` every click on the
 * `document`. `scan()` lets us do this easily. 
 */
var clicks = Rx.Observable.fromEvent(document, 'click')
  .scan(function(acc) {
    return !acc;
  }, true);
var pauser = new Rx.Subject();
clicks.subscribe(pauser);

var interval = Rx.Observable.interval(400);
var source = new Rx.Subject();
/**
 * We need to use a subject subscribed to `interval`, 
 * otherwise we receive the interval from the 
 * beginning every time the `pausableBuffered` is 
 * toggled.
 */
interval.subscribe(source);

/********* Buffering *********/
/**
 * The `buffer()` will close & emit its array
 * every time the `pauser` emits. We want to
 * emit the items from the array in order as fast as
 * possible. We can do this with `.mergeAll()`.
 *
 * Instead of using the `buffer()` method presented
 * below, you can also use `bufferToggle()` like this:
 * ```
 * .bufferToggle(pauser, function (toggle) {
 *   return toggle ? Rx.Observable.empty() : pauser;
 * })
 * ```
 *
 * Using `bufferToggle()` should be more performant.
 * I'm not sure because I haven't inspected the 
 * implementation, but if it works like I think it
 * does, then using it would skip buffering emissions
 * while unpaused. It is more code, however. If the
 * `pausableBuffered` is going to be running unpaused
 * most of the time, then the perfomance gain will be 
 * negligible and the `buffer()` syntax is probably 
 * preferred.
 */
var buffer = new Rx.Subject();
source
  .buffer(pauser)
  .mergeAll()
  .subscribe(buffer);

/********* Pausing *********/
/**
 * `pausableBuffered` should alternately emit from
 * `buffer` and `source`.
 */
var pausableBuffered = pauser.switchMap(function(paused) {
  return paused ? buffer : source;
});
pausableBuffered.subscribe(function(x) {
  console.log('Next: ' + x.toString());
});


/********* Displaying *********/
var makeTemplate = function(count, isPaused) {
  var paused = isPaused ? 'paused & buffering' : 'unpaused';
  return 'Click inside the <b><code>document</code></b> to toggle behavior.<br/><b><code>pausableBuffered</code></b> is currently <b>' + paused + '</b>.<br/>The live count is <b>' + count + '</b>.';
};

Rx.Observable.combineLatest(source, pauser, makeTemplate)
  .subscribe(function(inner) {
    document.getElementById('displaying').innerHTML = inner;
  });

var log = [];
var makeLog = function(latest) {
  log.push(latest);
  log = log.slice(-20);
  document.getElementById('log').innerHTML = log.reduce(function(acc, curr) {
    return acc.replace('</ol>', '') + '<li> – <b><code>' + curr + '</code></b></li></ol>';
  }, 'The last 20 items emitted by <b><code>pausableBuffered</code></b>:<br/><ol>');
};
pausableBuffered.subscribe(makeLog);

/********* Start *********/
pauser.next(false);
/* Styles go here */

#log ol {
    list-style-type: lower-roman;
}
## `pausableBuffered` in `rxjs@5`
This is an implementation of `pausableBuffered` from 
`rxjs@4` in `rxjs@5.0.0-beta.7`. Inspired by a discussion here: 
https://github.com/ReactiveX/rxjs/issues/1542.


### The magic

We need a `pauser` and a `source`. In the plunker these are an observable listening
to clicks on the `document` and an interval. We also need a `buffer` that we'll
subscribe to the buffered output the source.

```js
var pauser = new Rx.Subject();
var source = new Rx.Subject();
var buffer = new Rx.Subject();

getObservableSomehow.subscribe(source);
```
> **N.B.**: We don't want to use the original source observable directly because 
when we do, the emissions from our `pausableBuffered` get reset to the beginning
of the sequence each time we toggle the pauser.

We want to buffer the emissions from `source`, and close and emit this buffer
whenever `pauser` emits (i.e. is toggled). We don't want to emit arrays, but
rather the indivual items, so we make sure to `mergeAll()` before we subcribe
`buffer`.
```js
source
  .buffer(pauser)
  .mergeAll()
  .subscribe(buffer);
```

Finally, we use a `switchMap()` on `pauser` to emit items either from `source` or
`buffer`.
```js
var pausableBuffered = pauser.switchMap(function(paused) {
    return paused ? buffer : source;
  });
pausableBuffered.subscribe(function(x) {
    console.log('Next: ' + x.toString());
  });
```

### Alternate implementation: `bufferToggle()`

Instead of using the `buffer()` method presented above, you can also use 
`bufferToggle()` like this:
```
source
  .bufferToggle(pauser, function (toggle) {
    return toggle ? Rx.Observable.empty() : pauser;
  })
  .mergeAll()
  .subscribe(buffer);
```

Using `bufferToggle()` may be more performant. I'm not sure because I haven't
inspected the implementation, but if it works like I think it does, then using
it would skip buffering emissions while unpaused. It is more code, however. If
the `pausableBuffered` is going to be running unpaused most of the time, then
the perfomance gain will be negligible and the `buffer()` syntax is probably
preferred.