Advanced Azure Queuing with the Sessions and the Service Bus

Monday Oct 15th 2012 by Jeffrey Juday
Share:

Microsoft Azure offers two queuing options: Azure Queues and Azure Service Bus. The first step to understanding Service Bus Sessions, an advanced feature of Service Bus, is understanding the need.

Microsoft Azure offers two queuing options: Azure Queues and Azure Service Bus.  Azure Queues offer basic queuing features.  Service Bus offers basic queuing features along with more advanced features.  Topics and Subscriptions are one of those more advanced features.  Sessions are another advanced feature and the topic (pun-intended) of this article.  The first step to understanding Service Bus Sessions is; understanding the need.

    Windows Azure 90 day free trial. Try it Now!    

Grouping Messages with a Key

When a business process is driven by, for example, single self-contained messages a simple publisher and subscriber model works just fine.  Processes driven by a set of messages grouped together by a sort of key require some special handling.  Messages must be tagged with the key (usually in the message header).  As each message is processed; some process result is generally updated.  Often there is an expected number of messages to process so the updated result may be, for example, the processed count.  The result is stored in either the queuing mechanism or in a consumer’s database.

Pushing the processing results storage onto the client consumer is certainly possible.  The processing result storage should be durable.  Spreading the result storage across the queuing mechanism and the client may, for example, complicate transactions.  It’s easier to compartmentalize a transaction within a closed system like a SQL Server database than it is to separate the transaction parts across multiple systems.

Service Bus handles both queuing and processing results storage.  Service Bus Queues and Topics store messages.  Service Bus Sessions store processing results and handle retrieval based on a key or other sort of Identifier.

Sessions Explained

Sessions have two parts. 

First, messages traveling together may not arrive together.  Often messages are interleaved.  Complicating matters there may be Competing Consumers http://www.eaipatterns.com/CompetingConsumers.html; multiple consumers to speed processing.  It’s often important for a single Consumer to process a group of messages and then update the process state.  Unless the two Consumers are coordinating, the updated state will be that of the last Consumer update.  Interleaving the processing can corrupt the process if separate consumers were updating a count.  Processing two messages on two different consumers may update the count to one when, in fact, the true value should be two. 

Second, processing must often store process state after messages have been consumed.  Since all messages may not be in the Queue at once; current state must be available later in the processing.

Service Bus Sessions handle all of these issues.

Sample Overview

Sample code demonstrating Service Bus Sessions can be found here http://servicebus.codeplex.com/.  The sample is aptly named SessionMessagesSample.  By default, all Service Bus Client code runs with the NetTcp Binding.  NetTcp requires open ports 9350 and 9351.  If you are running the sample from somewhere with fairly tight firewall security, I recommend changing the code to run in HTTP mode.   The change is Client global and should be done before logging in and performing queuing activities.  The following code configures for HTTP mode.

ServiceBusEnvironment.SystemConnectivity.Mode = ConnectivityMode.Http;

Sample code message consumption is changed from the default behavior that requires a method Complete call.  Instead the queue is set to remove the message as soon as it has been received.  The following code changes the default queuing Receive behavior.

messagingFactory.CreateQueueClient(queueName, ReceiveMode.ReceiveAndDelete);

The sample first executes without Sessions and then with Sessions.  Four color-coded Competing Consumers processes executing with Sessions gather messages and store a process state TimeSpan value.  All queuing starts with a Sender and Sessions require the sender to configure a Message appropriately.

Sending and Receiving Messages

Like all Service Bus Queuing, Sessions utilize the BrokeredMessage.  Following Creates a BrokeredMessage supporting Sessions.

BrokeredMessage message = new BrokeredMessage();
message.SessionId = sessionId;
message.MessageId = "Order_" + Guid.NewGuid().ToString().Substring(0,5);
 

The only difference between Session Messages and a regular message is that the SessionId property is set.  Likewise Queues and Topics are created in the standard way.  The following code creates a Queue Client to communicate with the Service Bus.

MessagingFactory messagingFactory = MessagingFactory.Create(
                runtimeUri,
                TokenProvider.CreateSharedSecretTokenProvider(ServiceBusIssuerName, ServiceBusIssuerKey));
 
            return messagingFactory.CreateQueueClient(queueName, ReceiveMode.ReceiveAndDelete);
 

Receiving a Session based message looks much different than a normal Receive.  The following code demonstrates receiving a Session based message from a queue.

// demonstrates where it is unknown.

MessageSession sessionReceiver = sessionQueueClient.AcceptMessageSession(TimeSpan.FromSeconds(acceptSessionReceiverTimeout));

AcceptSessionMethod allows receiving the next available SessionId or a specific SessionId.  This almost duplicates the same behavior seen with Topics; however Sessions don’t support the rich Filtering supported by Topics.

As stated earlier, once a message is processed, processing state can be saved back to the Service Bus.

Storing Session State

The sample demonstrates saving state in a TimeSpan, but Service Bus can save any serialized state.  According to the documentation state is limited to 256K because state storage is implemented using the messaging infrastructure.  The following code demonstrates saving state.

static void SetState(MessageSession session, string state)
{
    using (var stream = new MemoryStream())
    {
        using (var writer = new StreamWriter(stream))
        {
            writer.Write(state);
            writer.Flush();
 
            stream.Position = 0;
            session.SetState(stream);
        }
    }
}
 

State is implemented as a Stream.  Developers familiar with Steams should realize that a Stream can be just about anything including: Bytes, XML Structured Text, and strings.  The DataContractSerializer class is recommended for turning a data structure into a Stream and a Stream back into a class.

Retrieving the current State is the reverse process.  GetState returns a Stream.  Following is sample code retrieving the current State.

static string GetState(MessageSession session)
{
    string state = null;
    Stream stream = session.GetState();
 
    if (stream != null)
    {
        using (stream)
        {
            using (var reader = new StreamReader(stream))
            {
                state = reader.ReadToEnd();
            }
        }
    }
 
    return state;
}
 

Though it’s beyond the scope of the sample and this article, Queuing supports ordered delivery.  Message processing order may be important especially if a large data payload is stored across multiple messages.  The Defer method supports pushing a message back into the queue if a message is received out of order.

Conclusion

Service Bus Sessions enable handling groups of BrokeredMessages tying the messages to a single key.  Sessions have two components.  One component ties a consumer to a group of messages with a common SessionId.  A second component enables saving message processing results back into the Service Bus infrastructure and making the result available for later retrieval.

Resources

http://msdn.microsoft.com/en-us/library/windowsazure/hh532012.aspx

http://blogs.msdn.com/b/appfabric/archive/2011/05/25/an-introduction-to-service-bus-topics.aspx

Share:
Home
Mobile Site | Full Site
Copyright 2017 © QuinStreet Inc. All Rights Reserved