forked from meteor/meteor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlivedata_connection.js
1657 lines (1481 loc) · 60.8 KB
/
livedata_connection.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
if (Meteor.isServer) {
var path = Npm.require('path');
var Fiber = Npm.require('fibers');
var Future = Npm.require(path.join('fibers', 'future'));
}
// @param url {String|Object} URL to Meteor app,
// or an object as a test hook (see code)
// Options:
// reloadWithOutstanding: is it OK to reload if there are outstanding methods?
// headers: extra headers to send on the websockets connection, for
// server-to-server DDP only
// _sockjsOptions: Specifies options to pass through to the sockjs client
// onDDPNegotiationVersionFailure: callback when version negotiation fails.
//
// XXX There should be a way to destroy a DDP connection, causing all
// outstanding method calls to fail.
//
// XXX Our current way of handling failure and reconnection is great
// for an app (where we want to tolerate being disconnected as an
// expect state, and keep trying forever to reconnect) but cumbersome
// for something like a command line tool that wants to make a
// connection, call a method, and print an error if connection
// fails. We should have better usability in the latter case (while
// still transparently reconnecting if it's just a transient failure
// or the server migrating us).
var Connection = function (url, options) {
var self = this;
options = _.extend({
onConnected: function () {},
onDDPVersionNegotiationFailure: function (description) {
Meteor._debug(description);
},
heartbeatInterval: 17500,
heartbeatTimeout: 15000,
// These options are only for testing.
reloadWithOutstanding: false,
supportedDDPVersions: DDPCommon.SUPPORTED_DDP_VERSIONS,
retry: true,
respondToPings: true
}, options);
// If set, called when we reconnect, queuing method calls _before_ the
// existing outstanding ones. This is the only data member that is part of the
// public API!
self.onReconnect = null;
// as a test hook, allow passing a stream instead of a url.
if (typeof url === "object") {
self._stream = url;
} else {
self._stream = new LivedataTest.ClientStream(url, {
retry: options.retry,
headers: options.headers,
_sockjsOptions: options._sockjsOptions,
// Used to keep some tests quiet, or for other cases in which
// the right thing to do with connection errors is to silently
// fail (e.g. sending package usage stats). At some point we
// should have a real API for handling client-stream-level
// errors.
_dontPrintErrors: options._dontPrintErrors,
connectTimeoutMs: options.connectTimeoutMs
});
}
self._lastSessionId = null;
self._versionSuggestion = null; // The last proposed DDP version.
self._version = null; // The DDP version agreed on by client and server.
self._stores = {}; // name -> object with methods
self._methodHandlers = {}; // name -> func
self._nextMethodId = 1;
self._supportedDDPVersions = options.supportedDDPVersions;
self._heartbeatInterval = options.heartbeatInterval;
self._heartbeatTimeout = options.heartbeatTimeout;
// Tracks methods which the user has tried to call but which have not yet
// called their user callback (ie, they are waiting on their result or for all
// of their writes to be written to the local cache). Map from method ID to
// MethodInvoker object.
self._methodInvokers = {};
// Tracks methods which the user has called but whose result messages have not
// arrived yet.
//
// _outstandingMethodBlocks is an array of blocks of methods. Each block
// represents a set of methods that can run at the same time. The first block
// represents the methods which are currently in flight; subsequent blocks
// must wait for previous blocks to be fully finished before they can be sent
// to the server.
//
// Each block is an object with the following fields:
// - methods: a list of MethodInvoker objects
// - wait: a boolean; if true, this block had a single method invoked with
// the "wait" option
//
// There will never be adjacent blocks with wait=false, because the only thing
// that makes methods need to be serialized is a wait method.
//
// Methods are removed from the first block when their "result" is
// received. The entire first block is only removed when all of the in-flight
// methods have received their results (so the "methods" list is empty) *AND*
// all of the data written by those methods are visible in the local cache. So
// it is possible for the first block's methods list to be empty, if we are
// still waiting for some objects to quiesce.
//
// Example:
// _outstandingMethodBlocks = [
// {wait: false, methods: []},
// {wait: true, methods: [<MethodInvoker for 'login'>]},
// {wait: false, methods: [<MethodInvoker for 'foo'>,
// <MethodInvoker for 'bar'>]}]
// This means that there were some methods which were sent to the server and
// which have returned their results, but some of the data written by
// the methods may not be visible in the local cache. Once all that data is
// visible, we will send a 'login' method. Once the login method has returned
// and all the data is visible (including re-running subs if userId changes),
// we will send the 'foo' and 'bar' methods in parallel.
self._outstandingMethodBlocks = [];
// method ID -> array of objects with keys 'collection' and 'id', listing
// documents written by a given method's stub. keys are associated with
// methods whose stub wrote at least one document, and whose data-done message
// has not yet been received.
self._documentsWrittenByStub = {};
// collection -> IdMap of "server document" object. A "server document" has:
// - "document": the version of the document according the
// server (ie, the snapshot before a stub wrote it, amended by any changes
// received from the server)
// It is undefined if we think the document does not exist
// - "writtenByStubs": a set of method IDs whose stubs wrote to the document
// whose "data done" messages have not yet been processed
self._serverDocuments = {};
// Array of callbacks to be called after the next update of the local
// cache. Used for:
// - Calling methodInvoker.dataVisible and sub ready callbacks after
// the relevant data is flushed.
// - Invoking the callbacks of "half-finished" methods after reconnect
// quiescence. Specifically, methods whose result was received over the old
// connection (so we don't re-send it) but whose data had not been made
// visible.
self._afterUpdateCallbacks = [];
// In two contexts, we buffer all incoming data messages and then process them
// all at once in a single update:
// - During reconnect, we buffer all data messages until all subs that had
// been ready before reconnect are ready again, and all methods that are
// active have returned their "data done message"; then
// - During the execution of a "wait" method, we buffer all data messages
// until the wait method gets its "data done" message. (If the wait method
// occurs during reconnect, it doesn't get any special handling.)
// all data messages are processed in one update.
//
// The following fields are used for this "quiescence" process.
// This buffers the messages that aren't being processed yet.
self._messagesBufferedUntilQuiescence = [];
// Map from method ID -> true. Methods are removed from this when their
// "data done" message is received, and we will not quiesce until it is
// empty.
self._methodsBlockingQuiescence = {};
// map from sub ID -> true for subs that were ready (ie, called the sub
// ready callback) before reconnect but haven't become ready again yet
self._subsBeingRevived = {}; // map from sub._id -> true
// if true, the next data update should reset all stores. (set during
// reconnect.)
self._resetStores = false;
// name -> array of updates for (yet to be created) collections
self._updatesForUnknownStores = {};
// if we're blocking a migration, the retry func
self._retryMigrate = null;
// metadata for subscriptions. Map from sub ID to object with keys:
// - id
// - name
// - params
// - inactive (if true, will be cleaned up if not reused in re-run)
// - ready (has the 'ready' message been received?)
// - readyCallback (an optional callback to call when ready)
// - errorCallback (an optional callback to call if the sub terminates with
// an error, XXX COMPAT WITH 1.0.3.1)
// - stopCallback (an optional callback to call when the sub terminates
// for any reason, with an error argument if an error triggered the stop)
self._subscriptions = {};
// Reactive userId.
self._userId = null;
self._userIdDeps = new Tracker.Dependency;
// Block auto-reload while we're waiting for method responses.
if (Meteor.isClient && Package.reload && !options.reloadWithOutstanding) {
Package.reload.Reload._onMigrate(function (retry) {
if (!self._readyToMigrate()) {
if (self._retryMigrate)
throw new Error("Two migrations in progress?");
self._retryMigrate = retry;
return false;
} else {
return [true];
}
});
}
var onMessage = function (raw_msg) {
try {
var msg = DDPCommon.parseDDP(raw_msg);
} catch (e) {
Meteor._debug("Exception while parsing DDP", e);
return;
}
// Any message counts as receiving a pong, as it demonstrates that
// the server is still alive.
if (self._heartbeat) {
self._heartbeat.messageReceived();
}
if (msg === null || !msg.msg) {
// XXX COMPAT WITH 0.6.6. ignore the old welcome message for back
// compat. Remove this 'if' once the server stops sending welcome
// messages (stream_server.js).
if (! (msg && msg.server_id))
Meteor._debug("discarding invalid livedata message", msg);
return;
}
if (msg.msg === 'connected') {
self._version = self._versionSuggestion;
self._livedata_connected(msg);
options.onConnected();
}
else if (msg.msg === 'failed') {
if (_.contains(self._supportedDDPVersions, msg.version)) {
self._versionSuggestion = msg.version;
self._stream.reconnect({_force: true});
} else {
var description =
"DDP version negotiation failed; server requested version " + msg.version;
self._stream.disconnect({_permanent: true, _error: description});
options.onDDPVersionNegotiationFailure(description);
}
}
else if (msg.msg === 'ping' && options.respondToPings) {
self._send({msg: "pong", id: msg.id});
}
else if (msg.msg === 'pong') {
// noop, as we assume everything's a pong
}
else if (_.include(['added', 'changed', 'removed', 'ready', 'updated'], msg.msg))
self._livedata_data(msg);
else if (msg.msg === 'nosub')
self._livedata_nosub(msg);
else if (msg.msg === 'result')
self._livedata_result(msg);
else if (msg.msg === 'error')
self._livedata_error(msg);
else
Meteor._debug("discarding unknown livedata message type", msg);
};
var onReset = function () {
// Send a connect message at the beginning of the stream.
// NOTE: reset is called even on the first connection, so this is
// the only place we send this message.
var msg = {msg: 'connect'};
if (self._lastSessionId)
msg.session = self._lastSessionId;
msg.version = self._versionSuggestion || self._supportedDDPVersions[0];
self._versionSuggestion = msg.version;
msg.support = self._supportedDDPVersions;
self._send(msg);
// Now, to minimize setup latency, go ahead and blast out all of
// our pending methods ands subscriptions before we've even taken
// the necessary RTT to know if we successfully reconnected. (1)
// They're supposed to be idempotent; (2) even if we did
// reconnect, we're not sure what messages might have gotten lost
// (in either direction) since we were disconnected (TCP being
// sloppy about that.)
// If the current block of methods all got their results (but didn't all get
// their data visible), discard the empty block now.
if (! _.isEmpty(self._outstandingMethodBlocks) &&
_.isEmpty(self._outstandingMethodBlocks[0].methods)) {
self._outstandingMethodBlocks.shift();
}
// Mark all messages as unsent, they have not yet been sent on this
// connection.
_.each(self._methodInvokers, function (m) {
m.sentMessage = false;
});
// If an `onReconnect` handler is set, call it first. Go through
// some hoops to ensure that methods that are called from within
// `onReconnect` get executed _before_ ones that were originally
// outstanding (since `onReconnect` is used to re-establish auth
// certificates)
if (self.onReconnect)
self._callOnReconnectAndSendAppropriateOutstandingMethods();
else
self._sendOutstandingMethods();
// add new subscriptions at the end. this way they take effect after
// the handlers and we don't see flicker.
_.each(self._subscriptions, function (sub, id) {
self._send({
msg: 'sub',
id: id,
name: sub.name,
params: sub.params
});
});
};
var onDisconnect = function () {
if (self._heartbeat) {
self._heartbeat.stop();
self._heartbeat = null;
}
};
if (Meteor.isServer) {
self._stream.on('message', Meteor.bindEnvironment(onMessage, "handling DDP message"));
self._stream.on('reset', Meteor.bindEnvironment(onReset, "handling DDP reset"));
self._stream.on('disconnect', Meteor.bindEnvironment(onDisconnect, "handling DDP disconnect"));
} else {
self._stream.on('message', onMessage);
self._stream.on('reset', onReset);
self._stream.on('disconnect', onDisconnect);
}
};
// A MethodInvoker manages sending a method to the server and calling the user's
// callbacks. On construction, it registers itself in the connection's
// _methodInvokers map; it removes itself once the method is fully finished and
// the callback is invoked. This occurs when it has both received a result,
// and the data written by it is fully visible.
var MethodInvoker = function (options) {
var self = this;
// Public (within this file) fields.
self.methodId = options.methodId;
self.sentMessage = false;
self._callback = options.callback;
self._connection = options.connection;
self._message = options.message;
self._onResultReceived = options.onResultReceived || function () {};
self._wait = options.wait;
self._methodResult = null;
self._dataVisible = false;
// Register with the connection.
self._connection._methodInvokers[self.methodId] = self;
};
_.extend(MethodInvoker.prototype, {
// Sends the method message to the server. May be called additional times if
// we lose the connection and reconnect before receiving a result.
sendMessage: function () {
var self = this;
// This function is called before sending a method (including resending on
// reconnect). We should only (re)send methods where we don't already have a
// result!
if (self.gotResult())
throw new Error("sendingMethod is called on method with result");
// If we're re-sending it, it doesn't matter if data was written the first
// time.
self._dataVisible = false;
self.sentMessage = true;
// If this is a wait method, make all data messages be buffered until it is
// done.
if (self._wait)
self._connection._methodsBlockingQuiescence[self.methodId] = true;
// Actually send the message.
self._connection._send(self._message);
},
// Invoke the callback, if we have both a result and know that all data has
// been written to the local cache.
_maybeInvokeCallback: function () {
var self = this;
if (self._methodResult && self._dataVisible) {
// Call the callback. (This won't throw: the callback was wrapped with
// bindEnvironment.)
self._callback(self._methodResult[0], self._methodResult[1]);
// Forget about this method.
delete self._connection._methodInvokers[self.methodId];
// Let the connection know that this method is finished, so it can try to
// move on to the next block of methods.
self._connection._outstandingMethodFinished();
}
},
// Call with the result of the method from the server. Only may be called
// once; once it is called, you should not call sendMessage again.
// If the user provided an onResultReceived callback, call it immediately.
// Then invoke the main callback if data is also visible.
receiveResult: function (err, result) {
var self = this;
if (self.gotResult())
throw new Error("Methods should only receive results once");
self._methodResult = [err, result];
self._onResultReceived(err, result);
self._maybeInvokeCallback();
},
// Call this when all data written by the method is visible. This means that
// the method has returns its "data is done" message *AND* all server
// documents that are buffered at that time have been written to the local
// cache. Invokes the main callback if the result has been received.
dataVisible: function () {
var self = this;
self._dataVisible = true;
self._maybeInvokeCallback();
},
// True if receiveResult has been called.
gotResult: function () {
var self = this;
return !!self._methodResult;
}
});
_.extend(Connection.prototype, {
// 'name' is the name of the data on the wire that should go in the
// store. 'wrappedStore' should be an object with methods beginUpdate, update,
// endUpdate, saveOriginals, retrieveOriginals. see Collection for an example.
registerStore: function (name, wrappedStore) {
var self = this;
if (name in self._stores)
return false;
// Wrap the input object in an object which makes any store method not
// implemented by 'store' into a no-op.
var store = {};
_.each(['update', 'beginUpdate', 'endUpdate', 'saveOriginals',
'retrieveOriginals', 'getDoc'], function (method) {
store[method] = function () {
return (wrappedStore[method]
? wrappedStore[method].apply(wrappedStore, arguments)
: undefined);
};
});
self._stores[name] = store;
var queued = self._updatesForUnknownStores[name];
if (queued) {
store.beginUpdate(queued.length, false);
_.each(queued, function (msg) {
store.update(msg);
});
store.endUpdate();
delete self._updatesForUnknownStores[name];
}
return true;
},
/**
* @memberOf Meteor
* @summary Subscribe to a record set. Returns a handle that provides
* `stop()` and `ready()` methods.
* @locus Client
* @param {String} name Name of the subscription. Matches the name of the
* server's `publish()` call.
* @param {Any} [arg1,arg2...] Optional arguments passed to publisher
* function on server.
* @param {Function|Object} [callbacks] Optional. May include `onStop`
* and `onReady` callbacks. If there is an error, it is passed as an
* argument to `onStop`. If a function is passed instead of an object, it
* is interpreted as an `onReady` callback.
*/
subscribe: function (name /* .. [arguments] .. (callback|callbacks) */) {
var self = this;
var params = Array.prototype.slice.call(arguments, 1);
var callbacks = {};
if (params.length) {
var lastParam = params[params.length - 1];
if (_.isFunction(lastParam)) {
callbacks.onReady = params.pop();
} else if (lastParam &&
// XXX COMPAT WITH 1.0.3.1 onError used to exist, but now we use
// onStop with an error callback instead.
_.any([lastParam.onReady, lastParam.onError, lastParam.onStop],
_.isFunction)) {
callbacks = params.pop();
}
}
// Is there an existing sub with the same name and param, run in an
// invalidated Computation? This will happen if we are rerunning an
// existing computation.
//
// For example, consider a rerun of:
//
// Tracker.autorun(function () {
// Meteor.subscribe("foo", Session.get("foo"));
// Meteor.subscribe("bar", Session.get("bar"));
// });
//
// If "foo" has changed but "bar" has not, we will match the "bar"
// subcribe to an existing inactive subscription in order to not
// unsub and resub the subscription unnecessarily.
//
// We only look for one such sub; if there are N apparently-identical subs
// being invalidated, we will require N matching subscribe calls to keep
// them all active.
var existing = _.find(self._subscriptions, function (sub) {
return sub.inactive && sub.name === name &&
EJSON.equals(sub.params, params);
});
var id;
if (existing) {
id = existing.id;
existing.inactive = false; // reactivate
if (callbacks.onReady) {
// If the sub is not already ready, replace any ready callback with the
// one provided now. (It's not really clear what users would expect for
// an onReady callback inside an autorun; the semantics we provide is
// that at the time the sub first becomes ready, we call the last
// onReady callback provided, if any.)
if (!existing.ready)
existing.readyCallback = callbacks.onReady;
}
// XXX COMPAT WITH 1.0.3.1 we used to have onError but now we call
// onStop with an optional error argument
if (callbacks.onError) {
// Replace existing callback if any, so that errors aren't
// double-reported.
existing.errorCallback = callbacks.onError;
}
if (callbacks.onStop) {
existing.stopCallback = callbacks.onStop;
}
} else {
// New sub! Generate an id, save it locally, and send message.
id = Random.id();
self._subscriptions[id] = {
id: id,
name: name,
params: EJSON.clone(params),
inactive: false,
ready: false,
readyDeps: new Tracker.Dependency,
readyCallback: callbacks.onReady,
// XXX COMPAT WITH 1.0.3.1 #errorCallback
errorCallback: callbacks.onError,
stopCallback: callbacks.onStop,
connection: self,
remove: function() {
delete this.connection._subscriptions[this.id];
this.ready && this.readyDeps.changed();
},
stop: function() {
this.connection._send({msg: 'unsub', id: id});
this.remove();
if (callbacks.onStop) {
callbacks.onStop();
}
}
};
self._send({msg: 'sub', id: id, name: name, params: params});
}
// return a handle to the application.
var handle = {
stop: function () {
if (!_.has(self._subscriptions, id))
return;
self._subscriptions[id].stop();
},
ready: function () {
// return false if we've unsubscribed.
if (!_.has(self._subscriptions, id))
return false;
var record = self._subscriptions[id];
record.readyDeps.depend();
return record.ready;
},
subscriptionId: id
};
if (Tracker.active) {
// We're in a reactive computation, so we'd like to unsubscribe when the
// computation is invalidated... but not if the rerun just re-subscribes
// to the same subscription! When a rerun happens, we use onInvalidate
// as a change to mark the subscription "inactive" so that it can
// be reused from the rerun. If it isn't reused, it's killed from
// an afterFlush.
Tracker.onInvalidate(function (c) {
if (_.has(self._subscriptions, id))
self._subscriptions[id].inactive = true;
Tracker.afterFlush(function () {
if (_.has(self._subscriptions, id) &&
self._subscriptions[id].inactive)
handle.stop();
});
});
}
return handle;
},
// options:
// - onLateError {Function(error)} called if an error was received after the ready event.
// (errors received before ready cause an error to be thrown)
_subscribeAndWait: function (name, args, options) {
var self = this;
var f = new Future();
var ready = false;
var handle;
args = args || [];
args.push({
onReady: function () {
ready = true;
f['return']();
},
onError: function (e) {
if (!ready)
f['throw'](e);
else
options && options.onLateError && options.onLateError(e);
}
});
handle = self.subscribe.apply(self, [name].concat(args));
f.wait();
return handle;
},
methods: function (methods) {
var self = this;
_.each(methods, function (func, name) {
if (typeof func !== 'function')
throw new Error("Method '" + name + "' must be a function");
if (self._methodHandlers[name])
throw new Error("A method named '" + name + "' is already defined");
self._methodHandlers[name] = func;
});
},
/**
* @memberOf Meteor
* @summary Invokes a method passing any number of arguments.
* @locus Anywhere
* @param {String} name Name of method to invoke
* @param {EJSONable} [arg1,arg2...] Optional method arguments
* @param {Function} [asyncCallback] Optional callback, which is called asynchronously with the error or result after the method is complete. If not provided, the method runs synchronously if possible (see below).
*/
call: function (name /* .. [arguments] .. callback */) {
// if it's a function, the last argument is the result callback,
// not a parameter to the remote method.
var args = Array.prototype.slice.call(arguments, 1);
if (args.length && typeof args[args.length - 1] === "function")
var callback = args.pop();
return this.apply(name, args, callback);
},
// @param options {Optional Object}
// wait: Boolean - Should we wait to call this until all current methods
// are fully finished, and block subsequent method calls
// until this method is fully finished?
// (does not affect methods called from within this method)
// onResultReceived: Function - a callback to call as soon as the method
// result is received. the data written by
// the method may not yet be in the cache!
// returnStubValue: Boolean - If true then in cases where we would have
// otherwise discarded the stub's return value
// and returned undefined, instead we go ahead
// and return it. Specifically, this is any
// time other than when (a) we are already
// inside a stub or (b) we are in Node and no
// callback was provided. Currently we require
// this flag to be explicitly passed to reduce
// the likelihood that stub return values will
// be confused with server return values; we
// may improve this in future.
// @param callback {Optional Function}
/**
* @memberOf Meteor
* @summary Invoke a method passing an array of arguments.
* @locus Anywhere
* @param {String} name Name of method to invoke
* @param {EJSONable[]} args Method arguments
* @param {Object} [options]
* @param {Boolean} options.wait (Client only) If true, don't send this method until all previous method calls have completed, and don't send any subsequent method calls until this one is completed.
* @param {Function} options.onResultReceived (Client only) This callback is invoked with the error or result of the method (just like `asyncCallback`) as soon as the error or result is available. The local cache may not yet reflect the writes performed by the method.
* @param {Function} [asyncCallback] Optional callback; same semantics as in [`Meteor.call`](#meteor_call).
*/
apply: function (name, args, options, callback) {
var self = this;
// We were passed 3 arguments. They may be either (name, args, options)
// or (name, args, callback)
if (!callback && typeof options === 'function') {
callback = options;
options = {};
}
options = options || {};
if (callback) {
// XXX would it be better form to do the binding in stream.on,
// or caller, instead of here?
// XXX improve error message (and how we report it)
callback = Meteor.bindEnvironment(
callback,
"delivering result of invoking '" + name + "'"
);
}
// Keep our args safe from mutation (eg if we don't send the message for a
// while because of a wait method).
args = EJSON.clone(args);
// Lazily allocate method ID once we know that it'll be needed.
var methodId = (function () {
var id;
return function () {
if (id === undefined)
id = '' + (self._nextMethodId++);
return id;
};
})();
var enclosing = DDP._CurrentInvocation.get();
var alreadyInSimulation = enclosing && enclosing.isSimulation;
// Lazily generate a randomSeed, only if it is requested by the stub.
// The random streams only have utility if they're used on both the client
// and the server; if the client doesn't generate any 'random' values
// then we don't expect the server to generate any either.
// Less commonly, the server may perform different actions from the client,
// and may in fact generate values where the client did not, but we don't
// have any client-side values to match, so even here we may as well just
// use a random seed on the server. In that case, we don't pass the
// randomSeed to save bandwidth, and we don't even generate it to save a
// bit of CPU and to avoid consuming entropy.
var randomSeed = null;
var randomSeedGenerator = function () {
if (randomSeed === null) {
randomSeed = DDPCommon.makeRpcSeed(enclosing, name);
}
return randomSeed;
};
// Run the stub, if we have one. The stub is supposed to make some
// temporary writes to the database to give the user a smooth experience
// until the actual result of executing the method comes back from the
// server (whereupon the temporary writes to the database will be reversed
// during the beginUpdate/endUpdate process.)
//
// Normally, we ignore the return value of the stub (even if it is an
// exception), in favor of the real return value from the server. The
// exception is if the *caller* is a stub. In that case, we're not going
// to do a RPC, so we use the return value of the stub as our return
// value.
var stub = self._methodHandlers[name];
if (stub) {
var setUserId = function(userId) {
self.setUserId(userId);
};
var invocation = new DDPCommon.MethodInvocation({
isSimulation: true,
userId: self.userId(),
setUserId: setUserId,
randomSeed: function () { return randomSeedGenerator(); }
});
if (!alreadyInSimulation)
self._saveOriginals();
try {
// Note that unlike in the corresponding server code, we never audit
// that stubs check() their arguments.
var stubReturnValue = DDP._CurrentInvocation.withValue(invocation, function () {
if (Meteor.isServer) {
// Because saveOriginals and retrieveOriginals aren't reentrant,
// don't allow stubs to yield.
return Meteor._noYieldsAllowed(function () {
// re-clone, so that the stub can't affect our caller's values
return stub.apply(invocation, EJSON.clone(args));
});
} else {
return stub.apply(invocation, EJSON.clone(args));
}
});
}
catch (e) {
var exception = e;
}
if (!alreadyInSimulation)
self._retrieveAndStoreOriginals(methodId());
}
// If we're in a simulation, stop and return the result we have,
// rather than going on to do an RPC. If there was no stub,
// we'll end up returning undefined.
if (alreadyInSimulation) {
if (callback) {
callback(exception, stubReturnValue);
return undefined;
}
if (exception)
throw exception;
return stubReturnValue;
}
// If an exception occurred in a stub, and we're ignoring it
// because we're doing an RPC and want to use what the server
// returns instead, log it so the developer knows
// (unless they explicitly ask to see the error).
//
// Tests can set the 'expected' flag on an exception so it won't
// go to log.
if (exception) {
if (options.throwStubExceptions) {
throw exception;
} else if (!exception.expected) {
Meteor._debug("Exception while simulating the effect of invoking '" +
name + "'", exception, exception.stack);
}
}
// At this point we're definitely doing an RPC, and we're going to
// return the value of the RPC to the caller.
// If the caller didn't give a callback, decide what to do.
if (!callback) {
if (Meteor.isClient) {
// On the client, we don't have fibers, so we can't block. The
// only thing we can do is to return undefined and discard the
// result of the RPC. If an error occurred then print the error
// to the console.
callback = function (err) {
err && Meteor._debug("Error invoking Method '" + name + "':",
err.message);
};
} else {
// On the server, make the function synchronous. Throw on
// errors, return on success.
var future = new Future;
callback = future.resolver();
}
}
// Send the RPC. Note that on the client, it is important that the
// stub have finished before we send the RPC, so that we know we have
// a complete list of which local documents the stub wrote.
var message = {
msg: 'method',
method: name,
params: args,
id: methodId()
};
// Send the randomSeed only if we used it
if (randomSeed !== null) {
message.randomSeed = randomSeed;
}
var methodInvoker = new MethodInvoker({
methodId: methodId(),
callback: callback,
connection: self,
onResultReceived: options.onResultReceived,
wait: !!options.wait,
message: message
});
if (options.wait) {
// It's a wait method! Wait methods go in their own block.
self._outstandingMethodBlocks.push(
{wait: true, methods: [methodInvoker]});
} else {
// Not a wait method. Start a new block if the previous block was a wait
// block, and add it to the last block of methods.
if (_.isEmpty(self._outstandingMethodBlocks) ||
_.last(self._outstandingMethodBlocks).wait)
self._outstandingMethodBlocks.push({wait: false, methods: []});
_.last(self._outstandingMethodBlocks).methods.push(methodInvoker);
}
// If we added it to the first block, send it out now.
if (self._outstandingMethodBlocks.length === 1)
methodInvoker.sendMessage();
// If we're using the default callback on the server,
// block waiting for the result.
if (future) {
return future.wait();
}
return options.returnStubValue ? stubReturnValue : undefined;
},
// Before calling a method stub, prepare all stores to track changes and allow
// _retrieveAndStoreOriginals to get the original versions of changed
// documents.
_saveOriginals: function () {
var self = this;
_.each(self._stores, function (s) {
s.saveOriginals();
});
},
// Retrieves the original versions of all documents modified by the stub for
// method 'methodId' from all stores and saves them to _serverDocuments (keyed
// by document) and _documentsWrittenByStub (keyed by method ID).
_retrieveAndStoreOriginals: function (methodId) {
var self = this;
if (self._documentsWrittenByStub[methodId])
throw new Error("Duplicate methodId in _retrieveAndStoreOriginals");
var docsWritten = [];
_.each(self._stores, function (s, collection) {
var originals = s.retrieveOriginals();
// not all stores define retrieveOriginals
if (!originals)
return;
originals.forEach(function (doc, id) {
docsWritten.push({collection: collection, id: id});
if (!_.has(self._serverDocuments, collection))
self._serverDocuments[collection] = new MongoIDMap;
var serverDoc = self._serverDocuments[collection].setDefault(id, {});
if (serverDoc.writtenByStubs) {
// We're not the first stub to write this doc. Just add our method ID
// to the record.
serverDoc.writtenByStubs[methodId] = true;
} else {
// First stub! Save the original value and our method ID.
serverDoc.document = doc;
serverDoc.flushCallbacks = [];
serverDoc.writtenByStubs = {};
serverDoc.writtenByStubs[methodId] = true;
}
});
});
if (!_.isEmpty(docsWritten)) {
self._documentsWrittenByStub[methodId] = docsWritten;
}
},
// This is very much a private function we use to make the tests
// take up fewer server resources after they complete.
_unsubscribeAll: function () {
var self = this;
_.each(_.clone(self._subscriptions), function (sub, id) {
// Avoid killing the autoupdate subscription so that developers
// still get hot code pushes when writing tests.
//
// XXX it's a hack to encode knowledge about autoupdate here,
// but it doesn't seem worth it yet to have a special API for
// subscriptions to preserve after unit tests.
if (sub.name !== 'meteor_autoupdate_clientVersions') {
self._subscriptions[id].stop();
}
});
},
// Sends the DDP stringification of the given message object
_send: function (obj) {
var self = this;
self._stream.send(DDPCommon.stringifyDDP(obj));
},
// We detected via DDP-level heartbeats that we've lost the
// connection. Unlike `disconnect` or `close`, a lost connection
// will be automatically retried.
_lostConnection: function (error) {
var self = this;
self._stream._lostConnection(error);
},
/**
* @summary Get the current connection status. A reactive data source.
* @locus Client
* @memberOf Meteor
*/
status: function (/*passthrough args*/) {
var self = this;
return self._stream.status.apply(self._stream, arguments);
},