Posts tagged with: griffin.networking

Easy and perfomant client/server communication with protobuf-net & Griffin.Framework

ProtoBuf is Googles open serialization format which can be used to serialize objects in a standardized way. With it, different platforms can communicate with a format that is much more efficient than XML. Combine the most popular implementation if it, protobuf-net, with Griffin.Framework and you get an easy and fast way of sending information between processes.

Continue Reading



Griffin.Networking – Added support for HTTP ranges

Added support for partial file downloads (content ranges)

Someone requested that I should add support for the Range headers in the HTTP protocol. I’ve done that now. But let’s back up a bit first.

In Griffin.Networking you can send files no matter the file size (GB files are no problem). What you do is simply to invoke Send() in your service class. Here is an example:

public class YourService : IServiceService
{
	public override void HandleReceive(object message)
	{
		var buffer = (byte[])message;
		// check message here
		
		var fileStream = new FileStream(@"C:MyFilesSomeVeryLargeFile.zip", FileMode.Open,
                                                FileAccess.Read, FileShare.ReadWrite);

		Context.Send(fileStream);
	}
}

That’s it. The framework will take care of the file sending, sending one piece at a time (the size of the piece is determined by the buffer pool size that you defined when creating the server).

Dealing with ranges manually

In a custom tailored server (i.e. you just use the HTTP protocol implementation from Griffin.Networking) you do it like this:

var rangeHeader = request.Headers["Range"];
if (rangeHeader != null && !string.IsNullOrEmpty(rangeHeader.Value))
{
	var response = request.CreateResponse(HttpStatusCode.PartialContent, "Roger, you'll get partial content");
	response.AddHeader("Accept-Ranges", "bytes");
	var fileStream = new FileStream(@"SomeFilePath", FileMode.Open,	FileAccess.Read, FileShare.ReadWrite);

	// parse requested ranges.
	// we can parse all, but currently do only support one in the ByteRangeStream
	var ranges = new RangeCollection();
	ranges.Parse(rangeHeader.Value, (int)fileStream.Length);
	response.AddHeader("Content-Range", ranges.ToHtmlHeaderValue((int)fileStream.Length));

	// move to the correct position in the inner stream.
	// will also report the correct range size instead of the entire file size
	// will support serving multiple ranges in the same response in the future.
	response.Body = new ByteRangeStream(ranges, fileStream); 
	Send(response);
}

// all other request processing here.

A full example can be found in the HTTP protocol sample folder at github.

Using the webserver

I’ve moved the WebServer implementation from Griffin.Networking repository to a seperate one at Github called Griffin.WebServer. It contains a file module which supports caching, partial file downloads and listing files (if enabled).

You can configure a HTTP server which serves static files like this:

// Module manager handles all modules in the server
var moduleManager = new ModuleManager();

// Let's serve our downloaded files (Windows 7 users)
var fileService = new DiskFileService("/", string.Format(@"C:Users{0}Downloads", Environment.UserName));

// Create the file module and allow files to be listed.
var module = new FileModule(fileService) {ListFiles = true};

// Add the module
moduleManager.Add(module);

// And start the server.
var server = new HttpServer(moduleManager);
server.Start(IPAddress.Any, 8080);

The WebServer is still work in progress. The pieces are however coming together pretty well.


New version of Griffin.Networking – A networking library for .NET

I’ve refactored Griffin.Network quite a lot (I did say that it was still an early release). The new version is called v0.5 (the library is taking shape, but there is still some things to take care of).

Highlights:

1. The pipeline is no longer mandatory
2. I’m using the Async methods instead of Begin/EndXxxx
3. The buffer management is a more effecient
4. It’s easier to work with the buffers
5. Introducing messaging support

Continue Reading


Griffin.Networking – A somewhat performant networking library for .NET

Disclaimer: The current framework release is a beta. It should be reasonable stable, but don’t blame me if it blow up your computer.

Introduction

Griffin.Networking is a networking library written in C# which purpose is to:

a) abstract away the repetitive tasks which you have to do with vanilla .NET
b) Create a more structured way of processing the inbound and outbound data

