Skip to content

Commit fff698e

Browse files
committed
add dead letter exchanges
1 parent 939c5aa commit fff698e

File tree

9 files changed

+141
-93
lines changed

9 files changed

+141
-93
lines changed

asset-manager/README.md

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ RabbitMQ(RabbitMQ)
2525
%% Database
2626
PostgreSQL[(PostgreSQL)]
2727
28+
%% Queues
29+
Queue[image-processing queue]
30+
RetryQueue[image-processing.retry queue]
31+
2832
%% User
2933
User([User])
3034
@@ -42,7 +46,10 @@ WebApp -->|Retrieve Images| LocalFS
4246
WebApp -->|Retrieve Metadata| PostgreSQL
4347
4448
%% RabbitMQ Flow
45-
RabbitMQ -->|Push Message| Worker
49+
RabbitMQ -->|Push Message| Queue
50+
Queue -->|Processing Failed| RetryQueue
51+
RetryQueue -->|After 1 min delay| Queue
52+
Queue -->|Consume Message| Worker
4653
4754
%% Worker Flow
4855
Worker -->|Download Original| S3
@@ -93,6 +100,10 @@ ServiceBus(Azure Service Bus)
93100
%% Azure Database
94101
AzPostgreSQL[(Azure PostgreSQL)]
95102
103+
%% Queues
104+
Queue[image-processing queue]
105+
RetryQueue[retry queue]
106+
96107
%% User
97108
User([User])
98109
@@ -110,7 +121,10 @@ WebApp -->|Retrieve Images| LocalFS
110121
WebApp -->|Retrieve Metadata| AzPostgreSQL
111122
112123
%% Service Bus Flow
113-
ServiceBus -->|Push Message| Worker
124+
ServiceBus -->|Push Message| Queue
125+
Queue -->|Processing Failed| RetryQueue
126+
RetryQueue -->|After 1 min delay| Queue
127+
Queue -->|Consume Message| Worker
114128
115129
%% Worker Flow
116130
Worker -->|Download Original| AzBlob
@@ -125,7 +139,7 @@ classDef app fill:#90caf9,stroke:#0d47a1,color:#0d47a1
125139
classDef storage fill:#68B3A1,stroke:#006064,color:#006064
126140
classDef broker fill:#B39DDB,stroke:#4527A0,color:#4527A0
127141
classDef db fill:#90CAF9,stroke:#1565C0,color:#1565C0
128-
classDef queue fill:#fff59d,stroke:#f57f17,color:#f57f17
142+
classDef queue fill:#81C784,stroke:#2E7D32,color:#2E7D32
129143
classDef user fill:#ef9a9a,stroke:#b71c1c,color:#b71c1c
130144
131145
class WebApp,Worker app

asset-manager/scripts/start.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ if not exist "%PROJECT_ROOT%\pids" mkdir "%PROJECT_ROOT%\pids"
2222

2323
echo Starting web module...
2424
cd /d "%PROJECT_ROOT%\web"
25-
start "Web Module" cmd /c "%PROJECT_ROOT%\mvnw.cmd spring-boot:run -Dspring-boot.run.jvmArguments=-Dspring.pid.file=%PROJECT_ROOT%\pids\web.pid -Dspring-boot.run.profiles=dev"
25+
start "Web Module" cmd /k "%PROJECT_ROOT%\mvnw.cmd spring-boot:run -Dspring-boot.run.jvmArguments=-Dspring.pid.file=%PROJECT_ROOT%\pids\web.pid -Dspring-boot.run.profiles=dev"
2626

2727
echo Starting worker module...
2828
cd /d "%PROJECT_ROOT%\worker"

asset-manager/web/src/main/java/com/microsoft/migration/assets/config/RabbitConfig.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package com.microsoft.migration.assets.config;
22

33
import org.springframework.amqp.core.AcknowledgeMode;
4+
import org.springframework.amqp.core.Binding;
5+
import org.springframework.amqp.core.BindingBuilder;
6+
import org.springframework.amqp.core.DirectExchange;
47
import org.springframework.amqp.core.Queue;
58
import org.springframework.amqp.core.QueueBuilder;
69
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
@@ -13,11 +16,44 @@
1316

