๐ Observer Pattern: Core Subscription Mechanics
๐ Overview
The Observer Pattern is the foundation of reactive programming. It defines a one-to-many dependency between objects so that when one object (the Subject) changes state, all its dependents (Observers) are notified and updated automatically.
Core Concepts
- Subject Registry: A central list of "subscribers" that the subject manages.
- Loose Coupling: The Subject doesn't need to know the specific classes of its observers, only that they implement a common interface.
- Broadcast Communication: The subject pushes updates to all registered observers simultaneously.
๐ญ The Engineering Story & Problem
๐ก The Villain (The Problem)
Imagine a "Rigid Weather Station." It measures temperature and needs to update a PhoneApp, a WebDashboard, and a PublicDisplay.
In the bad version, the WeatherStation class has a hardcoded reference to all three:
class WeatherStation:
def measurements_changed(self):
self.phone_app.update(self.temp)
self.web_dashboard.update(self.temp)
self.public_display.update(self.temp)
WeatherStation code. This is a "Rigid Broadcaster" that is tightly coupled and fragile.
๐ฆธ The Hero (The Solution)
The Observer Pattern introduces a "Subscription Desk." The WeatherStation doesn't care who is listening. It just maintains a list of Observers.
If a new device wants updates, it "attaches" itself to the station.
When the temperature changes, the station simply loops through its list:
WeatherStation.
๐ Requirements & Constraints
- (Functional): allow any number of observers to subscribe and unsubscribe at runtime.
- (Technical): The Subject must not depend on concrete Observer classes.
- (Technical): Observers must implement a standard
update()method.
๐๏ธ Structure & Blueprint
Class Diagram
classDiagram
direction TB
class Subject {
<<interface>>
+attach(observer)
+detach(observer)
+notify()
}
class ConcreteSubject {
-observers: List
-state
+get_state()
+set_state()
}
class Observer {
<<interface>>
+update(state)
}
class ConcreteObserver {
+update(state)
}
Subject <|.. ConcreteSubject
Observer <|.. ConcreteObserver
ConcreteSubject o-- Observer : notifies
Runtime Context (Sequence)
sequenceDiagram
participant S as Subject
participant O1 as Observer 1
participant O2 as Observer 2
O1->>S: attach()
O2->>S: attach()
Note over S: State Changes
S->>O1: update(data)
S->>O2: update(data)
๐ป Implementation & Code
๐ง SOLID Principles Applied
- Open/Closed: You can add new Observer types without modifying the Subject.
- Dependency Inversion: The Subject depends on an abstract
Observerinterface, not concrete classes.
๐ The Code
The Villain's Code (Without Pattern)
The Hero's Code (With Pattern)
from typing import Optional, Protocol
class Subscriber(Protocol):
def alert(self, msg: Optional[int]):
...
def critical(self, msg: Optional[int]):
...
class Siren(Subscriber):
def alert(self, msg: Optional[int]):
print(f"The smoke level is {msg}")
def critical(self, msg: Optional[int]):
print("Loud Sounding Siren")
class Light(Subscriber):
def alert(self, msg: Optional[int]):
print("Yellow lights flashing")
def critical(self, msg: Optional[int]):
print("Red lights flashing")
class Phone(Subscriber):
def alert(self, msg: Optional[int]):
print(f"Send Alert notification to the user for smoke level reached: {msg}")
def critical(self, msg: Optional[int]):
print(f"Sent Critical notification to the user for smoke level reached: {msg}")
class SubscriberList:
def __init__(self) -> None:
self.subs: list[Subscriber] = []
def getSubscriber(self) -> list[Subscriber]:
return self.subs
def addSubscriber(self, subscriber: Subscriber) -> None:
if subscriber not in self.subs:
self.subs.append(subscriber)
def deleteSubscriber(self, subscriber: Subscriber) -> None:
if subscriber in self.subs:
self.subs.remove(subscriber)
def alertSignal(self, level: int):
for subcriber in self.subs:
subcriber.alert(msg=level)
def criticalSignal(self, level: int):
for subcriber in self.subs:
subcriber.critical(msg=level)
class SmokeSensor:
def __init__(self) -> None:
self.alertSmokeLevel = 5
self.criticalSmokeLevel = 10
self.subsList = SubscriberList()
def addSubscriber(self, subscriber: Subscriber) -> None:
self.subsList.addSubscriber(subscriber)
def removeSubscriber(self, subscriber: Subscriber) -> None:
self.subsList.deleteSubscriber(subscriber)
def trigger(self, level: int):
if level >= self.criticalSmokeLevel:
self.subsList.criticalSignal(level)
elif level >= self.alertSmokeLevel:
self.subsList.alertSignal(level)
else:
print("Safe Smoke Level")
print("\n")
sensor = SmokeSensor()
sensor.addSubscriber(Siren())
sensor.addSubscriber(Light())
sensor.trigger(3)
sensor.trigger(9)
sensor.addSubscriber(Phone())
sensor.trigger(11)
sensor.trigger(6)
sensor.removeSubscriber(Siren())
sensor.trigger(13)
โ๏ธ Trade-offs & Testing
| Pros (Why it works) | Cons (The Twist / Pitfalls) |
|---|---|
| Loose Coupling: Subject and Observers are independent. | Notification Order: Usually unpredictable. |
| Flexibility: Add/remove observers at runtime. | Memory Leaks: If observers don't "detach," they stay in memory forever. |
| Broadcast: Efficiently update many objects at once. | Cascade Updates: One update might trigger a chain reaction of many more. |
๐งช Testing Strategy
- Unit Test Subject: Verify
attachincreases the observer count anddetachdecreases it. - Test Notification: Mock two observers, trigger a change in the subject, and verify both mocks'
updatemethods were called with the correct data.
๐ค Interview Toolkit
- Interview Signal: mastery of event-driven design and decoupling.
- When to Use:
- "A change in one object requires changing others, but you don't know how many..."
- "Implement an event listener system..."
- "Keep multiple views of data in sync (MVC)..."
- Scalability Probe: "How to handle 10,000 observers?" (Answer: Use an asynchronous message queue like RabbitMQ or a 'Fan-out' pattern to prevent the Subject from blocking.)
- Design Alternatives:
- Pub-Sub: A variation where a "Message Broker" sits between the Subject and Observers.
- Mediator: Centralizes communication; Observer distributes it.