程序地带

.NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ Masstransit 详解)--学习笔记


2.6.7 RabbitMQ -- Masstransit 详解
Consumer 消费者
Producer 生产者
Request-Response 请求-响应
Consumer 消费者

在 MassTransit 中,一个消费者可以消费一种或多种消息


消费者的类型包括:普通消费者,saga,saga 状态机,路由活动(分布式追踪),处理器 handlers,工作消费者 job comsumers


Consumer
Instance
Handler
Others
Consumer
public class Program
{
public static async Task Main()
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint("order-service", e =>
{
e.Consumer<SubmitOrderConsumer>();
});
});
}
}

继承 IConsumer,实现 Consume 方法


class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
await context.Publish<OrderSubmitted>(new
{
context.Message.OrderId
});
}
}

三个原则:


拥抱 The Hollywood Principle, which states, "Dont"t call us, we"ll call you."
Consume 方法是一个被等待的方法,在执行中时其他消费者无法接收到这个消息,当这个方法完成的时候,消息被 ack,并且从队列中移除
Task 方法异常会导致消息触发 retry,如果没有配置重试,消息将被投递到失败队列
Instance
public class Program
{
public static async Task Main()
{
var submitOrderConsumer = new SubmitOrderConsumer();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint("order-service", e =>
{
e.Instance(submitOrderConsumer);
});
});
}
}

所有接收到的消息都由一个消费者来实例来处理(请确保这个消费者类是线程安全)


Consumer 每次接收到消息都会 new 一个实例


Handler
public class Program
{
public static async Task Main()
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint("order-service", e =>
{
e.Handler<SubmitOrder>(async context =>
{
await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}");
});
});
});
}
}

通过一个委托 Lambda 方法,来消费消息


Others
Saga<>
StateMachineSaga<>
Producer 生产者

消息的生产可以通过两种方式产生:发送和发布


发送的时候需要指定一个具体的地址 DestinationAddress,发布的时候消息会被广播给所有订阅了这个消息类型的消费者


基于这两种规则,消息被定义为:命令 command 和事件 event


send
publish
send

可以调用以下对象的 send 方法来发送 command:


ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
ISendEndpointProvider(可以从 DI 中获取)
IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)
ConsumeContext
public class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
private readonly IOrderSubmitter _orderSubmitter;
public SubmitOrderConsumer(IOrderSubmitter submitter)
=> _orderSubmitter = submitter;
public async Task Consume(IConsumeContext<SubmitOrder> context)
{
await _orderSubmitter.Process(context.Message);
await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
}
}
ISendEndpointProvider
public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
{
var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);
await endpoint.Send(new SubmitOrder { OrderId = "123" });
}
publish
发送地址
短地址
Convention Map
发送地址
rabbitmq://localhost/input-queue
rabbitmq://localhost/input-queue?durable=false
短地址
GetSendEndpoint(new Uri("queue:input-queue"))


Convention Map

在配置文件中指定 map 规则


EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));

直接发送


public class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
private readonly IOrderSubmitter _orderSubmitter;
public SubmitOrderConsumer(IOrderSubmitter submitter)
=> _orderSubmitter = submitter;
public async Task Consume(IConsumeContext<SubmitOrder> context)
{
await _orderSubmitter.Process(context.Message);
await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
}
}

可以调用以下对象的 publish 方法来发送 event:


ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
IPublishEndpoint(可以从 DI 中获取)
IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

IPublishEndpoint


public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
{
await publishEndpoint.Publish<OrderSubmitted>(new
{
OrderId = "27",
OrderDate = DateTime.UtcNow,
});
}
Request-Response 请求-响应

Request-Response 模式让应用程序之间解耦之后,依然采用同步的方式


Consumer
IClientFactory
IRequestClient
Send a request
Consumer
public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
var order = await _orderRepository.Get(context.Message.OrderId);
if (order == null)
throw new InvalidOperationException("Order not found");
await context.RespondAsync<OrderStatusResult>(new
{
OrderId = order.Id,
order.Timestamp,
order.StatusCode,
order.StatusText
});
}

需要处理返回类型 OrderStatusResult,异步方式模拟同步,实际上同样有消息队列,消费者处理过程


IClientFactory
public interface IClientFactory
{
IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);
IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);
RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
}

通过 IBusControl 的 CreateClientFactory 方法可以得到 ClientFactory


IRequestClient
public interface IRequestClient<TRequest>
where TRequest : class
{
RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
}

RequestClient 可以创建请求,或者直接获得响应


Send a request
var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");
var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);
var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});

知识共享许可协议


本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。


欢迎转载、使用、重新发布,但务必保留文章署名 郑子铭 (包含链接: http://www.cnblogs.com/MingsonZheng/ ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。


如有任何疑问,请与我联系 ([email protected]) 。


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/MingsonZheng/p/14274903.html

随机推荐

电脑屏幕变黄如何调整_【科普】如何在线翻转视频?

二、更稳定安全的方法:使用都叫兽™视频编辑软件翻转视频1、什么是都叫兽™视频编辑软件?都叫兽™视频编辑软件是一款集电脑屏幕录制、视频后期制作与格式转换功能为一体的视频录制编...

weixin_39855568 阅读(630)

一文搞定所有TCP/HTTP面试题

在秋招过程中看了大量面经,将常见的计算机网络面试题总结如下,并按照面试中提问的频率做了标注(星数越高,面试中提问频率越高)...

路人zhang 阅读(113)

leetcode刷sql(1543)-trim去除空格的用法

今天这道题让我学习了trim()的用法先介绍trim():trim()去除前后空格(保留中间空格)ltrim()去除左边空格rtrim()去除右边空格repl...

陈菜菜想当生信工程师! 阅读(639)

Linux设置项目Jar开机启动

文章为自己工作过程中学习总结记录,如有错误,请指正很多时候,服务器运行了多个项目,一旦发生意外情况服务停止了,就需要我们一个一个去...

淡若如初 阅读(420)

15张图 | 掌握程序是如何在线升级的...

15张图 | 掌握程序是如何在线升级的...

关注、星标公众号,直达精彩内容作者| IOT小胡来源|网络素材最近其中一个项目快搞完,也就涉及到了在线升级的问题,于是便动起手来,这是网上看到的...

李肖遥 阅读(580)