1417
@Configuration
1518
public class RabbitConfig {
16-
public static final String QUEUE_NAME = "image-processing";
19+
public static final String IMAGE_PROCESSING_QUEUE = "image-processing";
20+
21+
// Dead letter queue configuration for the retry mechanism
22+
public static final String RETRY_EXCHANGE = "image-processing.retry";
23+
public static final String RETRY_QUEUE = "image-processing.retry";
24+
public static final String RETRY_ROUTING_KEY = "retry";
25+
public static final int RETRY_DELAY_MS = 60000; // 1 minute delay
1726

1827
@Bean
1928
public Queue imageProcessingQueue() {
20-
return QueueBuilder.durable(QUEUE_NAME).build();
29+
return QueueBuilder.durable(IMAGE_PROCESSING_QUEUE
30+
)
31+
.withArgument("x-dead-letter-exchange", RETRY_EXCHANGE)
32+
.withArgument("x-dead-letter-routing-key", RETRY_ROUTING_KEY)
33+
.build();
34+
}
35+
36+
@Bean
37+
public Queue retryQueue() {
38+
return QueueBuilder.durable(RETRY_QUEUE)
39+
.withArgument("x-dead-letter-exchange", "")
40+
.withArgument("x-dead-letter-routing-key", IMAGE_PROCESSING_QUEUE
41+
)
42+
.withArgument("x-message-ttl", RETRY_DELAY_MS)
43+
.build();
44+
}
45+
46+
@Bean
47+
public DirectExchange retryExchange() {
48+
return new DirectExchange(RETRY_EXCHANGE);
49+
}
50+
51+
@Bean
52+
public Binding retryBinding() {
53+
return BindingBuilder
54+
.bind(retryQueue())
55+
.to(retryExchange())
56+
.with(RETRY_ROUTING_KEY);
2157
}
2258

2359
@Bean
@@ -33,6 +69,7 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
3369
configurer.configure(factory, connectionFactory);
3470
factory.setMessageConverter(jsonMessageConverter());
3571
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
72+
factory.setDefaultRequeueRejected(false);
3673
return factory;
3774
}
3875
}

asset-manager/web/src/main/java/com/microsoft/migration/assets/service/AwsS3Service.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.UUID;
2222
import java.util.stream.Collectors;
2323

24-
import static com.microsoft.migration.assets.config.RabbitConfig.QUEUE_NAME;
24+
import static com.microsoft.migration.assets.config.RabbitConfig.IMAGE_PROCESSING_QUEUE;
2525

2626
@Service
2727
@RequiredArgsConstructor
@@ -83,7 +83,7 @@ public void uploadObject(MultipartFile file) throws IOException {
8383
getStorageType(),
8484
file.getSize()
8585
);
86-
rabbitTemplate.convertAndSend(QUEUE_NAME, message);
86+
rabbitTemplate.convertAndSend(IMAGE_PROCESSING_QUEUE, message);
8787

8888
// Create and save metadata to database
8989
ImageMetadata metadata = new ImageMetadata();

asset-manager/web/src/main/java/com/microsoft/migration/assets/service/BackupMessageProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import org.springframework.context.annotation.Profile;
1010
import org.springframework.stereotype.Component;
1111

12-
import static com.microsoft.migration.assets.config.RabbitConfig.QUEUE_NAME;
12+
import static com.microsoft.migration.assets.config.RabbitConfig.IMAGE_PROCESSING_QUEUE;
1313

1414
import java.io.IOException;
1515

