Azure – Topic with Service Bus

By | 28/04/2021

In this post, we will see how to create and read Topics, using Azure Service Bus.
Topics and subscriptions provide a one-to-many form of communication in a publish and subscribe pattern.
It’s useful for scaling to large numbers of recipients.
For more information, go to the Microsoft Web Site.

First of all, we go to Azure Portal and we create a Service Bus
(It is important remember that we cannot create Topics with a pricing tier Basic):

Now, clicking on the Topic button, we will create two Topics:

Then, in every topic we will create at least one subscription:

Finally, we take the connection string that we will use in our application:

Now, we will create a Console application for writing in our two Topics.

First of all, using Manage NuGet Packages, we will install the Azure Messaging ServiceBus libraries:

Then, we create a class called Topic:

[TOPIC.CS]

using Azure.Messaging.ServiceBus;
using System.Threading.Tasks;

namespace TestTopics
{
    public class Topic
    {
        private string _connection;

        public Topic(string connect)
        {
            _connection = connect;
        }

        public async Task WriteTopic(string topicName, string message)
        {
            await using (ServiceBusClient client = new ServiceBusClient(_connection))
            {
                // create a sender for the topic
                ServiceBusSender sender = client.CreateSender(topicName);
                await sender.SendMessageAsync(new ServiceBusMessage(message));
            }
        }
    }
}



Finally, we modify the Program.cs file and we run the application:

[PROGRAM.CS]

using System;
using System.Threading.Tasks;

namespace TestTopics
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Start Application");
            string connection = "Endpoint=sb://damianotesttopics.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xr1FPyXUvsYwsYPmrettvjR8Fh/DeShYIY+y+gA24rY=";
            string nameTopic1 = "test1";
            string nameTopic2 = "test2";
            string message1Topic1 = "Message1 Topic1";
            string message2Topic1 = "Message2 Topic1";
            string message1Topic2 = "Message1 Topic2";

            Topic objTopic = new Topic(connection);
            Console.WriteLine("Write message1 in test1");
            Task.Run<Task>(async () => await objTopic.WriteTopic(nameTopic1, message1Topic1));

            Console.WriteLine("Write message2 in test1");
            Task.Run<Task>(async () => await objTopic.WriteTopic(nameTopic1, message2Topic1));

            Console.WriteLine("Write message1 in test2");
            Task.Run<Task>(async () => await objTopic.WriteTopic(nameTopic2, message1Topic2));

            Console.ReadLine();
        }
    }
}



If we go in Azure portal, we can check that in test1 there are two message and in test2 only one:

Now, we will implement the method ReceiveMessagesFromSubscriptionAsync used to read the message:

[TOPIC.CS]

using Azure.Messaging.ServiceBus;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace TestTopics
{
    public class Topic
    {
        private string _connection;
        private List<string> lstMessage;

        public Topic(string connect)
        {
            _connection = connect;
        }

        public async Task WriteTopic(string topicName, string message)
        {
            await using (ServiceBusClient client = new ServiceBusClient(_connection))
            {
                // create a sender for the topic
                ServiceBusSender sender = client.CreateSender(topicName);
                await sender.SendMessageAsync(new ServiceBusMessage(message));
            }
        }

        public async Task ReceiveMessagesFromSubscriptionAsync(string topicName, string subscriptionName)
        {
            lstMessage = new List<string>();

            await using (ServiceBusClient client = new ServiceBusClient(_connection))
            {
                // create a processor that we can use to process the messages
                ServiceBusProcessor processor = client.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions());

                // add handler to process messages
                processor.ProcessMessageAsync += MessageHandler;

                // add handler to process any errors
                processor.ProcessErrorAsync += ErrorHandler;

                // start processing 
                await processor.StartProcessingAsync();

                // we will wait 5 seconds in order to read the message
                Task.Delay(5000).Wait();

                await processor.StopProcessingAsync();
            }
        }

        private async Task MessageHandler(ProcessMessageEventArgs args)
        {
            lstMessage.Add(args.Message.Body.ToString());

            // complete the message. messages is deleted from the queue. 
            await args.CompleteMessageAsync(args.Message);
        }

        private Task ErrorHandler(ProcessErrorEventArgs args)
        {
            // TO DO: create a log
            // args.Exception.ToString();
            return Task.CompletedTask;
        }

        public List<string> GetListMessage()
        {
            return lstMessage;
        }
    }
}



Finally, we modify Main.cs and we run the application:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace TestTopics
{
    class Program
    {
        static async Task Main(string[] args)
        {
            Console.WriteLine("Start Application");
            string connection = "Endpoint=sb://damianotesttopics.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xr1FPyXUvsYwsYPmrettvjR8Fh/DeShYIY+y+gA24rY=";
            string nameTopic1 = "test1";
            string nameTopic2 = "test2";
            string nameSubTopic1 = "Subtest1";
            string nameSubTopic2 = "Subtest2";


            Topic objTopic = new Topic(connection);
            Console.WriteLine("Read messages in test1");
            await objTopic.ReceiveMessagesFromSubscriptionAsync(nameTopic1, nameSubTopic1);
            ReadMessages(objTopic.GetListMessage());

            Console.WriteLine("Read messages in test2");
            await objTopic.ReceiveMessagesFromSubscriptionAsync(nameTopic2, nameSubTopic2);
            ReadMessages(objTopic.GetListMessage());

            Console.ReadLine();
        }

        private static void ReadMessages(List<string> lstMessages)
        {
            Console.WriteLine("Start list messages");
            lstMessages.ForEach(item => {
                Console.WriteLine(item);
            });
            Console.WriteLine("End list messages");
            Console.WriteLine();
        }
    }
}



If we go in Azure portal, we can check that in test1 and test2 there aren’t any messages:



Leave a Reply

Your email address will not be published. Required fields are marked *