Concurrency Helpers

This is a collection of utilities to help the development in concurrent environments with C#

Available as Nuget package

The Coroutine

Coroutines are computer program components that generalize subroutines to allow multiple entry points for suspending and resuming execution at certain locations. (from Wikipedia).

They are "light" threads. Based on the yield return syntax, stretching it to be used to control the execution flow.

Invoking a coroutine

To use a coroutine, a CoroutineThread must be present. The thread will execute the part of each coroutine one by one following a Round-Robin execution process.

    var maxCycleLengthInMilliseconds = 10;

    //Create the coroutine thread
    var coroutineThread = new CoroutineThread(maxCycleLengthInMilliseconds);
    coroutineThread.Start();

    //Adding a coroutine
    coroutineThread.AddCoroutine(new MyCoroutine());

    //Stopping the coroutine thread
    coroutineThread.Stop();

A simple example

There is already a base class "Coroutine" that will let you implement the coroutine itself easily. For example here is a simple implementation of a coroutine that will execute two operations, and then terminate.

public class SimpleCoroutine : Coroutine
{
    public override IEnumerable<Step> Run()
    {
        SomeOperation();
        //Leave control to the others coroutines
        yield return Step.Current;

        AnotherOperation();
        yield return Step.Current;
    }

    public override void OnError(Exception ex)
    {
        //Execute the cleanup
        CleanupOperations();
        Console.WriteLine(ex);
        ShouldTerminate = true;
    }
}

Note the usage of the "Step.Current" it's a singleton object used as a marker. The "Step" is the object that is always returned by the various coroutines to the main thread. A simple "null" could have been used, but to make it more readable i used this "Step"

There is even a Coroutine.NullCoroutine instance fore coroutines that does nothing :)

Calling default .NET async functions

The various functions supporting async and await returns Task-s these task are mostly calling function already controlled through the I/O completion ports, meaning that the simply wait for the data returned by the drivers, they mostly do nothing by themselves.

For example to write data on a stream, that essentially copies data on the network card buffer and wait for the response. In this situation the controller will be stopped until the end of the WriteAsync function.

    yield return InvokeTaskAndWait(tcpStream.WriteAsync(bytes, 0, bytes.Length 0));

The signature is

    Step InvokeTaskAndWait(Task task);

I want for example write asynchronously a file stream inside a coroutine.

public override IEnumerable<Step> Run()
{
    //Prepare the file stream
    FileStream sourceStream = File.Open("myFile.bin", FileMode.OpenOrCreate);
    sourceStream.Seek(0, SeekOrigin.End);

    //Invoke the task
    yield return InvokeTaskAndWait(sourceStream.WriteAsync(result, 0, result.Length));

    //Close the stream
    sourceStream.Close();
}

In this example we are calling the InvokeTaskAndWait function. This will set the ICoroutine.IsReady flag to false, thus telling the main thread that the coroutine will be put in a kind of wait state. The control is left to the main thread. Until when the IsReady will be true the coroutine will be paused and will not use processor cycles. Then, inside the ContinueWith of the task (added inside the InvokeTaskAndWait) the flag IsReady is set to true. The main thread now will re-call the coroutine that will restart running, closing the sourceStream.

No "real" wait had been used. In fact this wait is really a "pause".

With return values

In the following code we are calling the ReadText function, its definition is the following. When it finish it will return a string with the content of the file. This function will call a ReadAsync on a file stream thus will return null, until the moment in which it would have read all the file content.

    IEnumerable<string> ReadText(string path);

The result, when the ReadText would be completed, will be stored inside the Container.RawData field. No wait will be made, only a poll to check if the asynchronous operation finished

    var result = new Container();
    yield return InvokeLocalAndWait(() => _globalPathProvider.ReadText(path), container);
    var data = container.RawData as string;

The signature is

    Step InvokeLocalAndWait<T>(Func<IEnumerable<T>> func, Container result = null)

As tasks

