<!DOCTYPE html>
<html>
<head>
<script data-require="jquery@*" data-semver="2.0.3" src="http://code.jquery.com/jquery-2.0.3.min.js"></script>
<link rel="stylesheet" href="style.css" />
</head>
<body>
<div id="container">
<div id="sidebar">
Sidebar
<ul>
<li class="thumb"></li>
<li class="thumb"></li>
<li class="thumb"></li>
</ul>
</div>
</div>
<h1>Hello Plunker!</h1>
<button>Push me</button>
<script src="highland.js"></script>
<script src="script.js"></script>
</body>
</html>
var _ = highland;
var clicks = _('click', $('#container'));
var move = _('mousemove', $('#container'));
var clickHandle = clicks.fork();
var counter = clicks.fork().map(1).scan(1, _.add);
// var moveRender = move.fork();
// moveRender.resume();
clickHandle.resume();
counter.resume();
clickHandle
.filter(isThumb)
.throttle(1000)
.each(function (n) {
console.log('Click thumb', n);
});
counter.each(function (n) {
console.log('Counter', n);
});
function isThumb (e) {
return e.target.className === 'thumb';
}
move.batch(10).each(function (events) {
var str = '';
events.forEach(function (e) {
str += e.clientX + ':' + e.clientY + ' ';
});
console.log('Moved', str);
// (function animloop(){
// requestAnimFrame(animloop);
// render(e);
// })();
});
var body = document.querySelector('#container');
function render (events) {
events.forEach(function (e) {
var el = document.createElement('div');
el.style.position = 'fixed';
el.style.top = e.clientX + 'px';
el.style.left = e.clientY + 'px';
el.style.height = '5px';
el.style.width = '5px';
el.style.border = "1px solid green";
body.appendChild(el);
// el.setAttribute("")
// document.querySelector('#container').append(el);
});
}
// moveRender.toArray(function (arr) {
// console.log(arr);
// });
// var counter = clicks.scan(0, _.add);
// counter.each(function (n) {
// console.log('Click');
// });
window.requestAnimFrame = (function(){
return window.requestAnimationFrame ||
window.webkitRequestAnimationFrame ||
window.mozRequestAnimationFrame ||
function( callback ){
window.setTimeout(callback, 1000 / 60);
};
})();
/* Styles go here */
html, body {
height: 100%;
margin: 0;
}
#container {
min-height: 100%;
}
#sidebar {
height: 100%;
width: 30%;
display: block;
top: 0;
left: 0;
position: absolute;
background-color: rgba(150, 0, 0, 0.2);
}
#sidebar ul {
list-style: none;
margin-left: 0;
padding-left: 1em;
}
.thumb {
background-color: rgba(200, 0, 0, 0.2);
height: 50px;
width: 50px;
margin-bottom: 1em;
}
!function(e){if("object"==typeof exports)module.exports=e();else if("function"==typeof define&&define.amd)define(e);else{var f;"undefined"!=typeof window?f=window:"undefined"!=typeof global?f=global:"undefined"!=typeof self&&(f=self),f.highland=e()}}(function(){var define,module,exports;return (function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof require=="function"&&require;if(!u&&a)return a(o,!0);if(i)return i(o,!0);throw new Error("Cannot find module '"+o+"'")}var f=n[o]={exports:{}};t[o][0].call(f.exports,function(e){var n=t[o][1][e];return s(n?n:e)},f,f.exports,e,t,n,r)}return n[o].exports}var i=typeof require=="function"&&require;for(var o=0;o<r.length;o++)s(r[o]);return s})({1:[function(_dereq_,module,exports){
(function (process,global){
/**
* Highland: the high-level streams library
*
* Highland may be freely distributed under the Apache 2.0 license.
* http://github.com/caolan/highland
* Copyright (c) Caolan McMahon
*
*/
var inherits = _dereq_('util').inherits;
var EventEmitter = _dereq_('events').EventEmitter;
/**
* The Stream constructor, accepts an array of values or a generator function
* as an optional argument. This is typically the entry point to the Highland
* APIs, providing a convenient way of chaining calls together.
*
* **Arrays -** Streams created from Arrays will emit each value of the Array
* and then emit a [nil](#nil) value to signal the end of the Stream.
*
* **Generators -** These are functions which provide values for the Stream.
* They are lazy and can be infinite, they can also be asynchronous (for
* example, making a HTTP request). You emit values on the Stream by calling
* `push(err, val)`, much like a standard Node.js callback. You call `next()`
* to signal you've finished processing the current data. If the Stream is
* still being consumed the generator function will then be called again.
*
* You can also redirect a generator Stream by passing a new source Stream
* to read from to next. For example: `next(other_stream)` - then any subsequent
* calls will be made to the new source.
*
* **Node Readable Stream -** Pass in a Node Readable Stream object to wrap
* it with the Highland API. Reading from the resulting Highland Stream will
* begin piping the data from the Node Stream to the Highland Stream.
*
* **EventEmitter / jQuery Elements -** Pass in both an event name and an
* event emitter as the two arguments to the constructor and the first
* argument emitted to the event handler will be written to the new Stream.
*
* You can also pass as an optional third parameter a function, an array of strings
* or a number. In this case the event handler will try to wrap the arguments emitted
* to it and write this object to the new stream.
*
* **Promise -** Accepts an ES6 / jQuery style promise and returns a
* Highland Stream which will emit a single value (or an error).
*
* @id _(source)
* @section Streams
* @name _(source)
* @param {Array | Function | Readable Stream | Promise} source - (optional) source to take values from from
* @api public
*
* // from an Array
* _([1, 2, 3, 4]);
*
* // using a generator function
* _(function (push, next) {
* push(null, 1);
* push(err);
* next();
* });
*
* // a stream with no source, can pipe node streams through it etc.
* var through = _();
*
* // wrapping a Node Readable Stream so you can easily manipulate it
* _(readable).filter(hasSomething).pipe(writeable);
*
* // creating a stream from events
* _('click', btn).each(handleEvent);
*
* // creating a stream from events with mapping
* _('request', httpServer, ['req', 'res']).each(handleEvent);
*
* // from a Promise object
* var foo = _($.getJSON('/api/foo'));
*/
exports = module.exports = function (/*optional*/xs, /*optional*/ee, /*optional*/ mappingHint) {
return new Stream(xs, ee, mappingHint);
};
var _ = exports;
// Save bytes in the minified (but not gzipped) version:
var ArrayProto = Array.prototype,
ObjProto = Object.prototype;
// Create quick reference variables for speed access to core prototypes.
var slice = ArrayProto.slice,
toString = ObjProto.toString;
_.isFunction = function (x) {
return typeof x === 'function';
};
_.isObject = function (x) {
return typeof x === 'object' && x !== null;
};
_.isString = function (x) {
return typeof x === 'string';
};
_.isArray = Array.isArray || function (x) {
return toString.call(x) === '[object Array]';
};
// setImmediate implementation with browser and older node fallbacks
if (typeof setImmediate === 'undefined') {
if (typeof process === 'undefined' || !(process.nextTick)) {
_.setImmediate = function (fn) {
setTimeout(fn, 0);
};
}
else {
// use nextTick on old node versions
_.setImmediate = process.nextTick;
}
}
// check no process.stdout to detect browserify
else if (typeof process === 'undefined' || !(process.stdout)) {
// modern browser - but not a direct alias for IE10 compatibility
_.setImmediate = function (fn) {
setImmediate(fn);
};
}
else {
_.setImmediate = setImmediate;
}
/**
* The end of stream marker. This is sent along the data channel of a Stream
* to tell consumers that the Stream has ended. See the example map code for
* an example of detecting the end of a Stream.
*
* Note: `nil` is setup as a global where possible. This makes it convenient
* to access, but more importantly lets Streams from different Highland
* instances work together and detect end-of-stream properly. This is mostly
* useful for NPM where you may have many different Highland versions installed.
*
* @id nil
* @section Streams
* @name _.nil
* @api public
*
* var map = function (iter, source) {
* return source.consume(function (err, val, push, next) {
* if (err) {
* push(err);
* next();
* }
* else if (val === _.nil) {
* push(null, val);
* }
* else {
* push(null, iter(val));
* next();
* }
* });
* };
*/
// set up a global nil object in cases where you have multiple Highland
// instances installed (often via npm)
var _global = this;
if (typeof global !== 'undefined') {
_global = global;
}
else if (typeof window !== 'undefined') {
_global = window;
}
if (!_global.nil) {
_global.nil = {};
}
var nil = _.nil = _global.nil;
/**
* Transforms a function with specific arity (all arguments must be
* defined) in a way that it can be called as a chain of functions until
* the arguments list is saturated.
*
* This function is not itself curryable.
*
* @id curry
* @name curry(fn, [*arguments])
* @section Functions
* @param {Function} fn - the function to curry
* @param args.. - any number of arguments to pre-apply to the function
* @returns Function
* @api public
*
* fn = curry(function (a, b, c) {
* return a + b + c;
* });
*
* fn(1)(2)(3) == fn(1, 2, 3)
* fn(1, 2)(3) == fn(1, 2, 3)
* fn(1)(2, 3) == fn(1, 2, 3)
*/
_.curry = function (fn /* args... */) {
var args = slice.call(arguments);
return _.ncurry.apply(this, [fn.length].concat(args));
};
/**
* Same as `curry` but with a specific number of arguments. This can be
* useful when functions do not explicitly define all its parameters.
*
* This function is not itself curryable.
*
* @id ncurry
* @name ncurry(n, fn, [args...])
* @section Functions
* @param {Number} n - the number of arguments to wait for before apply fn
* @param {Function} fn - the function to curry
* @param args... - any number of arguments to pre-apply to the function
* @returns Function
* @api public
*
* fn = ncurry(3, function () {
* return Array.prototype.join.call(arguments, '.');
* });
*
* fn(1, 2, 3) == '1.2.3';
* fn(1, 2)(3) == '1.2.3';
* fn(1)(2)(3) == '1.2.3';
*/
_.ncurry = function (n, fn /* args... */) {
var largs = slice.call(arguments, 2);
if (largs.length >= n) {
return fn.apply(this, largs.slice(0, n));
}
return function () {
var args = largs.concat(slice.call(arguments));
if (args.length < n) {
return _.ncurry.apply(this, [n, fn].concat(args));
}
return fn.apply(this, args.slice(0, n));
};
};
/**
* Partially applies the function (regardless of whether it has had curry
* called on it). This will always postpone execution until at least the next
* call of the partially applied function.
*
* @id partial
* @name partial(fn, args...)
* @section Functions
* @param {Function} fn - function to partial apply
* @param args... - the arguments to apply to the function
* @api public
*
* var addAll = function () {
* var args = Array.prototype.slice.call(arguments);
* return foldl1(add, args);
* };
* var f = partial(addAll, 1, 2);
* f(3, 4) == 10
*/
_.partial = function (f /* args... */) {
var args = slice.call(arguments, 1);
return function () {
return f.apply(this, args.concat(slice.call(arguments)));
};
};
/**
* Evaluates the function `fn` with the argument positions swapped. Only
* works with functions that accept two arguments.
*
* @id flip
* @name flip(fn, [x, y])
* @section Functions
* @param {Function} f - function to flip argument application for
* @param x - parameter to apply to the right hand side of f
* @param y - parameter to apply to the left hand side of f
* @api public
*
* div(2, 4) == 0.5
* flip(div, 2, 4) == 2
* flip(div)(2, 4) == 2
*/
_.flip = _.curry(function (fn, x, y) { return fn(y, x); });
/**
* Creates a composite function, which is the application of function1 to
* the results of function2. You can pass an arbitrary number of arguments
* and have them composed. This means you can't partially apply the compose
* function itself.
*
* @id compose
* @name compose(fn1, fn2, ...)
* @section Functions
* @api public
*
* var add1 = add(1);
* var mul3 = mul(3);
*
* var add1mul3 = compose(mul3, add1);
* add1mul3(2) == 9
*/
_.compose = function (/*functions...*/) {
var fns = slice.call(arguments).reverse();
return _.seq.apply(null, fns);
};
/**
* The reversed version of compose. Where arguments are in the order of
* application.
*
* @id seq
* @name seq(fn1, fn2, ...)
* @section Functions
* @api public
*
* var add1 = add(1);
* var mul3 = mul(3);
*
* var add1mul3 = seq(add1, mul3);
* add1mul3(2) == 9
*/
_.seq = function () {
var fns = slice.call(arguments);
return function () {
if (!fns.length) {
return;
}
var r = fns[0].apply(this, arguments);
for (var i = 1; i < fns.length; i++) {
r = fns[i].call(this, r);
}
return r;
};
};
/**
* Actual Stream constructor wrapped the the main exported function
*/
function Stream(/*optional*/xs, /*optional*/ee, /*optional*/mappingHint) {
if (xs && _.isStream(xs)) {
// already a Stream
return xs;
}
EventEmitter.call(this);
var self = this;
// used to detect Highland Streams using isStream(x), this
// will work even in cases where npm has installed multiple
// versions, unlike an instanceof check
self.__HighlandStream__ = true;
self.id = ('' + Math.random()).substr(2, 6);
this.paused = true;
this._incoming = [];
this._outgoing = [];
this._consumers = [];
this._observers = [];
this._destructors = [];
this._send_events = false;
this.source = null;
// Old-style node Stream.pipe() checks for this
this.writable = true;
self.on('newListener', function (ev) {
if (ev === 'data') {
self._send_events = true;
_.setImmediate(self.resume.bind(self));
}
else if (ev === 'end') {
// this property avoids us checking the length of the
// listners subscribed to each event on each _send() call
self._send_events = true;
}
});
// TODO: write test to cover this removeListener code
self.on('removeListener', function (ev) {
if (ev === 'end' || ev === 'data') {
var end_listeners = self.listeners('end').length;
var data_listeners = self.listeners('data').length;
if (end_listeners + data_listeners === 0) {
// stop emitting events
self._send_events = false;
}
}
});
if (xs === undefined) {
// nothing else to do
}
else if (_.isArray(xs)) {
self._incoming = xs.concat([nil]);
}
else if (typeof xs === 'function') {
this._generator = xs;
this._generator_push = function (err, x) {
self.write(err ? new StreamError(err): x);
};
this._generator_next = function (s) {
if (s) {
// we MUST pause to get the redirect object into the _incoming
// buffer otherwise it would be passed directly to _send(),
// which does not handle StreamRedirect objects!
var _paused = self.paused;
if (!_paused) {
self.pause();
}
self.write(new StreamRedirect(s));
if (!_paused) {
self.resume();
}
}
else {
self._generator_running = false;
}
if (!self.paused) {
self.resume();
}
};
}
else if (_.isObject(xs)) {
if (_.isFunction(xs.then)) {
// probably a promise
return _(function (push) {
xs.then(function (value) {
push(null, value);
return push(null, nil);
},
function (err) {
push(err);
return push(null, nil);
});
});
}
else {
// assume it's a pipeable stream as a source
this._generator = function () {
delete self._generator;
xs.pipe(self);
};
}
}
else if (typeof xs === 'string') {
var mappingHintType = (typeof mappingHint);
var mapper;
if (mappingHintType === 'function') {
mapper = mappingHint;
} else if (mappingHintType === 'number') {
mapper = function () {
return slice.call(arguments, 0, mappingHint);
};
} else if (_.isArray(mappingHint)) {
mapper = function () {
var args = arguments;
return mappingHint.reduce(function (ctx, hint, idx) {
ctx[hint] = args[idx];
return ctx;
}, {});
};
} else {
mapper = function (x) { return x; };
}
ee.on(xs, function () {
var ctx = mapper.apply(this, arguments);
self.write(ctx);
});
}
else {
throw new Error(
'Unexpected argument type to Stream(): ' + (typeof xs)
);
}
}
inherits(Stream, EventEmitter);
/**
* adds a top-level _.foo(mystream) style export for Stream methods
*/
function exposeMethod(name) {
var f = Stream.prototype[name];
var n = f.length;
_[name] = _.ncurry(n + 1, function () {
var args = Array.prototype.slice.call(arguments);
var s = _(args.pop());
return f.apply(s, args);
});
}
/**
* Used as an Error marker when writing to a Stream's incoming buffer
*/
function StreamError(err) {
this.__HighlandStreamError__ = true;
this.error = err;
}
/**
* Used as a Redirect marker when writing to a Stream's incoming buffer
*/
function StreamRedirect(to) {
this.__HighlandStreamRedirect__ = true;
this.to = to;
}
/**
* Returns true if `x` is a Highland Stream.
*
* @id isStream
* @section Streams
* @name _.isStream(x)
* @param x - the object to test
* @api public
*
* _.isStream('foo') // => false
* _.isStream(_([1,2,3])) // => true
*/
_.isStream = function (x) {
return _.isObject(x) && x.__HighlandStream__;
};
_._isStreamError = function (x) {
return _.isObject(x) && x.__HighlandStreamError__;
};
_._isStreamRedirect = function (x) {
return _.isObject(x) && x.__HighlandStreamRedirect__;
};
/**
* Sends errors / data to consumers, observers and event handlers
*/
Stream.prototype._send = function (err, x) {
if (x === nil) {
this.ended = true;
}
if (this._consumers.length) {
for (var i = 0, len = this._consumers.length; i < len; i++) {
var c = this._consumers[i];
if (err) {
c.write(new StreamError(err));
}
else {
c.write(x);
}
}
}
if (this._observers.length) {
for (var j = 0, len2 = this._observers.length; j < len2; j++) {
this._observers[j].write(x);
}
}
if (this._send_events) {
if (x === nil) {
this.emit('end');
}
else {
this.emit('data', x);
}
}
};
/**
* Pauses the stream. All Highland Streams start in the paused state.
*
* @id pause
* @section Streams
* @name Stream.pause()
* @api public
*
* var xs = _(generator);
* xs.pause();
*/
Stream.prototype.pause = function () {
//console.log([this.id, 'pause']);
this.paused = true;
if (this.source) {
this.source._checkBackPressure();
}
};
/**
* When there is a change in downstream consumers, it will often ask
* the parent Stream to re-check it's state and pause/resume accordingly.
*/
Stream.prototype._checkBackPressure = function () {
if (!this._consumers.length) {
return this.pause();
}
for (var i = 0, len = this._consumers.length; i < len; i++) {
if (this._consumers[i].paused) {
return this.pause();
}
}
return this.resume();
};
/**
* Starts pull values out of the incoming buffer and sending them downstream,
* this will exit early if this causes a downstream consumer to pause.
*/
Stream.prototype._readFromBuffer = function () {
var len = this._incoming.length;
var i = 0;
while (i < len && !this.paused) {
var x = this._incoming[i];
if (_._isStreamError(x)) {
this._send(x.error);
}
else if (_._isStreamRedirect(x)) {
this._redirect(x.to);
}
else {
this._send(null, x);
}
i++;
}
// remove processed data from _incoming buffer
this._incoming.splice(0, i);
};
/**
* Starts pull values out of the incoming buffer and sending them downstream,
* this will exit early if this causes a downstream consumer to pause.
*/
Stream.prototype._sendOutgoing = function () {
var len = this._outgoing.length;
var i = 0;
while (i < len && !this.paused) {
var x = this._outgoing[i];
if (_._isStreamError(x)) {
Stream.prototype._send.call(this, x.error);
}
else if (_._isStreamRedirect(x)) {
this._redirect(x.to);
}
else {
Stream.prototype._send.call(this, null, x);
}
i++;
}
// remove processed data from _outgoing buffer
this._outgoing.splice(0, i);
};
/**
* Resumes a paused Stream. This will either read from the Stream's incoming
* buffer or request more data from an upstream source.
*
* @id resume
* @section Streams
* @name Stream.resume()
* @api public
*
* var xs = _(generator);
* xs.resume();
*/
Stream.prototype.resume = function () {
//console.log([this.id, 'resume']);
if (this._resume_running) {
// already processing _incoming buffer, ignore resume call
this._repeat_resume = true;
return;
}
this._resume_running = true;
do {
// use a repeat flag to avoid recursing resume() calls
this._repeat_resume = false;
this.paused = false;
// send values from outgoing buffer first
this._sendOutgoing();
// send values from incoming buffer before reading from source
this._readFromBuffer();
// we may have paused while reading from buffer
if (!this.paused) {
// ask parent for more data
if (this.source) {
this.source._checkBackPressure();
}
// run _generator to fill up _incoming buffer
else if (this._generator) {
this._runGenerator();
}
else {
// perhaps a node stream is being piped in
this.emit('drain');
}
}
} while (this._repeat_resume);
this._resume_running = false;
};
/**
* Ends a Stream. This is the same as sending a [nil](#nil) value as data.
* You shouldn't need to call this directly, rather it will be called by
* any [Node Readable Streams](http://nodejs.org/api/stream.html#stream_class_stream_readable)
* you pipe in.
*
* @id end
* @section Streams
* @name Stream.end()
* @api public
*
* mystream.end();
*/
Stream.prototype.end = function () {
this.write(nil);
};
/**
* Pipes a Highland Stream to a [Node Writable Stream](http://nodejs.org/api/stream.html#stream_class_stream_writable)
* (Highland Streams are also Node Writable Streams). This will pull all the
* data from the source Highland Stream and write it to the destination,
* automatically managing flow so that the destination is not overwhelmed
* by a fast source.
*
* This function returns the destination so you can chain together pipe calls.
*
* @id pipe
* @section Streams
* @name Stream.pipe(dest)
* @param {Writable Stream} dest - the destination to write all data to
* @api public
*
* var source = _(generator);
* var dest = fs.createWriteStream('myfile.txt')
* source.pipe(dest);
*
* // chained call
* source.pipe(through).pipe(dest);
*/
Stream.prototype.pipe = function (dest) {
var self = this;
// stdout and stderr are special case writables that cannot be closed
var canClose = dest !== process.stdout && dest !== process.stderr;
var s = self.consume(function (err, x, push, next) {
if (err) {
self.emit('error', err);
return;
}
if (x === nil) {
if (canClose) {
dest.end();
}
}
else if (dest.write(x) !== false) {
next();
}
});
dest.on('drain', onConsumerDrain);
// Since we don't keep a reference to piped-to streams,
// save a callback that will unbind the event handler.
this._destructors.push(function () {
dest.removeListener('drain', onConsumerDrain);
});
s.resume();
return dest;
function onConsumerDrain() {
s.resume();
}
};
/**
* Destroys a stream by unlinking it from any consumers and sources. This will
* stop all consumers from receiving events from this stream and removes this
* stream as a consumer of any source stream.
*
* This function calls end() on the stream and unlinks it from any piped-to streams.
*
* @id destroy
* @section Streams
* @name Stream.destroy()
* @api public
*/
Stream.prototype.destroy = function () {
var self = this;
this.end();
_(this._consumers).each(function (consumer) {
self._removeConsumer(consumer);
});
if (this.source) {
this.source._removeConsumer(this);
}
_(this._destructors).each(function (destructor) {
destructor();
});
};
/**
* Runs the generator function for this Stream. If the generator is already
* running (it has been called and not called next() yet) then this function
* will do nothing.
*/
Stream.prototype._runGenerator = function () {
// if _generator already running, exit
if (this._generator_running) {
return;
}
this._generator_running = true;
this._generator(this._generator_push, this._generator_next);
};
/**
* Performs the redirect from one Stream to another. In order for the
* redirect to happen at the appropriate time, it is put on the incoming
* buffer as a StreamRedirect object, and this function is called
* once it is read from the buffer.
*/
Stream.prototype._redirect = function (to) {
// coerce to Stream
to = _(to);
to._consumers = this._consumers.map(function (c) {
c.source = to;
return c;
});
// TODO: copy _observers
this._consumers = [];
this.consume = function () {
return to.consume.apply(to, arguments);
};
this._removeConsumer = function () {
return to._removeConsumer.apply(to, arguments);
};
if (this.paused) {
to.pause();
}
else {
this.pause();
to._checkBackPressure();
}
};
/**
* Adds a new consumer Stream, which will accept data and provide backpressure
* to this Stream. Adding more than one consumer will cause an exception to be
* thrown as the backpressure strategy must be explicitly chosen by the
* developer (through calling fork or observe).
*/
Stream.prototype._addConsumer = function (s) {
if (this._consumers.length) {
throw new Error(
'Stream already being consumed, you must either fork() or observe()'
);
}
s.source = this;
this._consumers.push(s);
this._checkBackPressure();
};
/**
* Removes a consumer from this Stream.
*/
Stream.prototype._removeConsumer = function (s) {
this._consumers = this._consumers.filter(function (c) {
return c !== s;
});
if (s.source === this) {
s.source = null;
}
this._checkBackPressure();
};
/**
* Consumes values from a Stream (once resumed) and returns a new Stream for
* you to optionally push values onto using the provided push / next functions.
*
* This function forms the basis of many higher-level Stream operations.
* It will not cause a paused stream to immediately resume, but behaves more
* like a 'through' stream, handling values as they are read.
*
* @id consume
* @section Streams
* @name Stream.consume(f)
* @param {Function} f - the function to handle errors and values
* @api public
*
* var filter = function (f, source) {
* return source.consume(function (err, x, push, next) {
* if (err) {
* // pass errors along the stream and consume next value
* push(err);
* next();
* }
* else if (x === _.nil) {
* // pass nil (end event) along the stream
* push(null, x);
* }
* else {
* // pass on the value only if the value passes the predicate
* if (f(x)) {
* push(null, x);
* }
* next();
* }
* });
* };
*/
Stream.prototype.consume = function (f) {
var self = this;
var s = new Stream();
var _send = s._send;
var push = function (err, x) {
//console.log(['push', err, x, s.paused]);
if (x === nil) {
// ended, remove consumer from source
self._removeConsumer(s);
}
if (s.paused) {
if (err) {
s._outgoing.push(new StreamError(err));
}
else {
s._outgoing.push(x);
}
}
else {
_send.call(s, err, x);
}
};
var async;
var next_called;
var next = function (s2) {
//console.log(['next', async]);
if (s2) {
// we MUST pause to get the redirect object into the _incoming
// buffer otherwise it would be passed directly to _send(),
// which does not handle StreamRedirect objects!
var _paused = s.paused;
if (!_paused) {
s.pause();
}
s.write(new StreamRedirect(s2));
if (!_paused) {
s.resume();
}
}
else if (async) {
s.resume();
}
else {
next_called = true;
}
};
s._send = function (err, x) {
async = false;
next_called = false;
f(err, x, push, next);
async = true;
if (!next_called) {
s.pause();
}
};
self._addConsumer(s);
return s;
};
/**
* Consumes a single item from the Stream. Unlike consume, this function will
* not provide a new stream for you to push values onto, and it will unsubscribe
* as soon as it has a single error, value or nil from the source.
*
* You probably won't need to use this directly, but it is used internally by
* some functions in the Highland library.
*
* @id pull
* @section Streams
* @name Stream.pull(f)
* @param {Function} f - the function to handle data
* @api public
*
* xs.pull(function (err, x) {
* // do something
* });
*/
Stream.prototype.pull = function (f) {
var s = this.consume(function (err, x) {
s.source._removeConsumer(s);
f(err, x);
});
s.resume();
};
/**
* Writes a value to the Stream. If the Stream is paused it will go into the
* Stream's incoming buffer, otherwise it will be immediately processed and
* sent to the Stream's consumers (if any). Returns false if the Stream is
* paused, true otherwise. This lets Node's pipe method handle back-pressure.
*
* You shouldn't need to call this yourself, but it may be called by Node
* functions which treat Highland Streams as a [Node Writable Stream](http://nodejs.org/api/stream.html#stream_class_stream_writable).
*
* @id write
* @section Streams
* @name Stream.write(x)
* @param x - the value to write to the Stream
* @api public
*
* var xs = _();
* xs.write(1);
* xs.write(2);
* xs.end();
*
* xs.toArray(function (ys) {
* // ys will be [1, 2]
* });
*/
Stream.prototype.write = function (x) {
if (this.paused) {
this._incoming.push(x);
}
else {
if (_._isStreamError(x)) {
this._send(x.error);
}
else {
this._send(null, x);
}
}
return !this.paused;
};
/**
* Forks a stream, allowing you to add additional consumers with shared
* back-pressure. A stream forked to multiple consumers will only pull values
* from it's source as fast as the slowest consumer can handle them.
*
* @id fork
* @section Streams
* @name Stream.fork()
* @api public
*
* var xs = _([1, 2, 3, 4]);
* var ys = xs.fork();
* var zs = xs.fork();
*
* // no values will be pulled from xs until zs also resume
* ys.resume();
*
* // now both ys and zs will get values from xs
* zs.resume();
*/
Stream.prototype.fork = function () {
var s = new Stream();
s.id = 'fork:' + s.id;
s.source = this;
this._consumers.push(s);
this._checkBackPressure();
return s;
};
/**
* Observes a stream, allowing you to handle values as they are emitted, without
* adding back-pressure or causing data to be pulled from the source. This can
* be useful when you are performing two related queries on a stream where one
* would block the other. Just be aware that a slow observer could fill up it's
* buffer and cause memory issues. Where possible, you should use [fork](#fork).
*
* @id observe
* @section Streams
* @name Stream.observe()
* @api public
*
* var xs = _([1, 2, 3, 4]);
* var ys = xs.fork();
* var zs = xs.observe();
*
* // now both zs and ys will recieve data as fast as ys can handle it
* ys.resume();
*/
Stream.prototype.observe = function () {
var s = new Stream();
s.id = 'observe:' + s.id;
s.source = this;
this._observers.push(s);
return s;
};
/**
* Extracts errors from a Stream and applies them to an error handler
* function. Returns a new Stream with the errors removed (unless the error
* handler chooses to rethrow them using `push`). Errors can also be
* transformed and put back onto the Stream as values.
*
* @id errors
* @section Streams
* @name Stream.errors(f)
* @param {Function} f - the function to pass all errors to
* @api public
*
* getDocument.errors(function (err, push) {
* if (err.statusCode === 404) {
* // not found, return empty doc
* push(null, {});
* }
* else {
* // otherwise, re-throw the error
* push(err);
* }
* });
*/
Stream.prototype.errors = function (f) {
return this.consume(function (err, x, push, next) {
if (err) {
f(err, push);
next();
}
else if (x === nil) {
push(null, nil);
}
else {
push(null, x);
next();
}
});
};
exposeMethod('errors');
/**
* Like the [errors](#errors) method, but emits a Stream end marker after
* an Error is encountered.
*
* @id stopOnError
* @section Streams
* @name Stream.stopOnError(f)
* @param {Function} f - the function to handle an error
* @api public
*
* brokenStream.stopOnError(function (err) {
* console.error('Something broke: ' + err);
* });
*/
Stream.prototype.stopOnError = function (f) {
return this.consume(function (err, x, push, next) {
if (err) {
f(err, push);
push(null, nil);
}
else if (x === nil) {
push(null, nil);
}
else {
push(null, x);
next();
}
});
};
exposeMethod('stopOnError');
/**
* Iterates over every value from the Stream, calling the iterator function
* on each of them. This function causes a **thunk**.
*
* If an error from the Stream reaches the `each` call, it will emit an
* error event (which will cause it to throw if unhandled).
*
* @id each
* @section Streams
* @name Stream.each(f)
* @param {Function} f - the iterator function
* @api public
*
* _([1, 2, 3, 4]).each(function (x) {
* // will be called 4 times with x being 1, 2, 3 and 4
* });
*/
Stream.prototype.each = function (f) {
var self = this;
return this.consume(function (err, x, push, next) {
if (err) {
self.emit('error', err);
}
else if (x !== nil) {
f(x);
next();
}
}).resume();
};
exposeMethod('each');
/**
* Applies results from a Stream as arguments to a function
*
* @id apply
* @section Streams
* @name Stream.apply(f)
* @param {Function} f - the function to apply arguments to
* @api public
*
* _([1, 2, 3]).apply(function (a, b, c) {
* // a === 1
* // b === 2
* // c === 3
* });
*/
Stream.prototype.apply = function (f) {
return this.toArray(function (args) {
f.apply(null, args);
});
};
exposeMethod('apply');
/**
* Collects all values from a Stream into an Array and calls a function with
* once with the result. This function causes a **thunk**.
*
* If an error from the Stream reaches the `toArray` call, it will emit an
* error event (which will cause it to throw if unhandled).
*
* @id toArray
* @section Streams
* @name Stream.toArray(f)
* @param {Function} f - the callback to provide the completed Array to
* @api public
*
* _([1, 2, 3, 4]).toArray(function (x) {
* // parameter x will be [1,2,3,4]
* });
*/
// TODO: implement using collect()?
Stream.prototype.toArray = function (f) {
var self = this;
var xs = [];
return this.consume(function (err, x, push, next) {
if (err) {
self.emit('error', err);
}
else if (x === nil) {
f(xs);
}
else {
xs.push(x);
next();
}
}).resume();
};
/**
* Creates a new Stream of transformed values by applying a function to each
* value from the source. The transformation function can be replaced with
* a non-function value for convenience, and it will emit that value
* for every data event on the source Stream.
*
* @id map
* @section Streams
* @name Stream.map(f)
* @param f - the transformation function or value to map to
* @api public
*
* var doubled = _([1, 2, 3, 4]).map(function (x) {
* return x * 2;
* });
*
* _([1, 2, 3]).map('hi') // => 'hi', 'hi', 'hi'
*/
Stream.prototype.map = function (f) {
if (!_.isFunction(f)) {
var val = f;
f = function () {
return val;
};
}
return this.consume(function (err, x, push, next) {
if (err) {
push(err);
next();
}
else if (x === nil) {
push(err, x);
}
else {
push(null, f(x));
next();
}
});
};
exposeMethod('map');
/**
* Creates a new Stream of values by applying each item in a Stream to an
* iterator function which may return a Stream. Each item on these result
* Streams are then emitted on a single output Stream.
*
* The same as calling `stream.map(f).flatten()`.
*
* @id flatMap
* @section Streams
* @name Stream.flatMap(f)
* @param {Function} f - the iterator function
* @api public
*
* filenames.flatMap(readFile)
*/
Stream.prototype.flatMap = function (f) {
return this.map(f).flatten();
};
exposeMethod('flatMap');
/**
* Retrieves values associated with a given property from all elements in
* the collection.
*
* @id pluck
* @section Streams
* @name Stream.pluck(property)
* @param {String} prop - the property to which values should be associated
* @api public
*
* var docs = [
* {type: 'blogpost', title: 'foo'},
* {type: 'blogpost', title: 'bar'},
* {type: 'comment', title: 'baz'}
* ];
*
* _(docs).pluck('title').toArray(function (xs) {
* // xs is now ['foo', 'bar', 'baz']
* });
*/
Stream.prototype.pluck = function (prop) {
return this.consume(function (err, x, push, next) {
if (err) {
push(err);
next();
}
else if (x === nil) {
push(err, x);
}
else if (_.isObject(x)) {
push(null, x[prop]);
next();
}
else {
push(new Error(
'Expected Object, got ' + (typeof x)
));
next();
}
});
};
exposeMethod('pluck');
/**
* Creates a new Stream including only the values which pass a truth test.
*
* @id filter
* @section Streams
* @name Stream.filter(f)
* @param f - the truth test function
* @api public
*
* var evens = _([1, 2, 3, 4]).filter(function (x) {
* return x % 2 === 0;
* });
*/
Stream.prototype.filter = function (f) {
return this.consume(function (err, x, push, next) {
if (err) {
push(err);
next();
}
else if (x === nil) {
push(err, x);
}
else {
if (f(x)) {
push(null, x);
}
next();
}
});
};
exposeMethod('filter');
/**
* Filters using a predicate which returns a Stream. If you need to check
* against an asynchronous data source when filtering a Stream, this can
* be convenient. The Stream returned from the filter function should have
* a Boolean as it's first value (all other values on the Stream will be
* disregarded).
*
* @id flatFilter
* @section Streams
* @name Stream.flatFilter(f)
* @param {Function} f - the truth test function which returns a Stream
* @api public
*
* var checkExists = _.wrapCallback(fs.exists);
* filenames.flatFilter(checkExists)
*/
Stream.prototype.flatFilter = function (f) {
var xs = this.observe();
var ys = this.flatMap(function (x) {
return f(x).take(1);
});
return xs.zip(ys)
.filter(function (pair) {
return pair[1];
})
.map(function (pair) {
return pair[0];
});
};
exposeMethod('flatFilter');
/**
* The inverse of [filter](#filter).
*
* @id reject
* @section Streams
* @name Stream.reject(f)
* @param {Function} f - the truth test function
* @api public
*
* var odds = _([1, 2, 3, 4]).reject(function (x) {
* return x % 2 === 0;
* });
*/
Stream.prototype.reject = function (f) {
return this.filter(_.compose(_.not, f));
};
exposeMethod('reject');
/**
* A convenient form of filter, which returns the first object from a
* Stream that passes the provided truth test
*
* @id find
* @section Streams
* @name Stream.find(f)
* @param {Function} f - the truth test function which returns a Stream
* @api public
*
* var docs = [
* {type: 'blogpost', title: 'foo'},
* {type: 'blogpost', title: 'bar'},
* {type: 'comment', title: 'foo'}
* ];
*
* var f = function (x) {
* return x.type == 'blogpost';
* };
*
* _(docs).find(f);
* // => [{type: 'blogpost', title: 'foo'}]
*
* // example with partial application
* var firstBlogpost = _.find(f);
*
* firstBlogpost(docs)
* // => [{type: 'blogpost', title: 'foo'}]
*/
Stream.prototype.find = function (f) {
return this.consume(function (err, x, push, next) {
if (err) {
push(err);
next();
}
else if (x === nil) {
push(err, x);
}
else {
if (f(x)) {
push(null, x);
push(null, nil);
}
next();
}
}.bind(this));
};
exposeMethod('find');
/**
* A convenient form of reduce, which groups items based on a function or property name
*
* @id group
* @section Streams
* @name Stream.group(f)
* @param {Function|String} f - the function or property name on which to group,
* toString() is called on the result of a function.
* @api public
*
* var docs = [
* {type: 'blogpost', title: 'foo'},
* {type: 'blogpost', title: 'bar'},
* {type: 'comment', title: 'foo'}
* ];
*
* var f = function (x) {
* return x.type;
* };
*
* _(docs).group(f); OR _(docs).group('type');
* // => {
* // => 'blogpost': [{type: 'blogpost', title: 'foo'}, {type: 'blogpost', title: 'bar'}]
* // => 'comment': [{type: 'comment', title: 'foo'}]
* // => }
*
*/
Stream.prototype.group = function (f) {
var lambda = _.isString(f) ? _.get(f) : f;
return this.reduce({}, function (m, o) {
var key = lambda(o);
if (!m.hasOwnProperty(key)) { m[key] = []; }
m[key].push(o);
return m;
}.bind(this));
};
exposeMethod('group');
/**
* Filters a Stream to drop all non-truthy values.
*
* @id compact
* @section Streams
* @name Stream.compact()
* @api public
*
* var compacted = _([0, 1, false, 3, null, undefined, 6]).compact();
* // => [1, 3, 6]
*/
Stream.prototype.compact = function () {
return this.filter(function (x) {
return x;
});
};
exposeMethod('compact');
/**
* A convenient form of filter, which returns all objects from a Stream
* match a set of property values.
*
* @id where
* @section Streams
* @name Stream.where(props)
* @param {Object} props - the properties to match against
* @api public
*
* var docs = [
* {type: 'blogpost', title: 'foo'},
* {type: 'blogpost', title: 'bar'},
* {type: 'comment', title: 'foo'}
* ];
*
* _(docs).where({title: 'foo'})
* // => {type: 'blogpost', title: 'foo'}
* // => {type: 'comment', title: 'foo'}
*
* // example with partial application
* var getBlogposts = _.where({type: 'blogpost'});
*
* getBlogposts(docs)
* // => {type: 'blogpost', title: 'foo'}
* // => {type: 'blogpost', title: 'bar'}
*/
Stream.prototype.where = function (props) {
return this.filter(function (x) {
for (var k in props) {
if (x[k] !== props[k]) {
return false;
}
}
return true;
});
};
exposeMethod('where');
/**
* Takes two Streams and returns a Stream of corresponding pairs.
*
* @id zip
* @section Streams
* @name Stream.zip(ys)
* @param {Array | Stream} ys - the other stream to combine values with
* @api public
*
* _(['a', 'b', 'c']).zip([1, 2, 3]) // => ['a', 1], ['b', 2], ['c', 3]
*/
Stream.prototype.zip = function (ys) {
ys = _(ys);
var xs = this;
var returned = 0;
var z = [];
function nextValue(index, max, src, push, next) {
src.pull(function (err, x) {
if (err) {
push(null, err);
nextValue(index, max, src, push, next);
}
else if (x === _.nil) {
push(null, nil);
}
else {
returned++;
z[index] = x;
if (returned === max) {
push(null, z);
next();
}
}
});
}
return _(function (push, next) {
returned = 0;
z = [];
nextValue(0, 2, xs, push, next);
nextValue(1, 2, ys, push, next);
});
};
exposeMethod('zip');
/**
* Takes one Stream and batches incoming data into arrays of given length
*
* @id batch
* @section Streams
* @name Stream.batch(n)
* @param {Number} n - length of the array to batch
* @api public
*
* _([1, 2, 3, 4, 5]).batch(2) // => [1, 2], [3, 4], [5]
*/
Stream.prototype.batch = function (n) {
var batched = [];
return this.consume(function (err, x, push, next) {
if (err) {
push(err);
next();
}
if (x === nil) {
if (batched.length > 0) {
push(null, batched);
}
push(null, nil);
} else {
batched.push(x);
if (batched.length === n) {
push(null, batched);
batched = [];
}
next();
}
});
};
exposeMethod('batch');
/**
* Creates a new Stream with the first `n` values from the source.
*
* @id take
* @section Streams
* @name Stream.take(n)
* @param {Number} n - integer representing number of values to read from source
* @api public
*
* _([1, 2, 3, 4]).take(2) // => 1, 2
*/
// TODO: test that errors don't count in take 'n' calls
Stream.prototype.take = function (n) {
if (n === 0) {
return _([]);
}
return this.consume(function (err, x, push, next) {
if (err) {
push(err);
if (n > 0) {
next();
}
else {
push(null, nil);
}
}
else if (x === nil) {
push(null, nil);
}
else {
n--;
push(null, x);
if (n > 0) {
next();
}
else {
push(null, nil);
}
}
});
};
exposeMethod('take');
/**
* Creates a new Stream with only the first value from the source.
*
* @id head
* @section Streams
* @name Stream.head()
* @api public
*
* _([1, 2, 3, 4]).head() // => 1
*/
Stream.prototype.head = function () {
return this.take(1);
};
exposeMethod('head');
/**
* Drops all values from the Stream apart from the last one (if any).
*
* @id last
* @section Streams
* @name Stream.last()
* @api public
*
* _([1, 2, 3, 4]).last() // => 4
*/
Stream.prototype.last = function () {
var nothing = {};
var prev = nothing;
return this.consume(function (err, x, push, next) {
if (err) {
push(err);
next();
}
else if (x === nil) {
if (prev !== nothing) {
push(null, prev);
}
push(null, nil);
}
else {
prev = x;
next();
}
});
};
exposeMethod('last');
/**
* Passes the current Stream to a function, returning the result. Can also
* be used to pipe the current Stream through another Stream. It will always
* return a Highland Stream (instead of the piped to target directly as in
* Node.js).
*
* @id through
* @section Streams
* @name Stream.through(target)
* @api public
*
* function oddDoubler(s) {
* return s.filter(function (x) {
* return x % 2; // odd numbers only
* })
* .map(function (x) {
* return x * 2;
* });
* }
*
* _([1, 2, 3, 4]).through(oddDoubler).toArray(function (xs) {
* // xs will be [2, 6]
* });
*
* // Can also be used with Node Through Streams
* _(filenames).through(jsonParser).map(function (obj) {
* // ...
* });
*/
Stream.prototype.through = function (target) {
if (_.isFunction(target)) {
return target(this);
}
else {
var output = _();
target.pause();
this.pipe(target).pipe(output);
return output;
}
};
exposeMethod('through');
/**
* Creates a 'Through Stream', which passes data through a pipeline
* of functions or other through Streams. This is particularly useful
* when combined with partial application of Highland functions to expose a
* Node-compatible Through Stream.
*
* This is not a method on a Stream, and it only exposed at the top-level
* as `_.pipeline`. It takes an arbitrary number of arguments.
*
* @id pipeline
* @section Streams
* @name _.pipeline(...)
* @api public
*
* var through = _.pipeline(
* _.map(parseJSON),
* _.filter(isBlogpost),
* _.reduce(collectCategories)
* _.through(otherPipeline)
* );
*
* readStream.pipe(through).pipe(outStream);
*
* // Alternatively, you can use pipeline to manipulate a stream in
* // the chained method call style:
*
* var through2 = _.pipeline(function (s) {
* return s.map(parseJSON).filter(isBlogpost); // etc.
* });
*/
_.pipeline = function (/*through...*/) {
if (!arguments.length) {
return _();
}
var start = arguments[0], rest;
if (!_.isStream(start) && !_.isFunction(start.resume)) {
// not a Highland stream or Node stream, start with empty stream
start = _();
rest = slice.call(arguments);
}
else {
// got a stream as first argument, co-erce to Highland stream
start = _(start);
rest = slice.call(arguments, 1);
}
var end = rest.reduce(function (src, dest) {
return src.through(dest);
}, start);
var wrapper = _(function (push, next) {
end.pull(function (err, x) {
if (err) {
wrapper._send(err);
next();
}
else if (x === nil) {
wrapper._send(null, nil);
}
else {
wrapper._send(null, x);
next();
}
});
});
wrapper.write = function (x) {
start.write(x);
};
return wrapper;
};
/**
* Reads values from a Stream of Streams, emitting them on a Single output
* Stream. This can be thought of as a flatten, just one level deep. Often
* used for resolving asynchronous actions such as a HTTP request or reading
* a file.
*
* @id sequence
* @section Streams
* @name Stream.sequence()
* @api public
*
* var nums = _([
* _([1, 2, 3]),
* _([4, 5, 6])
* ]);
*
* nums.sequence() // => 1, 2, 3, 4, 5, 6
*
* // using sequence to read from files in series
* filenames.map(readFile).sequence()
*/
Stream.prototype.sequence = function () {
var original = this;
var curr = this;
return _(function (push, next) {
curr.pull(function (err, x) {
if (err) {
push(err);
return next();
}
else if (_.isArray(x)) {
// just send all values from array directly
x.forEach(function (y) {
push(null, y);
});
return next();
}
else if (_.isStream(x)) {
if (curr === original) {
// switch to reading new stream
curr = x;
return next();
}
else {
// sequence only goes 1 level deep
push(null, x);
return next();
}
}
else if (x === nil) {
if (curr === original) {
push(null, nil);
}
else {
// resume reading from original
curr = original;
return next();
}
}
else {
if (curr === original) {
// we shouldn't be getting non-stream (or array)
// values from the top-level stream
push(new Error(
'Expected Stream, got ' + (typeof x)
));
return next();
}
else {
push(null, x);
return next();
}
}
});
});
};
exposeMethod('sequence');
/**
* An alias for the [sequence](#sequence) method.
*
* @id series
* @section Streams
* @name Stream.series()
* @api public
*
* filenames.map(readFile).series()
*/
Stream.prototype.series = Stream.prototype.sequence;
_.series = _.sequence;
/**
* Recursively reads values from a Stream which may contain nested Streams
* or Arrays. As values or errors are encountered, they are emitted on a
* single output Stream.
*
* @id flatten
* @section Streams
* @name Stream.flatten()
* @api public
*
* _([1, [2, 3], [[4]]]).flatten(); // => 1, 2, 3, 4
*
* var nums = _(
* _([1, 2, 3]),
* _([4, _([5, 6]) ])
* );
*
* nums.flatten(); // => 1, 2, 3, 4, 5, 6
*/
Stream.prototype.flatten = function () {
var curr = this;
var stack = [];
return _(function (push, next) {
curr.pull(function (err, x) {
if (err) {
push(err);
return next();
}
if (_.isArray(x)) {
x = _(x);
}
if (_.isStream(x)) {
stack.push(curr);
curr = x;
next();
}
else if (x === nil) {
if (stack.length) {
curr = stack.pop();
next();
}
else {
push(null, nil);
}
}
else {
push(null, x);
next();
}
});
});
};
exposeMethod('flatten');
/**
* Takes a Stream of Streams and reads from them in parallel, buffering
* the results until they can be returned to the consumer in their original
* order.
*
* @id parallel
* @section Streams
* @name Stream.parallel(n)
* @param {Number} n - the maximum number of concurrent reads/buffers
* @api public
*
* var readFile = _.wrapCallback(fs.readFile);
* var filenames = _(['foo.txt', 'bar.txt', 'baz.txt']);
*
* // read from up to 10 files at once
* filenames.map(readFile).parallel(10);
*/
Stream.prototype.parallel = function (n) {
var buffers = [];
var running = 0;
var self = this;
var ondata = null;
return _(function (push, next) {
ondata = null;
// make sure we're reading from 'n' streams
var len = buffers.length;
if (!self.ended && running < n) {
var i = 0;
self.take(n - running).each(function (x) {
running++;
var target = buffers[len + i] = [];
i++;
x.consume(function (err, x, _push, _next) {
if (x === nil) {
running--;
target.push([null, nil]);
}
else {
target.push([err, x]);
_next();
}
if (target === buffers[0] && ondata) {
ondata();
}
}).resume();
});
}
// check if we have buffered data we can send
if (buffers.length) {
if (buffers[0].length) {
var args = buffers[0].shift();
var err = args[0];
var x = args[1];
if (x === nil) {
// stream finished, move on to next one
buffers.shift();
}
else {
push(err, x);
}
next();
}
else {
// waiting for more data from first stream
ondata = function () {
ondata = null;
next();
};
}
}
else if (self.ended && running === 0) {
// totally done
push(null, nil);
return;
}
else {
// waiting for more streams to read
ondata = function () {
ondata = null;
next();
};
}
});
};
exposeMethod('parallel');
/**
* Switches source to an alternate Stream if the current Stream is empty.
*
* @id otherwise
* @section Streams
* @name Stream.otherwise(ys)
* @param {Stream} ys - alternate stream to use if this stream is empty
* @api public
*
* _([1,2,3]).otherwise(['foo']) // => 1, 2, 3
* _([]).otherwise(['foo']) // => 'foo'
*
* _.otherwise(_(['foo']), _([1,2,3])) // => 1, 2, 3
* _.otherwise(_(['foo']), _([])) // => 'foo'
*/
Stream.prototype.otherwise = function (ys) {
var xs = this;
return xs.consume(function (err, x, push, next) {
if (err) {
// got an error, just keep going
push(err);
next();
}
if (x === nil) {
// hit the end without redirecting to xs, use alternative
next(ys);
}
else {
// got a value, push it, then redirect to xs
push(null, x);
next(xs);
}
});
};
exposeMethod('otherwise');
/**
* Adds a value to the end of a Stream.
*
* @id append
* @section Streams
* @name Stream.append(y)
* @param y - the value to append to the Stream
* @api public
*
* _([1, 2, 3]).append(4) // => 1, 2, 3, 4
*/
Stream.prototype.append = function (y) {
return this.consume(function (err, x, push, next) {
if (x === nil) {
push(null, y);
push(null, _.nil);
}
else {
push(err, x);
next();
}
});
};
exposeMethod('append');
/**
* Boils down a Stream to a single value. The memo is the initial state
* of the reduction, and each successive step of it should be returned by
* the iterator function. The iterator is passed two arguments:
* the memo and the next value.
*
* @id reduce
* @section Streams
* @name Stream.reduce(memo, iterator)
* @param memo - the initial state of the reduction
* @param {Function} iterator - the function which reduces the values
* @api public
*
* var add = function (a, b) {
* return a + b;
* };
*
* _([1, 2, 3, 4]).reduce(0, add) // => 10
*/
// TODO: convert this to this.scan(z, f).last()
Stream.prototype.reduce = function (z, f) {
return this.consume(function (err, x, push, next) {
if (x === nil) {
push(null, z);
push(null, _.nil);
}
else if (err) {
push(err);
next();
}
else {
z = f(z, x);
next();
}
});
};
exposeMethod('reduce');
/**
* Same as [reduce](#reduce), but uses the first element as the initial
* state instead of passing in a `memo` value.
*
* @id reduce1
* @section Streams
* @name Stream.reduce1(iterator)
* @param {Function} iterator - the function which reduces the values
* @api public
*
* _([1, 2, 3, 4]).reduce1(add) // => 10
*/
Stream.prototype.reduce1 = function (f) {
var self = this;
return _(function (push, next) {
self.pull(function (err, x) {
if (err) {
push(err);
next();
}
if (x === nil) {
push(null, nil);
}
else {
next(self.reduce(x, f));
}
});
});
};
exposeMethod('reduce1');
/**
* Groups all values into an Array and passes down the stream as a single
* data event. This is a bit like doing [toArray](#toArray), but instead
* of accepting a callback and causing a *thunk*, it passes the value on.
*
* @id collect
* @section Streams
* @name Stream.collect()
* @api public
*
* _(['foo', 'bar']).collect().toArray(function (xs) {
* // xs will be [['foo', 'bar']]
* });
*/
Stream.prototype.collect = function () {
var xs = [];
return this.consume(function (err, x, push, next) {
if (err) {
push(err);
next();
}
else if (x === nil) {
push(null, xs);
push(null, nil);
}
else {
xs.push(x);
next();
}
});
};
exposeMethod('collect');
/**
* Like [reduce](#reduce), but emits each intermediate value of the
* reduction as it is calculated.
*
* @id scan
* @section Streams
* @name Stream.scan(memo, iterator)
* @param memo - the initial state of the reduction
* @param {Function} iterator - the function which reduces the values
* @api public
*
* _([1, 2, 3, 4]).scan(0, add) // => 0, 1, 3, 6, 10
*/
Stream.prototype.scan = function (z, f) {
var self = this;
return _([z]).concat(
self.consume(function (err, x, push, next) {
if (x === nil) {
push(null, _.nil);
}
else if (err) {
push(err);
next();
}
else {
z = f(z, x);
push(null, z);
next();
}
})
);
};
exposeMethod('scan');
/**
* Same as [scan](#scan), but uses the first element as the initial
* state instead of passing in a `memo` value.
*
* @id scan1
* @section Streams
* @name Stream.scan1(iterator)
* @param {Function} iterator - the function which reduces the values
* @api public
*
* _([1, 2, 3, 4]).scan1(add) // => 1, 3, 6, 10
*/
Stream.prototype.scan1 = function (f) {
var self = this;
return _(function (push, next) {
self.pull(function (err, x) {
if (err) {
push(err);
next();
}
if (x === nil) {
push(null, nil);
}
else {
next(self.scan(x, f));
}
});
});
};
exposeMethod('scan1');
/**
* Concatenates a Stream to the end of this Stream.
*
* Be aware that in the top-level export, the args may be in the reverse
* order to what you'd expect `_([a], [b]) => [b, a]`, as this follows the
* convention of other top-level exported functions which do `x` to `y`.
*
* @id concat
* @section Streams
* @name Stream.concat(ys)
* @params {Stream | Array} ys - the values to concatenate onto this Stream
* @api public
*
* _([1, 2]).concat([3, 4]) // => 1, 2, 3, 4
* _.concat([3, 4], [1, 2]) // => 1, 2, 3, 4
*/
Stream.prototype.concat = function (ys) {
return this.consume(function (err, x, push, next) {
if (x === nil) {
next(ys);
}
else {
push(err, x);
next();
}
});
};
exposeMethod('concat');
/**
* Takes a Stream of Streams and merges their values and errors into a
* single new Stream. The merged stream ends when all source streams have
* ended.
*
* Note that no guarantee is made with respect to the order in which
* values for each stream end up in the merged stream. Values in the
* merged stream will, however, respect the order they were emitted from
* their respective streams.
*
* @id merge
* @section Streams
* @name Stream.merge()
* @api public
*
* var txt = _(['foo.txt', 'bar.txt']).map(readFile)
* var md = _(['baz.md']).map(readFile)
*
* _([txt, md]).merge();
* // => contents of foo.txt, bar.txt and baz.txt in the order they were read
*/
Stream.prototype.merge = function () {
var self = this;
var ended = 0;
var total = 0;
var toread = [];
var reading_srcs = false;
return _(function (push, next) {
if (!self.ended && !reading_srcs) {
reading_srcs = true;
self.pull(function (err, x) {
if (err) {
push(err);
}
else if (x !== nil) {
total++;
toread.push(x);
}
reading_srcs = false;
next();
});
}
while (toread.length) {
(function (src) {
src.pull(function (err, x) {
if (err) {
toread.push(src);
push(err);
next();
}
else if (x === nil) {
ended++;
if (self.ended && ended === total) {
push(null, nil);
}
else {
next();
}
}
else {
toread.push(src);
push(null, x);
next();
}
});
})(toread.shift());
}
});
};
exposeMethod('merge');
/**
* Calls a named method on each object from the Stream - returning
* a new stream with the result of those calls.
*
* @id invoke
* @section Streams
* @name Stream.invoke(method, args)
* @param {String} method - the method name to call
* @param {Array} args - the arguments to call the method with
* @api public
*
* _(['foo', 'bar']).invoke('toUpperCase', []) // => FOO, BAR
*
* filenames.map(readFile).sequence().invoke('toString', ['utf8']);
*/
Stream.prototype.invoke = function (method, args) {
return this.map(function (x) {
return x[method].apply(x, args);
});
};
exposeMethod('invoke');
/**
* Ensures that only one data event is push downstream (or into the buffer)
* every `ms` milliseconds, any other values are dropped.
*
* @id throttle
* @section Streams
* @name Stream.throttle(ms)
* @param {Number} ms - the minimum milliseconds between each value
* @api public
*
* _('mousemove', document).throttle(1000);
*/
Stream.prototype.throttle = function (ms) {
var s = new Stream();
var last = 0 - ms;
var _write = s.write;
s.write = function (x) {
var now = new Date().getTime();
if (_._isStreamError(x) || x === nil) {
return _write.apply(this, arguments);
}
else if (now - ms >= last) {
last = now;
return _write.apply(this, arguments);
}
};
this._addConsumer(s);
return s;
};
exposeMethod('throttle');
/**
* Holds off pushing data events downstream until there has been no more
* data for `ms` milliseconds. Sends the last value that occurred before
* the delay, discarding all other values.
*
* @id debounce
* @section Streams
* @name Stream.debounce(ms)
* @param {Number} ms - the milliseconds to wait before sending data
* @api public
*
* // sends last keyup event after user has stopped typing for 1 second
* $('keyup', textbox).debounce(1000);
*/
Stream.prototype.debounce = function (ms) {
var s = new Stream();
var t = null;
var nothing = {};
var last = nothing;
var _write = s.write;
s.write = function (x) {
if (_._isStreamError(x)) {
// let errors through regardless
return _write.apply(this, arguments);
}
else if (x === nil) {
if (t) {
clearTimeout(t);
}
if (last !== nothing) {
_write.call(s, last);
}
return _write.apply(this, arguments);
}
else {
last = x;
if (t) {
clearTimeout(t);
}
t = setTimeout(function () {
_write.call(s, last);
}, ms);
return !this.paused;
}
};
this._addConsumer(s);
return s;
};
exposeMethod('debounce');
/**
* Creates a new Stream, which when read from, only returns the last
* seen value from the source. The source stream does not experience
* back-pressure. Useful if you're using a Stream to model a changing
* property which you need to query periodically.
*
* @id latest
* @section Streams
* @name Stream.latest()
* @api public
*
* // slowThing will always get the last known mouse position
* // when it asks for more data from the mousePosition stream
* mousePosition.latest().map(slowThing)
*/
Stream.prototype.latest = function () {
var s = new Stream();
var _write = s.write;
s.pause = function () {
this.paused = true;
// do not force parent to checkBackpressure
};
s.write = function (x) {
if (_._isStreamError(x)) {
// pass errors straight through
_write.call(this, x);
}
else if (x === nil) {
_write.call(this, x);
}
else {
if (this.paused) {
this._incoming = this._incoming.filter(function (x) {
// remove any existing values from buffer
return _._isStreamError(x) || x === nil;
});
this._incoming.push(x);
}
else {
_write.call(this, x);
}
}
// never push back
return true;
};
this._addConsumer(s);
s.resume();
return s;
};
exposeMethod('latest');
/**
* Returns values from an Object as a Stream. Reads properties
* lazily, so if you don't read from all keys on an object, not
* all properties will be read from (may have an effect where getters
* are used).
*
* @id values
* @section Objects
* @name _.values(obj)
* @param {Object} obj - the object to return values from
* @api public
*
* _.values({foo: 1, bar: 2, baz: 3}) // => 1, 2, 3
*/
_.values = function (obj) {
return _.keys(obj).map(function (k) {
return obj[k];
});
};
/**
* Returns keys from an Object as a Stream.
*
* @id keys
* @section Objects
* @name _.keys(obj)
* @param {Object} obj - the object to return keys from
* @api public
*
* _.keys({foo: 1, bar: 2, baz: 3}) // => 'foo', 'bar', 'baz'
*/
_.keys = function (obj) {
var keys = [];
for (var k in obj) {
if (obj.hasOwnProperty(k)) {
keys.push(k);
}
}
return _(keys);
};
/**
* Returns key/value pairs for an Object as a Stream. Reads properties
* lazily, so if you don't read from all keys on an object, not
* all properties will be read from (may have an effect where getters
* are used).
*
* @id pairs
* @section Objects
* @name _.pairs(obj)
* @param {Object} obj - the object to return key/value pairs from
* @api public
*
* _.pairs({foo: 1, bar: 2}) // => ['foo', 1], ['bar', 2]
*/
_.pairs = function (obj) {
return _.keys(obj).map(function (k) {
return [k, obj[k]];
});
};
/**
* Extends one object with the properties of another. **Note:** The
* arguments are in the reverse order of other libraries such as
* underscore. This is so it follows the convention of other functions in
* this library and so you can more meaningfully partially apply it.
*
* @id extend
* @section Objects
* @name _.extend(a, b)
* @param {Object} a - the properties to extend b with
* @param {Object} b - the original object to extend
* @api public
*
* _.extend({name: 'bar'}, {name: 'foo', price: 20})
* // => {name: 'bar', price: 20}
*
* // example of partial application
* var publish = _.extend({published: true});
*
* publish({title: 'test post'})
* // => {title: 'test post', published: true}
*/
_.extend = _.curry(function (extensions, target) {
for (var k in extensions) {
if (extensions.hasOwnProperty(k)) {
target[k] = extensions[k];
}
}
return target;
});
/**
* Returns a property from an object.
*
* @id get
* @section Objects
* @name _.get(prop, obj)
* @param {String} prop - the property to return
* @param {Object} obj - the object to read properties from
* @api public
*
* var obj = {foo: 'bar', baz: 123};
* _.get('foo', obj) // => 'bar'
*
* // making use of partial application
* var posts = [
* {title: 'one'},
* {title: 'two'},
* {title: 'three'}
* ];
*
* _(posts).map(_.get('title')) // => 'one', 'two', 'three'
*/
_.get = _.curry(function (prop, obj) {
return obj[prop];
});
/**
* Updates a property on an object, returning the updated object.
*
* @id set
* @section Objects
* @name _.set(prop, value, obj)
* @param {String} prop - the property to return
* @param value - the value to set the property to
* @param {Object} obj - the object to set properties on
* @api public
*
* var obj = {foo: 'bar', baz: 123};
* _.set('foo', 'wheeee', obj) // => {foo: 'wheeee', baz: 123}
*
* // making use of partial application
* var publish = _.set('published', true);
*
* publish({title: 'example'}) // => {title: 'example', published: true}
*/
_.set = _.curry(function (prop, val, obj) {
obj[prop] = val;
return obj;
});
/**
* Logs values to the console, a simple wrapper around `console.log` that
* it suitable for passing to other functions by reference without having to
* call `bind`.
*
* @id log
* @section Utils
* @name _.log(args..)
* @api public
*
* _.log('Hello, world!');
*
* _([1, 2, 3, 4]).each(_.log);
*/
_.log = function () {
console.log.apply(console, arguments);
};
/**
* Wraps a node-style async function which accepts a callback, transforming
* it to a function which accepts the same arguments minus the callback and
* returns a Highland Stream instead. Only the first argument to the
* callback (or an error) will be pushed onto the Stream.
*
* @id wrapCallback
* @section Utils
* @name _.wrapCallback(f)
* @param {Function} f - the node-style function to wrap
* @api public
*
* var fs = require('fs');
*
* var readFile = _.wrapCallback(fs.readFile);
*
* readFile('example.txt').apply(function (data) {
* // data is now the contents of example.txt
* });
*/
_.wrapCallback = function (f) {
return function () {
var args = slice.call(arguments);
return _(function (push) {
var cb = function (err, x) {
if (err) {
push(err);
}
else {
push(null, x);
}
push(null, nil);
};
f.apply(null, args.concat([cb]));
});
};
};
/**
* Add two values. Can be partially applied.
*
* @id add
* @section Operators
* @name _.add(a, b)
* @api public
*
* add(1, 2) === 3
* add(1)(5) === 6
*/
_.add = _.curry(function (a, b) {
return a + b;
});
/**
* Perform logical negation on a value. If `x` is truthy then returns false,
* otherwise returns true.
*
* @id not
* @section Operators
* @name _.not(x)
* @param x - the value to negate
* @api public
*
* _.not(true) // => false
* _.not(false) // => true
*/
_.not = function (x) {
return !x;
};
}).call(this,_dereq_("/home/caolan/projects/highland/node_modules/browserify/node_modules/insert-module-globals/node_modules/process/browser.js"),typeof self !== "undefined" ? self : typeof window !== "undefined" ? window : {})
},{"/home/caolan/projects/highland/node_modules/browserify/node_modules/insert-module-globals/node_modules/process/browser.js":4,"events":2,"util":6}],2:[function(_dereq_,module,exports){
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
function EventEmitter() {
this._events = this._events || {};
this._maxListeners = this._maxListeners || undefined;
}
module.exports = EventEmitter;
// Backwards-compat with node 0.10.x
EventEmitter.EventEmitter = EventEmitter;
EventEmitter.prototype._events = undefined;
EventEmitter.prototype._maxListeners = undefined;
// By default EventEmitters will print a warning if more than 10 listeners are
// added to it. This is a useful default which helps finding memory leaks.
EventEmitter.defaultMaxListeners = 10;
// Obviously not all Emitters should be limited to 10. This function allows
// that to be increased. Set to zero for unlimited.
EventEmitter.prototype.setMaxListeners = function(n) {
if (!isNumber(n) || n < 0 || isNaN(n))
throw TypeError('n must be a positive number');
this._maxListeners = n;
return this;
};
EventEmitter.prototype.emit = function(type) {
var er, handler, len, args, i, listeners;
if (!this._events)
this._events = {};
// If there is no 'error' event listener then throw.
if (type === 'error') {
if (!this._events.error ||
(isObject(this._events.error) && !this._events.error.length)) {
er = arguments[1];
if (er instanceof Error) {
throw er; // Unhandled 'error' event
} else {
throw TypeError('Uncaught, unspecified "error" event.');
}
return false;
}
}
handler = this._events[type];
if (isUndefined(handler))
return false;
if (isFunction(handler)) {
switch (arguments.length) {
// fast cases
case 1:
handler.call(this);
break;
case 2:
handler.call(this, arguments[1]);
break;
case 3:
handler.call(this, arguments[1], arguments[2]);
break;
// slower
default:
len = arguments.length;
args = new Array(len - 1);
for (i = 1; i < len; i++)
args[i - 1] = arguments[i];
handler.apply(this, args);
}
} else if (isObject(handler)) {
len = arguments.length;
args = new Array(len - 1);
for (i = 1; i < len; i++)
args[i - 1] = arguments[i];
listeners = handler.slice();
len = listeners.length;
for (i = 0; i < len; i++)
listeners[i].apply(this, args);
}
return true;
};
EventEmitter.prototype.addListener = function(type, listener) {
var m;
if (!isFunction(listener))
throw TypeError('listener must be a function');
if (!this._events)
this._events = {};
// To avoid recursion in the case that type === "newListener"! Before
// adding it to the listeners, first emit "newListener".
if (this._events.newListener)
this.emit('newListener', type,
isFunction(listener.listener) ?
listener.listener : listener);
if (!this._events[type])
// Optimize the case of one listener. Don't need the extra array object.
this._events[type] = listener;
else if (isObject(this._events[type]))
// If we've already got an array, just append.
this._events[type].push(listener);
else
// Adding the second element, need to change to array.
this._events[type] = [this._events[type], listener];
// Check for listener leak
if (isObject(this._events[type]) && !this._events[type].warned) {
var m;
if (!isUndefined(this._maxListeners)) {
m = this._maxListeners;
} else {
m = EventEmitter.defaultMaxListeners;
}
if (m && m > 0 && this._events[type].length > m) {
this._events[type].warned = true;
console.error('(node) warning: possible EventEmitter memory ' +
'leak detected. %d listeners added. ' +
'Use emitter.setMaxListeners() to increase limit.',
this._events[type].length);
console.trace();
}
}
return this;
};
EventEmitter.prototype.on = EventEmitter.prototype.addListener;
EventEmitter.prototype.once = function(type, listener) {
if (!isFunction(listener))
throw TypeError('listener must be a function');
var fired = false;
function g() {
this.removeListener(type, g);
if (!fired) {
fired = true;
listener.apply(this, arguments);
}
}
g.listener = listener;
this.on(type, g);
return this;
};
// emits a 'removeListener' event iff the listener was removed
EventEmitter.prototype.removeListener = function(type, listener) {
var list, position, length, i;
if (!isFunction(listener))
throw TypeError('listener must be a function');
if (!this._events || !this._events[type])
return this;
list = this._events[type];
length = list.length;
position = -1;
if (list === listener ||
(isFunction(list.listener) && list.listener === listener)) {
delete this._events[type];
if (this._events.removeListener)
this.emit('removeListener', type, listener);
} else if (isObject(list)) {
for (i = length; i-- > 0;) {
if (list[i] === listener ||
(list[i].listener && list[i].listener === listener)) {
position = i;
break;
}
}
if (position < 0)
return this;
if (list.length === 1) {
list.length = 0;
delete this._events[type];
} else {
list.splice(position, 1);
}
if (this._events.removeListener)
this.emit('removeListener', type, listener);
}
return this;
};
EventEmitter.prototype.removeAllListeners = function(type) {
var key, listeners;
if (!this._events)
return this;
// not listening for removeListener, no need to emit
if (!this._events.removeListener) {
if (arguments.length === 0)
this._events = {};
else if (this._events[type])
delete this._events[type];
return this;
}
// emit removeListener for all listeners on all events
if (arguments.length === 0) {
for (key in this._events) {
if (key === 'removeListener') continue;
this.removeAllListeners(key);
}
this.removeAllListeners('removeListener');
this._events = {};
return this;
}
listeners = this._events[type];
if (isFunction(listeners)) {
this.removeListener(type, listeners);
} else {
// LIFO order
while (listeners.length)
this.removeListener(type, listeners[listeners.length - 1]);
}
delete this._events[type];
return this;
};
EventEmitter.prototype.listeners = function(type) {
var ret;
if (!this._events || !this._events[type])
ret = [];
else if (isFunction(this._events[type]))
ret = [this._events[type]];
else
ret = this._events[type].slice();
return ret;
};
EventEmitter.listenerCount = function(emitter, type) {
var ret;
if (!emitter._events || !emitter._events[type])
ret = 0;
else if (isFunction(emitter._events[type]))
ret = 1;
else
ret = emitter._events[type].length;
return ret;
};
function isFunction(arg) {
return typeof arg === 'function';
}
function isNumber(arg) {
return typeof arg === 'number';
}
function isObject(arg) {
return typeof arg === 'object' && arg !== null;
}
function isUndefined(arg) {
return arg === void 0;
}
},{}],3:[function(_dereq_,module,exports){
if (typeof Object.create === 'function') {
// implementation from standard node.js 'util' module
module.exports = function inherits(ctor, superCtor) {
ctor.super_ = superCtor
ctor.prototype = Object.create(superCtor.prototype, {
constructor: {
value: ctor,
enumerable: false,
writable: true,
configurable: true
}
});
};
} else {
// old school shim for old browsers
module.exports = function inherits(ctor, superCtor) {
ctor.super_ = superCtor
var TempCtor = function () {}
TempCtor.prototype = superCtor.prototype
ctor.prototype = new TempCtor()
ctor.prototype.constructor = ctor
}
}
},{}],4:[function(_dereq_,module,exports){
// shim for using process in browser
var process = module.exports = {};
process.nextTick = (function () {
var canSetImmediate = typeof window !== 'undefined'
&& window.setImmediate;
var canPost = typeof window !== 'undefined'
&& window.postMessage && window.addEventListener
;
if (canSetImmediate) {
return function (f) { return window.setImmediate(f) };
}
if (canPost) {
var queue = [];
window.addEventListener('message', function (ev) {
var source = ev.source;
if ((source === window || source === null) && ev.data === 'process-tick') {
ev.stopPropagation();
if (queue.length > 0) {
var fn = queue.shift();
fn();
}
}
}, true);
return function nextTick(fn) {
queue.push(fn);
window.postMessage('process-tick', '*');
};
}
return function nextTick(fn) {
setTimeout(fn, 0);
};
})();
process.title = 'browser';
process.browser = true;
process.env = {};
process.argv = [];
process.binding = function (name) {
throw new Error('process.binding is not supported');
}
// TODO(shtylman)
process.cwd = function () { return '/' };
process.chdir = function (dir) {
throw new Error('process.chdir is not supported');
};
},{}],5:[function(_dereq_,module,exports){
module.exports = function isBuffer(arg) {
return arg && typeof arg === 'object'
&& typeof arg.copy === 'function'
&& typeof arg.fill === 'function'
&& typeof arg.readUInt8 === 'function';
}
},{}],6:[function(_dereq_,module,exports){
(function (process,global){
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var formatRegExp = /%[sdj%]/g;
exports.format = function(f) {
if (!isString(f)) {
var objects = [];
for (var i = 0; i < arguments.length; i++) {
objects.push(inspect(arguments[i]));
}
return objects.join(' ');
}
var i = 1;
var args = arguments;
var len = args.length;
var str = String(f).replace(formatRegExp, function(x) {
if (x === '%%') return '%';
if (i >= len) return x;
switch (x) {
case '%s': return String(args[i++]);
case '%d': return Number(args[i++]);
case '%j':
try {
return JSON.stringify(args[i++]);
} catch (_) {
return '[Circular]';
}
default:
return x;
}
});
for (var x = args[i]; i < len; x = args[++i]) {
if (isNull(x) || !isObject(x)) {
str += ' ' + x;
} else {
str += ' ' + inspect(x);
}
}
return str;
};
// Mark that a method should not be used.
// Returns a modified function which warns once by default.
// If --no-deprecation is set, then it is a no-op.
exports.deprecate = function(fn, msg) {
// Allow for deprecating things in the process of starting up.
if (isUndefined(global.process)) {
return function() {
return exports.deprecate(fn, msg).apply(this, arguments);
};
}
if (process.noDeprecation === true) {
return fn;
}
var warned = false;
function deprecated() {
if (!warned) {
if (process.throwDeprecation) {
throw new Error(msg);
} else if (process.traceDeprecation) {
console.trace(msg);
} else {
console.error(msg);
}
warned = true;
}
return fn.apply(this, arguments);
}
return deprecated;
};
var debugs = {};
var debugEnviron;
exports.debuglog = function(set) {
if (isUndefined(debugEnviron))
debugEnviron = process.env.NODE_DEBUG || '';
set = set.toUpperCase();
if (!debugs[set]) {
if (new RegExp('\\b' + set + '\\b', 'i').test(debugEnviron)) {
var pid = process.pid;
debugs[set] = function() {
var msg = exports.format.apply(exports, arguments);
console.error('%s %d: %s', set, pid, msg);
};
} else {
debugs[set] = function() {};
}
}
return debugs[set];
};
/**
* Echos the value of a value. Trys to print the value out
* in the best way possible given the different types.
*
* @param {Object} obj The object to print out.
* @param {Object} opts Optional options object that alters the output.
*/
/* legacy: obj, showHidden, depth, colors*/
function inspect(obj, opts) {
// default options
var ctx = {
seen: [],
stylize: stylizeNoColor
};
// legacy...
if (arguments.length >= 3) ctx.depth = arguments[2];
if (arguments.length >= 4) ctx.colors = arguments[3];
if (isBoolean(opts)) {
// legacy...
ctx.showHidden = opts;
} else if (opts) {
// got an "options" object
exports._extend(ctx, opts);
}
// set default options
if (isUndefined(ctx.showHidden)) ctx.showHidden = false;
if (isUndefined(ctx.depth)) ctx.depth = 2;
if (isUndefined(ctx.colors)) ctx.colors = false;
if (isUndefined(ctx.customInspect)) ctx.customInspect = true;
if (ctx.colors) ctx.stylize = stylizeWithColor;
return formatValue(ctx, obj, ctx.depth);
}
exports.inspect = inspect;
// http://en.wikipedia.org/wiki/ANSI_escape_code#graphics
inspect.colors = {
'bold' : [1, 22],
'italic' : [3, 23],
'underline' : [4, 24],
'inverse' : [7, 27],
'white' : [37, 39],
'grey' : [90, 39],
'black' : [30, 39],
'blue' : [34, 39],
'cyan' : [36, 39],
'green' : [32, 39],
'magenta' : [35, 39],
'red' : [31, 39],
'yellow' : [33, 39]
};
// Don't use 'blue' not visible on cmd.exe
inspect.styles = {
'special': 'cyan',
'number': 'yellow',
'boolean': 'yellow',
'undefined': 'grey',
'null': 'bold',
'string': 'green',
'date': 'magenta',
// "name": intentionally not styling
'regexp': 'red'
};
function stylizeWithColor(str, styleType) {
var style = inspect.styles[styleType];
if (style) {
return '\u001b[' + inspect.colors[style][0] + 'm' + str +
'\u001b[' + inspect.colors[style][1] + 'm';
} else {
return str;
}
}
function stylizeNoColor(str, styleType) {
return str;
}
function arrayToHash(array) {
var hash = {};
array.forEach(function(val, idx) {
hash[val] = true;
});
return hash;
}
function formatValue(ctx, value, recurseTimes) {
// Provide a hook for user-specified inspect functions.
// Check that value is an object with an inspect function on it
if (ctx.customInspect &&
value &&
isFunction(value.inspect) &&
// Filter out the util module, it's inspect function is special
value.inspect !== exports.inspect &&
// Also filter out any prototype objects using the circular check.
!(value.constructor && value.constructor.prototype === value)) {
var ret = value.inspect(recurseTimes, ctx);
if (!isString(ret)) {
ret = formatValue(ctx, ret, recurseTimes);
}
return ret;
}
// Primitive types cannot have properties
var primitive = formatPrimitive(ctx, value);
if (primitive) {
return primitive;
}
// Look up the keys of the object.
var keys = Object.keys(value);
var visibleKeys = arrayToHash(keys);
if (ctx.showHidden) {
keys = Object.getOwnPropertyNames(value);
}
// IE doesn't make error fields non-enumerable
// http://msdn.microsoft.com/en-us/library/ie/dww52sbt(v=vs.94).aspx
if (isError(value)
&& (keys.indexOf('message') >= 0 || keys.indexOf('description') >= 0)) {
return formatError(value);
}
// Some type of object without properties can be shortcutted.
if (keys.length === 0) {
if (isFunction(value)) {
var name = value.name ? ': ' + value.name : '';
return ctx.stylize('[Function' + name + ']', 'special');
}
if (isRegExp(value)) {
return ctx.stylize(RegExp.prototype.toString.call(value), 'regexp');
}
if (isDate(value)) {
return ctx.stylize(Date.prototype.toString.call(value), 'date');
}
if (isError(value)) {
return formatError(value);
}
}
var base = '', array = false, braces = ['{', '}'];
// Make Array say that they are Array
if (isArray(value)) {
array = true;
braces = ['[', ']'];
}
// Make functions say that they are functions
if (isFunction(value)) {
var n = value.name ? ': ' + value.name : '';
base = ' [Function' + n + ']';
}
// Make RegExps say that they are RegExps
if (isRegExp(value)) {
base = ' ' + RegExp.prototype.toString.call(value);
}
// Make dates with properties first say the date
if (isDate(value)) {
base = ' ' + Date.prototype.toUTCString.call(value);
}
// Make error with message first say the error
if (isError(value)) {
base = ' ' + formatError(value);
}
if (keys.length === 0 && (!array || value.length == 0)) {
return braces[0] + base + braces[1];
}
if (recurseTimes < 0) {
if (isRegExp(value)) {
return ctx.stylize(RegExp.prototype.toString.call(value), 'regexp');
} else {
return ctx.stylize('[Object]', 'special');
}
}
ctx.seen.push(value);
var output;
if (array) {
output = formatArray(ctx, value, recurseTimes, visibleKeys, keys);
} else {
output = keys.map(function(key) {
return formatProperty(ctx, value, recurseTimes, visibleKeys, key, array);
});
}
ctx.seen.pop();
return reduceToSingleString(output, base, braces);
}
function formatPrimitive(ctx, value) {
if (isUndefined(value))
return ctx.stylize('undefined', 'undefined');
if (isString(value)) {
var simple = '\'' + JSON.stringify(value).replace(/^"|"$/g, '')
.replace(/'/g, "\\'")
.replace(/\\"/g, '"') + '\'';
return ctx.stylize(simple, 'string');
}
if (isNumber(value))
return ctx.stylize('' + value, 'number');
if (isBoolean(value))
return ctx.stylize('' + value, 'boolean');
// For some reason typeof null is "object", so special case here.
if (isNull(value))
return ctx.stylize('null', 'null');
}
function formatError(value) {
return '[' + Error.prototype.toString.call(value) + ']';
}
function formatArray(ctx, value, recurseTimes, visibleKeys, keys) {
var output = [];
for (var i = 0, l = value.length; i < l; ++i) {
if (hasOwnProperty(value, String(i))) {
output.push(formatProperty(ctx, value, recurseTimes, visibleKeys,
String(i), true));
} else {
output.push('');
}
}
keys.forEach(function(key) {
if (!key.match(/^\d+$/)) {
output.push(formatProperty(ctx, value, recurseTimes, visibleKeys,
key, true));
}
});
return output;
}
function formatProperty(ctx, value, recurseTimes, visibleKeys, key, array) {
var name, str, desc;
desc = Object.getOwnPropertyDescriptor(value, key) || { value: value[key] };
if (desc.get) {
if (desc.set) {
str = ctx.stylize('[Getter/Setter]', 'special');
} else {
str = ctx.stylize('[Getter]', 'special');
}
} else {
if (desc.set) {
str = ctx.stylize('[Setter]', 'special');
}
}
if (!hasOwnProperty(visibleKeys, key)) {
name = '[' + key + ']';
}
if (!str) {
if (ctx.seen.indexOf(desc.value) < 0) {
if (isNull(recurseTimes)) {
str = formatValue(ctx, desc.value, null);
} else {
str = formatValue(ctx, desc.value, recurseTimes - 1);
}
if (str.indexOf('\n') > -1) {
if (array) {
str = str.split('\n').map(function(line) {
return ' ' + line;
}).join('\n').substr(2);
} else {
str = '\n' + str.split('\n').map(function(line) {
return ' ' + line;
}).join('\n');
}
}
} else {
str = ctx.stylize('[Circular]', 'special');
}
}
if (isUndefined(name)) {
if (array && key.match(/^\d+$/)) {
return str;
}
name = JSON.stringify('' + key);
if (name.match(/^"([a-zA-Z_][a-zA-Z_0-9]*)"$/)) {
name = name.substr(1, name.length - 2);
name = ctx.stylize(name, 'name');
} else {
name = name.replace(/'/g, "\\'")
.replace(/\\"/g, '"')
.replace(/(^"|"$)/g, "'");
name = ctx.stylize(name, 'string');
}
}
return name + ': ' + str;
}
function reduceToSingleString(output, base, braces) {
var numLinesEst = 0;
var length = output.reduce(function(prev, cur) {
numLinesEst++;
if (cur.indexOf('\n') >= 0) numLinesEst++;
return prev + cur.replace(/\u001b\[\d\d?m/g, '').length + 1;
}, 0);
if (length > 60) {
return braces[0] +
(base === '' ? '' : base + '\n ') +
' ' +
output.join(',\n ') +
' ' +
braces[1];
}
return braces[0] + base + ' ' + output.join(', ') + ' ' + braces[1];
}
// NOTE: These type checking functions intentionally don't use `instanceof`
// because it is fragile and can be easily faked with `Object.create()`.
function isArray(ar) {
return Array.isArray(ar);
}
exports.isArray = isArray;
function isBoolean(arg) {
return typeof arg === 'boolean';
}
exports.isBoolean = isBoolean;
function isNull(arg) {
return arg === null;
}
exports.isNull = isNull;
function isNullOrUndefined(arg) {
return arg == null;
}
exports.isNullOrUndefined = isNullOrUndefined;
function isNumber(arg) {
return typeof arg === 'number';
}
exports.isNumber = isNumber;
function isString(arg) {
return typeof arg === 'string';
}
exports.isString = isString;
function isSymbol(arg) {
return typeof arg === 'symbol';
}
exports.isSymbol = isSymbol;
function isUndefined(arg) {
return arg === void 0;
}
exports.isUndefined = isUndefined;
function isRegExp(re) {
return isObject(re) && objectToString(re) === '[object RegExp]';
}
exports.isRegExp = isRegExp;
function isObject(arg) {
return typeof arg === 'object' && arg !== null;
}
exports.isObject = isObject;
function isDate(d) {
return isObject(d) && objectToString(d) === '[object Date]';
}
exports.isDate = isDate;
function isError(e) {
return isObject(e) &&
(objectToString(e) === '[object Error]' || e instanceof Error);
}
exports.isError = isError;
function isFunction(arg) {
return typeof arg === 'function';
}
exports.isFunction = isFunction;
function isPrimitive(arg) {
return arg === null ||
typeof arg === 'boolean' ||
typeof arg === 'number' ||
typeof arg === 'string' ||
typeof arg === 'symbol' || // ES6 symbol
typeof arg === 'undefined';
}
exports.isPrimitive = isPrimitive;
exports.isBuffer = _dereq_('./support/isBuffer');
function objectToString(o) {
return Object.prototype.toString.call(o);
}
function pad(n) {
return n < 10 ? '0' + n.toString(10) : n.toString(10);
}
var months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep',
'Oct', 'Nov', 'Dec'];
// 26 Feb 16:19:34
function timestamp() {
var d = new Date();
var time = [pad(d.getHours()),
pad(d.getMinutes()),
pad(d.getSeconds())].join(':');
return [d.getDate(), months[d.getMonth()], time].join(' ');
}
// log is just a thin wrapper to console.log that prepends a timestamp
exports.log = function() {
console.log('%s - %s', timestamp(), exports.format.apply(exports, arguments));
};
/**
* Inherit the prototype methods from one constructor into another.
*
* The Function.prototype.inherits from lang.js rewritten as a standalone
* function (not on Function.prototype). NOTE: If this file is to be loaded
* during bootstrapping this function needs to be rewritten using some native
* functions as prototype setup using normal JavaScript does not work as
* expected during bootstrapping (see mirror.js in r114903).
*
* @param {function} ctor Constructor function which needs to inherit the
* prototype.
* @param {function} superCtor Constructor function to inherit prototype from.
*/
exports.inherits = _dereq_('inherits');
exports._extend = function(origin, add) {
// Don't do anything if add isn't an object
if (!add || !isObject(add)) return origin;
var keys = Object.keys(add);
var i = keys.length;
while (i--) {
origin[keys[i]] = add[keys[i]];
}
return origin;
};
function hasOwnProperty(obj, prop) {
return Object.prototype.hasOwnProperty.call(obj, prop);
}
}).call(this,_dereq_("/home/caolan/projects/highland/node_modules/browserify/node_modules/insert-module-globals/node_modules/process/browser.js"),typeof self !== "undefined" ? self : typeof window !== "undefined" ? window : {})
},{"./support/isBuffer":5,"/home/caolan/projects/highland/node_modules/browserify/node_modules/insert-module-globals/node_modules/process/browser.js":4,"inherits":3}]},{},[1])
(1)
});