streampipe.js 2.37 KB
const stream = require('stream')
const Gun = require('gun')

const randomString = length => {
  const possible = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
  let text = ''
  for (var i = 0; i < length; i++) {
    text += possible[Math.floor(Math.random() * possible.length)]
  }
  return text
}

// Generator
// generates stream of json {id: "", name: "", description: ""}
class Generator extends stream.Readable {
  constructor(n) {
    super()

    this._n = n
    this._i = 0
  }

  _read(size) {
    if (this._i < this._n) {
      this.push(
        JSON.stringify({
          id: `${this._i}`,
          name: randomString(20),
          description: randomString(200)
        })
      )
      this.push('\n')
      this._i++
    } else {
      this.push(null)
    }
  }
}

// Line
// read line by line as stream comes through it
class Line extends stream.Transform {
  constructor() {
    super()
    this.buff = ''
  }

  trySendLine() {
    let index = this.buff.indexOf(`\n`)

    while (index !== -1) {
      const line = this.buff.slice(0, index)
      this.buff = this.buff.slice(index + 1)

      this.push(line)

      index = this.buff.indexOf(`\n`)
    }
  }

  _transform(chunk, enc, cb) {
    this.buff += chunk.toString()

    this.trySendLine()

    cb()
  }

  end() {
    this.trySendLine()

    if (this.buff.length > 0) {
      this.push(this.buff)
    }
  }
}

// Graph
// parse the chunk and tries to add it to table
class Graph extends stream.Transform {
  constructor() {
    super()

    this.db = new Gun({
      //file: 'graph.json'
      localStorage: false
    })
    this.items = this.db.get('items')
  }

  _transform(chunk, enc, cb) {
    const json = JSON.parse(chunk.toString())
    const item = this.db.get(json.id)

    item.put(
      {
        id: json.id,
        name: json.name,
        description: json.description
      },
      () => {
        this.items.set(item, () => {
          this.push(chunk)
          cb()
        })
      }
    )
  }
}

// Report
// shows how many item has pass through the system
class Report extends stream.Transform {
  constructor() {
    super()
    this._count = 0
  }

  _transform(chunk, enc, cb) {
    this.push(`count: ${this._count++}\r`)
    cb()
  }
}

const generator = new Generator(5000)
const line = new Line()
const graph = new Graph()
const report = new Report()

generator
  .pipe(line)
  .pipe(graph)
  .pipe(report)
  .pipe(process.stdout)