ASP.NET Core 6: Communication across the API Applications using RabbitMQ and Background Service

In this article, we will see the scenario where two APIs can communicate with each other using the Background Services and Messaging Service. The concept of REST APIs is not new for most of you and that too in most modern applications Microservices architectures are used where the server-side logic is written and deployed using the small REST APIs. These microservices are autonomous and isolated from each other. Since these microservices are deployed separately they do not know the existence of each other. In such a scenario what if one microservice wants to send or share data with another microservice? This is where the application architecture must use a background running messaging service.

Consider the scenario where an E-Commerce application has two services one for accepting Order for Acceptance and Order Validation and another service for processing orders. In this case, if the received order is validated by the Order Acceptance service then it must be passed to the Order processing service. The Order acceptance service must write the validated order information into the messaging service so that the Order processing service can read order information from the messaging service and further process it. Figure 1 explains this scenario





Figure 1: The Scenario of using Background Messaging Service              

Figure 1 explains the scenario as follows:

  1.  The Submit Order App sends order information to the Accept Order Service. This is a publisher or the sender service.
  2. The Sender Order App saves the Order data in the database after validating it.
  3. Once the order is validated, it is passed to the messaging service.
  4. The messaging service is subscribed by the Order Process Service or Subscriber service using a background hosted service. The background hosted service is continuously running in the background so that once the order information is put in the messaging service it will be received. 
  5. The receiver Order Process Service will get the message using the background hosted service and it will be processed.
  6. The processed order is saved in the database.
  7. The Order Process Service then will read data from the database so that it can be presented to the Order Process App.
  8.  Finally, the order data is presented to the Order Process App.
In the scenario of communication across the service apps, we can use any technology to design and develop REST APIs e.g. ASP.NET Core, Node.js + Express, Java Spring Boot, etc. We can use any of the following messaging services
  • RabbitMQ
  • Azure Service Bus
  • AWS SQS
  • Azure Queue Storage
  • Apache Kafka
In this implementation, I will be using ASP.NET Core 6 for building REST APIs and RabbitMQ for messaging. RabbitMQ is the most widely deployed open-source message broker. More information about the RabbitMQ can be read from this link

We can use the RabbitMQ by installing it on the local machine or using the Docker Image. I have used the RabbitMQ Docker Image. Run commands as shown in Listing 1 to use the RabbitMQ Docker image
  • docker pull rabbitmq:3-management
  • docker run --rm -it -p 15672:15672 -p 5672:5672 rabbitmq:3-management
Listing 1: Commands to use the RabbitMQ Docker Image

Port 15672 is used for the RabbitMQ Web UI interface for management and port 5672 is used for the message broker to communicate messages. We can open the management UI using the http://localhost:15672 address. Figure 2 shows the management UI.



Figure 2: The RabbitMQ management UI

Use the guest as Username and Password to access the Management UI as shown in Figure 3



Figure 3: The RabbitMQ management UI

 We can use the following URL in the application to send and receive messages 

amqp://guest:guest@localhost:5672

Creating REST APIs using ASP.NET Core 6 applications

Step 1: Open Visual Studio 2022 and create a blank solution. Name this solution as  Using_RabbitMQ. In this solution add a new ASP.NET Core Web API project and name it Core_SenderAPI.  In this project add the following package to use classes for working with RabbitMQ

RabbitMQ.Client

Step 2: In this project add a new folder and name it Models. In this folder add a new class file and name it Models.cs. In this class file, add classes as shown in Listing 2

namespace Core_SenderAPI.Models
{
    /// <summary>
    /// Order Object
    /// </summary>
    public class Order
    {
        public string? CustomerName { get; set; }
        public string? OrderedItem { get; set; }
        public int Quantity { get; set; }
        public double AdvanceAmount { get; set; }
      
    }
    /// <summary>
    /// The Item Information
    /// </summary>
    public class Item
    {
        public string? ItemName { get; set; }
        public double UnitPrice { get; set; }
    }
    /// <summary>
    /// The Response Object to return response to the Client
    /// </summary>
    public class ResponseObject
    {
        public long ApproveId { get; set; }
        public long RejectionId { get; set; }
        public string? Message { get; set; }
    }
    /// <summary>
    /// Message to write into the RabbitMQ
    /// </summary>
    public class Message
    {
        public Order Order { get; set; }
        public double TotalAmount { get; set; }
        public double Advance { get; set; }
    }
    /// <summary>
    /// Items Data
    /// </summary>
    public class ItemDb : List<Item>
    {
        public ItemDb()
        {
            Add(new Item() { ItemName = "Laptop", UnitPrice = 145000 });
            Add(new Item() { ItemName = "RAM", UnitPrice = 14000 });
            Add(new Item() { ItemName = "Router", UnitPrice = 15000 });
            Add(new Item() { ItemName = "SSD", UnitPrice = 18000 });
            Add(new Item() { ItemName = "HDD", UnitPrice = 4000 });
            Add(new Item() { ItemName = "Adapter", UnitPrice = 9000 });
            Add(new Item() { ItemName = "Keyboard", UnitPrice = 1000 });
            Add(new Item() { ItemName = "Mouse", UnitPrice = 200 });
            Add(new Item() { ItemName = "Headphone", UnitPrice = 1600 });
            Add(new Item() { ItemName = "Speaker", UnitPrice = 13000 });
        }
    }
}

