mesh.js
11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
var Type = require('../type');
function Mesh(root){
var mesh = function(){};
var opt = root.opt || {};
opt.log = opt.log || console.log;
opt.gap = opt.gap || opt.wait || 0;
opt.pack = opt.pack || (opt.memory? (opt.memory * 1000 * 1000) : 1399000000) * 0.3; // max_old_space_size defaults to 1400 MB.
opt.puff = opt.puff || 9; // IDEA: do a start/end benchmark, divide ops/result.
var puff = setTimeout.puff || setTimeout;
var dup = root.dup, dup_check = dup.check, dup_track = dup.track;
var hear = mesh.hear = function(raw, peer){
if(!raw){ return }
if(opt.pack <= raw.length){ return mesh.say({dam: '!', err: "Message too big!"}, peer) }
var msg, id, hash, tmp = raw[0], DBG;
if(mesh === this){ hear.d += raw.length||0 ; ++hear.c } // STATS!
if('[' === tmp){
try{msg = JSON.parse(raw)}catch(e){opt.log('DAM JSON parse error', e)}
raw = '';
if(!msg){ return }
console.STAT && console.STAT(+new Date, msg.length, '# on hear batch');
var P = opt.puff;
(function go(){
var S = +new Date;
//var P = peer.puff || opt.puff, s = +new Date; // TODO: For future, but in mix?
var i = 0, m; while(i < P && (m = msg[i++])){ hear(m, peer) }
//peer.puff = Math.ceil((+new Date - s)? P * 1.1 : P * 0.9);
msg = msg.slice(i); // slicing after is faster than shifting during.
console.STAT && console.STAT(S, +new Date - S, 'hear loop');
flush(peer); // force send all synchronously batched acks.
if(!msg.length){ return }
puff(go, 0);
}());
return;
}
if('{' === tmp || ((raw['#'] || obj_is(raw)) && (msg = raw))){
try{msg = msg || JSON.parse(raw);
}catch(e){return opt.log('DAM JSON parse error', e)}
if(!msg){ return }
if(msg.DBG){ msg.DBG = DBG = {DBG: msg.DBG} }
DBG && (DBG.hp = +new Date);
if(!(id = msg['#'])){ id = msg['#'] = Type.text.random(9) }
if(tmp = dup_check(id)){ return }
/*if(!(hash = msg['##']) && u !== msg.put){ hash = msg['##'] = Type.obj.hash(msg.put) }
if(hash && (tmp = msg['@'] || (msg.get && id))){ // Reduces backward daisy in case varying hashes at different daisy depths are the same.
if(dup.check(tmp+hash)){ return }
dup.track(tmp+hash, true).it = it(msg); // GUN core also dedups, so `true` is needed. // Does GUN core need to dedup anymore?
}
if(tmp = msg['><']){ (msg._).to = Type.obj.map(tmp.split(','), tomap) }
*/ // TOOD: COME BACK TO THIS LATER!!! IMPORTANT MESH STUFF!!
(msg._ = function(){}).via = mesh.leap = peer;
if(tmp = msg.dam){
if(tmp = mesh.hear[tmp]){
tmp(msg, peer, root);
}
dup_track(id);
return;
}
var S = +new Date, ST;
DBG && (DBG.is = S);
root.on('in', msg);
//ECHO = msg.put || ECHO; !(msg.ok !== -3740) && mesh.say({ok: -3740, put: ECHO, '@': msg['#']}, peer);
DBG && (DBG.hd = +new Date);
console.STAT && (ST = +new Date - S) > 9 && console.STAT(S, ST, 'msg'); // TODO: PERF: caught one > 1.5s on tgif
dup_track(id).via = peer;
mesh.leap = null; // warning! mesh.leap could be buggy.
}
}
var tomap = function(k,i,m){m(k,true)};
var noop = function(){};
hear.c = hear.d = 0;
;(function(){
var SMIA = 0;
var message, loop;
function each(peer){ mesh.say(message, peer) }
var say = mesh.say = function(msg, peer){
if(this && this.to){ this.to.next(msg) } // compatible with middleware adapters.
if(!msg){ return false }
var id, hash, tmp, raw;
var DBG = msg.DBG, S; if(!peer){ S = +new Date ; DBG && (DBG.y = S) }
var meta = msg._||(msg._=function(){});
if(!(id = msg['#'])){ id = msg['#'] = Type.text.random(9) }
//if(!(hash = msg['##']) && u !== msg.put){ hash = msg['##'] = Type.obj.hash(msg.put) }
if(!(raw = meta.raw)){
raw = mesh.raw(msg);
/*if(hash && (tmp = msg['@'])){
dup.track(tmp+hash).it = it(msg);
if(tmp = (dup.s[tmp]||ok).it){
if(hash === tmp['##']){ return false }
tmp['##'] = hash;
}
}*/
}
S && console.STAT && console.STAT(S, +new Date - S, 'say prep');
!loop && dup_track(id);//.it = it(msg); // track for 9 seconds, default. Earth<->Mars would need more! // always track, maybe move this to the 'after' logic if we split function.
//console.log("SEND!", JSON.parse(JSON.stringify(msg)));
if(!peer && (tmp = msg['@'])){ peer = ((tmp = dup.s[tmp]) && (tmp.via || ((tmp = tmp.it) && (tmp = tmp._) && tmp.via))) || mesh.leap } // warning! mesh.leap could be buggy!
if(!peer && msg['@']){
console.STAT && console.STAT(+new Date, ++SMIA, 'total no peer to ack to');
return false;
} // TODO: Temporary? If ack via trace has been lost, acks will go to all peers, which trashes browser bandwidth. Not relaying the ack will force sender to ask for ack again. Note, this is technically wrong for mesh behavior.
if(!peer && mesh.way){ return mesh.way(msg) }
if(!peer || !peer.id){ message = msg;
if(!Type.obj.is(peer || opt.peers)){ return false }
var P = opt.puff, ps = opt.peers, pl = Object.keys(peer || opt.peers || {}); // TODO: BETTER PERF? No object.keys? It is polyfilled by Type.js tho.
;(function go(){
var S = +new Date;
//Type.obj.map(peer || opt.peers, each); // in case peer is a peer list.
loop = 1; var wr = meta.raw; meta.raw = raw; // quick perf hack
var i = 0, p; while(i < 9 && (p = (pl||'')[i++])){
if(!(p = ps[p])){ continue }
say(msg, p);
}
meta.raw = wr; loop = 0;
pl = pl.slice(i); // slicing after is faster than shifting during.
console.STAT && console.STAT(S, +new Date - S, 'say loop');
if(!pl.length){ return }
puff(go, 0);
dup_track(msg['@']); // keep for later
}());
return;
}
// TODO: PERF: consider splitting function here, so say loops do less work.
if(!peer.wire && mesh.wire){ mesh.wire(peer) }
if(id === peer.last){ return } peer.last = id; // was it just sent?
if(peer === meta.via){ return false } // don't send back to self.
if((tmp = meta.to) && (tmp[peer.url] || tmp[peer.pid] || tmp[peer.id]) /*&& !o*/){ return false }
if(peer.batch){
peer.tail = (tmp = peer.tail || 0) + raw.length;
if(peer.tail <= opt.pack){
//peer.batch.push(raw);
peer.batch += (tmp?',':'')+raw; // TODO: Prevent double JSON! // FOR v1.0 !?
return;
}
flush(peer);
}
//peer.batch = [];
peer.batch = '['; // TODO: Prevent double JSON!
var S = +new Date, ST;
setTimeout(function(){
console.STAT && (ST = +new Date - S) > 9 && console.STAT(S, ST, '0ms TO', id, peer.id);
flush(peer);
}, opt.gap);
send(raw, peer);
}
mesh.say.c = mesh.say.d = 0;
}());
function flush(peer){
var tmp = peer.batch, t = 'string' == typeof tmp, l;
if(t){ tmp += ']' }// TODO: Prevent double JSON!
peer.batch = peer.tail = null;
if(!tmp){ return }
if(t? 3 > tmp.length : !tmp.length){ return } // TODO: ^
if(!t){try{tmp = (1 === tmp.length? tmp[0] : JSON.stringify(tmp));
}catch(e){return opt.log('DAM JSON stringify error', e)}}
if(!tmp){ return }
send(tmp, peer);
}
// for now - find better place later.
function send(raw, peer){ try{
var wire = peer.wire;
if(peer.say){
peer.say(raw);
} else
if(wire.send){
wire.send(raw);
}
mesh.say.d += raw.length||0; ++mesh.say.c; // STATS!
}catch(e){
(peer.queue = peer.queue || []).push(raw);
}}
;(function(){
// TODO: this caused a out-of-memory crash!
mesh.raw = function(msg){ // TODO: Clean this up / delete it / move logic out!
if(!msg){ return '' }
var meta = (msg._) || {}, put, hash, tmp;
if(tmp = meta.raw){ return tmp }
if('string' == typeof msg){ return msg }
/*if(!msg.dam){ // TOOD: COME BACK TO THIS LATER!!! IMPORTANT MESH STUFF!!
var i = 0, to = []; Type.obj.map(opt.peers, function(p){
to.push(p.url || p.pid || p.id); if(++i > 3){ return true } // limit server, fast fix, improve later! // For "tower" peer, MUST include 6 surrounding ids. // REDUCED THIS TO 3 for temporary relay peer performance, towers still should list neighbors.
}); if(i > 1){ msg['><'] = to.join() }
}*/ // TOOD: COME BACK TO THIS LATER!!! IMPORTANT MESH STUFF!!
var raw = $(msg); // optimize by reusing put = the JSON.stringify from .hash?
/*if(u !== put){
tmp = raw.indexOf(_, raw.indexOf('put'));
raw = raw.slice(0, tmp-1) + put + raw.slice(tmp + _.length + 1);
//raw = raw.replace('"'+ _ +'"', put); // NEVER USE THIS! ALSO NEVER DELETE IT TO NOT MAKE SAME MISTAKE! https://github.com/amark/gun/wiki/@$$ Heisenbug
}*/
// TODO: PERF: tgif, CPU way too much on re-JSONifying ^ it.
/*
// NOTE TO SELF: Switch NTS to DAM now.
*/
if(meta && (raw||'').length < (1000 * 100)){ meta.raw = raw } // HNPERF: If string too big, don't keep in memory.
return raw;
}
var $ = JSON.stringify, _ = ':])([:';
}());
mesh.hi = function(peer){
var tmp = peer.wire || {};
if(peer.id){
opt.peers[peer.url || peer.id] = peer;
} else {
tmp = peer.id = peer.id || Type.text.random(9);
mesh.say({dam: '?', pid: root.opt.pid}, opt.peers[tmp] = peer);
delete dup.s[peer.last]; // IMPORTANT: see https://gun.eco/docs/DAM#self
}
peer.met = peer.met || +(new Date);
if(!tmp.hied){ root.on(tmp.hied = 'hi', peer) }
// @rogowski I need this here by default for now to fix go1dfish's bug
tmp = peer.queue; peer.queue = [];
Type.obj.map(tmp, function(msg){
send(msg, peer);
});
Type.obj.native && Type.obj.native(); // dirty place to check if other JS polluted.
}
mesh.bye = function(peer){
root.on('bye', peer);
var tmp = +(new Date); tmp = (tmp - (peer.met||tmp));
mesh.bye.time = ((mesh.bye.time || tmp) + tmp) / 2;
}
mesh.hear['!'] = function(msg, peer){ opt.log('Error:', msg.err) }
mesh.hear['?'] = function(msg, peer){
if(msg.pid){
if(!peer.pid){ peer.pid = msg.pid }
if(msg['@']){ return }
}
mesh.say({dam: '?', pid: opt.pid, '@': msg['#']}, peer);
delete dup.s[peer.last]; // IMPORTANT: see https://gun.eco/docs/DAM#self
}
root.on('create', function(root){
root.opt.pid = root.opt.pid || Type.text.random(9);
this.to.next(root);
root.on('out', mesh.say);
});
root.on('bye', function(peer, tmp){
peer = opt.peers[peer.id || peer] || peer;
this.to.next(peer);
peer.bye? peer.bye() : (tmp = peer.wire) && tmp.close && tmp.close();
Type.obj.del(opt.peers, peer.id);
peer.wire = null;
});
var gets = {};
root.on('bye', function(peer, tmp){ this.to.next(peer);
if(!(tmp = peer.url)){ return } gets[tmp] = true;
setTimeout(function(){ delete gets[tmp] },opt.lack || 9000);
});
root.on('hi', function(peer, tmp){ this.to.next(peer);
if(!(tmp = peer.url) || !gets[tmp]){ return } delete gets[tmp];
if(opt.super){ return } // temporary (?) until we have better fix/solution?
Type.obj.map(root.next, function(node, soul){
tmp = {}; tmp[soul] = root.graph[soul];
mesh.say({'##': Type.obj.hash(tmp), get: {'#': soul}}, peer);
})
});
return mesh;
}
;(function(){
var $ = JSON.stringify, u;
Type.obj.hash = function(obj, hash){
if(!hash && u === (obj = $(obj, sort))){ return }
return Type.text.hash(hash || obj || '');
}
function sort(k, v){ var tmp;
if(!(v instanceof Object)){ return v }
var S = +new Date;
Type.obj.map(Object.keys(v).sort(), map, {to: tmp = {}, on: v});
console.STAT && console.STAT(S, +new Date - S, 'sort');
return tmp;
}
Type.obj.hash.sort = sort;
function map(k){
this.to[k] = this.on[k];
}
}());
function it(msg){ return msg || {_: msg._, '##': msg['##']} } // HNPERF: Only need some meta data, not full reference (took up too much memory). // HNPERF: Garrrgh! We add meta data to msg over time, copying the object happens to early.
var empty = {}, ok = true, u;
var obj_is = Type.obj.is, obj_map = Type.obj.map;
try{ module.exports = Mesh }catch(e){}