@@ -27,7 +27,7 @@ public class BackupMessageProcessor {
2727
* Processes image messages from a backup queue for monitoring and resilience purposes.
2828
* Uses the same RabbitMQ API pattern as the worker module.
2929
*/
30-
@RabbitListener(queues = QUEUE_NAME)
30+
@RabbitListener(queues = IMAGE_PROCESSING_QUEUE)
3131
public void processBackupMessage(final ImageProcessingMessage message,
3232
Channel channel,
3333
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {

asset-manager/web/src/main/java/com/microsoft/migration/assets/service/LocalFileStorageService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.util.List;
2020
import java.util.stream.Collectors;
2121

22-
import static com.microsoft.migration.assets.config.RabbitConfig.QUEUE_NAME;
22+
import static com.microsoft.migration.assets.config.RabbitConfig.IMAGE_PROCESSING_QUEUE;
2323

2424
@Service
2525
@Profile("dev") // Only active when dev profile is active
@@ -102,7 +102,7 @@ public void uploadObject(MultipartFile file) throws IOException {
102102
getStorageType(),
103103
file.getSize()
104104
);
105-
rabbitTemplate.convertAndSend(QUEUE_NAME, message);
105+
rabbitTemplate.convertAndSend(IMAGE_PROCESSING_QUEUE, message);
106106
}
107107

108108
@Override
Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package com.microsoft.migration.assets.worker.config;
22

33
import org.springframework.amqp.core.AcknowledgeMode;
4+
import org.springframework.amqp.core.Binding;
5+
import org.springframework.amqp.core.BindingBuilder;
6+
import org.springframework.amqp.core.DirectExchange;
47
import org.springframework.amqp.core.Queue;
58
import org.springframework.amqp.core.QueueBuilder;
69
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
@@ -10,19 +13,45 @@
1013
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
1114
import org.springframework.context.annotation.Bean;
1215
import org.springframework.context.annotation.Configuration;
13-
import org.springframework.retry.backoff.FixedBackOffPolicy;
14-
import org.springframework.retry.policy.SimpleRetryPolicy;
15-
import org.springframework.retry.support.RetryTemplate;
1616

1717
@Configuration
1818
public class RabbitConfig {
19-
public static final String QUEUE_NAME = "image-processing";
19+
public static final String IMAGE_PROCESSING_QUEUE = "image-processing";
20+
21+
// Dead letter queue configuration for the retry mechanism
22+
public static final String RETRY_EXCHANGE = "image-processing.retry";
23+
public static final String RETRY_QUEUE = "image-processing.retry";
24+
public static final String RETRY_ROUTING_KEY = "retry";
2025
public static final int RETRY_DELAY_MS = 60000; // 1 minute delay
21-
public static final int MAX_ATTEMPTS = 3; // Maximum number of retry attempts
2226

2327
@Bean
2428
public Queue imageProcessingQueue() {
25-
return QueueBuilder.durable(QUEUE_NAME).build();
29+
return QueueBuilder.durable(IMAGE_PROCESSING_QUEUE)
30+
.withArgument("x-dead-letter-exchange", RETRY_EXCHANGE)
31+
.withArgument("x-dead-letter-routing-key", RETRY_ROUTING_KEY)
32+
.build();
33+
}
34+
35+
@Bean
36+
public Queue retryQueue() {
37+
return QueueBuilder.durable(RETRY_QUEUE)
38+
.withArgument("x-dead-letter-exchange", "")
39+
.withArgument("x-dead-letter-routing-key", IMAGE_PROCESSING_QUEUE)
40+
.withArgument("x-message-ttl", RETRY_DELAY_MS)
41+
.build();
42+
}
43+
44+
@Bean
45+
public DirectExchange retryExchange() {
46+
return new DirectExchange(RETRY_EXCHANGE);
47+
}
48+
49+
@Bean
50+
public Binding retryBinding() {
51+
return BindingBuilder
52+
.bind(retryQueue())
53+
.to(retryExchange())
54+
.with(RETRY_ROUTING_KEY);
2655
}
2756

2857
@Bean
@@ -38,24 +67,8 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
3867
configurer.configure(factory, connectionFactory);
3968
factory.setMessageConverter(jsonMessageConverter());
4069
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
70+
factory.setDefaultRequeueRejected(false);
4171
return factory;
4272
}
43-
44-
@Bean
45-
public RetryTemplate retryTemplate() {
46-
RetryTemplate retryTemplate = new RetryTemplate();
47-
48-
// Configure retry policy (number of attempts)
49-
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
50-
retryPolicy.setMaxAttempts(MAX_ATTEMPTS);
51-
retryTemplate.setRetryPolicy(retryPolicy);
52-
53-
// Configure backoff policy (delay between retries)
54-
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
55-
backOffPolicy.setBackOffPeriod(RETRY_DELAY_MS);
56-
retryTemplate.setBackOffPolicy(backOffPolicy);
57-
58-
return retryTemplate;
59-
}
6073

6174
}

asset-manager/worker/src/main/java/com/microsoft/migration/assets/worker/service/AbstractFileProcessingService.java

Lines changed: 21 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,7 @@
66
import lombok.extern.slf4j.Slf4j;
77
import org.springframework.amqp.rabbit.annotation.RabbitListener;
88
import org.springframework.amqp.support.AmqpHeaders;
9-
import org.springframework.beans.factory.annotation.Autowired;
109
import org.springframework.messaging.handler.annotation.Header;
11-
import org.springframework.retry.RetryCallback;
12-
import org.springframework.retry.RetryContext;
13-
import org.springframework.retry.support.RetryTemplate;
1410

1511
import javax.imageio.ImageIO;
1612
import java.awt.Graphics2D;
@@ -20,49 +16,16 @@
2016
import java.nio.file.Files;
2117
import java.nio.file.Path;
2218

23-
import static com.microsoft.migration.assets.worker.config.RabbitConfig.QUEUE_NAME;
19+
import static com.microsoft.migration.assets.worker.config.RabbitConfig.IMAGE_PROCESSING_QUEUE;
2420

2521
@Slf4j
2622
public abstract class AbstractFileProcessingService implements FileProcessor {
2723

28-
@Autowired
29-
private RetryTemplate retryTemplate;
30-
31-
@RabbitListener(queues = QUEUE_NAME)
24+
@RabbitListener(queues = IMAGE_PROCESSING_QUEUE)
3225
public void processImage(final ImageProcessingMessage message,
3326
Channel channel,
3427
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
35-
try {
36-
retryTemplate.execute(new RetryCallback<Void, Exception>() {
37-
@Override
38-
public Void doWithRetry(RetryContext context) throws Exception {
39-
if (context.getRetryCount() > 0) {
40-
log.info("Retry attempt {} for image: {}", context.getRetryCount(), message.getKey());
41-
}
42-
43-
processImageWithRetry(message);
44-
return null;
45-
}
46-
});
47-
48-
// Success - acknowledge the message
49-
log.debug("Acknowledging message after successful processing: {}", message.getKey());
50-
channel.basicAck(deliveryTag, false);
51-
} catch (Exception e) {
52-
log.error("All retry attempts failed for image: " + message.getKey(), e);
53-
54-
try {
55-
// After all retries are exhausted, reject the message
56-
// to retry later, use basicNack with requeue=true
57-
log.debug("Rejecting message after all retry attempts failed: {}", message.getKey());
58-
channel.basicNack(deliveryTag, false, true);
59-
} catch (IOException ackEx) {
60-
log.error("Error handling RabbitMQ acknowledgment for: {}", message.getKey(), ackEx);
61-
}
62-
}
63-
}
64-
65-
private void processImageWithRetry(ImageProcessingMessage message) {
28+
boolean processingSuccess = false;
6629
Path tempDir = null;
6730
Path originalFile = null;
6831
Path thumbnailFile = null;
@@ -87,13 +50,17 @@ private void processImageWithRetry(ImageProcessingMessage message) {
8750
uploadThumbnail(thumbnailFile, thumbnailKey, message.getContentType());
8851

8952
log.info("Successfully processed image: {}", message.getKey());
53+
54+
// Mark processing as successful
55+
processingSuccess = true;
9056
} else {
9157
log.debug("Skipping message with storage type: {} (we handle {})",
9258
message.getStorageType(), getStorageType());
59+
// This is not an error, just not for this service, so we can acknowledge
60+
processingSuccess = true;
9361
}
9462
} catch (Exception e) {
9563
log.error("Failed to process image: " + message.getKey(), e);
96-
throw new RuntimeException("Failed to process image: " + message.getKey(), e);
9764
} finally {
9865
try {
9966
// Cleanup temporary files
@@ -106,12 +73,23 @@ private void processImageWithRetry(ImageProcessingMessage message) {
10673
if (tempDir != null) {
10774
Files.deleteIfExists(tempDir);
10875
}
76+
77+
if (processingSuccess) {
78+
// Acknowledge the message if processing was successful
79+
channel.basicAck(deliveryTag, false);
80+
log.debug("Message acknowledged for: {}", message.getKey());
81+
} else {
82+
// Reject the message with requeue=false to trigger dead letter exchange
83+
// This will route the message to the retry queue with delay
84+
channel.basicNack(deliveryTag, false, false);
85+
log.debug("Message rejected and sent to dead letter exchange for delayed retry: {}", message.getKey());
86+
}
10987
} catch (IOException e) {
110-
log.error("Error cleaning up temporary files for: {}", message.getKey(), e);
88+
log.error("Error handling RabbitMQ acknowledgment for: {}", message.getKey(), e);
11189
}
11290
}
11391
}
114-
92+
11593
protected abstract String generateUrl(String key);
11694

11795
protected void generateThumbnail(Path input, Path output) throws IOException {

0 commit comments

Comments
 (0)