Skip to content

Commit 20300c6

Browse files
committed
Genric message
1 parent c15f24e commit 20300c6

File tree

5 files changed

+89
-143
lines changed

5 files changed

+89
-143
lines changed

example-application/entry-points/message-queue-starter.js

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class MessageQueueStarter {
1414
}
1515

1616
async consumeUserDeletionQueue() {
17+
console.log("consume register", this.queueName)
1718
// Let's now register to new delete messages from the queue
1819
return await this.messageQueueClient.consume(
1920
this.queueName,

example-application/libraries/message-queue-client.js

+47-10
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ class MessageQueueClient extends EventEmitter {
1414
// It can get one in the constructor here or even change by environment variables
1515
if (customMessageQueueProvider) {
1616
this.messageQueueProvider = customMessageQueueProvider;
17-
} else if (process.env.MESSAGE_QUEUE_PROVIDER === 'real') {
18-
this.messageQueueProvider = amqplib;
1917
} else {
20-
this.messageQueueProvider = new FakeMessageQueueProvider();
18+
this.messageQueueProvider = amqplib;
2119
}
20+
21+
this.countEvents();
2222
}
2323

2424
async connect() {
@@ -59,7 +59,7 @@ class MessageQueueClient extends EventEmitter {
5959
return sendResponse;
6060
}
6161

62-
async publish(exchangeName, routingKey, message) {
62+
async publish(exchangeName, routingKey, message, messageId) {
6363
if (!this.channel) {
6464
await this.connect();
6565
}
@@ -68,7 +68,8 @@ class MessageQueueClient extends EventEmitter {
6868
const sendResponse = await this.channel.publish(
6969
exchangeName,
7070
routingKey,
71-
Buffer.from(JSON.stringify(message))
71+
Buffer.from(JSON.stringify(message)),
72+
{ messageId }
7273
);
7374

7475
return sendResponse;
@@ -117,19 +118,19 @@ class MessageQueueClient extends EventEmitter {
117118
await this.connect();
118119
}
119120
this.channel.assertQueue(queueName);
121+
console.log('consume start', queueName);
120122

121-
const consumerTag = await this.channel.consume(queueName, async (theNewMessage) => {
122-
console.log('new message', theNewMessage, consumerTag);
123+
await this.channel.consume(queueName, async (theNewMessage) => {
123124
//Not awaiting because some MQ client implementation get back to fetch messages again only after handling a message
124125
onMessageCallback(theNewMessage.content.toString())
125126
.then(() => {
126127
console.log('ack');
127-
this.emit('ack');
128+
this.emit('ack', theNewMessage);
128129
this.channel.ack(theNewMessage);
129130
})
130131
.catch((error) => {
131-
this.channel.nack(theNewMessage);
132-
this.emit('nack');
132+
this.channel.nack(theNewMessage, false, true);
133+
this.emit('nack', theNewMessage);
133134
console.log('nack', error.message);
134135
error.isTrusted = true; //Since it's related to a single message, there is no reason to let the process crash
135136
//errorHandler.handleError(error);
@@ -138,6 +139,42 @@ class MessageQueueClient extends EventEmitter {
138139

139140
return;
140141
}
142+
143+
countEvents() {
144+
const eventsToListen = ['nack', 'ack'];
145+
if (this.eventsCounter === undefined) {
146+
this.eventsCounter = {};
147+
eventsToListen.forEach((eventToListenTo) => {
148+
this.eventsCounter[eventToListenTo] = 0;
149+
this.on(eventToListenTo, (eventData) => {
150+
this.eventsCounter[eventToListenTo]++;
151+
console.log('events counting', this.eventsCounter);
152+
this.emit('event-counted', {
153+
name: eventToListenTo,
154+
lastEventData: eventData,
155+
count: this.eventsCounter[eventToListenTo],
156+
});
157+
});
158+
});
159+
}
160+
}
161+
162+
// Helper methods for testing
163+
async waitFor(eventName, howMuch) {
164+
return new Promise((resolve, reject) => {
165+
this.on('event-counted', (eventInfo) => {
166+
if (eventInfo.name !== eventName) {
167+
return;
168+
}
169+
if (eventInfo.count >= howMuch) {
170+
resolve({
171+
lastEventData: eventInfo.lastEventData,
172+
count: eventInfo.count,
173+
});
174+
}
175+
});
176+
});
177+
}
141178
}
142179

143180
module.exports = MessageQueueClient;
Original file line numberDiff line numberDiff line change
@@ -1,123 +0,0 @@
1-
const axios = require('axios');
2-
const sinon = require('sinon');
3-
const nock = require('nock');
4-
const { once } = require('events');
5-
const amqplib = require('amqplib');
6-
const messageQueueClient = require('../../example-application/libraries/message-queue-client');
7-
const testHelpers = require('./test-helpers');
8-
const orderRepository = require('../../example-application/data-access/order-repository');
9-
10-
const {
11-
getNextMQConfirmation,
12-
startFakeMessageQueue,
13-
getMQMessageOrTimeout,
14-
getShortUnique,
15-
} = require('./test-helpers');
16-
const {
17-
FakeMessageQueueProvider,
18-
} = require('../../example-application/libraries/fake-message-queue-provider');
19-
20-
const {
21-
initializeWebServer,
22-
stopWebServer,
23-
} = require('../../example-application/entry-points/api');
24-
25-
let axiosAPIClient, mqClient, activeQueue;
26-
27-
beforeAll(async (done) => {
28-
// ️️️✅ Best Practice: Place the backend under test within the same process
29-
const apiConnection = await initializeWebServer();
30-
31-
// ️️️✅ Best Practice: Ensure that this component is isolated by preventing unknown calls
32-
nock.disableNetConnect();
33-
nock.enableNetConnect('127.0.0.1');
34-
const axiosConfig = {
35-
baseURL: `http://127.0.0.1:${apiConnection.port}`,
36-
validateStatus: () => true, //Don't throw HTTP exceptions. Delegate to the tests to decide which error is acceptable
37-
};
38-
axiosAPIClient = axios.create(axiosConfig);
39-
40-
mqClient = new messageQueueClient(amqplib);
41-
42-
done();
43-
});
44-
45-
beforeEach(async () => {
46-
nock('http://localhost/user/').get(`/1`).reply(200, {
47-
id: 1,
48-
name: 'John',
49-
});
50-
nock('http://mail.com').post('/send').reply(202);
51-
});
52-
53-
afterEach(async () => {
54-
nock.cleanAll();
55-
sinon.restore();
56-
console.log('After each about delete queue', activeQueue);
57-
58-
if (activeQueue) {
59-
//await mqClient.deleteQueue(activeQueue);
60-
}
61-
});
62-
63-
afterAll(async (done) => {
64-
// ️️️✅ Best Practice: Clean-up resources after each run
65-
await stopWebServer();
66-
//await messageQueueClient.close();
67-
nock.enableNetConnect();
68-
done();
69-
});
70-
71-
// Playground
72-
test.skip('playground 2 When a message is poisoned, then its rejected and put back to queue', async () => {
73-
// Arrange
74-
const userDeletedQueueName = `user-deleted-${getShortUnique()}`;
75-
console.time('queue-creation');
76-
await mqClient.assertQueue(userDeletedQueueName);
77-
await mqClient.bindQueue(userDeletedQueueName, 'user-events', 'user.deleted');
78-
console.timeEnd('queue-creation');
79-
80-
// Act
81-
82-
console.log('before publish');
83-
await mqClient.publish('user-events', 'user.deleted', {
84-
invalidField: 'invalid-value',
85-
});
86-
console.log('after publish');
87-
88-
// Assert
89-
});
90-
91-
test('When a delete message fails ONCE, than thanks to retry the order is deleted', async () => {
92-
// Arrange
93-
const orderToAdd = {
94-
userId: 1,
95-
productId: 2,
96-
mode: 'approved',
97-
};
98-
const addedOrderId = (await axiosAPIClient.post('/order', orderToAdd)).data
99-
.id;
100-
console.time('putq');
101-
activeQueue = `user-deleted-${getShortUnique()}`;
102-
const exchangeName = `user-events-${getShortUnique()}`;
103-
mqClient.assertExchange(exchangeName, 'topic');
104-
await mqClient.assertQueue(activeQueue);
105-
await mqClient.bindQueue(activeQueue, exchangeName, 'user.deleted');
106-
const mqClient2 = await testHelpers.startMQSubscriber(activeQueue);
107-
const waitForAck = once(mqClient2, 'ack');
108-
const deleteOrderStub = sinon.stub(orderRepository.prototype, 'deleteOrder');
109-
deleteOrderStub.onFirstCall().rejects(new Error('Cant delete order'));
110-
orderRepository.prototype.deleteOrder.callThrough();
111-
112-
// Act
113-
await mqClient.publish(exchangeName, 'user.deleted', { id: addedOrderId });
114-
115-
// Assert
116-
await waitForAck;
117-
console.timeEnd('putq');
118-
const aQueryForDeletedOrder = await axiosAPIClient.get(
119-
`/order/${addedOrderId}`
120-
);
121-
expect(aQueryForDeletedOrder.status).toBe(404);
122-
console.log('final', aQueryForDeletedOrder.status);
123-
});

recipes/message-queue/real-message-queue.test.js

+39-8
Original file line numberDiff line numberDiff line change
@@ -85,28 +85,59 @@ test.skip('playground 2 When a message is poisoned, then its rejected and put ba
8585

8686
test('When a delete message fails ONCE, than thanks to retry the order is deleted', async () => {
8787
// Arrange
88-
const orderToDelete = testHelpers.addNewOrder(axiosAPIClient);
89-
perTestQueue = testHelpers.createQueueForTest('user.delete');
88+
const addedOrderId = await testHelpers.addNewOrder(axiosAPIClient);
89+
perTestQueue = await testHelpers.createQueueForTest('user.deleted');
9090
const messageQueueClient = await testHelpers.startMQSubscriber(
9191
'real',
9292
perTestQueue.queueName
9393
);
94-
// Replace with our own promise
95-
const waitForAck = once(messageQueueClient, 'ack');
9694
const deleteOrderStub = sinon.stub(orderRepository.prototype, 'deleteOrder');
97-
deleteOrderStub.onFirstCall().rejects(new Error('Cant delete order'));
98-
orderRepository.prototype.deleteOrder.callThrough();
95+
deleteOrderStub.onFirstCall().rejects(new Error('Cant delete order')); // Fail only once
96+
orderRepository.prototype.deleteOrder.callThrough(); // Then on retry succeed
9997

10098
// Act
10199
await messageQueueClient.publish(perTestQueue.exchangeName, 'user.deleted', {
102100
id: addedOrderId,
103101
});
104102

105103
// Assert
106-
await waitForAck;
104+
await messageQueueClient.waitFor('ack', 1);
107105
const aQueryForDeletedOrder = await axiosAPIClient.get(
108106
`/order/${addedOrderId}`
109107
);
110108
expect(aQueryForDeletedOrder.status).toBe(404);
111-
console.log('final', aQueryForDeletedOrder.status);
109+
});
110+
111+
test('When a batch of messages has ONE poisoned message, than only one is rejected (nack)', async () => {
112+
// Arrange
113+
const addedOrderId = await testHelpers.addNewOrder(axiosAPIClient);
114+
perTestQueue = await testHelpers.createQueueForTest('user.deleted');
115+
const messageQueueClient = await testHelpers.startMQSubscriber(
116+
'real',
117+
perTestQueue.queueName
118+
);
119+
const badMessageId = getShortUnique();
120+
const goodMessageId = getShortUnique();
121+
122+
// Act
123+
await messageQueueClient.publish(
124+
perTestQueue.exchangeName,
125+
'user.deleted',
126+
{
127+
id: addedOrderId,
128+
},
129+
goodMessageId
130+
); //good message
131+
await messageQueueClient.publish(
132+
perTestQueue.exchangeName,
133+
'user.deleted',
134+
{
135+
nonExisting: 'invalid',
136+
},
137+
badMessageId
138+
); // bad message
139+
140+
// Assert
141+
const lastNackEvent = await messageQueueClient.waitFor('nack', 1);
142+
expect(lastNackEvent.lastEventData.properties.messageId).toBe(badMessageId);
112143
});

recipes/message-queue/test-helpers.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ module.exports.createQueueForTest = async (bindingPattern) => {
2323
const exchangeName = `user-events-${this.getShortUnique()}`;
2424
mqClient.assertExchange(exchangeName, 'topic');
2525
await mqClient.assertQueue(queueName);
26-
await mqClient.bindQueue(activeQueue, exchangeName, bindingPattern);
26+
await mqClient.bindQueue(queueName, exchangeName, bindingPattern);
2727
return { queueName, exchangeName };
2828
};
2929

@@ -57,7 +57,7 @@ module.exports.startMQSubscriber = async (fakeOrReal, queueName) => {
5757
return messageQueueClient;
5858
};
5959

60-
module.exports.addNewOrder = async (axiosInstance) => {
60+
module.exports.addNewOrder = async (axiosAPIClient) => {
6161
const orderToAdd = {
6262
userId: 1,
6363
productId: 2,

0 commit comments

Comments
 (0)