Introduction
In the Observer design pattern, a subject holds a list of interested parties – the observers – which it will notify about changes in status. Simply put, it’s a form of subscription, and this design comes up in all sorts of places (which is one of the definitions of the term ‘design pattern‘). It’s well suited for handling asynchronous events, like user interaction in a GUI, sensor information, and so on.
There is, however, often a need to re-synchronise asynchronous events. For instance, you might keep the latest status update until it’s actually needed for display, storage or some calculation. By doing this, you disregard the asynchronous nature of its source, and treat it as just another variable, as if it had been read from the subject right then. In other words, you synchronise a status from the past with the present. Sometimes, though, you don’t want the last value, but the next, which is a bit more complex, as it requires you to wait for the future to happen before we can say it’s the present.
In this article, we will write a simple multi-threaded example implementation of the Observer pattern, and show how to re-synchronise a past event to look current. Then we’ll demonstrate a technique to treat future events like they’re current, too.
Thread safety
If, as is often the case, the observed subject and the part of the system that manages the observers run in different threads, we need to make sure that they can co-exist in a friendly manner. Specifically, we must make sure that adding or removing an observer – activities that takes place outside the observed thread – does not interfere with the reporting of events.
In other words, accessing the list of observers is a critical section of the code, which may not be interrupted. Since this is a quite common situation, operating systems that support multi-threading also provide tools to handle it. On Windows, this is done with a CRITICAL_SECTION
object.
(If you use MFC, there is an eponymous wrapper for it. However, the implementations of the MFC synchronisation objects have been badly flawed in the past, and I believe they still are.)
Checking the documentation, we see that there are functions available to create and destroy CRITICAL_SECTION
, and to enter and leave a locked state. We also see that it can be entered and left recursively, as long as it’s the same thread. Knowing all this, we can write a C++ class to manage it.
Why not use the Windows API directly? The same reason for almost all C++ wrappers of OS objects – lifetime management. Instead of having to remember to call DeleteCriticalSection
everywhere it might be needed, we can do that in the destructor, as per the Resource Acquisition Is Initialization idiom.
// Need CRITICAL_SECTION declaration #include <Windows.h> // Class wrapping Win32 CRITICAL_SECTION class CriticalSection { public: // Constructor CriticalSection() { ::InitializeCriticalSection(&cs_); } // Destructor ~CriticalSection() { ::DeleteCriticalSection(&cs_); } // Enter critical section void Enter() { ::EnterCriticalSection(&cs_); } // Leave critical section void Leave() { ::LeaveCriticalSection(&cs_); } private: // Hide copy operations CriticalSection(const CriticalSection&); CriticalSection& operator=(const CriticalSection&); // Data member CRITICAL_SECTION cs_; };
Quite simple, really, with little overhead. And because every Enter
must be matched by a Leave
, the sensible thing to do is to write a RAII wrapper for that, too. If we don’t, odds are that at some point, we’ll alter the code using the CriticalSection
and introduce a new exit point, via a function return or exception, which won’t get the Leave
function called. A RAII wrapper helps code robustness.
// RAII Critical section lock class CSLock { public: // Constructor CSLock(CriticalSection& section) : section_(section) { section_.Enter(); } // Destructor ~CSLock() { section_.Leave(); } private: // Hide copy operations CSLock(const CSLock&); CSLock& operator=(const CSLock&); // Data member CriticalSection& section_; };
This automates the locking, so that we only need declare a CSLock
at the beginning of the scope we wish to lock for interruption, and will automatically unlock as we leave the scope and the destructor is called. We’ll see examples of how these are used below.
Something to see
Now that we have the tools to support a multi-threaded application, let’s write a simple Observer system. This requires something to observe, and something to do the observing. For this example, we’ll make a Subject
class, which declares an internal, abstract Subject::Observer
class, from which we’ll derive our observers. The Subject
notification in this example will send an integer to the observers.
#include <set> #include "CSLock.h" // Subject to be observed class Subject { public: // Abstract observer class Observer { // The subject we're observing Subject* subject_; // The subject is a friend, so it can help manage our relationship friend class Subject; // Assign a subject we've been registered with void SetSubject(Subject* subject) { // Is it the one we have already? if (subject != subject_) { // Unregister from current subject, if any if (0 != subject_) { subject_->Unregister(*this); } // Remember new subject subject_ = subject; } } // Kicked out by the subject void ReleaseFromSubject(Subject* subject) { if (subject == subject_) { subject_ = 0; } } protected: // Constructor only available to derived classes Observer() : subject_(0) {} // Derived classes decide what to do with notifications // (Still available to Subject class, as it's a friend) virtual void Notify(int) = 0; public: // Base classes need virtual destructor virtual ~Observer() { SetSubject(0); } }; // End of internal class definition // Destructor ~Subject() { // Lock, as we're using the set of observers CSLock lock(criticalsection_); // Release all observers for (std::set<Observer*>::iterator i = observers_.begin(); i != observers_.end(); ++i) { (*i)->ReleaseFromSubject(this); } } // Add observer void Register(Observer& observer) { // Lock, as we're manipulating the set of observers CSLock lock(criticalsection_); // Add to the set of observers observers_.insert(&observer); // Let it know we've accepted the registration observer.SetSubject(this); } // Remove observer void Unregister(Observer& observer) { // Lock, as we're manipulating the set of observers CSLock lock(criticalsection_); // Remove from the set of observers observers_.erase(&observer); // Let it know we've accepted the unregistration observer.ReleaseFromSubject(this); } // Notify observers about new data void Notify(int val) const { // Lock, as we're using the set of observers CSLock lock(criticalsection_); // Notify all for (std::set<Observer*>::const_iterator i = observers_.begin(); i != observers_.end(); ++i) { (*i)->Notify(val); } } // Check how many observers we have size_t ObserverCount() const { return observers_.size(); } private: // The registered observers std::set<Observer*> observers_; // A critical section to guard the observers mutable CriticalSection criticalsection_; };
The first thing to note here is that the Subject
and Observer
are tightly coupled, which somewhat paradoxically is to help de-couple the derived observers from the Subject
. The logic and responsibility of maintaining the relationship is kept private, thanks to the friendship between the Subject
and Observer
, so that derived classes can’t affect it. This tight coupling is also the reason for making the Observer
an internal class, to emphasise this is not any old observer, but one for this particular Subject
.
Another thing worth noting is that just as the Subject::Observer
class leaves the actual handling of a Notify
call to a derived class, the Subject
class here isn’t concerned with the generation of values to notify observers with. That’s for someone else, this Subject
only handles its observers and getting notifications out to them. (Indeed, it would be a relatively trivial task to make the notification type (int
in this example) a template type, and make this a generic and re-usable Observer pattern implementation. To do so is left as an exercise to the reader. Just mind whether you notify by value, reference, or pointer.)
A final point worth making is that the CriticalSection
is declared to be mutable
. The reason for this is that it’s only altered during a function call, by the CSLock
, but at the end of the function call it will have been restored to its previous state. By indicating it’s mutable, we can make the Notify
function const
.
So, let’s put it all together, with a custom observer that saves the latest value, a function to produce values, a thread, and a complete program.
#include <iostream> class PastObserver : public Subject::Observer { // Last observed value int value_; // A critical section to guard the value mutable CriticalSection criticalsection_; public: // Constructor, PastObserver() : value_(0) {} // Access last value int GetLastValue() const { // Lock to prevent the value being modified CSLock lock(criticalsection_); return value_; } // Function called by observed Subject virtual void Notify(int value) { { // Lock to prevent the value being read while we're assigning CSLock lock(criticalsection_); // Store the value value_ = value; } // Print it out std::cout << "PastObserver notified: " << value_ << std::endl; } }; // A thread function to generate values // Takes a Subject* as thread parameter DWORD WINAPI ValueFunction(void* pParam) { Subject* subject = (Subject*)pParam; std::cout << "Thread started with " << subject->ObserverCount() << " observers" << std::endl; // Put a seed value in for the random number generator int val = (int)time(0); // Run until there are no more observers while (0 < subject->ObserverCount()) { // Get a random value to report srand(val); val = rand(); // Report it subject->Notify(val); // Take a little break Sleep(100 * (val & 0x7)); } std::cout << "Thread ended" << std::endl; return 0; } void main() { // Create subject and observer Subject subject; PastObserver observer; subject.Register(observer); // Start the thread CreateThread(NULL, NULL, &ValueFunction, &subject, NULL, NULL); // Let it work for a bit Sleep(1000); // Close down and report subject.Unregister(observer); std::cout << "Last value: " << observer.GetLastValue() << std::endl; // Wait for thread to terminate ::WaitForSingleObject(thread, INFINITE); }
And that’s it. A reasonably small and clear illustration of how the Observer pattern works. In this example, we synchronise with the past, by reading the last value. Now, let’s synchronise with the future!
Reading the future
So how do you read the future? Well, obviously, our observer has to wait for it to happen, so we’ll need another synchronisation object: the Event. This is a boolean object which can be set or reset, and waited for with WaitForSingleObject
. As it turns out, we’ll need two of those – one to indicate we’re waiting for data, and one to indicate we’ve received it.
class FutureObserver : public Subject::Observer { // Observed value int value_; // Events HANDLE waiting_; HANDLE newValue_; public: // Constructor FutureObserver() : value_(0), waiting_(0), newValue_(0) { // No security attributes, automatic reset, initially reset, no name waiting_ = ::CreateEvent(0, 0, 0, 0); newValue_ = ::CreateEvent(0, 0, 0, 0); } // Destructor ~FutureObserver() { CloseHandle(waiting_); CloseHandle(newValue_); } // Wait for next value int GetNextValue() const { // Indicate we're waiting SetEvent(waiting_); // Wait for a new value if (WAIT_OBJECT_0 == ::WaitForSingleObject(newValue_, INFINITE)) { // Success return value_; } else { // Failures, could be WAIT_FAILED or WAIT_TIMEOUT throw std::exception("Failed waiting for next value"); } } // Function called by observed Subject virtual void Notify(int value) { // Print it out std::cout << "FutureObserver notified: " << value << std::endl; // Check to see if we're waiting if (WAIT_OBJECT_0 == ::WaitForSingleObject(waiting_, 0)) { // We were waiting, so keep the value... value_ = value; // ... and flag we have it ::SetEvent(newValue_); } } };
This observer is a bit more complex, as it has to manage the two Event
objects, but the principle is simple enough. In the GetNextValue()
function, we set one event, and wait for the other. The next time Notify()
is called, it will see the waiting_
flag is set, so it will store the value and signal that newValue_
is ready. The events are created to reset automatically, as soon as a WaitForSingleObject
call is successful (eg when the event it is waiting for has been set).
The GetNextValue()
function waits infinitely here – it will not continue until it’s found a value – and so the exception should never happen, unless the FutureObserver
has been deleted in another thread. If you’d prefer a timeout, just overload the GetNextValue()
function:
// Access next value, with timeout and success indicator int GetNextValue(DWORD millisecondTimeout, bool& timedOut) const { // Indicate we're waiting SetEvent(waiting_); // Wait for a new value switch (::WaitForSingleObject(newValue_, millisecondTimeout)) { case WAIT_OBJECT_0: // Success timedOut = false; return value_; case WAIT_TIMEOUT: timedOut = true; return 0; default: // WAIT_FAILED throw std::exception("Failed waiting for next value"); } }
The Notify()
function, in contrast, doesn’t wait at all. When WaitForSingleObject
is called with a timeout of zero milliseconds, it returns immediately, so we have to check the return value to see if we were successful. This means we’re not holding up the Subject::Nofify()
more than necessary.
Finally, let’s put it all together:
void main() { // Create subject and observers Subject subject; PastObserver past; subject.Register(past); FutureObserver future; subject.Register(future); // Start the thread HANDLE thread = CreateThread(NULL, NULL, &ValueFunction, &subject, NULL, NULL); // Let it work for a bit Sleep(1000); // Get the last and next value, twice std::cout << "Last value: " << past.GetLastValue() << std::endl; std::cout << "Next value: " << future.GetNextValue() << std::endl; std::cout << "Last value: " << past.GetLastValue() << std::endl; std::cout << "Next value: " << future.GetNextValue() << std::endl; // Close down subject.Unregister(past); subject.Unregister(future); // Wait for thread time to terminate ::WaitForSingleObject(thread, INFINITE); }
As always, if you found this interesting or useful, or have suggestions for improvements, please let me know.