Those two goals should lower the time it takes to develop networking applications and also improve the performance thanks to a (hopefully) well-designed networking layer.

The framework also have Inversion Of Control support built in from start (be careful, slow containers hurt performance a lot). The IoC support is provided by a service location interface (which should not be exposed outside the framework).

The goal with this article is to describe the framework and show you have to develop an application with it. You will have a working JSON RPC implementation (server side) when done. The client side can be quite easily created afterwards. Simply create a RequestEncoder and a ResponseDecoder for the framework, everything else can be reused from the server implementation.

Background

I’ve built several networking applications in the past. Everything from small client applications in C# to performant socket servers in C++ utilizing IO Completion Ports.

They have worked well, but I always seemed to repeat myself when implementing my applications. The biggest problem is that it’s hard to get an extendable architecture where you can inject handlers into the protocol handling. My C# WebServer (Google “C# webserver” and click on the first search result) illustrates this well. It’s not easy to follow the communication flow.

I did therefore decide to try create a networking library which is easy to use and extend. During my research I stumpled upon Netty for Java which my library is heavliy inspired by.

Architecture

The purpose of this section is to give you a brief overview over the architecture and the terms which will be used throughout the article. Some things in this section may not make sense until you have read the entire article.

Channel

The channel is the IO layer. In most cases it’s a socket implementation, but could be anything used for communication. The default socket implementation uses the classical Begin/End type of methods. They will probably be replaced by the new Async methods later on.

There are two types of channels. Server channels who’s responsibility is to accept new connections and build the correct client channel (and it’s pipeline). Client channels which responsibility is to send and receive information from the remote peer.

    public interface IChannel
    {
        void HandleDownstream(IPipelineMessage message);
    }

As you can see, the channel interface is quite small. The reason to this is that the entire framework is asynchronous. All communication is made by messages. The contract of a channel says that is should only be able to receive and process messages. A message (read more below) can for instance be Connect, SendBuffer or Close. All channel implementations takes a pipeline (see below) in the constructor and use it to send messages to your application.

Pipeline

The pipeline is the most central part of the library. All the action happens in the pipeline. It’s in the pipeline that you authorize the users, transform the incoming byte[] array into something more usable like a HttpRequest etc etc.

The pipeline has two directions (compare with a road with two lanes). The lane from the channel to the application is called upstream, since the message travels up from the channel to your application. The other direction is called downstream since the message travels down to the channel.

A pipeline can contain an arbitrary number of handlers, and each direction have it’s unique set of handlers. A HTTP streaming server might only contain the HttpHeaderDecoder in the upstream and a HttpFileStreamer in the downstream to gain performance, while a complete HttpServer would include session management, authentication, logging, error handler etc as upstream handlers.

public interface IPipeline
{
	/// <summary>
	/// Send something from the channel to all handlers.
	/// </summary>
	/// <param name="message">Message to send to the client</param>
	void SendUpstream(IPipelineMessage message);

	/// <summary>
	/// Set down stream end point
	/// </summary>
	/// <param name="channel">channel which will handle all down stream messages</param>
	void SetChannel(IChannel channel);

	/// <summary>
	/// Send a message from the client and downwards.
	/// </summary>
	/// <param name="message">Message to send to the channel</param>
	void SendDownstream(IPipelineMessage message);
}

The architecture allows you to have full control over how the incoming and outgoing data is processed before it arrives in your application or in the channel.

Messages

As mentioned in the previous section, the pipeline are used to send message to/from your application. These messages are small classes which contains the information to process. a message can be compared with the EventArg classes in the .NET event mechanism. POCO classes which implements the IPipelineMessage interface.

Messages that requires actions should be named as verbs (Send) while event messages should be named past tense (Received).

The general guideline is that each message may only contain one type of information. You may not have a message called Received with an object property which is a byte[] in the beginning and a SuperDeluxeObject in the end. Rather create a new message named ReceivedSuperDeluxe which contains the SuperDeluxeObject object. It makes the processing cleaner and easier to follow.

