Abp vNext利用Rebus模块连接Azure Service Bus(服务总线)

Azure上创建Service Bus

在Service上创建bus时,Rebus官网规定定价策略必须选择标准以上,因为需要使用其topic功能

Azure上点击“新建”-“集成”-“Service Bus”

选择资源组-输入命名空间名称-选择定价层(标准以上)-创建

获取访问连接字符串

依次点击“共享访问策略”-“RootManageSharedAccessKey”-主/辅助连接字符串,复制连接字符串用来访问service bus

vNext中安装并配置Rebus组件

Volo.Abp.EventBus.Rebus组件的安装可以按照官网文档进行操作已经很详细,下面介绍下其他安装与配置,我们想要连接azure service bus首先要安装Rebus.AzureServiceBus包,在Nuget中搜索安装到您的Host中即可,在你项目的Module模块中增加如下代码(重写PreConfigureServices,并配置Rebus):

public override void PreConfigureServices(ServiceConfigurationContext context)
        {
            base.PreConfigureServices(context);

            var configuration = context.Services.GetConfiguration();

            ConfigureRebus(configuration);

            
        }

private void ConfigureRebus(IConfiguration configuration)
{
    PreConfigure<AbpRebusEventBusOptions>(options =>
    {
       options.InputQueueName = configuration["Rebus:InputQueueAddress"];
       options.Configurer = rebusConfigurer =>
       {
        rebusConfigurer.Logging(l => l.ColoredConsole(minLevel: LogLevel.Debug))
          .Transport(t => t.UseAzureServiceBus(configuration["Rebus:ConnectionString"], configuration["Rebus:SubscribeAddress"]));
           //.Options(t => t.EnableEncryption(configuration["Rebus:EncryptionKey"]));
         };
    });
}

appsetting.json增加如下配置

"Rebus": {
"ConnectionString": "Endpoint=sb://xxxxxx",//此处替换成上面的主/辅助连接字符串
"InputQueueAddress": "PublicQueue",//这是当前服务的发布队列名
"SubscribeAddress": "SubscribeQueue",//这是当前服务要订阅的队列名
"EncryptionKey": ""//用于加密传输的加密字符串,此次代码没有进行配置
}

巨坑来了!

配置代码中options.InputQueueName参数是只该应用发布事件的队列名称!而UseAzureServiceBus方法的第二个参数是指该应用要订阅的队列名称!如果像我把发布和订阅应用的这两个参数都写成一样的,就会出现你发布两次或更多次,而你订阅的应用只能收到一次!!原因就是发布应用自己也订阅到了自己的发布消息!!!我被这个弱智坑坑了两三天啊。。。只是因为这方面文档太少,最后还是rebus官方git库文档里面给了我提示!!!

添加测试代码

创建TestBusAppService类用于发布事件,代码如下

public class TestBusAppService: CoreAppService
    {
        private readonly IDistributedEventBus _distributedEventBus;

        public TestBusAppService(IDistributedEventBus distributedEventBus)
        {
            _distributedEventBus = distributedEventBus;
        }

        public virtual async Task ChangeStockCountAsync(int newCount)
        {
            await _distributedEventBus.PublishAsync(
                new StockCountChangedEto
                {
                    NewCount = newCount
                }
            );
        }
    }

在服务层发布事件需要注入IDistributedEventBus,如果在领域层中的聚合根实体可以直接使用AddDistributedEvent方法发布实体,但是要切记,在EF Core中领域层使用AddDistributedEvent时只有调用DbContext.SaveChanges方法时才会发布,这点写测试或者调试的时候可能会忽略!!!

StockCountChangedEto是事件传输对象,负责传输与事件相关的参数,即使事件不需要传输参数也要创建该类,此时该类为空,如果事件发布在application层我一般把此类放在Contracts层中:

public class StockCountChangedEto
    {
        public int NewCount { get; set; }
    }

添加订阅事件类:

 public class TestBusSub
    : IDistributedEventHandler<StockCountChangedEto>,
      ITransientDependency
    {
        public Task HandleEventAsync(StockCountChangedEto eventData)
        {
            var _count = eventData.NewCount;

            Console.WriteLine("订阅到的值为:" + _count);

            return Task.CompletedTask;
        }
    }

运行项目,swagger中找到测试的api,输入参数执行:

控制台会得到如下结果:

在azure的主题中也会添加该服务发布和订阅的两个队列,还有一个error记录错误消息的队列

到此vNext连接Azure Service Bus就成功了,我使用的是abp vnext 4.2.1 基于net 5的框架。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注