.NET Zone is brought to you in partnership with:

Robert is a South African technology specialist with focus on Microsoft technologies. He is very passionate about teaching and sharing and is a Microsoft MVP & Ranger. Robert is a DZone MVB and is not an employee of DZone and has posted 75 posts at DZone. You can read more from them at their website. View Full User Profile

.NET 4.5 Baby Steps, Part 3: IDataFlowBlock

05.10.2012
| 3256 views |
  • submit to reddit

Introduction

A new interface in .NET is the IDataFlowBlock, which is implemented in many interesting ways, so to look at those we will start off with the simplest implementation. ActionBlock<TInput> is a completely new class in .NET 4.5 and provides a way of working with data in a very task orientated way. I simplistically think of this as the implementation of the IObserver interface we got in .NET 4. but do not limit your thinking to just that.

To use it, you first must add a reference to System.Threading.Tasks.DataFlow

image

In this simple first example I am doing a fairly simple Pub/Sub demo:

var subscriber = new ActionBlock<string>(input =>
    {
        Console.WriteLine("Got: {0}", input);
    });

for (int i = 0; i < 10; i++)
{
    Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            subscriber.Post(DateTime.Now.ToLongTimeString());
        });
}

Console.ReadLine();
CropperCapture[3]

As IObserver<T>

So the first fantastic feature is that it does have the ability (via extension method) to be an IObsserver<T> so it really solves the need to build up your own subscriber classes when implementing a pub/sub model.

First is the code for the publisher class – this is normal for the IObservable<T> as we had in .NET 4. This just means our new code can play well with our existing code.

public class Publisher : IObservable<string>
{
    List<IObserver<string>> subscribers = new List<IObserver<string>>();

    public IDisposable Subscribe(IObserver<string> observer)
    {
        subscribers.Add(observer);
        return null;
    }

    public void Send()
    {
        foreach (var item in subscribers)
        {
            item.OnNext(DateTime.Now.ToLongTimeString());
        }
    }
}

For our demo code, which produces the same as above:

var publisher = new Publisher();

var subscriber = new ActionBlock<string>(input =>
    {
        Console.WriteLine("Got: {0}", input);
    });

publisher.Subscribe(subscriber.AsObserver());

for (int i = 0; i < 10; i++)
{
    Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            publisher.Send();
        });
}

Complete

The next awesome feature is the Complete method which can be used to stop accepting of input when called – this is great for services where you want to shut down.

In this demo code it will run until you press enter:

var subscriber = new ActionBlock<string>(input =>
{
    Console.WriteLine("Got: {0}", input);
});


Task.Factory.StartNew(() =>
{
    while (true)
    {

        Thread.Sleep(new Random().Next(200, 1000));
        subscriber.Post(DateTime.Now.ToLongTimeString());
    }
});

Console.WriteLine("Press any key to stop input");
Console.ReadLine();
subscriber.Complete();
AttachmentSize
ActionBlock<T> demos8.36 KB
Published at DZone with permission of Robert Maclean, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)