Listing 2: Model classes
In Listing 2, we have various classes as follows
  • Order class, this class will be used for posting order data to REST API.
  • Item and ItemDb classes are used for storing Item information so that the unit price for the ordered item can be read.
  • ResponseObject class will be used to send the order approval or rejection to the client.
  • Message class will be used to define the structure of the message to be written in RabbitMQ. 
Step 3: We need to define logic to publish a message to RabbitMQ. This logic will be responsible to connect to the RabbitMQ using the URL. To connect to the RabbitMQ, we need to use the ConnectionFactory class. This class has the Uri property that accepts the Uri of the RabbitMQ endpoint running on port 5672. We need to define message exchange so that the message will be managed by RabbitMQ once it is received from the sender application. It is recommended that the Time To Live (ttl) must be set for the message in the queue so that the exchange will know the maximum time for which the message will be stored in the queue. The ExchangeDeclare() method of the ChannelFactory class is used to define an exchange with parameters like exchange name, the exchange type, and time to live.  The exchange type can be topic, direct, fanout, and headers. Messages are sent using the routing key. The routing key is a message attribute the exchange looks at when deciding how to route the message to queues. Exchange types are having following behaviors: 
  • A direct exchange delivers messages to queues based on the message routing key. 
  • A fanout exchange routes messages to all of the queues that are bound to it and the routing key is ignored. 
  • Topic exchanges route messages to one or many queues based on matching between a message routing key and the pattern that was used to bind a queue to an exchange. 
  • A headers exchange is designed for routing on multiple attributes that are more easily expressed as message headers than a routing key.    
More information about the exchange type can be read from this link. The BasicPublish() method of the ChannelFactory class is used to publish messages to the queue. This method takes parameters such as exchange name, routing key, and the JSON serialized message. In the Core_SenderAPI project add a new folder and name it Logic. In this folder add a class file and name it QueuePublisher.cs. Add the code shown in Listing 3 in this file


using Core_SenderAPI.Models;
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;

namespace Core_SenderAPI.Logic
{
    /// <summary>
    /// This class contains logic to write approved message in the queue
    /// </summary>
    public class QueuePublisher
    {
        public void PublishMessage(Order order, double totalAmount, double advance)
        {
            // creating a connection factory
            var factory = new ConnectionFactory()
            {
                Uri = new Uri("amqp://guest:guest@localhost:5672")
            };

            // create a connection (we are using the default one)
            var connection = factory.CreateConnection();
            // create a channel
            var channel = connection.CreateModel();

            // time to live message 
            var ttl = new Dictionary<string, object>
            {
                { "x-message-ttl", 60000} // message will live for 60 seconds
            };


            //                      Exchange name           ,   Exchange type   , optional argument
            channel.ExchangeDeclare("message.exchange",
                ExchangeType.Topic, arguments: ttl);
            // The message to serialize
            var MessageObject = new Message()
            {
               Order = order,
               TotalAmount = totalAmount,
               Advance = advance
            };

            var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(MessageObject));
            // since the routing key is message.queue.*, that matched with the pattern the messages will
            // received by queue
            channel.BasicPublish("message.exchange", "message.queue.*", null, body);


        }
    }
}

Listing 3: The Message publishing to the Queue
The code in Listing 3 shows that we have created an exchange named message.exchange and the routing key as message.queue.*.  

Step 4: In the Logic folder, add a new class file and name it ProcessOrder.cs. In this file, we will write logic to validate and approve the received order. The logic will check if the order contains the Advance greater than or equal to 20% of the Total Bill Amount, if it is then the order will be written to the Queue for further processing else it will be rejected. Listing 4 shows the code for the logic


