.NET Framework 4.0 offers a set of new features focused on Observable/Observer design pattern. Actually, such pattern is a subset of Publish/Subscribe design pattern. Such kind of patterns aim to provide a mechanism for push-based notifications.
The idea turns around two new generic interfaces IObserver<T> and IObservable<T> . Whilst IObservable<T> provides all the functionality for the publisher, IObserver<T> does the same for subscriber. You will find that publisher and subscriber are also known as provider and observer. Whatever name is used for, keep in mind what each item is supposed to do, otherwise you will feel confused.
- Publisher –> Provider –> Observable
- Subscriber –> Observer
Notice that both interfaces are generics by <T>. Actually, IObservable is covariant with <T> whereas IObserver is contravariant with <T>. The reason is that whilst IObservable could provide push-based notification methods to the indicated type <T> and more derived <T> types, the IObserver only could observe such derived types from <T’> to <T> where <T’> is the derived type of <T>, i.e. IObserver could use <T> and less derived types. The contracts of both interfaces are declared as follows:
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
I wish to be observed
That said, the main target of an observable object is to be observed by a bunch of observers. Observable owns just one method used for subscribing in an observer. On the other hand, each time an observable object push a new notification it would be caught by OnNext observer’s method. Let’s see an example.
Suppose we have a simply class called Car which describes a racing car.
class Car
{
//common properties
public string Name { get; set; }
public int Id { get; set; }
public int Position { get; set; }
public long BestTime { get; private set; }
public long LastTime { get; private set; }
public TimeSpan TotalTime { get; set; }
public Car(string name, int id)
{
Name = name;
Id = id;
BestTime = long.MaxValue;
TotalTime = new TimeSpan();
}
//use this method for updating stats
public void LastLap(long lastlaptime)
{
//saving the last lap time
LastTime = lastlaptime;
//is the lastlap the best time?
BestTime = (lastlaptime < BestTime) ? lastlaptime : BestTime;
//total time
TotalTime = TotalTime.Add(new TimeSpan(lastlaptime));
}
public override string ToString()
{
return string.Format("Car {0} -\t TotalTime: {3}\n- LastTime: {1}\n- BestTime: {2}",
this.Id.ToString("00"), new TimeSpan(this.LastTime),
new TimeSpan(this.BestTime), this.TotalTime);
}
}
Besides, a simply class which describes a Race.
1: class Race
2: {
3: public int TotalLaps { get; set; }
4: public int CurrentLap { get; set; }
5: public List<Car> Cars { get; set; }
6:
7: public Race(int totalLaps)
8: {
9: Cars = new List<Car>();
10: TotalLaps = totalLaps;
11: }
12: }
Then, our goal is monitoring a car racing. Let’s say that such car racing is Nascar which will contain a Race type object and such Race object will contain a generic list of participants Cars, as well. What we aim to monitor is the Nascar car racing so we may create a class NascarRace which will be observable for car racing journalist, let’s say NBC and BBC. Thus, such journalist would became nascar’s observer.
The nascar race could be described as follows:
class NascarRace : IObservable<Race>
{
List<IObserver<Race>> display = new List<IObserver<Race>>();
Race race = new Race(400);
public void Start()
{
//start up cars
Car car1 = new Car("Car1", 1);
Car car2 = new Car("Car2", 46);
Car car3 = new Car("Car3", 99);
race.Cars.Add(car1);
race.Cars.Add(car2);
race.Cars.Add(car3);
ThreadStart threadDelegate = new ThreadStart(Running);
Thread newThread = new Thread(threadDelegate);
newThread.Start();
}
private void Running()
{
//lap loop
for (int lap = 0; lap < race.TotalLaps; lap++)
{
Random rnd = new Random();
race.Cars.ToList().ForEach(r =>
{
//setting the last lap time randomly
r.LastLap(rnd.Next(1000000000, int.MaxValue));
});
//increasing num of laps done
race.CurrentLap++;
//observers notification
display.ToList().ForEach(d => d.OnNext(race));
//waiting for a second
Thread.Sleep(1000);
}
//completed. observers notification
display.ToList().ForEach(d => d.OnCompleted());
}
public IDisposable Subscribe(IObserver<Race> observer)
{
//new observer
display.Add(observer);
return observer as IDisposable;
}
}
Note that NascarRace class constructor sets up three cars which are going to participate to the competition. On the other hand a new thread is created and launched once the competition starts up. ThreadStart delegate points to a method called Running() and this method will hold the racing timeline. The total laps are set by default at 40 so during 40 laps all three cars will be gathering elapsed time and for each lap all the observers are going to be notified. RaceMonitor will be displaying race results in ascending order. Before next lap, thread gets slept for a 1 second in order to give more emotion to simulated nascar racing
Now, I wish to observe
On the other hand journalist or anyone else whom aims to be an observer would be described through RaceMonitor class described as follows:
1: class RaceMonitor : IObserver<Race>
2: {
3: public void OnCompleted()
4: {
5: Console.WriteLine("Finished");
6: }
7:
8: public void OnError(Exception error)
9: {
10: Console.Write(error.Message);
11: }
12:
13: public void OnNext(Race value)
14: {
15: Console.Clear();
16: Console.WriteLine(string.Format("Race Lap {0}", value.CurrentLap));
17: value.Cars.OrderBy(c => c.TotalTime)
18: .ToList()
19: .ForEach(car => Console.WriteLine(car.ToString()));
20: }
21: }
The observers will be represented by RaceMonitor class and will be subscribed to NascarRace through Subscribe() method in the main method’s program.
The main program would be:
class Program
{
static void Main(string[] args)
{
NascarRace nascarRace = new NascarRace();
//observer
RaceMonitor nbssports = new RaceMonitor();
RaceMonitor bbcsports = new RaceMonitor();
nascarRace.Subscribe(nbssports);
nascarRace.Subscribe(bbcsports);
//let race start
nascarRace.Start();
Console.Read();
}
}
And the final standings!!
Call’s flow?
Check how push notifications to IObserver<Race> object are made at following sequence diagram of NascarRace.Running method. All take place into lap control loop.
Deferred calls are made since we are actually using LINQ’s method extension for iterating both Cars and Observers (RaceMonitor objects).
Here you are complete code listing:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace Observables
{
class Program
{
static void Main(string[] args)
{
NascarRace nascarRace = new NascarRace();
//observer
RaceMonitor nbssports = new RaceMonitor();
RaceMonitor bbcsports = new RaceMonitor();
nascarRace.Subscribe(nbssports);
nascarRace.Subscribe(bbcsports);
//let race start
nascarRace.Start();
Console.Read();
}
}
class Car
{
//common properties
public string Name { get; set; }
public int Id { get; set; }
public int Position { get; set; }
public long BestTime { get; private set; }
public long LastTime { get; private set; }
public TimeSpan TotalTime { get; set; }
public Car(string name, int id)
{
Name = name;
Id = id;
BestTime = long.MaxValue;
TotalTime = new TimeSpan();
}
//use this method for updating stats
public void LastLap(long lastlaptime)
{
//saving the last lap time
LastTime = lastlaptime;
//is the lastlap the best time?
BestTime = (lastlaptime < BestTime) ? lastlaptime : BestTime;
//total time
TotalTime = TotalTime.Add(new TimeSpan(lastlaptime));
}
public override string ToString()
{
return string.Format("Car {0} -\t TotalTime: {3}\n- LastTime: {1}\n- BestTime: {2}",
this.Id.ToString("00"), new TimeSpan(this.LastTime),
new TimeSpan(this.BestTime), this.TotalTime);
}
}
class Race
{
public int TotalLaps { get; set; }
public int CurrentLap { get; set; }
public List<Car> Cars { get; set; }
public Race(int totalLaps)
{
Cars = new List<Car>();
TotalLaps = totalLaps;
}
}
class RaceMonitor : IObserver<Race>
{
public void OnCompleted()
{
Console.WriteLine("Finished");
}
public void OnError(Exception error)
{
Console.Write(error.Message);
}
public void OnNext(Race value)
{
Console.Clear();
Console.WriteLine(string.Format("Race Lap {0}", value.CurrentLap));
value.Cars.OrderBy(c => c.TotalTime)
.ToList()
.ForEach(car => Console.WriteLine(car.ToString()));
}
}
class NascarRace : IObservable<Race>
{
List<IObserver<Race>> display = new List<IObserver<Race>>();
Race race = new Race(40);
public void Start()
{
//start up cars
Car car1 = new Car("Car1", 1);
Car car2 = new Car("Car2", 46);
Car car3 = new Car("Car3", 99);
race.Cars.Add(car1);
race.Cars.Add(car2);
race.Cars.Add(car3);
ThreadStart threadDelegate = new ThreadStart(Running);
Thread newThread = new Thread(threadDelegate);
newThread.Start();
}
private void Running()
{
//lap loop
for (int lap = 0; lap < race.TotalLaps; lap++)
{
Random rnd = new Random();
race.Cars.ToList().ForEach(r =>
{
//setting the last lap time randomly
r.LastLap(rnd.Next(1000000000, int.MaxValue));
});
//increasing num of laps done
race.CurrentLap++;
//observers notification
display.ToList().ForEach(d => d.OnNext(race));
//waiting for a second
Thread.Sleep(1000);
}
//completed. observers notification
display.ToList().ForEach(d => d.OnCompleted());
}
public IDisposable Subscribe(IObserver<Race> observer)
{
//new observer
display.Add(observer);
return observer as IDisposable;
}
}
}

