
Checkpoint
./gun-ws.js:13690106/3222 ./server.js:13690106/104
Showing
3 changed files
with
128 additions
and
4 deletions
gun-ws.js
0 → 100644
1 | ;(function(){ | ||
2 | var Gun = (typeof window !== "undefined")? window.Gun : require('gun'); | ||
3 | var url = require('url'); | ||
4 | |||
5 | Gun.on('opt', function mount(ctx){ | ||
6 | this.to.next(ctx); | ||
7 | var opt = ctx.opt; | ||
8 | if( !opt.peers ) | ||
9 | if( typeof( opt == "string" ) ) | ||
10 | opt.peers = [opt]; | ||
11 | |||
12 | if(ctx.once){ return } | ||
13 | if(false === opt.ws){ return } | ||
14 | var ws = opt.ws || (opt.ws = {}), batch; | ||
15 | |||
16 | if(opt.web){ | ||
17 | |||
18 | console.log('Initializing Gun WS socket!', ws.path); | ||
19 | |||
20 | ws.server = ws.server || opt.web; | ||
21 | ws.path = ws.path || '/gun'; | ||
22 | |||
23 | if (!ws.web) ws.web = new WebSocket.Server(ws); | ||
24 | |||
25 | ws.web.on('connection', function(wire){ | ||
26 | wire.upgradeReq = wire.upgradeReq || {}; | ||
27 | wire.url = url.parse(wire.upgradeReq.url||'', true); | ||
28 | wire.id = wire.id || Gun.text.random(6); | ||
29 | var peer = opt.peers[wire.id] = {wire: wire}; | ||
30 | wire.peer = function(){ return peer }; | ||
31 | ctx.on('hi', peer); | ||
32 | wire.on('message', function(msg){ | ||
33 | //console.log("MESSAGE", msg); | ||
34 | receive(msg, wire, ctx); // diff: wire is wire. | ||
35 | }); | ||
36 | wire.on('close', function(){ | ||
37 | ctx.on('bye', peer); | ||
38 | Gun.obj.del(opt.peers, wire.id); | ||
39 | }); | ||
40 | wire.on('error', function(e){}); | ||
41 | }); | ||
42 | } | ||
43 | |||
44 | ctx.on('out', function(at){ | ||
45 | this.to.next(at); | ||
46 | batch = JSON.stringify(at); | ||
47 | if(ws.drain){ | ||
48 | ws.drain.push(batch); | ||
49 | return; | ||
50 | } | ||
51 | ws.drain = []; | ||
52 | setTimeout(function(){ | ||
53 | if(!ws.drain){ return } | ||
54 | var tmp = ws.drain; | ||
55 | ws.drain = null; | ||
56 | if(!tmp.length){ return } | ||
57 | batch = JSON.stringify(tmp); | ||
58 | Gun.obj.map(opt.peers, send, ctx); | ||
59 | }, opt.gap || opt.wait || 1); | ||
60 | Gun.obj.map(opt.peers, send, ctx); | ||
61 | }); | ||
62 | |||
63 | // EVERY message taken care of. The "extra" ones are from in-memory not having "asked" for it yet - which we won't want it to do for foreign requests. Likewise, lots of chattyness because the put/ack replies happen before the `get` syncs so everybody now has it in-memory already to reply with. | ||
64 | function send(peer){ | ||
65 | var ctx = this, msg = batch; | ||
66 | var wire = peer.wire || open(peer, ctx); | ||
67 | if(!wire){ return } | ||
68 | if(wire.readyState === wire.OPEN){ | ||
69 | wire.send(msg); | ||
70 | return; | ||
71 | } | ||
72 | (peer.queue = peer.queue || []).push(msg); | ||
73 | } | ||
74 | function receive(msg, wire, ctx){ | ||
75 | if(!ctx){ return } | ||
76 | try{msg = JSON.parse(msg.data || msg); | ||
77 | }catch(e){} | ||
78 | if(msg instanceof Array){ | ||
79 | var i = 0, m; | ||
80 | while(m = msg[i++]){ | ||
81 | receive(m, wire, ctx); // wire not peer! | ||
82 | } | ||
83 | return; | ||
84 | } | ||
85 | msg.peer = wire.peer; | ||
86 | ctx.on('in', msg); | ||
87 | } | ||
88 | function open(peer, as){ | ||
89 | if(!peer || !peer.url){ return } | ||
90 | var url = peer.url.replace('http', 'ws'); | ||
91 | var wire = peer.wire = new WebSocket(url); | ||
92 | wire.on('close', function(){ | ||
93 | reconnect(peer, as); | ||
94 | }); | ||
95 | wire.on('error', function(error){ | ||
96 | if(!error){ return } | ||
97 | if(error.code === 'ECONNREFUSED'){ | ||
98 | reconnect(peer, as); // placement? | ||
99 | } | ||
100 | }); | ||
101 | wire.on('open', function(){ | ||
102 | var queue = peer.queue; | ||
103 | peer.queue = []; | ||
104 | Gun.obj.map(queue, function(msg){ | ||
105 | batch = msg; | ||
106 | send.call(as, peer); | ||
107 | }); | ||
108 | }); | ||
109 | wire.on('message', function(msg){ | ||
110 | receive(msg, wire, as); // diff: wire not peer! | ||
111 | }); | ||
112 | return wire; | ||
113 | } | ||
114 | |||
115 | function reconnect(peer, as){ | ||
116 | clearTimeout(peer.defer); | ||
117 | peer.defer = setTimeout(function(){ | ||
118 | open(peer, as); | ||
119 | }, 2 * 1000); | ||
120 | } | ||
121 | }); | ||
122 | var noop = function(){}; | ||
123 | }()); | ||
... | \ No newline at end of file | ... | \ No newline at end of file |
1 | {"jack\u001bname":{"":{":":"Jack",">":1586991984290}}} | ||
... | \ No newline at end of file | ... | \ No newline at end of file |
1 | {"j":{"ack\u001bname":{"":{":":"Jack",">":1587032540203}},"ill\u001bname":{"":{":":"Jill",">":1587032540204.002}}}} | ||
... | \ No newline at end of file | ... | \ No newline at end of file | ... | ... |
1 | const url = require('url'); | 1 | const url = require('url'); |
2 | const Gun = require('gun'); | 2 | const Gun = require('gun/gun'); |
3 | require('./gun-ws.js'); | ||
3 | const http = require('http'); | 4 | const http = require('http'); |
4 | const WebSocket = require('ws'); | 5 | const WebSocket = require('ws'); |
5 | var server = http.createServer(); | 6 | var server = http.createServer(); |
... | @@ -24,8 +25,8 @@ server.on('upgrade', async function (request, socket, head) { | ... | @@ -24,8 +25,8 @@ server.on('upgrade', async function (request, socket, head) { |
24 | console.log('Create id',pathname); | 25 | console.log('Create id',pathname); |
25 | // HOW HOW HOW ? IS it even possible to attach the WS to the Gun Object alone? | 26 | // HOW HOW HOW ? IS it even possible to attach the WS to the Gun Object alone? |
26 | // Works only when passing "server" to the web parameter, no WS/WSS) | 27 | // Works only when passing "server" to the web parameter, no WS/WSS) |
27 | //gun.server = new WebSocket.Server({ noServer: true}); | 28 | gun.server = new WebSocket.Server({ noServer: true}); |
28 | gun.gun = new Gun({peers:[], ws: { noServer: true }, web: undefined }); | 29 | gun.gun = new Gun({peers:[], ws: { noServer: true, path: pathname, web: gun.server }, web: gun.server }); |
29 | lru.set(pathname,gun); | 30 | lru.set(pathname,gun); |
30 | } | 31 | } |
31 | } | 32 | } | ... | ... |
-
Please register or sign in to post a comment