Example message:

public class Connect : IPipelineMessage
{
    private readonly EndPoint _remoteEndPoint;

    public Connect(EndPoint remoteEndPoint)
    {
        if (remoteEndPoint == null)
            throw new ArgumentNullException("remoteEndPoint");

        _remoteEndPoint = remoteEndPoint;
    }

    public EndPoint RemoteEndPoint
    {
        get { return _remoteEndPoint; }
    }
} 

Pipeline handlers

Pipeline handlers are used to process the messages which are sent through the pipeline. They can either be singletons (shared among channels) or be created per channel. Handlers that are constructed together with the pipeline can store state information since they are used by one channel only.

Example upstream handler which traces the received information:

public class BufferTracer : IUpstreamHandler
{
    private readonly ILogger _logger = LogManager.GetLogger<BufferTracer>();

    public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
    {
        var msg = message as Received;
        if (msg != null)
        {
            var str = Encoding.UTF8.GetString(msg.BufferSlice.Buffer, msg.BufferSlice.Position, msg.BufferSlice.RemainingLength);
            _logger.Trace(str);
        }

        context.SendUpstream(message);
    }
}

Notice how it sends all messages to the next handler using context.SendUpstream(message). This is quite important. Each handler get to decide whether the message should be propagated up the call stack or not. It’s also how messages are transformed into something more usable. Let’s look at the HTTP HeaderDecoder handler.

public class HeaderDecoder : IUpstreamHandler
{
    private readonly IHttpParser _parser;
    private int _bodyBytesLeft = 0;

    public HeaderDecoder(IHttpParser parser)
    {
        if (parser == null) throw new ArgumentNullException("parser");
        _parser = parser;
    }

    public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
    {
        if (message is Closed)
        {
            _bodyBytesLeft = 0;
            _parser.Reset();
        }
        else if (message is Received)
        {
            var msg = (Received) message;

            // complete the body
            if (_bodyBytesLeft > 0)
            {
                _bodyBytesLeft -= msg.BufferSlice.Count;
                context.SendUpstream(message);
                return;
            }

            var httpMsg = _parser.Parse(msg.BufferSlice);
            if (httpMsg != null)
            {
                var recivedHttpMsg = new ReceivedHttpRequest((IRequest) httpMsg);
                _bodyBytesLeft = recivedHttpMsg.HttpRequest.ContentLength;
                _parser.Reset();

                // send up the message to let someone else handle the body
                context.SendUpstream(recivedHttpMsg);
                msg.BytesHandled = msg.BufferSlice.Count;
                context.SendUpstream(msg);
            }

            return;
        }

        context.SendUpstream(message);
    }
} 

Two things are important here:

It follows Single Responsibility Principle

It doesn’t actually parse the HTTP message but uses an external parser for that. It’s easy to follow what the handler does since it does not violate Single Responsibility Principle, and we can at any time switch parser if we find a more performant one.

It transforms the Received message into a ReceivedHttpRequest

All messages should be considered to be immutable. Don’t change their contents unless you have a really good reason to. Don’t propagate the original package upstream, but create a new message instead.

Switching sides

A pipeline handler can at any time switch from the downstream to the upstream (or vice versa). Switching sides will always invoke the first handler in the other side. This allows us to stream line the process and to avoid confusion.

public class AuthenticationHandler : IUpstreamHandler
{
	private readonly IAuthenticator _authenticator;
	private readonly IPrincipalFactory _principalFactory;

	public AuthenticationHandler(IAuthenticator authenticator, IPrincipalFactory principalFactory)
	{
		_authenticator = authenticator;
		_principalFactory = principalFactory;
	}

