Simple (c#) message broker using Nanomsg

I was attempting to do some CanOpen development using CanOpenNode in a 100% windows simulated environment where different apps all pretend to be different nodes and talk to each other via a simulated bus. Now message busses in software are not uncommon things but many are quite heavy going and for the magnitude of the product a think like DBUS was just not needed. Enter Nanomsgnanomsg is a socket library that provides several common communication patterns. It aims to make the networking layer fast, scalable, and easy to use. Implemented in C, it works on a wide range of operating systems with no further dependencies.” It also has support for a BUS type socket so seemed awesome on the surface.

First problem is the exact implementation of the bus socket is not exactly the same as one would expect when creating an electrical bus. Its simply not possible to just wire all the test apps together and have them talking with out some kind of extra glue in the middle. With nanomsg it is necessary to manually connect every node to every other node to form the bus. Some info is given here but the implementation detail is missed and stumped me for a while.

If we consider 3 apps all trying to form a bus via IPC then we have to do the following to make the system work :-

App 1

s.Bind("ipc://can_id1");

App 2

s1.Bind("ipc://can_id2");
s2.Connect("ipc://can_id1");

App 3

s1.Connect("ipc://can_id1");
s2.Connect("ipc://can_id2");

 

App1 has nothing to connect to so we create a socket with bind, App2 Can connect to app1 but also creates a socket. App3 just binds to app1 and app2’s socket. Nanomsg will take care of the rest and ensure that no matter who broadcasts all nodes receive. Clearly this is only suitable for a fixed implementation where number of nodes and node locations is hard coded. Not really what i had in mind. Enter the message broker. Now i’m not claiming this is efficient or the most elegant way to solve this problem, but its simple and flexible which are the key goals here for the testing i wish to do.

The idea with the simple message broker is to open a number of sockets (in the example 10) then each app will just connect to the appropriate socket. We can decide which one to connect to as i’m simulating a canopen node bus so each node/app has an ID from 1-127 so that makes a perfect sufix to use on the IPC eg can_id1 can_id2 etc.. If we wanted to be more elegant and remove hardcoded things to the next level we could add an interface to the broker that can be contacted by a new node that allocates a new socket on demand and returns its id. But this is good enough for the my requirements… The broker will listen to each of the multiple sockets then rebroadcast anything it gets to all OTHER sockets. As the sockets are all bi-directional it works as expected for a bus topology

using System;
using System.Threading.Tasks;
using NNanomsg;
using NNanomsg.Protocols;


namespace MessageBroker
{
    class Program
    {
        const int nosockets = 10;
        static BusSocket[] s = new BusSocket[nosockets];
        static NanomsgListener[] l = new NanomsgListener[nosockets];

        static void Main(string[] args)
        {
            Console.WriteLine("Starting message broker");

            for(int x=0;x< nosockets;x++) { s[x] = new BusSocket(); s[x].Bind(string.Format("ipc://can_id{0}",x)); l[x] = new NanomsgListener(); l[x].AddSocket(s[x]); l[x].ReceivedMessage += Listener_ReceivedMessage; } while (true) { Parallel.ForEach(l, (lx) =>
                 {
                     lx.Listen(new TimeSpan(0));
                 });
            }    
        }

        private static void Listener_ReceivedMessage(int socketID)
        {
            //New data on socket
            byte[] payload = s[socketID].ReceiveImmediate();

            //Send new data to everyone other than the sender
            Parallel.ForEach(s, (sx) =>
            {
                if (sx.SocketID == socketID)
                    return;
                sx.Send(payload);
            });         

        }

    }
}

Leave a Reply