Using RabbitMQ in ASP.NET Core the Right Way

Consumers and Producers in ASP.NET Core

Docs: http://www.rabbitmq.com/ — .NET client: http://www.rabbitmq.com/dotnet.html

Consumer with IHostedService

Run consumers with the app lifecycle using IHostedService.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class RabbitListener : IHostedService
{
    private readonly IConnection _conn; private readonly IModel _channel;
    protected string RouteKey; protected string QueueName;

    public RabbitListener(IOptions<AppConfiguration> options)
    {
        var f = new ConnectionFactory {
            HostName = options.Value.RabbitHost,
            UserName = options.Value.RabbitUserName,
            Password = options.Value.RabbitPassword,
            Port = options.Value.RabbitPort,
        };
        _conn = f.CreateConnection();
        _channel = _conn.CreateModel();
    }

    public Task StartAsync(CancellationToken ct) { Register(); return Task.CompletedTask; }
    public Task StopAsync(CancellationToken ct) { _conn.Close(); return Task.CompletedTask; }

    public virtual bool Process(string message) => throw new NotImplementedException();

    void Register()
    {
        _channel.ExchangeDeclare(exchange: "message", type: "topic");
        _channel.QueueDeclare(queue: QueueName, exclusive: false);
        _channel.QueueBind(queue: QueueName, exchange: "message", routingKey: RouteKey);
        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (m, ea) =>
        {
            var msg = Encoding.UTF8.GetString(ea.Body);
            if (Process(msg)) _channel.BasicAck(ea.DeliveryTag, false);
        };
        _channel.BasicConsume(queue: QueueName, consumer: consumer);
    }
}

Subclass and resolve scoped services via IServiceProvider.CreateScope() inside Process.

Register: services.AddHostedService<YourListener>();

Producer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RabbitMQClient
{
    private readonly IModel _channel; private readonly ILogger _logger;
    public RabbitMQClient(IOptions<AppConfiguration> opts, ILogger<RabbitMQClient> logger)
    {
        var f = new ConnectionFactory {
            HostName = opts.Value.RabbitHost,
            UserName = opts.Value.RabbitUserName,
            Password = opts.Value.RabbitPassword,
            Port = opts.Value.RabbitPort,
        };
        var conn = f.CreateConnection();
        _channel = conn.CreateModel();
        _logger = logger;
    }
    public void PushMessage(string routingKey, object message)
    {
        _logger.LogInformation($"PushMessage,routingKey:{routingKey}");
        _channel.QueueDeclare(queue: "message", durable: false, exclusive: false, autoDelete: false);
        var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
        _channel.BasicPublish(exchange: "message", routingKey: routingKey, basicProperties: null, body: body);
    }
}

Register as singleton: services.AddSingleton<RabbitMQClient>();

使用 Hugo 构建
主题 StackJimmy 设计