	public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
	{
		var msg = message as ReceivedHttpRequest;
		if (msg == null)
		{
			context.SendUpstream(message);
			return;
		}

		var authHeader = msg.HttpRequest.Headers["Authorization"];
		if (authHeader == null)
		{
			context.SendUpstream(message);
			return;
		}

		var user = _authenticator.Authenticate(msg.HttpRequest);
		if (user == null)
		{
			//Not authenticated, send error downstream and abort handling
			var response = msg.HttpRequest.CreateResponse(HttpStatusCode.Unauthorized,													   "Invalid username or password.");
			context.SendDownstream(new SendHttpResponse(msg.HttpRequest, response));
		}
		else
		{
			var principal =
				_principalFactory.Create(new PrincipalFactoryContext {Request = msg.HttpRequest, User = user});
			Thread.CurrentPrincipal = principal;
		}
	}
}

Pipeline factories

A pipeline (and all of it’s handlers) needs to be constructed each time a new channel is created. There are two built in factories in the framework.

One that uses an interface called IServiceLocator which allows you to add support for your favorite IoC container. And one that uses delegates to created stateful handlers.

var factory = new DelegatePipelineFactory();
factory.AddDownstreamHandler(() => new ResponseEncoder());

factory.AddUpstreamHandler(() => new HeaderDecoder(new HttpParser()));
factory.AddUpstreamHandler(new HttpErrorHandler(new SimpleErrorFormatter())); //singleton
factory.AddUpstreamHandler(() => new BodyDecoder(new CompositeBodyDecoder(), 65535, 6000000));
factory.AddUpstreamHandler(() => new FileHandler());
factory.AddUpstreamHandler(() => new MessageHandler());
factory.AddUpstreamHandler(new PipelineFailureHandler()); //singleton

Buffers

A fundamental part of a performant networking library is how the data is handled. All larger allocations hurt performance. We don’t want to create a new byte[65535] each time we read or send a new packet. It takes time to do the allocation, the garbage collector have to work more and the memory will end up fragmented.

The framework solves this by using buffer pools and a class called BufferSlice. We can allocate a buffer which is 5Mb large and slice it into smaller pieces which we use in the processing. We can either make the buffer pool a singleton or let each handler allocate it’s own buffer pool (it’s still just five allocations instead of 5000 if you have five handlers).

The BufferSlice class returns it’s buffer to the pool when it’s disposed. It’s therefore important that all messages that uses the BufferSlice class implements IDisposable, since the channel will dispose all messages when it’s done with them.

Performance

The framework is still quite new (abut one month =)). The performance is not at it’s peak yet.

However, I’ve used Apache’s ab tool to throw 5000 requests at the HTTP listener. The framework handled about 280 HTTP requests per second (localhost) which I consider to be OK this early in the project. The memory consumption was about 80Mb (working set). (Note that the numbers doesn’t really say anything.) Feel free to help improve the performance or do your own benchmarks. I would like to get a sample application which I can use for performance tuning (and compare the performance with other frameworks).

Building a JSON RPC server

It’s time to start building a JSON RPC server. Create a new console application name something like JsonRpcServer. Start the nuget package console and run install-packate griffin.networking to install the framework.

The specification for JSON RPC can be found at the official website. This article will not help you understand it, but only showing how you can implement it. The specification do not say anything about how the messages are transferred and we’ll therefore create a simple envelope which will be used to wrap the messages. The envelope is a simple binary header with a version (byte) and a length (int) field.

Decoding / Encoding

The first thing we need to do is to process the incoming bytes. We have to decode them into something that we can work with. As mentioned we’ll use a simple envelope. Something like:

public class SimpleHeader
{
	public int Length { get; set; }
	public byte Version { get; set; }
}

But to be able to use that class we need to decode the incoming bytes in some way. So let’s create our first pipeline handler which we’ll use for just that:

public class HeaderDecoder : IUpstreamHandler
{
	public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
	{
		var msg = message as Received;
		if (msg == null)
		{
			context.SendUpstream(message);
			return;
		}

		// byte + int
		if (msg.BufferSlice.RemainingLength < 5)
		{
			return;
		}

		var header = new SimpleHeader
						 {
							 Version = msg.BufferSlice.Buffer[msg.BufferSlice.Position++],
							 Length = BitConverter.ToInt32(msg.BufferSlice.Buffer, msg.BufferSlice.Position)
						 };
		msg.BufferSlice.Position += 4;
		context.SendUpstream(new ReceivedHeader(header));

		if (msg.BufferSlice.RemainingLength > 0)
			context.SendUpstream(msg);
	}
}