using Core_SenderAPI.Models;
namespace Core_SenderAPI.Logic
{
    public class ProcessOrder
    {
        ItemDb items;
        private double expectedAdvance, totalBillAmount;
        QueuePublisher publisher;
        public ProcessOrder()
        {
            items = new ItemDb();   
        }
        private bool ApproveOrder(Order order)
        {
            bool isOrderApproved = false;
             
                // Get the Unit Price for the Item
                double unitPrice = (from item in items
                                    where item.ItemName == order.OrderedItem
                                    select item).FirstOrDefault().UnitPrice;
            // Calculate the Total Amount
            totalBillAmount = unitPrice * order.Quantity;
                // Calculate the expected Advance against the Order
                 expectedAdvance = totalBillAmount * 0.2;
            // Please check if the advance amount is less than the expected advance

            isOrderApproved = order.AdvanceAmount >= expectedAdvance ? true : false;
           return isOrderApproved;
        }

        public ResponseObject ManageOrder(Order order)
        {
            var response =new ResponseObject();
            Random rnd = new Random();
            if (ApproveOrder(order))
            {
                response.ApproveId = rnd.NextInt64(long.MaxValue);
                response.Message = "Congratulation!!! Your Order is Approved 
                and send for the Procesing.";
                // Publish Message to Queue
                publisher = new QueuePublisher();
                publisher.PublishMessage(order, totalBillAmount, expectedAdvance);
            }
            else
            {
                response.RejectionId  = rnd.NextInt64(long.MaxValue);
                response.Message = $"Sorry!!! Your Order is Rejected because the 
                Total Amount to pay is {totalBillAmount} and the expected advance is gearetr than or equalt 
                to {expectedAdvance}, but the received advance is {order.AdvanceAmount}";
            }
            return response;
        }
    }
}

Listing 4: The Order Logic Code
 
Step 4: In the Controllers folder, add a new empty API Controller and name it OrdersController. In this controller add the code as shown in  Listing 5


using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Core_SenderAPI.Models;
using Core_SenderAPI.Logic;
namespace Core_SenderAPI.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class OrdersController : ControllerBase
    {
        ProcessOrder process;

        public OrdersController()
        {
            process = new ProcessOrder();
        }

        [HttpPost]
        public IActionResult Post(Order order)
        {
            var response = new ResponseObject();
            response = process.ManageOrder(order);
            return Ok(response);
        }
    }
}

Listing 5: The OrdersController 

Our sender application is ready. Run the sender application and post the Order information, once this order is validated based on logic, then this information will be added in Queue as shown in Figure 4.


Figure 4: Posted Order in Queue
We can get the message by clicking on the Get Messages(s) button as shown in Figure 5


Figure 5: The message in Queue

Step 5: Once the Sender application is ready, we need to create a Receiver application that will read data from the queue. In the same solution add a new ASP.NET Core API project that is targeted to .NET 6. Name this project Core_ReceiverAPI. In this project add the RabbitMQ.Client NuGet Package. In this project, add a new folder and name it Models. In this folder add a new class file and name it as Order.cs. In this class file, add code as shown in Listing 6.

 public class Order
    {
        public string? CustomerName { get; set; }
        public string? OrderedItem { get; set; }
        public int Quantity { get; set; }
        public double AdvanceAmount { get; set; }
    }


    public class Message
    {
        public Order Order { get; set; }
        public double TotalAmount { get; set; }
        public double Advance { get; set; }
    }
Listing 6: The Model classes for reading data from Queue  
We need these classes in the receiver project so that data received from the Queue can be deserialized from in the Message class.

Step 6: Since the receiver application reads messages from the queue using a background hosted service, we need to create a new Background Service class in the application. In the receiver application, add a new class and name it BackgroundServiceOp.cs. In this class, file add code as shown in Listing 7

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using Core_ReceiverAPI.Models;
using System.Diagnostics;

namespace Core_ReceiverAPI
{
    public class BackgroundServiceOp : BackgroundService
    {
        private readonly ILogger _logger;
        private IConnection _connection;
        private IModel _channel;
        

        public BackgroundServiceOp(ILoggerFactory loggerFactory, IServiceProvider service)
        {
            this._logger = loggerFactory.CreateLogger<BackgroundServiceOp>();
 
            InitializeRabbitMQ();
        }

        private void InitializeRabbitMQ()
        {
            // create a factory
            var factory = new ConnectionFactory()
            {
                 
                 Uri = new Uri("amqp://guest:guest@localhost:5672") 
            };

            // create a connection
            _connection = factory.CreateConnection();

            // create channel
            _channel = _connection.CreateModel();

            // channel exchange declare
            _channel.ExchangeDeclare("message.exchange", ExchangeType.Topic);
            _channel.QueueDeclare("message.queue",
                durable: false, exclusive: false, autoDelete: false, arguments: null);
            _channel.QueueBind("message.queue", "message.exchange", "message.queue.*", null);
            // prefetch size, prefetch count
            _channel.BasicQos(0, 1, false);

            _connection.ConnectionShutdown += _connection_ConnectionShutdown;

        }

