Skip to content

Commit c15f24e

Browse files
committed
Genric message
1 parent 665db48 commit c15f24e

8 files changed

+310
-76
lines changed

example-application/data-access/order-repository.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@ module.exports = class OrderRepository {
4444
async addOrder(orderDetails) {
4545
const addingResponse = await orderModel.create(orderDetails);
4646

47-
return addingResponse.dataValues
47+
return addingResponse.dataValues;
4848
}
4949

5050
async deleteOrder(orderToDelete) {
51+
console.log('About to delete', orderToDelete);
5152
await orderModel.destroy({ where: { id: orderToDelete } });
5253
return;
5354
}

example-application/entry-points/message-queue-starter.js

+18-22
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,32 @@ const OrderRepository = require('../data-access/order-repository');
44

55
// This is message queue entry point. Like API routes but for message queues.
66
class MessageQueueStarter {
7-
constructor(customMessageQueueProvider) {
8-
this.messageQueueClient = new MessageQueueClient(
9-
customMessageQueueProvider
10-
);
7+
constructor(messageQueueClient, queueName) {
8+
this.messageQueueClient = messageQueueClient;
9+
this.queueName = queueName;
1110
}
1211

1312
async start() {
14-
return await this.consumeUserDeletionQueue();
13+
await this.consumeUserDeletionQueue();
1514
}
1615

1716
async consumeUserDeletionQueue() {
18-
// This function is what handles a new message. Like API route handler, but for MQ
19-
const deletedOrderMessageHandler = async (message) => {
20-
// Validate to ensure it is not a poisoned message (invalid) that will loop into the queue
21-
const newMessageAsObject = JSON.parse(message);
22-
// ️️️✅ Best Practice: Validate incoming MQ messages using your validator framework (simplistic implementation below)
23-
if (!newMessageAsObject.id) {
24-
return reject(new AppError('invalid-message', true));
25-
}
26-
27-
const orderRepository = new OrderRepository();
28-
await orderRepository.deleteOrder(newMessageAsObject.id);
29-
};
30-
3117
// Let's now register to new delete messages from the queue
32-
await this.messageQueueClient.consume(
33-
'deleted-user',
34-
deletedOrderMessageHandler
18+
return await this.messageQueueClient.consume(
19+
this.queueName,
20+
async (message) => {
21+
// Validate to ensure it is not a poisoned message (invalid) that will loop into the queue
22+
const newMessageAsObject = JSON.parse(message);
23+
24+
// ️️️✅ Best Practice: Validate incoming MQ messages using your validator framework (simplistic implementation below)
25+
if (!newMessageAsObject.id) {
26+
throw new AppError('invalid-message', true);
27+
}
28+
29+
const orderRepository = new OrderRepository();
30+
await orderRepository.deleteOrder(newMessageAsObject.id);
31+
}
3532
);
36-
return;
3733
}
3834
}
3935

example-application/libraries/fake-message-queue-provider.js

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ class FakeMessageQueueProvider extends EventEmitter {
2525
async consume(queueName, messageHandler) {
2626
// We just save the callback (handler) locally, whenever a message will put into this queue
2727
// we will fire this handler
28+
console.info(`Received request to listen to the queue ${queueName}`);
29+
console.log("6");
2830
this.messageHandler = messageHandler;
2931
}
3032

example-application/libraries/message-queue-client.js

+10-3
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class MessageQueueClient extends EventEmitter {
6464
await this.connect();
6565
}
6666
console.log('publish', exchangeName, routingKey);
67+
6768
const sendResponse = await this.channel.publish(
6869
exchangeName,
6970
routingKey,
@@ -77,7 +78,8 @@ class MessageQueueClient extends EventEmitter {
7778
if (!this.channel) {
7879
await this.connect();
7980
}
80-
await this.channel.deleteQueue(queueName);
81+
const queueDeletionResult = await this.channel.deleteQueue(queueName);
82+
console.log(queueDeletionResult);
8183

8284
return;
8385
}
@@ -116,16 +118,21 @@ class MessageQueueClient extends EventEmitter {
116118
}
117119
this.channel.assertQueue(queueName);
118120

119-
await this.channel.consume(queueName, async (theNewMessage) => {
121+
const consumerTag = await this.channel.consume(queueName, async (theNewMessage) => {
122+
console.log('new message', theNewMessage, consumerTag);
120123
//Not awaiting because some MQ client implementation get back to fetch messages again only after handling a message
121124
onMessageCallback(theNewMessage.content.toString())
122125
.then(() => {
126+
console.log('ack');
127+
this.emit('ack');
123128
this.channel.ack(theNewMessage);
124129
})
125130
.catch((error) => {
126131
this.channel.nack(theNewMessage);
132+
this.emit('nack');
133+
console.log('nack', error.message);
127134
error.isTrusted = true; //Since it's related to a single message, there is no reason to let the process crash
128-
errorHandler.handleError(error);
135+
//errorHandler.handleError(error);
129136
});
130137
});
131138

recipes/message-queue/message-queue.test.js recipes/message-queue/fake-message-queue.test.js

+10-47
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,9 @@ const axios = require('axios');
22
const sinon = require('sinon');
33
const nock = require('nock');
44
const { once } = require('events');
5-
const amqplib = require('amqplib');
6-
7-
const messageQueueClient = require('../../example-application/libraries/message-queue-client');
85
const {
96
getNextMQConfirmation,
107
startFakeMessageQueue,
11-
getMQMessageOrTimeout,
12-
getShortUnique,
138
} = require('./test-helpers');
149
const {
1510
FakeMessageQueueProvider,
@@ -20,7 +15,7 @@ const {
2015
stopWebServer,
2116
} = require('../../example-application/entry-points/api');
2217

23-
let axiosAPIClient, mqClient;
18+
let axiosAPIClient;
2419

2520
beforeAll(async (done) => {
2621
// ️️️✅ Best Practice: Place the backend under test within the same process
@@ -35,9 +30,6 @@ beforeAll(async (done) => {
3530
};
3631
axiosAPIClient = axios.create(axiosConfig);
3732

38-
mqClient = new messageQueueClient(amqplib);
39-
mqClient.assertExchange('user-events', 'topic');
40-
4133
done();
4234
});
4335

@@ -62,45 +54,16 @@ afterAll(async (done) => {
6254
done();
6355
});
6456

65-
// Playground
66-
test('playground', async () => {
67-
// Arrange
68-
const userDeletedQueueName = `user-deleted-${getShortUnique()}`;
69-
console.time('queue-creation');
70-
await mqClient.assertQueue(userDeletedQueueName);
71-
await mqClient.bindQueue(userDeletedQueueName, 'user-events', 'user.deleted');
72-
console.timeEnd('queue-creation');
73-
const orderToAdd = {
74-
userId: 1,
75-
productId: 2,
76-
mode: 'approved',
77-
};
78-
79-
const addedOrderId = (await axiosAPIClient.post('/order', orderToAdd)).data
80-
.id;
81-
const newMessagePromise = getMQMessageOrTimeout(userDeletedQueueName, 3000);
82-
83-
// Act
84-
85-
console.log('before publish');
86-
await mqClient.publish('user-events', 'user.deleted', { id: addedOrderId });
87-
console.log('after publish');
88-
89-
// Assert
90-
const newMessage = await newMessagePromise;
91-
console.log('msg arrived', newMessage);
92-
});
93-
9457
// ️️️✅ Best Practice: Test a flow that starts via a queue message and ends with removing/confirming the message
9558
test('Whenever a user deletion message arrive, then his orders are deleted', async () => {
9659
// Arrange
97-
const orderToAdd = {
98-
userId: 1,
99-
productId: 2,
100-
mode: 'approved',
101-
};
102-
const addedOrderId = (await axiosAPIClient.post('/order', orderToAdd)).data
103-
.id;
60+
const orderToAdd = {
61+
userId: 1,
62+
productId: 2,
63+
mode: 'approved',
64+
};
65+
const addedOrderId = (await axiosAPIClient.post('/order', orderToAdd)).data
66+
.id;
10467
const fakeMessageQueue = await startFakeMessageQueue();
10568
const getNextMQEvent = getNextMQConfirmation(fakeMessageQueue); //Store the MQ actions in a promise
10669

@@ -116,7 +79,7 @@ test('Whenever a user deletion message arrive, then his orders are deleted', asy
11679
expect(aQueryForDeletedOrder.status).toBe(404);
11780
});
11881

119-
test('When a poisoned message arrives, then it is being rejected back', async () => {
82+
test.only('When a poisoned message arrives, then it is being rejected back', async () => {
12083
// Arrange
12184
const messageWithInvalidSchema = { nonExistingProperty: 'invalid' };
12285
const fakeMessageQueue = await startFakeMessageQueue();
@@ -166,4 +129,4 @@ test.todo(
166129
);
167130
test.todo(
168131
'When multiple user deletion message arrives and one fails, then only the failed message is not acknowledged'
169-
);
132+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
const axios = require('axios');
2+
const sinon = require('sinon');
3+
const nock = require('nock');
4+
const { once } = require('events');
5+
const amqplib = require('amqplib');
6+
const messageQueueClient = require('../../example-application/libraries/message-queue-client');
7+
const testHelpers = require('./test-helpers');
8+
const orderRepository = require('../../example-application/data-access/order-repository');
9+
10+
const {
11+
getNextMQConfirmation,
12+
startFakeMessageQueue,
13+
getMQMessageOrTimeout,
14+
getShortUnique,
15+
} = require('./test-helpers');
16+
const {
17+
FakeMessageQueueProvider,
18+
} = require('../../example-application/libraries/fake-message-queue-provider');
19+
20+
const {
21+
initializeWebServer,
22+
stopWebServer,
23+
} = require('../../example-application/entry-points/api');
24+
25+
let axiosAPIClient, mqClient, activeQueue;
26+
27+
beforeAll(async (done) => {
28+
// ️️️✅ Best Practice: Place the backend under test within the same process
29+
const apiConnection = await initializeWebServer();
30+
31+
// ️️️✅ Best Practice: Ensure that this component is isolated by preventing unknown calls
32+
nock.disableNetConnect();
33+
nock.enableNetConnect('127.0.0.1');
34+
const axiosConfig = {
35+
baseURL: `http://127.0.0.1:${apiConnection.port}`,
36+
validateStatus: () => true, //Don't throw HTTP exceptions. Delegate to the tests to decide which error is acceptable
37+
};
38+
axiosAPIClient = axios.create(axiosConfig);
39+
40+
mqClient = new messageQueueClient(amqplib);
41+
42+
done();
43+
});
44+
45+
beforeEach(async () => {
46+
nock('http://localhost/user/').get(`/1`).reply(200, {
47+
id: 1,
48+
name: 'John',
49+
});
50+
nock('http://mail.com').post('/send').reply(202);
51+
});
52+
53+
afterEach(async () => {
54+
nock.cleanAll();
55+
sinon.restore();
56+
console.log('After each about delete queue', activeQueue);
57+
58+
if (activeQueue) {
59+
//await mqClient.deleteQueue(activeQueue);
60+
}
61+
});
62+
63+
afterAll(async (done) => {
64+
// ️️️✅ Best Practice: Clean-up resources after each run
65+
await stopWebServer();
66+
//await messageQueueClient.close();
67+
nock.enableNetConnect();
68+
done();
69+
});
70+
71+
// Playground
72+
test.skip('playground 2 When a message is poisoned, then its rejected and put back to queue', async () => {
73+
// Arrange
74+
const userDeletedQueueName = `user-deleted-${getShortUnique()}`;
75+
console.time('queue-creation');
76+
await mqClient.assertQueue(userDeletedQueueName);
77+
await mqClient.bindQueue(userDeletedQueueName, 'user-events', 'user.deleted');
78+
console.timeEnd('queue-creation');
79+
80+
// Act
81+
82+
console.log('before publish');
83+
await mqClient.publish('user-events', 'user.deleted', {
84+
invalidField: 'invalid-value',
85+
});
86+
console.log('after publish');
87+
88+
// Assert
89+
});
90+
91+
test('When a delete message fails ONCE, than thanks to retry the order is deleted', async () => {
92+
// Arrange
93+
const orderToAdd = {
94+
userId: 1,
95+
productId: 2,
96+
mode: 'approved',
97+
};
98+
const addedOrderId = (await axiosAPIClient.post('/order', orderToAdd)).data
99+
.id;
100+
console.time('putq');
101+
activeQueue = `user-deleted-${getShortUnique()}`;
102+
const exchangeName = `user-events-${getShortUnique()}`;
103+
mqClient.assertExchange(exchangeName, 'topic');
104+
await mqClient.assertQueue(activeQueue);
105+
await mqClient.bindQueue(activeQueue, exchangeName, 'user.deleted');
106+
const mqClient2 = await testHelpers.startMQSubscriber(activeQueue);
107+
const waitForAck = once(mqClient2, 'ack');
108+
const deleteOrderStub = sinon.stub(orderRepository.prototype, 'deleteOrder');
109+
deleteOrderStub.onFirstCall().rejects(new Error('Cant delete order'));
110+
orderRepository.prototype.deleteOrder.callThrough();
111+
112+
// Act
113+
await mqClient.publish(exchangeName, 'user.deleted', { id: addedOrderId });
114+
115+
// Assert
116+
await waitForAck;
117+
console.timeEnd('putq');
118+
const aQueryForDeletedOrder = await axiosAPIClient.get(
119+
`/order/${addedOrderId}`
120+
);
121+
expect(aQueryForDeletedOrder.status).toBe(404);
122+
console.log('final', aQueryForDeletedOrder.status);
123+
});

0 commit comments

Comments
 (0)