9. Implement Redis Queue
- Trong phần này, chúng ta sẽ tìm hiểu cách triển khai hàng đợi sử dụng Redis trong ứng dụng Spring Boot của chúng ta. Hàng đợi Redis cho phép chúng ta xử lý các tác vụ bất đồng bộ một cách hiệu quả và linh hoạt, đồng thời cung cấp khả năng mở rộng tốt hơn so với việc sử dụng hàng đợi trong bộ nhớ.
Cài đặt
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
Cấu hình kết nối Redis
# Redis Configuration
spring.data.redis.host=${SPRING_REDIS_HOST:localhost}
spring.data.redis.port=${SPRING_REDIS_PORT:6379}
spring.data.redis.password=${SPRING_REDIS_PASSWORD:redis_password}
spring.data.redis.timeout=60000
spring.data.redis.database=0
## Khai báo hàng đợi
package com.example.demo.models;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
@Setter
@Getter
@Builder
public class EmailMessage {
private String to;
private String subject;
private String body;
}
package com.example.demo.config;
import java.util.concurrent.Executor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import com.example.demo.models.EmailMessage;
@Configuration
@EnableAsync
@EnableScheduling
public class EmailRedisQueueConfig {
@Bean("emailRedisTemplate")
public RedisTemplate<String, EmailMessage> emailRedisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, EmailMessage> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
// Sử dụng StringRedisSerializer cho key
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
// Sử dụng Jackson serializer cho value
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
template.afterPropertiesSet();
return template;
}
@Bean("emailRedisTaskExecutor")
public Executor emailRedisTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("email-redis-sender-");
executor.initialize();
return executor;
}
}
Áp dụng
package com.example.demo.services;
import com.example.demo.models.EmailMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class EmailRedisQueueService {
private static final Logger logger = LoggerFactory.getLogger(EmailRedisQueueService.class);
private static final String EMAIL_QUEUE_KEY = "email:queue";
private final RedisTemplate<String, EmailMessage> redisTemplate;
public EmailRedisQueueService(@Qualifier("emailRedisTemplate") RedisTemplate<String, EmailMessage> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* Thêm email vào Redis queue
*
* @param emailMessage Email cần gửi
* @return true nếu thêm thành công, false nếu thất bại
*/
public boolean addToQueue(EmailMessage emailMessage) {
try {
// Sử dụng LPUSH để thêm email vào đầu list (FIFO khi dùng RPOP)
Long queueSize = redisTemplate.opsForList().leftPush(EMAIL_QUEUE_KEY, emailMessage);
logger.info("Email added to Redis queue: {}. Current queue size: {}", emailMessage, queueSize);
return true;
} catch (Exception e) {
logger.error("Failed to add email to Redis queue: {}", e.getMessage(), e);
return false;
}
}
/**
* Lấy kích thước queue hiện tại
*
* @return Số lượng email trong queue
*/
public long getQueueSize() {
try {
Long size = redisTemplate.opsForList().size(EMAIL_QUEUE_KEY);
return size != null ? size : 0;
} catch (Exception e) {
logger.error("Failed to get queue size from Redis: {}", e.getMessage(), e);
return 0;
}
}
/**
* Xử lý email queue - chạy mỗi 60 giây
*/
@Scheduled(fixedDelay = 60000)
public void processEmailQueue() {
processNextEmail();
}
/**
* Xử lý email tiếp theo trong queue
*/
@Async("emailRedisTaskExecutor")
public void processNextEmail() {
try {
logger.info("Processing next email from Redis queue...");
// Sử dụng RPOP để lấy email từ cuối list (FIFO)
EmailMessage emailMessage = redisTemplate.opsForList().rightPop(EMAIL_QUEUE_KEY);
if (emailMessage == null) {
logger.info("Redis email queue is empty.");
return;
}
// Giả lập gửi email
// Thực hiện gửi email ở đây (ví dụ: sử dụng JavaMailSender)
try {
Thread.sleep(1000); // Giả lập thời gian gửi email
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Email processing interrupted: {}", e.getMessage());
return;
}
logger.info("🚀 Email sent successfully from Redis queue to: {}", emailMessage.getTo());
logger.info("Remaining emails in Redis queue: {}", getQueueSize());
} catch (Exception e) {
logger.error("Error processing email from Redis queue: {}", e.getMessage(), e);
}
}
}
Thử nghiệm
package com.example.demo.controllers;
import com.example.demo.models.EmailMessage;
import com.example.demo.services.EmailRedisQueueService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/api/email-redis")
public class EmailRedisController {
private final EmailRedisQueueService emailRedisQueueService;
public EmailRedisController(EmailRedisQueueService emailRedisQueueService) {
this.emailRedisQueueService = emailRedisQueueService;
}
@PostMapping("/send")
public ResponseEntity<Map<String, Object>> sendEmail(@RequestBody EmailMessage emailMessage) {
Map<String, Object> response = new HashMap<>();
boolean queued = emailRedisQueueService.addToQueue(emailMessage);
if (queued) {
response.put("success", true);
response.put("message", "Email added to Redis queue successfully");
response.put("queueSize", emailRedisQueueService.getQueueSize());
} else {
response.put("success", false);
response.put("message", "Failed to add email to Redis queue");
}
return ResponseEntity.ok(response);
}
}