1
1
const amqplib = require ( 'amqplib' ) ;
2
2
const { EventEmitter } = require ( 'events' ) ;
3
+ const { throwIfEmpty } = require ( 'rxjs/operators' ) ;
3
4
const { AppError, errorHandler } = require ( '../error-handling' ) ;
4
5
const { FakeMessageQueueProvider } = require ( './fake-message-queue-provider' ) ;
5
6
@@ -9,12 +10,17 @@ class MessageQueueClient extends EventEmitter {
9
10
constructor ( customMessageQueueProvider ) {
10
11
super ( ) ;
11
12
this . isReady = false ;
13
+ this . requeue = true ; // Tells whether to return failed messages to the queue
12
14
13
15
// To facilitate testing, the client allows working with a fake MQ provider
14
16
// It can get one in the constructor here or even change by environment variables
15
17
if ( customMessageQueueProvider ) {
16
18
this . messageQueueProvider = customMessageQueueProvider ;
17
- } else {
19
+ }
20
+ else if ( process . env . USE_FAKE_MQ === 'true' ) {
21
+ this . messageQueueProvider = new FakeMessageQueueProvider ( ) ;
22
+ }
23
+ else {
18
24
this . messageQueueProvider = amqplib ;
19
25
}
20
26
@@ -63,14 +69,14 @@ class MessageQueueClient extends EventEmitter {
63
69
if ( ! this . channel ) {
64
70
await this . connect ( ) ;
65
71
}
66
- console . log ( 'publish' , exchangeName , routingKey ) ;
67
72
68
73
const sendResponse = await this . channel . publish (
69
74
exchangeName ,
70
75
routingKey ,
71
76
Buffer . from ( JSON . stringify ( message ) ) ,
72
77
{ messageId }
73
78
) ;
79
+ this . emit ( 'publish' , { exchangeName, routingKey, message } ) ;
74
80
75
81
return sendResponse ;
76
82
}
@@ -80,7 +86,6 @@ class MessageQueueClient extends EventEmitter {
80
86
await this . connect ( ) ;
81
87
}
82
88
const queueDeletionResult = await this . channel . deleteQueue ( queueName ) ;
83
- console . log ( queueDeletionResult ) ;
84
89
85
90
return ;
86
91
}
@@ -118,20 +123,17 @@ class MessageQueueClient extends EventEmitter {
118
123
await this . connect ( ) ;
119
124
}
120
125
this . channel . assertQueue ( queueName ) ;
121
- console . log ( 'consume start' , queueName ) ;
122
126
123
127
await this . channel . consume ( queueName , async ( theNewMessage ) => {
124
128
//Not awaiting because some MQ client implementation get back to fetch messages again only after handling a message
125
129
onMessageCallback ( theNewMessage . content . toString ( ) )
126
130
. then ( ( ) => {
127
- console . log ( 'ack' ) ;
128
131
this . emit ( 'ack' , theNewMessage ) ;
129
132
this . channel . ack ( theNewMessage ) ;
130
133
} )
131
134
. catch ( ( error ) => {
132
- this . channel . nack ( theNewMessage , false , true ) ;
135
+ this . channel . nack ( theNewMessage , false , this . requeue ) ;
133
136
this . emit ( 'nack' , theNewMessage ) ;
134
- console . log ( 'nack' , error . message ) ;
135
137
error . isTrusted = true ; //Since it's related to a single message, there is no reason to let the process crash
136
138
//errorHandler.handleError(error);
137
139
} ) ;
@@ -140,38 +142,51 @@ class MessageQueueClient extends EventEmitter {
140
142
return ;
141
143
}
142
144
145
+ setRequeue ( newValue ) {
146
+ this . requeue = newValue ;
147
+ }
148
+
149
+ // This function stores all the MQ events in a local data structure so later
150
+ // one query this
143
151
countEvents ( ) {
144
- const eventsToListen = [ 'nack' , 'ack' ] ;
145
- if ( this . eventsCounter === undefined ) {
146
- this . eventsCounter = { } ;
147
- eventsToListen . forEach ( ( eventToListenTo ) => {
148
- this . eventsCounter [ eventToListenTo ] = 0 ;
149
- this . on ( eventToListenTo , ( eventData ) => {
150
- this . eventsCounter [ eventToListenTo ] ++ ;
151
- console . log ( 'events counting' , this . eventsCounter ) ;
152
- this . emit ( 'event-counted' , {
153
- name : eventToListenTo ,
154
- lastEventData : eventData ,
155
- count : this . eventsCounter [ eventToListenTo ] ,
156
- } ) ;
152
+ const eventsToListen = [ 'nack' , 'ack' , 'publish' ] ;
153
+ if ( this . eventsRecorder !== undefined ) {
154
+ return ; // Already initialized and set up
155
+ }
156
+ this . eventsRecorder = { } ;
157
+ eventsToListen . forEach ( ( eventToListenTo ) => {
158
+ this . eventsRecorder [ eventToListenTo ] = {
159
+ count : 0 ,
160
+ lastEventData : null ,
161
+ name : eventToListenTo ,
162
+ } ;
163
+ this . on ( eventToListenTo , ( eventData ) => {
164
+ this . eventsRecorder [ eventToListenTo ] . count ++ ;
165
+ this . eventsRecorder [ eventToListenTo ] . lastEventData = eventData ;
166
+ this . emit ( 'message-queue-event' , {
167
+ name : eventToListenTo ,
168
+ eventsRecorder : this . eventsRecorder ,
157
169
} ) ;
158
170
} ) ;
159
- }
171
+ } ) ;
160
172
}
161
173
162
- // Helper methods for testing
174
+ resolveIfEventExceededThreshold ( eventName , threshold , resolve ) {
175
+ if ( this . eventsRecorder [ eventName ] . count >= threshold ) {
176
+ resolve ( {
177
+ name : eventName ,
178
+ lastEventData : this . eventsRecorder [ eventName ] . lastEventData ,
179
+ count : this . eventsRecorder [ eventName ] . count ,
180
+ } ) ;
181
+ }
182
+ }
183
+ // Helper methods for testing - Resolves/fires when some event happens
163
184
async waitFor ( eventName , howMuch ) {
164
185
return new Promise ( ( resolve , reject ) => {
165
- this . on ( 'event-counted' , ( eventInfo ) => {
166
- if ( eventInfo . name !== eventName ) {
167
- return ;
168
- }
169
- if ( eventInfo . count >= howMuch ) {
170
- resolve ( {
171
- lastEventData : eventInfo . lastEventData ,
172
- count : eventInfo . count ,
173
- } ) ;
174
- }
186
+ // The first resolve is for cases where the caller has approached AFTER the event has already happen
187
+ this . resolveIfEventExceededThreshold ( eventName , howMuch , resolve ) ;
188
+ this . on ( 'message-queue-event' , ( eventInfo ) => {
189
+ this . resolveIfEventExceededThreshold ( eventName , howMuch , resolve ) ;
175
190
} ) ;
176
191
} ) ;
177
192
}
0 commit comments