const { createReadStream } = require('fs')
const { Transform } = require('stream')
const { through } = require('mississippi')
class GetEmail extends Transform {
constructor(options) {
super(options)
}
_transform(chunk, enc, cb) {
cb(null, chunk['email'])
}
}
const map = f => through.obj((chunk, enc, cb) =>
cb(null, f(chunk)))
const stream = createReadStream('data.json')
.pipe(map(x => JSON.parse(x.toString())))
.pipe(new GetEmail({ objectMode: true }))
stream.on('error', err => console.error(err))
stream.on('readable', () => {
while (null !== (chunk = stream.read())) {
console.log(chunk)
}
})