        private void _connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            //throw new NotImplementedException();
        }

        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            stoppingToken.ThrowIfCancellationRequested();
            var consumer = new EventingBasicConsumer(_channel);
            consumer.Received += (ch, message) =>
            {
                // received body
                var content = Encoding.UTF8.GetString(message.Body.ToArray());
                // handled the received Message
                HandleReceivedMessage(content);
                // acknowledge the message
                _channel.BasicAck(message.DeliveryTag, false);
            };

            consumer.Shutdown += Consumer_Shutdown;
            consumer.Registered += Consumer_Registered;
            consumer.Unregistered += Consumer_Unregistered;
            consumer.ConsumerCancelled += Consumer_ConsumerCancelled;

            _channel.BasicConsume("message.queue", false, consumer);

            return Task.CompletedTask;
        }


        public override void Dispose()
        {
            _channel.Close();
            _connection.Close();
            base.Dispose();
        }

        private void Consumer_ConsumerCancelled(object sender, ConsumerEventArgs e)
        {
            //throw new NotImplementedException();
        }

        private void Consumer_Unregistered(object sender, ConsumerEventArgs e)
        {
            //throw new NotImplementedException();
        }

        private void Consumer_Registered(object sender, ConsumerEventArgs e)
        {
            //throw new NotImplementedException();
        }

        private void Consumer_Shutdown(object sender, ShutdownEventArgs e)
        {
            // throw new NotImplementedException();
        }

        private void HandleReceivedMessage(string content)
        {

            var MessageObject = new Message()
            {
                Order = new Order(),
                TotalAmount = 0,
                Advance = 0
            };

            var message = System.Text.Json.JsonSerializer.Deserialize<Message>(content);
            Debug.WriteLine($"Received Messsage {content}");
        }
    }
}

Listing 7: The Background Service class
   
The class in Listing 7, is the heart of the application. The BackgroundServiceOp class is derived from the BackgroundService base class. The constructor calls the InitializeRabbitMQ() method. This method contains logic to subscribe to the Queue using an instance of the ConnectionFactory() class.  The Uri property of this class connects with RabbitMQ on port 1562. The QueueDeclare() method is used to declare the queue named message.queue. The QueueBind() method of the ConnectionFactory class binds with the message.queue, queue, and the message.exchange, exchange type using the routing key as message.queue.*.  The HandleReceivedMessage() method is used to process the received message from the queue by deserializing the message. The BackgroundService base class has the ExecuteAsync() virtual method. In the current class, this method is overridden. This method uses the EventingBasicConsumer class. This class is used to write the consumer to the queue so the message can be read from it. The Received event of the EventingBasicConsumer class is used to receive messages from the queue when the BasicConsume() method of the EventingBasicConsumer class is executed. The EventingBasicConsumer class contains various other events e.g. registered, UnRegistered, etc.

Step 7: To use the BackgroundServiceOp class as a background service we need to add it to the Dependency Container of the application as shown in Listing 8.

builder.Services.AddHostedService&lt;BackgroundServiceOp&gt;();

Listing 8: Registration of the BackgroundServiceOp class as Backgrounded Hosted service
 
Run the Receiver application, in the output window we can see the message received from the queue as shown in Figure 6


Figure 6: Message read from the queue
Figure 6 shows the message received from the queue, since the Backgrounded Hosted service is continuously running, it will immediately receive a message from the queue once the message is added to it.    

As shown in Figure 5, if you want to receive messages from the queue then the Queue is Empty message will be displayed as shown in Figure 7


Figure 7: Queue is Empty message
 
Since the scope of the article is kept limited to establishing communication across API using messaging, I have not added code for the database to store Orders information. Please note that you can further modify the code of the Receiver application by adding Database support to store order information.  

The code for this article can be downloaded from this link.       

Conclusion: In the modern application architecture where we have various REST APIs, to establish communication across APIs we MUST make use of messaging services so that the disconnected communication can be performed easily.  

Popular posts from this blog

Uploading Excel File to ASP.NET Core 6 application to save data from Excel to SQL Server Database

ASP.NET Core 6: Downloading Files from the Server

ASP.NET Core 6: Using Entity Framework Core with Oracle Database with Code-First Approach