springboot 结合 rabbitmq 时,消费者这边如果实际业务逻辑消费需要30分钟以上,如何保证消息一直存活

一、

springboot 结合 rabbitmq 时,消费者这边如果实际业务逻辑消费需要30分钟以上,如何保证消息一直存活

为了保证消息不会在消费者逻辑处理时间之前过期并被丢弃,可以采取以下几种方法:

  • 增加消息的过期时间:在发送消息时可以设置消息的过期时间,确保消息在指定的时间内不会过期。具体设置方式可以参考RabbitMQ官方文档,例如在Spring Boot中使用rabbitTemplate.convertAndSend()方法时,可以通过设置MessagePostProcessor来实现。
  • 手动确认消息的消费:在消费者端需要手动确认消息的消费,这样可以确保只有在消费完成后才会将消息标记为已消费,并从队列中删除。在Spring Boot中可以通过设置@RabbitListener注解的ackMode属性为MANUAL来实现。
  • 配置死信队列:当消息过期或者被拒绝消费时,可以将其发送到死信队列中,这样就可以避免消息丢失。在Spring Boot中可以通过配置SimpleRabbitListenerContainerFactory的defaultRequeueRejected和requeueRejected属性来实现。
  • 分布式任务调度系统:如果消费者逻辑处理时间较长,可以考虑使用分布式任务调度系统,将耗时的任务交给任务调度系统去执行,这样可以避免消息过期。常用的任务调度系统有Quartz、Elastic Job等。

二、

在Spring Boot中可以通过设置@RabbitListener注解的ackMode属性为MANUAL来实现

这个需要具体怎么做,消费者端和业务逻辑中分别怎么实现?

在Spring Boot中,通过设置@RabbitListener注解的ackMode属性为MANUAL可以实现手动确认消息。具体步骤分别如下:

消费者端

在消费者类中添加@RabbitListener注解,并设置ackMode属性为MANUAL,示例如下:

@Component
public class MyConsumer {

    @RabbitListener(queues = "myQueue", ackMode = "MANUAL")
    public void handleMessage(Message message, Channel channel) throws IOException {
        try {
            // 处理业务逻辑
            System.out.println("Received message: " + new String(message.getBody()));
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 发生异常时,拒绝消息并将消息重新放回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

}

在handleMessage方法中,处理消息的业务逻辑并手动确认消息。如果业务逻辑处理成功,则调用channel.basicAck方法确认消息;如果发生异常,则调用channel.basicNack方法拒绝消息并将消息重新放回队列。

业务逻辑中

在处理业务逻辑时,需要考虑到消息可能被重复消费的情况。因此,建议在处理消息前先判断消息是否已经被处理过,如果已经处理过,则直接返回并不进行处理。示例代码如下:

@Component
public class MyConsumer {

    @Autowired
    private MessageRepository messageRepository;

    @RabbitListener(queues = "myQueue", ackMode = "MANUAL")
    public void handleMessage(Message message, Channel channel) throws IOException {
        try {
            // 判断消息是否已经处理过
            if (messageRepository.findById(message.getId()).isPresent()) {
                System.out.println("Message has been processed, ignore it.");
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            }
            // 处理业务逻辑
            System.out.println("Received message: " + new String(message.getBody()));
            // 保存已处理的消息
            messageRepository.save(message);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 发生异常时,拒绝消息并将消息重新放回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

}

在上述代码中,messageRepository是一个用于保存消息的仓库。在处理消息时,先判断消息是否已经被处理过,如果已经处理过,则直接返回并调用channel.basicAck方法确认消息;否则,处理业务逻辑,并在处理完成后保存已处理的消息,并调用channel.basicAck方法确认消息。如果发生异常,则调用channel.basicNack方法拒绝消息并将消息重新放回队列。

需要注意的是,在手动确认消息时,第二个参数指定为false表示只确认当前消息,而不确认之前未确认的消息。如果设置为true则表示确认当前消息以及之前未确认的所有消息。因此,如果处理业务逻辑时发生了异常,应该调用channel.basicNack方法拒绝当前消息并将其重新放回队列,以避免之前未确认的消息也被确认。


三、

例如在Spring Boot中使用rabbitTemplate.convertAndSend()方法时,可以通过设置MessagePostProcessor来实现。这个具体怎么实现?

在 Spring Boot 中使用 rabbitTemplate.convertAndSend() 方法时,可以使用 MessagePostProcessor 来修改消息的属性。MessagePostProcessor 可以在将消息发送到 RabbitMQ 之前修改消息的一些属性,比如设置消息的过期时间、添加自定义的头信息等。

下面是一个使用 MessagePostProcessor 的示例:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String exchange, String routingKey, Object message) {
        MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
        MessageProperties messageProperties = new MessageProperties();
        MessageHeaders messageHeaders = new MessageHeaders(null);
        MessagePostProcessor messagePostProcessor = message -> {
            messageProperties.setExpiration("10000");
            messageProperties.setHeader("customHeader", "customHeaderValue");
            return message;
        };
        Message amqpMessage = messageConverter.toMessage(message, messageProperties);
        rabbitTemplate.convertAndSend(exchange, routingKey, amqpMessage, messagePostProcessor);
    }

}

在上面的代码中,我们创建了一个 MessagePostProcessor 对象,并在其中设置了消息的过期时间和自定义头信息。然后,我们调用 rabbitTemplate.convertAndSend() 方法,并将 MessagePostProcessor 作为第四个参数传递给该方法。

当你运行这段代码并发送一条消息时,RabbitMQ 将接收到带有设置的过期时间和头信息的消息。

rabbitMQSender.send("exchangeName", "routingKey", "messageBody");

需要注意的是,在使用 MessagePostProcessor 时,你需要使用 MessageConverter 将消息对象转换为 AMQP 消息,并使用 MessageProperties 设置消息的属性。这对于将 Java 对象转换为 RabbitMQ 消息并设置其属性非常有帮助。


四、

这个时间的单位是什么 messageProperties.setExpiration("10000");

这个时间的单位是毫秒。

根据代码看,messageProperties.setExpiration("10000"); 意味着消息将会在发送后10秒钟过期并被丢弃,因为设置了一个10,000毫秒的过期时间。


五、

shell 代码中,一行中的开头有个冒号是什么意思?

在 shell 代码中,一行开头有个冒号通常表示一个空语句(null command)。它并不会执行任何操作,只是返回一个 true 值。这个特性通常用于占位符或者在某些条件下需要执行一些无操作的情况。例如:

: # 这是一个注释
: > file.txt # 创建一个空文件
if [ -z "$VAR" ]; then :; else echo "$VAR is set"; fi # 如果 VAR 为空则不做任何事
需要注意的是,在某些情况下,冒号也可以作为命令的参数使用,例如:

echo : # 输出冒号
ls -l : # 列出名为“:”文件的属性

因此,如果您看到一行代码开头有冒号,要根据上下文来确定它的具体含义。

添加新评论