Pretty straightforward. We don’t process anything until we got at least five bytes (the channel will continue to fill the buffer at the end until we handle something). Then we just decode the header, send a RecievedHeader message and pass on the remaining bytes. Notice that I use the version byte first. By doing so we can change the header as much as we like in future versions without screwing everything up.

The header doesn’t say anything more than the size of the actual JSON message. So we need something to process the JSON to. Let’s create another upstream handler for that (and thefore complying with the Single Responsibility Princinple). At will be called… BodyDecoder ;) (I’ve cheated and created the Request/Response/Error objects which the JSON RPC specification describes.)

public class BodyDecoder : IUpstreamHandler
{
	private static readonly BufferPool _bufferPool = new BufferPool(65535, 50, 50);
	private readonly BufferPoolStream _stream;
	private SimpleHeader _header;

	public BodyDecoder()
	{
		var slice = _bufferPool.PopSlice();
		_stream = new BufferPoolStream(_bufferPool, slice);
	}

	public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
	{
		var headerMsg = message as ReceivedHeader;
		if (headerMsg != null)
		{
			_header = headerMsg.Header;
			if (_header.Length > 65535)
			{
				var error = new ErrorResponse("-9999", new RpcError
														   {
															   Code = RpcErrorCode.InvalidRequest,
															   Message =
																   "Support requests which is at most 655355 bytes.",
														   });
				context.SendDownstream(new SendResponse(error));
			}

			return;
		}

		var received = message as Received;
		if (received != null)
		{
			var count = Math.Min(received.BufferSlice.RemainingLength, _header.Length);
			_stream.Write(received.BufferSlice.Buffer, received.BufferSlice.Position, count);
			received.BufferSlice.Position += count;

			if (_stream.Length == _header.Length)
			{
				_stream.Position = 0;
				var request = DeserializeRequest(_stream);
				context.SendUpstream(new ReceivedRequest(request));
			}

			return;
		}

		context.SendUpstream(message);
	}

	protected virtual Request DeserializeRequest(BufferPoolStream body)
	{
		var reader = new StreamReader(body);
		var json = reader.ReadToEnd();
		return JsonConvert.DeserializeObject<Request>(json);
	}
}

Here we are using the BufferPool instead of creating a new buffer each time. Hence a quite large performance gain and a lot less fragmented memory if the server runs for a while. Also notice that the framework has a BufferPoolStream which uses the BufferPool to get byte[] buffers. Future versions of the stream will most likely be able to use several buffers behind the scenes (and therefore be able to handle larger amount of data without creating too large buffers).

Before we continue with the actual application, lets add the only downstream handler. The response encoder.

public class ResponseEncoder : IDownstreamHandler
{
	private static readonly BufferPool _bufferPool = new BufferPool(65535, 50, 100);

	public void HandleDownstream(IPipelineHandlerContext context, IPipelineMessage message)
	{
		var msg =  message as SendResponse;
		if (msg == null)
		{
			context.SendDownstream(message);
			return;
		}

		var result = JsonConvert.SerializeObject(msg.Response, Formatting.None);

		// send header
		var header = new byte[5];
		header[0] = 1;
		var lengthBuffer = BitConverter.GetBytes(result.Length);
		Buffer.BlockCopy(lengthBuffer, 0, header, 1, lengthBuffer.Length);
		context.SendDownstream(new SendBuffer(header, 0, 5));

		// send JSON
		var slice = _bufferPool.PopSlice();
		Encoding.UTF8.GetBytes(result, 0, result.Length, slice.Buffer, slice.StartOffset);
		slice.Position = slice.StartOffset;
		slice.Count = result.Length;
		context.SendDownstream(new SendSlice(slice));
	}
}

Now we only got one thing left to do in the pipeline. And that’s to handle the requests. Let’s start by creating a very simple handler:

