Building a simple service relay for Dynamics 365 CE with RabbitMQ and Python - part 4

This is the final post in my series about building a service relay for Dynamics 365 CE with RabbitMQ and Python. In my previous post in this series, I showed the Python code to make the service relay work. In today's post, I will show how you can use Azure Functions to make a consumer service proxy using C# so client applications don't have to access to your RabbitMQ broker directly, and I will also discuss some general thoughts on security and scalability for this service relay architecture.

Although this simple service relay allows external consumers to get data from Dynamics 365 CE without needing to connect directly, the examples I've shown so far require that they can connect to a RabbitMQ broker. This may be problematic for a variety of reasons, so you would probably want external consumers to connect to a web service proxy that would write requests to and read responses from the RabbitMQ broker.

Building a service proxy function

You can build an Azure Functions service proxy with Python, but I don't recommend it for three reasons:

  1. Azure Functions Python support is still considered experimental.
  2. Python scripts that use external libraries can run exceedingly slow.
  3. Getting the environment set up is a bit of a hassle.

On the other hand, building a service proxy function with C# was so much easier, and it performed much better than a comparable Python function (~.5 seconds for C# compared to 5+ seconds for Python).

Here are the steps I took to build my C# service proxy function:

  1. Create a C# HTTP trigger function.
  2. Create and upload a project.json file with a dependency on the RabbitMQ client (see below).
  3. Take the "RpcClient" class from the RabbitMQ .Net RPC tutorial and call it from within my function.

Here's my project.json file:

{
  "frameworks": {
    "net46":{
      "dependencies": {
        "RabbitMQ.Client": "5.0.1"
      }
    }
   }
}

And here's my run.csx file:

using System.Net;
using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, TraceWriter log)
{
    log.Info("Processing request");

    // parse query parameter
    string query = req.GetQueryNameValuePairs()
        .FirstOrDefault(q => string.Compare(q.Key, "query", true) == 0)
        .Value;

    // Get request body
    dynamic data = await req.Content.ReadAsAsync<object>();

    // Set name to query string or body data
    query = query ?? data?.query;

    var rpcClient = new RpcClient();
    
    log.Info(string.Format(" [.] query start time {0}", DateTime.Now.ToString("MM/dd/yyyy hh:mm:ss.fff tt")));
    var response = rpcClient.Call(query);

    log.Info(string.Format(" [.] query end time {0}", DateTime.Now.ToString("MM/dd/yyyy hh:mm:ss.fff tt")));
    rpcClient.Close();

    return req.CreateResponse(HttpStatusCode.OK, response);
}

public class RpcClient
{
    private readonly IConnection connection;
    private readonly IModel channel;
    private readonly string replyQueueName;
    private readonly EventingBasicConsumer consumer;
    private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
    private readonly IBasicProperties props;

    public RpcClient()
    {
        var factory = new ConnectionFactory() { HostName = "RABBITHOST", UserName="RABBITUSER", Password="RABBITUSERPASS"  };

        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new EventingBasicConsumer(channel);

        props = channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = replyQueueName;

        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var response = Encoding.UTF8.GetString(body);
            if (ea.BasicProperties.CorrelationId == correlationId)
            {
                respQueue.Add(response);
            }
        };
    }

    public string Call(string message)
    {
        var messageBytes = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(
            exchange: "",
            routingKey: "rpc_queue",
            basicProperties: props,
            body: messageBytes);

        channel.BasicConsume(
            consumer: consumer,
            queue: replyQueueName,
            autoAck: true);

        return respQueue.Take(); ;
    }

    public void Close()
    {
        connection.Close();
    }
}

Here's a screenshot showing me calling the C# function with Postman.
Postman C# function call

Because I did actually build a Python function, I will go ahead and share how I did it if you're interested. Here are the steps I took:

  1. Create a Python HTTP trigger function.
  2. Install Python 3.6 via site extensions (see steps 2.1-2.4 here).
  3. Install the necessary libraries using pip via KUDU.

Here's the Python function code:

import os
import sys
import json
import pika
import uuid
import datetime

class CrmRpcClient(object):
    def __init__(self):
        #RabbitMQ connection details
        self.rabbituser = 'RABBITUSERNAME'
        self.rabbitpass = 'RABBITUSERPASS'
        self.rabbithost = 'RABBITHOST' 
        self.rabbitport = 5672
        self.rabbitqueue = 'rpc_queue'
        rabbitcredentials = pika.PlainCredentials(self.rabbituser, self.rabbitpass)
        rabbitparameters = pika.ConnectionParameters(host=self.rabbithost,
                                    port=self.rabbitport,
                                    virtual_host='/',
                                    credentials=rabbitcredentials)

        self.rabbitconn = pika.BlockingConnection(rabbitparameters)

        self.channel = self.rabbitconn.channel()

        #create an anonymous exclusive callback queue
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    #callback method for when a response is received - note the check for correlation id
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    #method to make the initial request
    def call(self, n):
        self.response = None
        #generate a new correlation id
        self.corr_id = str(uuid.uuid4())

        #publish the message to the rpc_queue - note the reply_to property is set to the callback queue from above
        self.channel.basic_publish(exchange='',
                                   routing_key=self.rabbitqueue,
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=n)
        while self.response is None:
            self.rabbitconn.process_data_events()
        return self.response

print(" [.] query start time %r" % str(datetime.datetime.now()))
#instantiate an rpc client
crm_rpc = CrmRpcClient()

postreqdata = json.loads(open(os.environ['req']).read())
query = postreqdata['query']

crm_rpc = CrmRpcClient()
print(" [.] query start time %r" % str(datetime.datetime.now()))
queryresponse = crm_rpc.call(query)
print(" [.] query end time %r" % str(datetime.datetime.now()))
response = open(os.environ['res'], 'w')
response.write(queryresponse.decode())
response.close()

Here's a screenshot showing me calling the Python function with Postman.Postman Python function call

Note the difference in time between the two functions - 5.62 seconds for Python and .46 seconds for C#!

Security and scalability

If you decide to use this approach in production, I'd suggest you carefully consider both security and scalability. Obviously the overall solution will only be as secure as your RabbitMQ broker and communications between the broker and its clients, so you'll want to look at best practices for access control and securing the communications with TLS. Here are some links for further reading on those subjects:

As for scalability, the approach I've shown creates a separate response queue for each consumer, but it can have problems scaling, especially if you are using a RabbitMQ cluster. You may want to look at the "direct reply-to" approach instead. For an interesting real-world overview of using direct reply-to, take a look at this blog post..

Wrapping up

I hope you've enjoyed this series and that it has given you some ideas about how to implement service relays in your Dynamics 365 CE projects. As I worked through the examples, I certainly learned a few new things, especially when I created my Python service proxy in Azure Functions.

Here are links to all the previous posts in this series.

  1. Part 1 - Series introduction
  2. Part 2 - Solution prerequisites
  3. Part 3 - Python code for the consumer and listener processes

What do you think about this approach? Is it something you think you'd use in production? Let us know in the comments!

comments powered by Disqus