Friday, September 26, 2014

Task parallel library - Quick notes

Task Vs ThreadPool-
Tasks internally uses Threadpool only, but it has few feature which makes it better than traditional threading. Here are few advantage of using Task over ThreadPool -
1) Along with traditional thread queue, it has internal local queue as well. Task creating another task get queued up in the local queue, rather than going into common queue. It helps in optimizing the execution.
Here is a good article which discuss about optimization -
http://www.danielmoth.com/Blog/New-And-Improved-CLR-4-Thread-Pool-Engine.aspx
2) Cancelling a task is much easier than stopping any executing code in a thread.
3) Exception handling is easier in case of Task
4) Getting result from task is much easier than thread.

Task creation -
There are multiple ways to create and start the Task -
1) var taskA = new Task(() => Console.WriteLine("Hello from taskA."));//create
   taskA.Start();//start
2) var taskA = Task.Factory.StartNew(() => Console.WriteLine("Hello from taskA."));//create + start
3) Task task1 = Task.Factory.StartNew((object obj) => Console.WriteLine(obj),123); //create + start with value passed in it.
4) Task> t = Task.Factory.StartNew>(() => { return new List() { 1,2,3 }; }); // return some value from task

Here is a good article on comparison between StartNew and Task.start - http://blogs.msdn.com/b/pfxteam/archive/2010/06/13/10024153.aspx

Exception Handling -
Task provide AggregateException, which is consolidate list of all exceptions occurred in tasks. It has property InnerException, by which we can access individual exceptions.
We can capture the exception, if we surround the task's wait/result method with try catch block.
Handle method available on aggregateException, which takes predicate. predicate takes exception as input and returns true/false if exception is handled successfully or not.
There is a property named as IsFaulted. We can use it to identify if there was any unhandled exception. We can use it if we are not using wait/result property.

Cancellation -
The requesting object creates a CancellationTokenSource object, and then passes its Token property to the cancellable operation. The operation that receives the request monitors the value of the IsCancellationRequested property of the token by polling/wait/waithandle. When the value becomes true, the listener can terminate in whatever manner is appropriate. It is advised to throw OperationCancelledException from the cancelled task, so that the state of the task can be moved to cancelled, and also it the the way to tell that task has responded to the cancellation request.
CancellationTokenSource tokenSource = new CancellationTokenSource();          
            CancellationToken token = tokenSource.Token;          
            Task taskWithFactoryAndState =
            Task.Factory.StartNew(() =>
            {              
                token.ThrowIfCancellationRequested();                                                  
            }, token);

            // cancel the second token source
            tokenSource.Cancel();
            try
            {
                taskWithFactoryAndState.Wait();
            }
            catch (AggregateException aggEx)
            {
            }

SynchronizationContext -
Using TaskScheduler.FromCurrentSynchronizationContext() we can marshal any task on UI thread. We can pass it as one of the parameter to task and .NET will execute the task on given context.
Here is a good article on the same- http://blogs.msdn.com/b/pfxteam/archive/2009/09/22/9898090.aspx

Continuation -
We can use continueWith API to chain any task. There are different flavor available of ContinueWith like WhenAny,WhenAll, OnlyonFaulted, OnlyOnRanToCompletion etc, which we can use to chain as per our need. It is one of the powerful feature of Task library as we can implement pipe execution model with it.
Continue task wont execute in case of exception or cancellation.

Parallel.For and Parallel.Foreach -
Parallel.For(0, 10, (i) => {
                Console.WriteLine(i);
            });

            List list = new List() {"one", "two","three","four","five"};
            Parallel.ForEach(list, (str) => {
                Console.WriteLine(str);
            });


There is no guarantee of the sequence.
We can terminate the loop in between using stop or break API in the following manner -
List list = new List() {"one", "two","three","four","five"};
            Parallel.ForEach(list, (str,state) => {
                Console.WriteLine(str);
                state.Break(); // or state.Stop();
            });

There is slight difference between stop and break -
1) breakindex - In case of break, we can identify at which point does the loop break -
List list = new List() {"one", "two","three","four","five"};
            var result = Parallel.ForEach(list, (str,state) => {
                Console.WriteLine(str);
                state.Break(); // or state.Stop();
            });
            long? breakIndex = result.LowestBreakIteration;

2) Stop notifies that all iteration will immediately stop but in case of break all higher iterations will be stopped, the lower iteration will still execute.-
Parallel.For(0, 100, (i, loopState) =>
            {
                if (i == 85)
                {
                    loopState.Break();
                }
                Thread.Sleep(20);
                list.Add(i);
            }
            );
In this code, if we are using Break, then all values including 85 will be added into list, but we use Stop, there might be some values >85 will be added, and some of the values less than 85 that will not be added.

TaskCompletionSource
This is another mechanism of creating Task. In this case, we create a task which is not assiciated with any delegate. There are other ways like APM or events available in .NET, which can give us asynchronous way of handling function. Using TCS, we need not to care what underlying model we have used, we can just wrapped it around TAsk and use it power.
It has SetResult and SetException kind of methods availble which set the result/exception to the given task. Every TCS can have only 1 task associated with it.
TaskCompletionSource tcs = new TaskCompletionSource();
            ThreadPool.QueueUserWorkItem(_ => {
                int data = MakeServiceCall();
                tcs.SetResult(data);
            });
            tcs.Task.Wait();
            Console.WriteLine(tcs.Task.Result);

Good article on the same - http://blogs.msdn.com/b/pfxteam/archive/2009/06/02/9685804.aspx