class MyApplication : IUpstreamHandler
{
	public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
	{
		var msg = message as ReceivedRequest;
		if (msg == null)
			return;


		var parray = msg.Request.Parameters as object[];
		if (parray == null)
			return; // muhahaha, violating the API specification

		object result;
		switch (msg.Request.Method)
		{
			case "add":
				result = int.Parse(parray[0].ToString()) + int.Parse(parray[0].ToString());
				break;
			case "substract":
				result = int.Parse(parray[0].ToString()) + int.Parse(parray[0].ToString());
				break;
			default:
				result = "Nothing useful.";
				break;
		}

		var response = new Response(msg.Request.Id, result);
		context.SendDownstream(new SendResponse(response));
	}
}

How do we run the application then? We need to create a server channel and define the client pipeline. I usually do it in a class called XxxxListener to follow the .NET standard. So let’s create a JsonRpcListener.

public class JsonRpcListener : IUpstreamHandler, IDownstreamHandler
{
	private TcpServerChannel _serverChannel;
	private Pipeline _pipeline;


	public JsonRpcListener(IPipelineFactory clientFactory)
	{
		_pipeline = new Pipeline();
		_pipeline.AddDownstreamHandler(this);
		_pipeline.AddUpstreamHandler(this);
		_serverChannel = new TcpServerChannel(_pipeline, clientFactory, 2000);

	}

	public void Start(IPEndPoint endPoint)
	{
		_pipeline.SendDownstream(new BindSocket(endPoint));
	}

	public void Stop()
	{
		_pipeline.SendDownstream(new Close());
	}

	public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
	{
		var msg = message as PipelineFailure;
		if (msg != null)
			throw new TargetInvocationException("Pipeline failed", msg.Exception);
	}

	public void HandleDownstream(IPipelineHandlerContext context, IPipelineMessage message)
	{
		context.SendDownstream(message);
	}
}

So now we can define the client pipeline in Program.cs and inject it in the RpcListener:

class Program
{
	static void Main(string[] args)
	{
		LogManager.Assign(new SimpleLogManager<ConsoleLogger>());

		var factory = new DelegatePipelineFactory();
		factory.AddUpstreamHandler(() => new HeaderDecoder());
		factory.AddUpstreamHandler(() => new BodyDecoder());
		factory.AddUpstreamHandler(new MyApplication());
		factory.AddDownstreamHandler(new ResponseEncoder());

		JsonRpcListener listener = new JsonRpcListener(factory);
		listener.Start(new IPEndPoint(IPAddress.Any, 3322));

		Console.ReadLine();
	}
}

The first two upstream handlers are stateful, so we need to create those for every channel which is generated. That’s why we use a delegate. The last two is not stateful and can therefore be singletons.

That’s it. You now got a working JSON RPC server. Sure. It’s pretty basic, but the actual remoting layer doesn’t have much to do with the networking layer. I did however take some time to create a proof of concept RPC system. Let’s define our RPC service first:

public class MathModule
{
	[OperationContract]
	public int Sum(int x, int y)
	{
		return x + y;
	}
}

Then we need to redefine the client pipeline:

var invoker = new RpcServiceInvoker(new DotNetValueConverter(), new SimpleServiceLocator());
invoker.Map<MathModule>();

factory.AddUpstreamHandler(() => new HeaderDecoder());
factory.AddUpstreamHandler(() => new BodyDecoder());
factory.AddUpstreamHandler(new RequestHandler(invoker));
factory.AddDownstreamHandler(new ResponseEncoder());

That’s it. From here we could go and include the Http protocol implementation and switch out our simple header against the HeaderDecoder in the HTTP implementation and therefore get an implementation which works over HTTP instead of our basic binary header. We have to do a few minor changes to achieve that, keeping most of the Json RPC implementation intact.

Summary

I hope that I’ve managed to demonstrate how to develop networking applications with Griffin.Networking and show the power that it gives you compared to vanilla .NET socket servers.

The code is available as a nuget package griffin.networking and the http implementation is available as griffin.networking.http. The JSON RPC implementation is still just a concept and therefore not added as a release yet. Feel free to participate to complete it.

All code is also available at github.