程序地带

.NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ Masstransit 异常处理)--学习笔记


2.6.8 RabbitMQ -- Masstransit 异常处理
异常处理
其他
高级功能
异常处理
异常与重试
重试配置
重试条件
重新投递信息
信箱
异常与重试

Exception


public class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
public Task Consume(ConsumeContext<SubmitOrder> context)
{
throw new Exception("Very bad things happened");
}
}

UseMessageRetry


var sessionFactory = CreateSessionFactory();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("rabbitmq://localhost/");
cfg.ReceiveEndpoint("submit-order", e =>
{
e.UseMessageRetry(r => r.Immediate(5));
e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});
});
重试配置


// 立即重试:一共连续重试10次
ep.UseMessageRetry(r => r.Immediate(10));
// 间隔重试:一共重试10次,每次间隔10秒
ep.UseMessageRetry(r => r.Interval(10, TimeSpan.FromSeconds(10)));
// 多个间隔重试:5秒后第一次,5+10秒后第二次,5+10+15秒后第三次
ep.UseMessageRetry(r => r.Intervals(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(15)));
// 指数级间隔重试:共10次,每次间隔:当前重试次数 * 60秒
ep.UseMessageRetry(r => r.Exponential(10, TimeSpan.FromSeconds(60), TimeSpan.FromHours(24), TimeSpan.FromSeconds(60)));
// 每次叠加50秒
ep.UseMessageRetry(r => r.Incremental(10, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(50)));
重试条件
e.UseMessageRetry(r =>
{
r.Handle<ArgumentNullException>();
r.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException));
r.Ignore<ArgumentException>(t => t.ParamName == "orderTotal");
});

过滤某些异常类型不进行重试


重新投递信息
cfg.ReceiveEndpoint("submit-order", e =>
{
e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
e.UseMessageRetry(r => r.Immediate(5));
e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});

消息冲队列移除之后,在一定时间之后重新投入消息队列。需要配置调度模块(scheduling)


信箱
cfg.ReceiveEndpoint("submit-order", e =>
{
e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
e.UseMessageRetry(r => r.Immediate(5));
e.UseInMemoryOutbox();
e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});

有些消息是在 consume 方法中发送或发布的,如果在发送之后 consume 中产生了异常,那原来发出去的消息就需要撤回,如果使用信箱之后,在 consume 中要发布/发送的消息就会先暂存在内存中直到 consume 方法成功之后才真正发出去


其他
Fault
Consuming Faults
Error Pipe
Dead-Letter Pipe
Fault
public interface Fault<T>
where T : class
{
Guid FaultId { get; }
Guid? FaultedMessageId { get; }
DateTime Timestamp { get; }
ExceptionInfo[] Exceptions { get; }
HostInfo Host { get; }
T Message { get; }
}

Fault 消息在异常的时候会发布出来


Consuming Faults
public class DashboardFaultConsumer :
IConsumer<Fault<SubmitOrder>>
{
public async Task Consume(ConsumeContext<Fault<SubmitOrder>> context)
{
// update the dashboard
}
}

Fault 消息也是可以进行订阅的


Error Pipe
cfg.ReceiveEndpoint("input-queue", ec =>
{
ec.DiscardFaultedMessages();
});

默认情况下错误的消息会被投递到了 _error 队列,可以配置直接抛弃错误信息


Dead-Letter Pipe
cfg.ReceiveEndpoint("input-queue", ec =>
{
ec.DiscardSkippedMessages();
});

死信队列:没有消费者的消息会被移到 _skipped 队列,但可以配置为不移到 _skipped 队列


高级功能
持久化
Saga 事件串
调度
Courier 最终一致性
监控

知识共享许可协议


本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。


欢迎转载、使用、重新发布,但务必保留文章署名 郑子铭 (包含链接: http://www.cnblogs.com/MingsonZheng/ ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。


如有任何疑问,请与我联系 ([email protected]) 。


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/MingsonZheng/p/14280129.html

随机推荐

基础类型与包装类

基础类型与包装类区别默认值不同:int的初值为0,Ingeter的初值为null;数据类型不同:int是基本数据类型,Integer是引用数据类型(对象)&#...

脑神 阅读(631)

函数成功,全变

#include<iostream>usingnamespacestd;voidswap(int&x,int&y){inttemp;temp=x;x=y;y=tem...

humble 小贾 阅读(365)

宝尚快讯动力源炒作的是“储能”

军工板块:现在依然强势,板块一向没高潮也就不存在结束,持续重视即可,这个方向是1月份的主线之一。飞龙股份昨日盘之感要点提示我们的,...

宝尚配资 阅读(355)

Linux下若没有SPI控制器,GPIO火速来救主!

Linux下若没有SPI控制器,GPIO火速来救主!

[导读]干过单片机的盆友或许都拿IO口对着时序模拟过SPI主控制器,在做嵌入式Linux设备开发时,发现SPI对应的脚都被用了,或者被当成别的用途了ÿ...

嵌入式资讯精选 阅读(483)