const { createReadStream } = require('fs')
const { Transform } = require('stream')
const { through } = require('mississippi')

class GetEmail extends Transform {
  constructor(options) {
    super(options)
  }
  _transform(chunk, enc, cb) {
    cb(null, chunk['email'])
  }
}

const map = f => through.obj((chunk, enc, cb) =>
                             cb(null, f(chunk)))

const stream = createReadStream('data.json')
  .pipe(map(x => JSON.parse(x.toString())))
  .pipe(new GetEmail({ objectMode: true }))

stream.on('error', err => console.error(err))
stream.on('readable', () => {
  while (null !== (chunk = stream.read())) {
    console.log(chunk)
  }
})