Microservice combat (3): Landing the microservice architecture to the direct selling system (building a message bus based on RabbitMq)

Microservice combat (3): Landing the microservice architecture to the direct selling system (building a message bus based on RabbitMq)

As can be seen from the previous article, the message bus is the core component of EDA (Event Driven Architecture) and microservice architecture. Without the message bus, the decoupling and communication between microservices cannot be well realized. Usually we can build our own message bus by using existing mature message agent products or message services provided by cloud platforms; we can also write a message agent product ourselves, and then build our own message bus based on it. Usually we don't need to reinvent the wheel (unless the company has special requirements, such as some large Internet companies considering autonomous and controllable white boxes), we can use mature message broker products such as RabbitMq as the underlying support for the message bus.

RabbitMq core components explained:

Connection: The sender or subscriber of the message connects to the RabbitMq server through it.

Channel: After the message sender or subscriber connects to the RabbitMq server through Connection, a session channel is established through Channel.

Exchange: The sender of the message sends the message to Exchange. Through the binding relationship between Exchange and Queue in the RabbitMq server, Exchange will route the message to the matching Queue.

Queue: The carrier of the message. The message of the sender of the message is finally routed to the matching Queue through the Exchange. The receiver of the message receives the message from the Queue and processes it.

Exchange mode: When a message is sent to Exchange, it needs to be routed to the matching Queue. As for how to route, it is determined by the Exchange mode.

1. Direct mode: a specific routing key (message type) is forwarded to the designated Queue of the Exchange.

2. Fanout mode: The message sent to the Exchange is sent to all Queues bound under the Exchange at the same time.

3. Topic mode: messages with certain characteristics are forwarded to the designated Queue of the Exchange. Our most common use is the Direct mode. If the message is to be consumed by multiple consumers, you can use the Fanout mode.

Implement a message bus based on RabbitMq:

We first need to install Erlang and RabbitMq on the server, and then we can develop the message bus based on RabbitMq. The overall development ideas and steps are as follows:

1. First establish a project as a message bus, and then introduce the nuget package Rabbitmq.Client, so that there is support for RabbitMq development.

2. The basic message bus has been implemented in the previous section. All RabbitMq-based message buses are inherited from it, and specific parameters need to be passed in to the message bus constructor:

public RabbitMqEB(IConnectionFactory connectionFactory,IEventHandlerExecutionContext context,
        string exchangeName,string exchangeType,string queueName,int publisherorconsumer,
        bool autoAck = true) : base(context)
    {
        this.connectionFactory = connectionFactory;
        this.connection = this.connectionFactory.CreateConnection();
        this.exchangeName = exchangeName;
        this.exchangeType = exchangeType;
        this.autoAck = autoAck;
        this.queueName = queueName;
        if (publisherorconsumer == 2)
        {
            this.channel = CreateComsumerChannel();
        }
    }
 

connectionFactory: The type in RabbitMq.Client, which is used to establish a connection with the RabbitMq server.

context: The object of the association relationship between the message and the message processor.

exchangeName: The name of the Exchange that the producer or consumer needs to connect to.

exchangeType: The Exchange mode described above.

queueName: The name of the Queue when the producer or consumer sends or receives messages.

publisherorconsumer: Specify whether the component connected to the message bus is the producer or consumer of the message bus. Consumers and producers will be different. Consumers (publisherorconsumer==2) will build a consumer channel for receiving messages from Queue and calling The HandleAsync method of the iventHandlerExecutionContext of the parent class handles the message.

3. Establish a connection to RabbitMq:

//
public bool IsConnected
    {
        get { return this.connection != null && this.connection.IsOpen; }
    }
    public bool TryConnect()
    {
      //, nuget , 1 5 
        var policy = RetryPolicy.Handle<SocketException>().Or<BrokerUnreachableException>()
            .WaitAndRetry(5, p => TimeSpan.FromSeconds(1),(ex,time)=> {
               //
            });
        policy.Execute(() =>
        {
           //RabbitMq Server 
            this.connection = this.connectionFactory.CreateConnection();
        });
        if (IsConnected)
        {
            return true;
        }
        return false;
    }
 

4. Create a consumer channel:

private IModel CreateComsumerChannel()
    {
        if (!IsConnected)
        {
            TryConnect();
        }
        var channel = this.connection.CreateModel();            
        channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType, durable: true);
        channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false,
            arguments: null);
        var consumer = new EventingBasicConsumer(channel);
     //
        consumer.Received += async (model, ea) =>
            {
                var eventbody = ea.Body;
                var json = Encoding.UTF8.GetString(eventbody);
                var @event = (IEvent)JsonConvert.DeserializeObject(json);
              //
                await this.eventHandlerExecutionContext.HandleAsync(@event);
              //
                channel.BasicAck(ea.DeliveryTag, multiple: false);
            };
        channel.BasicConsume(queue: this.queueName, autoAck: false, consumer: consumer);
        
        channel.CallbackException += (sender, ea) =>
        {
            this.channel.Dispose();
            this.channel = CreateComsumerChannel();
         };
        return channel;
    }
 

5. Support for producers to publish messages to the exchange queue:

public override void Publish<TEvent>(TEvent @event)
    {
        if (!IsConnected)
        {
            TryConnect();
        }
        using(var channel = this.connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType, durable: true);
            var message = JsonConvert.SerializeObject(@event);
            var body = Encoding.UTF8.GetBytes(message);
         //
            channel.BasicPublish(this.exchangeName, @event.GetType().FullName,null, body);
        }
    }
 

6. Support for subscribers subscribing to messages from the exchange queue:

public override void Subscribe<TEvent, TEventHandler>()
    {
      //
        if (!this.eventHandlerExecutionContext.IsRegisterEventHandler < TEvent,TEventHandler>()){
            this.eventHandlerExecutionContext.RegisterEventHandler<TEvent, TEventHandler>();
         //
            this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName);
        }
    }
 

From the above 6 steps, we have basically completed the basic functions based on the RabbitMq message bus. It should be noted here that the above code is only a demonstration. In the actual production environment, the above code cannot be used directly, and careful reconstruction is required. This code guarantees reliability and performance.

Please follow the WeChat public account: MSSHCJ for the actual microservice video