forked from meteor/meteor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream_client_nodejs.js
201 lines (165 loc) · 6.33 KB
/
stream_client_nodejs.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
// @param endpoint {String} URL to Meteor app
// "http://subdomain.meteor.com/" or "/" or
// "ddp+sockjs://foo-**.meteor.com/sockjs"
//
// We do some rewriting of the URL to eventually make it "ws://" or "wss://",
// whatever was passed in. At the very least, what Meteor.absoluteUrl() returns
// us should work.
//
// We don't do any heartbeating. (The logic that did this in sockjs was removed,
// because it used a built-in sockjs mechanism. We could do it with WebSocket
// ping frames or with DDP-level messages.)
LivedataTest.ClientStream = function (endpoint, options) {
var self = this;
options = options || {};
self.options = _.extend({
retry: true
}, options);
self.client = null; // created in _launchConnection
self.endpoint = endpoint;
self.headers = self.options.headers || {};
self._initCommon(self.options);
//// Kickoff!
self._launchConnection();
};
_.extend(LivedataTest.ClientStream.prototype, {
// data is a utf8 string. Data sent while not connected is dropped on
// the floor, and it is up the user of this API to retransmit lost
// messages on 'reset'
send: function (data) {
var self = this;
if (self.currentStatus.connected) {
self.client.send(data);
}
},
// Changes where this connection points
_changeUrl: function (url) {
var self = this;
self.endpoint = url;
},
_onConnect: function (client) {
var self = this;
if (client !== self.client) {
// This connection is not from the last call to _launchConnection.
// But _launchConnection calls _cleanup which closes previous connections.
// It's our belief that this stifles future 'open' events, but maybe
// we are wrong?
throw new Error("Got open from inactive client " + !!self.client);
}
if (self._forcedToDisconnect) {
// We were asked to disconnect between trying to open the connection and
// actually opening it. Let's just pretend this never happened.
self.client.close();
self.client = null;
return;
}
if (self.currentStatus.connected) {
// We already have a connection. It must have been the case that we
// started two parallel connection attempts (because we wanted to
// 'reconnect now' on a hanging connection and we had no way to cancel the
// connection attempt.) But this shouldn't happen (similarly to the client
// !== self.client check above).
throw new Error("Two parallel connections?");
}
self._clearConnectionTimer();
// update status
self.currentStatus.status = "connected";
self.currentStatus.connected = true;
self.currentStatus.retryCount = 0;
self.statusChanged();
// fire resets. This must come after status change so that clients
// can call send from within a reset callback.
_.each(self.eventCallbacks.reset, function (callback) { callback(); });
},
_cleanup: function (maybeError) {
var self = this;
self._clearConnectionTimer();
if (self.client) {
var client = self.client;
self.client = null;
client.close();
_.each(self.eventCallbacks.disconnect, function (callback) {
callback(maybeError);
});
}
},
_clearConnectionTimer: function () {
var self = this;
if (self.connectionTimer) {
clearTimeout(self.connectionTimer);
self.connectionTimer = null;
}
},
_getProxyUrl: function (targetUrl) {
var self = this;
// Similar to code in tools/http-helpers.js.
var proxy = process.env.HTTP_PROXY || process.env.http_proxy || null;
// if we're going to a secure url, try the https_proxy env variable first.
if (targetUrl.match(/^wss:/)) {
proxy = process.env.HTTPS_PROXY || process.env.https_proxy || proxy;
}
return proxy;
},
_launchConnection: function () {
var self = this;
self._cleanup(); // cleanup the old socket, if there was one.
// Since server-to-server DDP is still an experimental feature, we only
// require the module if we actually create a server-to-server
// connection.
var FayeWebSocket = Npm.require('faye-websocket');
var deflate = Npm.require('permessage-deflate');
var targetUrl = toWebsocketUrl(self.endpoint);
var fayeOptions = {
headers: self.headers,
extensions: [deflate]
};
var proxyUrl = self._getProxyUrl(targetUrl);
if (proxyUrl) {
fayeOptions.proxy = { origin: proxyUrl };
};
// We would like to specify 'ddp' as the subprotocol here. The npm module we
// used to use as a client would fail the handshake if we ask for a
// subprotocol and the server doesn't send one back (and sockjs doesn't).
// Faye doesn't have that behavior; it's unclear from reading RFC 6455 if
// Faye is erroneous or not. So for now, we don't specify protocols.
var subprotocols = [];
var client = self.client = new FayeWebSocket.Client(
targetUrl, subprotocols, fayeOptions);
self._clearConnectionTimer();
self.connectionTimer = Meteor.setTimeout(
function () {
self._lostConnection(
new DDP.ConnectionError("DDP connection timed out"));
},
self.CONNECT_TIMEOUT);
self.client.on('open', Meteor.bindEnvironment(function () {
return self._onConnect(client);
}, "stream connect callback"));
var clientOnIfCurrent = function (event, description, f) {
self.client.on(event, Meteor.bindEnvironment(function () {
// Ignore events from any connection we've already cleaned up.
if (client !== self.client)
return;
f.apply(this, arguments);
}, description));
};
clientOnIfCurrent('error', 'stream error callback', function (error) {
if (!self.options._dontPrintErrors)
Meteor._debug("stream error", error.message);
// Faye's 'error' object is not a JS error (and among other things,
// doesn't stringify well). Convert it to one.
self._lostConnection(new DDP.ConnectionError(error.message));
});
clientOnIfCurrent('close', 'stream close callback', function () {
self._lostConnection();
});
clientOnIfCurrent('message', 'stream message callback', function (message) {
// Ignore binary frames, where message.data is a Buffer
if (typeof message.data !== "string")
return;
_.each(self.eventCallbacks.message, function (callback) {
callback(message.data);
});
});
}
});