It is possible to call wetheaver Action as a task. For example when an operation would be very complex and need to be mantained all concentrated in a single "context" and would block the thread for too much time, we can invoke an action as a Task, and block the execution of the caller until the completion of the task.

    yield return InvokeAsTaskAndWait(() => context.Initialize())

The signature is

    Step InvokeAsTaskAndWait(Action action)

Calling another coroutine

To call a different coroutine.

    var childCoroutine = new ChildCoroutine();
    yield return InvokeCoroutineAndWait(childCoroutine);

The signature is the following. The added field is reserved for a future usage. Like running the coroutine directly as child, and not as new coroutines running totally indipendently.

    Step InvokeCoroutineAndWait(ICoroutine action,bool added = true)

Pleas don't call them corutine or corootine, co-routine would be ok :P

Containers

A series of container lock free and real-time (real-time means that the operations are completed within a certain amount of time)

LockFreeItem

This class would be used to store an object shared between threads. Its operation are based on the usage of the CAS (Compare And Swap) operation. You can search for the definition on Wikipedia. All get and set can be made without any need for locks.

The are of course restrictions on the usage of the object:

Now some usage sample

public void Do()
{
    var lockFreeItem = new LockFreeItem<object>();
    object content = lockFreeItem.Data;
    var newContent = new object();
    lockFreeItem.Data = newContent;
}

LockFreeQueue

This queue is based on a paper from Maged M. Michael and Michael L. Scott plus some stuffs founded on Internet. If you feel you should be cited here, please tell it to me and i'll add you!

The implementation is based on a linked list and atomic lock as the LockFreeItem. Operation of enqueue and dequeue can be made without any lock, being based on CAS operation.

Note that the "LockFreeQueue.Count" getter is intended as a "guess". The only safe way to know if the queue is empty is to issue a Dequeue and checking that it returns null.

Now some sample usage:

public void Do()
{
    var lockFreeQueue = new LockFreeQueue<object>();
    for(var i =0;i<10;i++){
        lockFreeQueue.Enqueue(new object());
    }
    lockFreeQueue.Enqueue(new List<object>{new object(),new object()});

    var obj = lockFreeQueue.DequeueSingle();
    Console.WriteLine("Items in queue: "+lockFreeQueue.Count);
    int maxItemToDequeue = 100;
    foreach(var item in lockFreeQueue(maxItemToDequeue)){
        Console.WriteLine("Items : "+item);
    }
}

AsyncLockFreeDictionary<TKey,TValue>

This object is a standard IDictionary. It works exactly as a standard dictionary The beahviours are the following:

AsyncLockFreeList

This is just a small proof of concept.. Use at your own risk! It's hard to deal with a lock free list :P

This object is based on a timer, a LockFreeQueue and LockFreeItem. It's guaranteed to obtain safe enumerators and safe copy of the dictionary.

The underlying List is updated (unless differently specified) every 10 milliseconds.

Utils

CounterInt64

An interlocked-based Int64 value. Its purpouse is to allow passing the interlocked-controlled values between functions.

Must be initialized explicitely with "new CounterInt64()" and NOT directly with a value. It masks the complexities of Interlocked infrastructure

ITimer

A wrapper on the System.Timer or System.Threading.Timer.

The idea was to hide the implementations details for the two different kinds of timers. All is based on the ITimer interface

public interface ITimer: IDisposable
{
    void Start(int period = 0, int delay = 0);
    void Stop();
    event ElapsedTimerEventHandler Elapsed;
    int TimesRun { get; }
    bool Running { get; }
}

The event handler is based on the ElapsedTimerEventArgs, a custom EventArgs that contains the DateTime SignalTime field as the handler for the default timers.

Note that the Start method can be invoked to change the running period of the timer in any moment. But note that when recalling the Start method, the value of the delay field will be honoured!

Licensing

Copyright (C) 2013-2014 Kendar.org

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


Last modified on: March 12, 2014