I was looking at various implementations of the Message Bus pattern, but I was not liking what I was finding, and so I rolled my own.
There are four significant differences between this implementation and the others that I've seen.
1. Subscribing, Unsubscribing and Publishing are threadsafe. This is important because I want publishers on multiple threads (specifically the ThreadPool, responding to XMPP events).
2. There is a WeakSubscribe model that uses a WeakReference to help with Garbage Collection (the WeakSubscribe method cannot hold an object alive).
3. Subscribing to a base class -- inclusive of object -- subscribes to all descendent classes as well. If I subscribe to Message, I get events when anything derived from Message is raised as well. If I subscribe to object, I get all events that are raised (because of how .NET operates).
4. Subscribers are notified via BeginInvoke, on a ThreadPool thread. This means that the delegate handling the event needs to be thread neutral.
Note: Because Publish is thread safe, the subscribers have to be thread neutral anyway. So the fact I'm using the pool is secondary.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace Net.BattleBazaar.AppServices
{
public class MessageBus
{
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private Dictionary<Type, List<object>> _subs = new Dictionary<Type,List<object>>();
public void Subscribe<T>(MessageBusCallback<T> callback)
{
_lock.EnterWriteLock();
try
{
List<object> Handlers;
if (_subs.ContainsKey(typeof(T)))
{
Handlers = _subs[typeof(T)];
}
else
{
Handlers = new List<object>();
_subs.Add(typeof(T), Handlers);
}
Handlers.Add(callback);
}
finally
{
_lock.ExitWriteLock();
}
}
public void WeakSubscribe<T>(MessageBusCallback<T> callback)
{
// no lock needed here.
var x = new WeakMessageBusCallback<T>(callback);
Subscribe<T>(x.Callback);
}
public void Unsubscribe<T>(MessageBusCallback<T> callback)
{
_lock.EnterUpgradeableReadLock();
try
{
if (!_subs.ContainsKey(typeof(T)))
return;
_lock.EnterWriteLock();
try
{
var Handlers = _subs[typeof(T)];
Handlers.Remove(callback);
if (Handlers.Count == 0)
_subs.Remove(typeof(T));
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public void Publish<T>(T msg)
{
_lock.EnterReadLock();
try
{
var Q = typeof(T);
var done = false;
while (!done)
{
if (Q == typeof(object))
done = true;
if (_subs.ContainsKey(Q))
{
var Handlers = _subs[Q];
Handlers.ForEach(delegate(object h)
{
var i = h.GetType().GetMethod("BeginInvoke"); // , new Type[] { Q, typeof(WaitCallback), typeof(object) });
i.Invoke(h, new object[] { msg, null, null });
});
}
Q = Q.BaseType;
}
}
finally
{
_lock.ExitReadLock();
}
}
}
public class WeakMessageBusCallback<T>
{
private WeakReference x;
public WeakMessageBusCallback(MessageBusCallback<T> cb)
{
x = new WeakReference(cb, true);
}
public void Callback(T msg)
{
if (x.IsAlive)
{
((MessageBusCallback<T>)x.Target)(msg);
}
}
}
public delegate void MessageBusCallback<T>(T msg);
}
So anyway -- that's my take on the problem. You use this pattern when you have an event source (in my case a XMPP stream) that you want to be loosely coupled with some services (in my case, the MUC group chat object).