Azure Event Hub & Stream Analytics–Part 1 : publier des évènements

Nous allons nous placer dans la première partie de notre architecture globale. Nous allons donc créer un site web avec une api et un composant middleware pour intercepter l’ensemble des appels et créer des évènements qui seront utilisés pour les statistiques

Nous sommes sur cette partie :

image

La première étape va donc être de créer une nouvelle solution avec un projet Web :

image

Nous allons héberger ce site dans Windows Azure Web Site, assurez vous d’avoir les éléments suivants à minima :

  • Web Api
  • Authentication: Individual User Accounts (nous nous servirons de cela pour identifier des utilisateurs dans nos statistiques)

Petit point, actuellement dans le Template, si vous n’utilisez pas encore VNext, vous avez encore les API qui sont définies de l’ancienne manière. Il faut donc faire un petit changement dès le début pour s’orienter dans l’architecture OWIN. (c’est hors scope du sujet EventHub & StreamAnalytics, mais c’est toujours utile). J’ai détaillé cela dans le billet précédent.

Créer un client eventHub :

la mise en place consiste dans un premier temps à récupérer la chaine de connexion au service bus en précisant que l’on souhaite utiliser un protocole AMQP. (AMQP est un protocole ouvert pour les applications de type messagerie, plus d’info ici).

//Define connection String
ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(connectionString);
builder.TransportType = TransportType.Amqp; //Only this protocol is supported
var connectionString = builder.ToString();
var eventHubName = "MyEventHub";

//Create eventHubClient
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString,eventHubName);

Définir le format de message

il faut ensuite définir le format de message que nous allons utiliser dans nos évènements. Je souhaite récupérer les informations suivantes dans le cadre de la récupération des métriques :

  • scope (nom de l’api)
  • username
  • request length
  • response length

cela me permettrait alors de pouvoir capter lorsqu’un utilisateur utilise trop de bande passante entrée/sortie sur une API en particulier. Cela correspond à la classe suivante :

[DataContract]
    public class MetricEvent
    {
        [DataMember]
        public String Scope { get; set; }

        [DataMember]
        public String UserName { get; set; }

        [DataMember]
        public long RequestLength { get; set; }

        [DataMember]
        public long ResponseLength { get; set; }

    }

Publier un évènement

la publication d’un évènement dans l’eventHub consiste alors à utiliser le client créé au préalable et à lui fournir une sérialisation de notre objet Metric pour l’envoi sur le service bus. La Partition Key permet ici au service EventHub de créer un hash afin d’effectuer une répartition des messages sur les différentes partition du hub.

var metricEvent = new MetricEvent()
                {
                    Scope = scope,
                    RequestLength = requestLength,
                    ResponseLength = responseLength,
                    UserName = userName,
                }

var serializedString = JsonConvert.SerializeObject(metricEvent);
EventData data = new EventData(Encoding.UTF8.GetBytes(serializedString))
{
	PartitionKey = metricEvent.Scope
};

// Send the metric to Event Hub
await client_.SendAsync(data);

Créer le middleware d’interception :

ensuite, nous pouvons intégrer cette logique dans le middleware

public class EventStatisticMiddleware : OwinMiddleware
    {
        Sender sender_;

        public EventStatisticMiddleware(OwinMiddleware next, Sender sender)
            : base(next)
        {
            sender_ = sender;
        }
        public async override Task Invoke(IOwinContext context)
        {
            long requestLength = 0;
            long responseLength = 0;
            String scope = null;
            String userName = "Anonymous";

            scope = ExtractScope(context.Request.Path);
            if (scope != null)
            {
                requestLength = context.Request.Body.Length;

                if (context.Authentication.User != null
                    && context.Authentication.User.Identity.IsAuthenticated)
                    userName = context.Authentication.User.Identity.Name;
            }

            await Next.Invoke(context);

            if (scope != null)
            {
                if (context.Response.ContentLength.HasValue)
                    responseLength = context.Response.ContentLength.Value;

                await sender_.SendEventAsync(new MetricEvent()
                {
                    Scope = scope,
                    RequestLength = requestLength,
                    ResponseLength = responseLength,
                    UserName = userName,
                });

            }
        }

        private String ExtractScope(PathString path)
        {
            if (path.HasValue)
            {
                var splitted = path.Value.Split(new[] { '/' });
                if (splitted.Length > 2)
                {
                    if (String.Compare(splitted[1], "api", true) == 0)
                        return splitted[2];
                }
                return null;
            }
            return null;
        }
    }

ou nous vérifions via ExtractScope que nous faisons bien appel à une API (tout chemin commençant par api/) et ou Sender est une classe qui implémente les appels à l’event hub:

 public class Sender
    {
        private EventHubClient client_;
        private String eventHubName;

        public Sender(String connectionString, string eventHubName)
        {
            this.eventHubName = eventHubName;
            this.client_ = EventHubClient
                         .CreateFromConnectionString(connectionString,this.eventHubName);
        }

        public async Task SendEventAsync(MetricEvent metricEvent)
        {
            try
            {
                var serializedString = JsonConvert.SerializeObject(metricEvent);
                EventData data = new EventData(Encoding.UTF8.GetBytes(serializedString))
                {
                    PartitionKey = metricEvent.Scope
                };

          // Send the metric to Event Hub
                await client_.SendAsync(data);
            }
            catch (Exception exp)
            {
                Console.WriteLine("Error on send: " + exp.Message);
            }
        }

        static void OutputMessageInfo(string action, EventData data, MetricEvent info)
        {
            if (data == null)
            {
                return;
            }
            if (info != null)
            {
                Console.WriteLine("{0}{1} - Scope {2}, RequestLength {3}, ResponseLength {4}.", action, data, info.Scope, info.RequestLength, info.ResponseLength);
            }
        }
    }

enfin, après la configuration de notre fichier startup.cs, nous publierons donc une évènement à chaque appel de l’API


app.Use<Middleware.EventStatisticMiddleware>(new Sender(connectionString, eventHubName));

This entry was posted in Azure, Français, Microsoft, Non classé and tagged , , . Bookmark the permalink.

One Response to Azure Event Hub & Stream Analytics–Part 1 : publier des évènements

  1. Pingback: Azure Event Hub & Stream Analytics–Part 2: analyse des évènements | Jérémie – an other BizTalker

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s