Skip to content

Commit 8476ea8

Browse files
Yoni GoldbergYoni Goldberg
Yoni Goldberg
authored and
Yoni Goldberg
committed
Call it consumer, not subscriber
1 parent ca5ee04 commit 8476ea8

File tree

7 files changed

+50
-72
lines changed

7 files changed

+50
-72
lines changed

example-application/business-logic/order-service.js

-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ async function getUserFromUsersService(userId) {
6666
},
6767
}
6868
);
69-
console.log(getUserResponse.data);
7069
return getUserResponse.data;
7170
} catch (error) {
7271
if (error?.code === 'ECONNABORTED') {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
const MessageQueueClient = require('../libraries/message-queue-client');
2+
const { errorHandler, AppError } = require('../error-handling');
3+
const orderService = require('../business-logic/order-service');
4+
5+
// This is message queue entry point. Like API routes but for message queues.
6+
class QueueConsumer {
7+
constructor(messageQueueClient) {
8+
this.messageQueueClient = messageQueueClient;
9+
}
10+
11+
async start() {
12+
await this.consumeUserDeletionQueue();
13+
}
14+
15+
async consumeUserDeletionQueue() {
16+
await this.messageQueueClient.consume('user.deleted', async (message) => {
17+
// ️️️Validate message
18+
const newMessageAsObject = JSON.parse(message);
19+
if (!newMessageAsObject.id) {
20+
throw new AppError('invalid-message', 'Unknown message schema');
21+
}
22+
23+
await orderService.deleteOrder(newMessageAsObject.id);
24+
});
25+
}
26+
}
27+
28+
process.on('uncaughtException', (error) => {
29+
errorHandler.handleError(error);
30+
});
31+
32+
process.on('unhandledRejection', (reason) => {
33+
errorHandler.handleError(reason);
34+
});
35+
36+
module.exports = { QueueConsumer };

example-application/entry-points/message-queue-starter.js

-45
This file was deleted.

example-application/libraries/message-queue-client.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
const { EventEmitter } = require('events');
22
const amqplib = require('amqplib');
3-
43
const { FakeMessageQueueProvider } = require('./fake-message-queue-provider');
4+
const { errorHandler } = require('../error-handling');
55

66
// This is a simplistic client for a popular message queue product - RabbitMQ
77
// It's generic in order to be used by any service in the organization
@@ -24,6 +24,7 @@ class MessageQueueClient extends EventEmitter {
2424
this.messageQueueProvider = new FakeMessageQueueProvider();
2525
}
2626

27+
this.setMaxListeners(50);
2728
this.countEvents();
2829
}
2930

@@ -113,7 +114,7 @@ class MessageQueueClient extends EventEmitter {
113114
this.channel.nack(theNewMessage, false, this.requeue);
114115
this.emit('nack', { queueName, message: theNewMessage });
115116
error.isTrusted = true; //Since it's related to a single message, there is no reason to let the process crash
116-
//errorHandler.handleError(error);
117+
errorHandler.handleError(error);
117118
});
118119
});
119120
}

example-application/start.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
const { initializeWebServer } = require('./entry-points/api');
2-
const { MessageQueueStarter } = require('./entry-points/message-queue-starter');
2+
const { QueueConsumer } = require('./entry-points/message-queue-consumer');
33

44
async function start() {
55
await initializeWebServer();
6-
await new MessageQueueStarter().start();
6+
await new QueueConsumer().start();
77
}
88

99
start()

recipes/message-queue/fake-message-queue.test.js

+5-12
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ const {
1212
FakeMessageQueueProvider,
1313
} = require('../../example-application/libraries/fake-message-queue-provider');
1414
const {
15-
QueueSubscriber,
16-
} = require('../../example-application/entry-points/message-queue-starter');
15+
QueueConsumer,
16+
} = require('../../example-application/entry-points/message-queue-consumer');
1717

1818
let axiosAPIClient;
1919

@@ -64,10 +64,7 @@ test('Whenever a user deletion message arrive, then his orders are deleted', asy
6464
};
6565
const addedOrderId = (await axiosAPIClient.post('/order', orderToAdd)).data
6666
.id;
67-
const messageQueueClient = await testHelpers.startMQSubscriber(
68-
'fake',
69-
'user.deleted'
70-
);
67+
const messageQueueClient = await testHelpers.startMQConsumer('fake');
7168

7269
// Act
7370
await messageQueueClient.publish('user.events', 'user.deleted', {
@@ -85,10 +82,7 @@ test('Whenever a user deletion message arrive, then his orders are deleted', asy
8582
test('When a poisoned message arrives, then it is being rejected back', async () => {
8683
// Arrange
8784
const messageWithInvalidSchema = { nonExistingProperty: 'invalid❌' };
88-
const messageQueueClient = await testHelpers.startMQSubscriber(
89-
'fake',
90-
'user.deleted'
91-
);
85+
const messageQueueClient = await testHelpers.startMQConsumer('fake');
9286

9387
// Act
9488
await messageQueueClient.publish(
@@ -109,7 +103,7 @@ test('When user deleted message arrives, then all corresponding orders are delet
109103
const messageQueueClient = new MessageQueueClient(
110104
new FakeMessageQueueProvider()
111105
);
112-
await new QueueSubscriber(messageQueueClient, 'user.deleted').start();
106+
await new QueueConsumer(messageQueueClient, 'user.deleted').start();
113107

114108
// Act
115109
await messageQueueClient.publish('user.events', 'user.deleted', {
@@ -158,4 +152,3 @@ test.todo(
158152
test.todo(
159153
'When multiple user deletion message arrives and one fails, then only the failed message is not acknowledged'
160154
);
161-

recipes/message-queue/test-helpers.js

+4-10
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@ const {
22
FakeMessageQueueProvider,
33
} = require('../../example-application/libraries/fake-message-queue-provider');
44
const {
5-
QueueSubscriber: MessageQueueStarter,
6-
} = require('../../example-application/entry-points/message-queue-starter');
5+
QueueConsumer,
6+
} = require('../../example-application/entry-points/message-queue-consumer');
77
const amqplib = require('amqplib');
88
const MessageQueueClient = require('../../example-application/libraries/message-queue-client');
99

10-
module.exports.startMQSubscriber = async (
10+
module.exports.startMQConsumer = async (
1111
fakeOrReal,
12-
queueName,
13-
deadLetterQueueName = undefined,
1412
messageQueueClient = undefined
1513
) => {
1614
if (!messageQueueClient) {
@@ -19,11 +17,7 @@ module.exports.startMQSubscriber = async (
1917
messageQueueClient = new MessageQueueClient(messageQueueProvider);
2018
}
2119

22-
await new MessageQueueStarter(
23-
messageQueueClient,
24-
queueName,
25-
deadLetterQueueName
26-
).start();
20+
await new QueueConsumer(messageQueueClient).start();
2721

2822
return messageQueueClient;
2923
};

0 commit comments

Comments
 (0)