ASP.NET Core 6: Communication across the API Applications using RabbitMQ and Background Service
In this article, we will see the scenario where two APIs can communicate with each other using the Background Services and Messaging Service. The concept of REST APIs is not new for most of you and that too in most modern applications Microservices architectures are used where the server-side logic is written and deployed using the small REST APIs. These microservices are autonomous and isolated from each other. Since these microservices are deployed separately they do not know the existence of each other. In such a scenario what if one microservice wants to send or share data with another microservice? This is where the application architecture must use a background running messaging service.
Consider the scenario where an E-Commerce application has two services one for accepting Order for Acceptance and Order Validation and another service for processing orders. In this case, if the received order is validated by the Order Acceptance service then it must be passed to the Order processing service. The Order acceptance service must write the validated order information into the messaging service so that the Order processing service can read order information from the messaging service and further process it. Figure 1 explains this scenario
Figure 1: The Scenario of using Background Messaging Service
Figure 1 explains the scenario as follows:
- The Submit Order App sends order information to the Accept Order Service. This is a publisher or the sender service.
- The Sender Order App saves the Order data in the database after validating it.
- Once the order is validated, it is passed to the messaging service.
- The messaging service is subscribed by the Order Process Service or Subscriber service using a background hosted service. The background hosted service is continuously running in the background so that once the order information is put in the messaging service it will be received.
- The receiver Order Process Service will get the message using the background hosted service and it will be processed.
- The processed order is saved in the database.
- The Order Process Service then will read data from the database so that it can be presented to the Order Process App.
- Finally, the order data is presented to the Order Process App.
- RabbitMQ
- Azure Service Bus
- AWS SQS
- Azure Queue Storage
- Apache Kafka
- docker pull rabbitmq:3-management
- docker run --rm -it -p 15672:15672 -p 5672:5672 rabbitmq:3-management
namespace Core_SenderAPI.Models { /// <summary> /// Order Object /// </summary> public class Order { public string? CustomerName { get; set; } public string? OrderedItem { get; set; } public int Quantity { get; set; } public double AdvanceAmount { get; set; } } /// <summary> /// The Item Information /// </summary> public class Item { public string? ItemName { get; set; } public double UnitPrice { get; set; } } /// <summary> /// The Response Object to return response to the Client /// </summary> public class ResponseObject { public long ApproveId { get; set; } public long RejectionId { get; set; } public string? Message { get; set; } } /// <summary> /// Message to write into the RabbitMQ /// </summary> public class Message { public Order Order { get; set; } public double TotalAmount { get; set; } public double Advance { get; set; } } /// <summary> /// Items Data /// </summary> public class ItemDb : List<Item> { public ItemDb() { Add(new Item() { ItemName = "Laptop", UnitPrice = 145000 }); Add(new Item() { ItemName = "RAM", UnitPrice = 14000 }); Add(new Item() { ItemName = "Router", UnitPrice = 15000 }); Add(new Item() { ItemName = "SSD", UnitPrice = 18000 }); Add(new Item() { ItemName = "HDD", UnitPrice = 4000 }); Add(new Item() { ItemName = "Adapter", UnitPrice = 9000 }); Add(new Item() { ItemName = "Keyboard", UnitPrice = 1000 }); Add(new Item() { ItemName = "Mouse", UnitPrice = 200 }); Add(new Item() { ItemName = "Headphone", UnitPrice = 1600 }); Add(new Item() { ItemName = "Speaker", UnitPrice = 13000 }); } } }
- Order class, this class will be used for posting order data to REST API.
- Item and ItemDb classes are used for storing Item information so that the unit price for the ordered item can be read.
- ResponseObject class will be used to send the order approval or rejection to the client.
- Message class will be used to define the structure of the message to be written in RabbitMQ.
- A direct exchange delivers messages to queues based on the message routing key.
- A fanout exchange routes messages to all of the queues that are bound to it and the routing key is ignored.
- Topic exchanges route messages to one or many queues based on matching between a message routing key and the pattern that was used to bind a queue to an exchange.
- A headers exchange is designed for routing on multiple attributes that are more easily expressed as message headers than a routing key.
using Core_SenderAPI.Models; using RabbitMQ.Client; using System.Text; using System.Text.Json; namespace Core_SenderAPI.Logic { /// <summary> /// This class contains logic to write approved message in the queue /// </summary> public class QueuePublisher { public void PublishMessage(Order order, double totalAmount, double advance) { // creating a connection factory var factory = new ConnectionFactory() { Uri = new Uri("amqp://guest:guest@localhost:5672") }; // create a connection (we are using the default one) var connection = factory.CreateConnection(); // create a channel var channel = connection.CreateModel(); // time to live message var ttl = new Dictionary<string, object> { { "x-message-ttl", 60000} // message will live for 60 seconds }; // Exchange name , Exchange type , optional argument channel.ExchangeDeclare("message.exchange", ExchangeType.Topic, arguments: ttl); // The message to serialize var MessageObject = new Message() { Order = order, TotalAmount = totalAmount, Advance = advance }; var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(MessageObject)); // since the routing key is message.queue.*, that matched with the pattern the messages will // received by queue channel.BasicPublish("message.exchange", "message.queue.*", null, body); } } }
using Core_SenderAPI.Models; namespace Core_SenderAPI.Logic { public class ProcessOrder { ItemDb items; private double expectedAdvance, totalBillAmount; QueuePublisher publisher; public ProcessOrder() { items = new ItemDb(); } private bool ApproveOrder(Order order) { bool isOrderApproved = false; // Get the Unit Price for the Item double unitPrice = (from item in items where item.ItemName == order.OrderedItem select item).FirstOrDefault().UnitPrice; // Calculate the Total Amount totalBillAmount = unitPrice * order.Quantity; // Calculate the expected Advance against the Order expectedAdvance = totalBillAmount * 0.2; // Please check if the advance amount is less than the expected advance isOrderApproved = order.AdvanceAmount >= expectedAdvance ? true : false; return isOrderApproved; } public ResponseObject ManageOrder(Order order) { var response =new ResponseObject(); Random rnd = new Random(); if (ApproveOrder(order)) { response.ApproveId = rnd.NextInt64(long.MaxValue); response.Message = "Congratulation!!! Your Order is Approved and send for the Procesing."; // Publish Message to Queue publisher = new QueuePublisher(); publisher.PublishMessage(order, totalBillAmount, expectedAdvance); } else { response.RejectionId = rnd.NextInt64(long.MaxValue); response.Message = $"Sorry!!! Your Order is Rejected because the Total Amount to pay is {totalBillAmount} and the expected advance is gearetr than or equalt to {expectedAdvance}, but the received advance is {order.AdvanceAmount}"; } return response; } } }
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; using Core_SenderAPI.Models; using Core_SenderAPI.Logic; namespace Core_SenderAPI.Controllers { [Route("api/[controller]")] [ApiController] public class OrdersController : ControllerBase { ProcessOrder process; public OrdersController() { process = new ProcessOrder(); } [HttpPost] public IActionResult Post(Order order) { var response = new ResponseObject(); response = process.ManageOrder(order); return Ok(response); } } }
public class Order { public string? CustomerName { get; set; } public string? OrderedItem { get; set; } public int Quantity { get; set; } public double AdvanceAmount { get; set; } } public class Message { public Order Order { get; set; } public double TotalAmount { get; set; } public double Advance { get; set; } }
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using Core_ReceiverAPI.Models; using System.Diagnostics; namespace Core_ReceiverAPI { public class BackgroundServiceOp : BackgroundService { private readonly ILogger _logger; private IConnection _connection; private IModel _channel; public BackgroundServiceOp(ILoggerFactory loggerFactory, IServiceProvider service) { this._logger = loggerFactory.CreateLogger<BackgroundServiceOp>(); InitializeRabbitMQ(); } private void InitializeRabbitMQ() { // create a factory var factory = new ConnectionFactory() { Uri = new Uri("amqp://guest:guest@localhost:5672") }; // create a connection _connection = factory.CreateConnection(); // create channel _channel = _connection.CreateModel(); // channel exchange declare _channel.ExchangeDeclare("message.exchange", ExchangeType.Topic); _channel.QueueDeclare("message.queue", durable: false, exclusive: false, autoDelete: false, arguments: null); _channel.QueueBind("message.queue", "message.exchange", "message.queue.*", null); // prefetch size, prefetch count _channel.BasicQos(0, 1, false); _connection.ConnectionShutdown += _connection_ConnectionShutdown; } private void _connection_ConnectionShutdown(object sender, ShutdownEventArgs e) { //throw new NotImplementedException(); } protected override Task ExecuteAsync(CancellationToken stoppingToken) { stoppingToken.ThrowIfCancellationRequested(); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (ch, message) => { // received body var content = Encoding.UTF8.GetString(message.Body.ToArray()); // handled the received Message HandleReceivedMessage(content); // acknowledge the message _channel.BasicAck(message.DeliveryTag, false); }; consumer.Shutdown += Consumer_Shutdown; consumer.Registered += Consumer_Registered; consumer.Unregistered += Consumer_Unregistered; consumer.ConsumerCancelled += Consumer_ConsumerCancelled; _channel.BasicConsume("message.queue", false, consumer); return Task.CompletedTask; } public override void Dispose() { _channel.Close(); _connection.Close(); base.Dispose(); } private void Consumer_ConsumerCancelled(object sender, ConsumerEventArgs e) { //throw new NotImplementedException(); } private void Consumer_Unregistered(object sender, ConsumerEventArgs e) { //throw new NotImplementedException(); } private void Consumer_Registered(object sender, ConsumerEventArgs e) { //throw new NotImplementedException(); } private void Consumer_Shutdown(object sender, ShutdownEventArgs e) { // throw new NotImplementedException(); } private void HandleReceivedMessage(string content) { var MessageObject = new Message() { Order = new Order(), TotalAmount = 0, Advance = 0 }; var message = System.Text.Json.JsonSerializer.Deserialize<Message>(content); Debug.WriteLine($"Received Messsage {content}"); } } }