8. Implement queue
- Trong phần này, chúng ta sẽ tìm hiểu cách triển khai hàng đợi trong ứng dụng Spring Boot của chúng ta. Hàng đợi cho phép chúng ta xử lý các tác vụ bất đồng bộ một cách hiệu quả và linh hoạt. Ví dụ này, sẽ giả lập việc gửi email bằng cách sử dụng hàng đợi.
Cấu hình
package com.example.demo.config;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import com.example.demo.models.EmailMessage;
@Configuration
@EnableAsync
@EnableScheduling
public class EmailQueueConfig {
@Bean("emailQueue")
public BlockingQueue<EmailMessage> emailQueue() {
return new LinkedBlockingQueue<>(1000); // Max 1000 emails trong queue
}
@Bean("emailTaskExecutor")
public Executor emailTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("email-sender-");
executor.initialize();
return executor;
}
}
Khai báo hàng đợi
package com.example.demo.models;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
@Setter
@Getter
@Builder
public class EmailMessage {
private String to;
private String subject;
private String body;
}
Khai báo hàng đợi
package com.example.demo.config;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import com.example.demo.models.EmailMessage;
@Configuration
@EnableAsync
@EnableScheduling
public class EmailQueueConfig {
@Bean("emailQueue")
public BlockingQueue<EmailMessage> emailQueue() {
return new LinkedBlockingQueue<>(1000); // Max 1000 emails trong queue
}
@Bean("emailTaskExecutor")
public Executor emailTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("email-sender-");
executor.initialize();
return executor;
}
}
Áp dụng
package com.example.demo.services;
import com.example.demo.models.EmailMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.concurrent.BlockingQueue;
@Service
public class EmailQueueService {
private static final Logger logger = LoggerFactory.getLogger(EmailQueueService.class);
// Đây là loại queue (có tính chất FIFO - First In First Out)
@Qualifier("emailQueue")
private final BlockingQueue<EmailMessage> emailQueue;
public EmailQueueService(@Qualifier("emailQueue") BlockingQueue<EmailMessage> emailQueue) {
this.emailQueue = emailQueue;
}
// Lấy kích thước queue hiện tại
public int getQueueSize() {
return emailQueue.size();
}
// Process email queue - chạy mỗi 5 giây
@Scheduled(fixedDelay = 5000)
public void processEmailQueue() {
processNextEmail();
}
@Async("emailTaskExecutor")
public void processNextEmail() {
// Lấy email tiếp theo từ queue và gửi
logger.info("Processing next email from the queue...");
if (emailQueue.isEmpty()) {
logger.info("Email queue is empty.");
return;
}
EmailMessage emailMessage = emailQueue.poll();
if (emailMessage != null) {
// Giả lập gửi email
// Thực hiện gửi email ở đây (ví dụ: sử dụng JavaMailSender)
// ...
// Dùng JavaMailSender để gửi email
// ...
try {
Thread.sleep(1000); // Giả lập thời gian gửi email
} catch (InterruptedException e) {
logger.error("Error while sending email: {}", e.getMessage());
}
logger.info("🚀 Email sent successfully to: {}", emailMessage.getTo());
} else {
logger.info("No emails to process in the queue.");
}
}
// Thêm email vào queue
public boolean addToQueue(EmailMessage emailMessage) {
try {
boolean added = emailQueue.offer(emailMessage);
if (added) {
logger.info("Email added to queue: {}", emailMessage);
} else {
logger.warn("Failed to add email to queue (queue might be full): {}", emailMessage);
}
return added;
} catch (Exception e) {
logger.error("Error adding email to queue: {}", emailMessage, e);
return false;
}
}
}
Thử nghiệm
package com.example.demo.controllers;
import java.util.HashMap;
import java.util.Map;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.demo.models.EmailMessage;
import com.example.demo.services.EmailQueueService;
@RestController
@RequestMapping("/api/email")
public class EmailController {
private final EmailQueueService emailQueueService;
public EmailController(EmailQueueService emailQueueService) {
this.emailQueueService = emailQueueService;
}
@PostMapping("/send")
public ResponseEntity<Map<String, Object>> sendEmail(@RequestBody EmailMessage emailMessage) {
Map<String, Object> response = new HashMap<>();
boolean queued = emailQueueService.addToQueue(emailMessage);
if (queued) {
response.put("success", true);
response.put("message", "Email added to queue successfully");
response.put("queueSize", emailQueueService.getQueueSize());
} else {
response.put("success", false);
response.put("message", "Failed to add email to queue");
}
return ResponseEntity.ok(response);
}
}