streampipe.js
2.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
const stream = require('stream')
const Gun = require('gun')
const randomString = length => {
const possible = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
let text = ''
for (var i = 0; i < length; i++) {
text += possible[Math.floor(Math.random() * possible.length)]
}
return text
}
// Generator
// generates stream of json {id: "", name: "", description: ""}
class Generator extends stream.Readable {
constructor(n) {
super()
this._n = n
this._i = 0
}
_read(size) {
if (this._i < this._n) {
this.push(
JSON.stringify({
id: `${this._i}`,
name: randomString(20),
description: randomString(200)
})
)
this.push('\n')
this._i++
} else {
this.push(null)
}
}
}
// Line
// read line by line as stream comes through it
class Line extends stream.Transform {
constructor() {
super()
this.buff = ''
}
trySendLine() {
let index = this.buff.indexOf(`\n`)
while (index !== -1) {
const line = this.buff.slice(0, index)
this.buff = this.buff.slice(index + 1)
this.push(line)
index = this.buff.indexOf(`\n`)
}
}
_transform(chunk, enc, cb) {
this.buff += chunk.toString()
this.trySendLine()
cb()
}
end() {
this.trySendLine()
if (this.buff.length > 0) {
this.push(this.buff)
}
}
}
// Graph
// parse the chunk and tries to add it to table
class Graph extends stream.Transform {
constructor() {
super()
this.db = new Gun({
//file: 'graph.json'
localStorage: false
})
this.items = this.db.get('items')
}
_transform(chunk, enc, cb) {
const json = JSON.parse(chunk.toString())
const item = this.db.get(json.id)
item.put(
{
id: json.id,
name: json.name,
description: json.description
},
() => {
this.items.set(item, () => {
this.push(chunk)
cb()
})
}
)
}
}
// Report
// shows how many item has pass through the system
class Report extends stream.Transform {
constructor() {
super()
this._count = 0
}
_transform(chunk, enc, cb) {
this.push(`count: ${this._count++}\r`)
cb()
}
}
const generator = new Generator(5000)
const line = new Line()
const graph = new Graph()
const report = new Report()
generator
.pipe(line)
.pipe(graph)
.pipe(report)
.pipe(process.stdout)