|
|
//filter will reemit the data if cb(err,pass) pass is truthy
// reduce is more tricky
// maybe we want to group the reductions or emit progress updates occasionally
// the most basic reduce just emits one 'data' event after it has recieved 'end'
var Stream = require('stream').Stream
//create an event stream and apply function to each .write
//emitting each response as data
//unless it's an empty callback
module.exports = function (mapper, opts) {
var stream = new Stream() , self = this , inputs = 0 , outputs = 0 , ended = false , paused = false , destroyed = false , lastWritten = 0 , inNext = false
this.opts = opts || {}; var errorEventName = this.opts.failures ? 'failure' : 'error';
// Items that are not ready to be written yet (because they would come out of
// order) get stuck in a queue for later.
var writeQueue = {}
stream.writable = true stream.readable = true
function queueData (data, number) { var nextToWrite = lastWritten + 1
if (number === nextToWrite) { // If it's next, and its not undefined write it
if (data !== undefined) { stream.emit.apply(stream, ['data', data]) } lastWritten ++ nextToWrite ++ } else { // Otherwise queue it for later.
writeQueue[number] = data }
// If the next value is in the queue, write it
if (writeQueue.hasOwnProperty(nextToWrite)) { var dataToWrite = writeQueue[nextToWrite] delete writeQueue[nextToWrite] return queueData(dataToWrite, nextToWrite) }
outputs ++ if(inputs === outputs) { if(paused) paused = false, stream.emit('drain') //written all the incoming events
if(ended) end() } }
function next (err, data, number) { if(destroyed) return inNext = true
if (!err || self.opts.failures) { queueData(data, number) }
if (err) { stream.emit.apply(stream, [ errorEventName, err ]); }
inNext = false; }
// Wrap the mapper function by calling its callback with the order number of
// the item in the stream.
function wrappedMapper (input, number, callback) { return mapper.call(null, input, function(err, data){ callback(err, data, number) }) }
stream.write = function (data) { if(ended) throw new Error('map stream is not writable') inNext = false inputs ++
try { //catch sync errors and handle them like async errors
var written = wrappedMapper(data, inputs, next) paused = (written === false) return !paused } catch (err) { //if the callback has been called syncronously, and the error
//has occured in an listener, throw it again.
if(inNext) throw err next(err) return !paused } }
function end (data) { //if end was called with args, write it,
ended = true //write will emit 'end' if ended is true
stream.writable = false if(data !== undefined) { return queueData(data, inputs) } else if (inputs == outputs) { //wait for processing
stream.readable = false, stream.emit('end'), stream.destroy() } }
stream.end = function (data) { if(ended) return end() }
stream.destroy = function () { ended = destroyed = true stream.writable = stream.readable = paused = false process.nextTick(function () { stream.emit('close') }) } stream.pause = function () { paused = true }
stream.resume = function () { paused = false }
return stream }
|