Friday, November 20, 2009

Reactive Extensions – First Look


Rx is a library for composing asynchronous and event based programs using observable collections. The library revolves around the concept of “Reactive programming”. Reactive programming or reactive programs is not a new concept, we have been using reactive programming before in our applications by registering to an event handler or a call back on an asynchronous operation.
All these operations were performed by specifying delegates or methods that are called at unpredictable times. i.e. the functions were called based on an action or a change in the existing data type or code. But we can also think that these operations are performed when data is passed to them as an item in a sequence. For e.g. a sequence of button click events are passed to the method OnButtonClick(). Every time the function gets an input it performs some operation.
Rx is a superset of the standard LINQ sequence operators that exposes asynchronous and event based computations as push based observable collections via the .NET 4.0 interfaces IObservable and IObserver. These interfaces form the heart of Rx.
IObservable interface represents the class that sends notifications (the provider); the IObserver interface represents the class that receives them (the observer). T represents the class that provides the notification information.
public interface IObservable<out T>
{       
    IDisposable Subscribe(IObserver observer);
}
The provider object implements the IObservable  that has a single method Subscribe, which indicates that an observer wants to receive push-based notifications. Callers to the method pass an instance of the observer.
public interface IObserver<in T>
{       
    void OnCompleted();       
    void OnError(Exception error);       
    void OnNext(T value);
}
The provider sends the following three kinds of notifications to the observer by calling IObserver<(Of <(T>)>) methods:
The current data. The provider can call the IObserver , OnNext method to pass the observer a T object that has current data, changed data, or fresh data.
An error condition. The provider can call the IObserver, OnError method to notify the observer that some error condition has occurred.
No further data. The provider can call the IObserver, OnCompleted method to notify the observer that it has finished sending notifications.

I have created a simple example to demonstrate the usage of IObservable and IObserver interfaces and the observer pattern implementation.
public class Employee
{
    public string Name { get; set; }
    public string Email { get; set; }
    public decimal Bonus { get; set; }
    public DateTime DateOfJoining { get; set; }
}

public class EmployeeSubject : IObservable<Employee>
{
    public EmployeeSubject ()
       {
        ___Employee = new Employee { Name = "Prajeesh", Bonus=90.90M, DateOfJoining= new DateTime(2005, 08, 01), Email = "prajeesh.prathap@gmail.com" };
       }

    public Employee GetEmployee() { return ___Employee; }
   
    public void UpdateEmployee(Employee employee)
    {
        ___Employee.Name = employee.Name;
        ___Employee.Email = employee.Email;
        ___Employee.Bonus = employee.Bonus;
        ___Employee.DateOfJoining = employee.DateOfJoining;
       
        //Notify the observers by pushing the updated employee details.
        ___Observers.ForEach(observer => observer.OnNext(___Employee));           
    }

    public IDisposable Subscribe(IObserver<Employee> observer)
    {
        ___Observers.Add(observer);
        observer.OnNext(___Employee);
        return observer as IDisposable;
    }

    Employee ___Employee;
    List<IObserver<Employee>> ___Observers = new List<IObserver<Employee>>();
}

public class EmployeeObserver : IObserver<Employee>
{
    public void  OnCompleted()
    {
       Console.WriteLine("Subscription completed");
    }

    public void  OnError(Exception error)
    {
       Console.WriteLine("Unexpected error occurred {0}", error.Message);
    }

    public void  OnNext(Employee value)
    {
       Console.WriteLine("Employee Details");
        Console.WriteLine("Name : " + value.Name);
        Console.WriteLine("Date of joining : " + value.DateOfJoining.ToShortDateString());
        Console.WriteLine("Email : " + value.Email);
        Console.WriteLine("Bonus : " + value.Bonus.ToString());
        Console.WriteLine();
    }
}

In the console application, I have the code written as
var __EmployeeSubject = new EmployeeSubject();
var __Observer = new EmployeeObserver();
__EmployeeSubject.Subscribe(__Observer);

var __Employee = __EmployeeSubject.GetEmployee();
__Employee.Bonus = 55.55M;
__EmployeeSubject.UpdateEmployee(__Employee);

Output:
Employee Details
Name : Prajeesh
Date of joining : 8/1/2005
Email : prajeesh.prathap@gmail.com
Bonus : 90.90

Employee Details
Name : Prajeesh
Date of joining : 8/1/2005
Email : prajeesh.prathap@gmail.com
Bonus : 55.55

In the next post, I’ll show some samples using the Rx framework and how it can be used in our applications.

No comments: