๐ Observer Pattern: Smart IoT Event Bus
๐ Overview
The Observer Pattern defines a one-to-many dependency between objects. When the Subject (e.g., a Motion Sensor) changes state, all its registered Observers (e.g., Siren, Lights, Phone App) are notified and updated automatically. This is the heart of event-driven architectures.
Core Concepts
- Pub-Sub Decoupling: The Publisher (Subject) doesn't know who the Subscribers (Observers) are.
- Dynamic Subscription: Observers can join or leave the "broadcast" at any time.
- Cascade Updates: A single trigger can result in many independent actions across the system.
๐ญ The Engineering Story & Problem
๐ก The Villain (The Problem)
You're building a smart security system. When a MotionSensor triggers, it needs to sound the Siren, send a PushNotification, and turn on the SmartLights.
In the "Hardcoded Dependency" version, the sensor code looks like this:
class MotionSensor:
def on_detect(self):
self.siren.alert()
self.app.send_push()
self.lights.turn_on()
SmartLock to lock the doors when motion is detected. You have to open the MotionSensor class and add self.lock.close(). The sensor becomes a "God Object" that is tightly coupled to every device in the house.
๐ฆธ The Hero (The Solution)
The Observer Pattern introduces an "Event Bus."
The MotionSensor is now a Subject. It doesn't know about Sirens or Locks. It just has a list of generic Observers.
When motion is detected, it shouts: "Motion detected at 10:00 PM!" to its list.
The Siren, PhoneApp, and SmartLock are all Observers. They subscribe to the sensor. When they hear the shout, they decide how to react (Siren wails, App notifies, Lock closes). You can add a CoffeeMaker to start brewing when motion is detected in the morning without ever touching the sensor's code.
๐ Requirements & Constraints
- (Functional): multiple devices must respond to a single sensor trigger.
- (Technical): Pass event data (location, timestamp) to all observers.
- (Technical): Support dynamic registration (e.g., only notify the Phone App if the user is away).
๐๏ธ Structure & Blueprint
Class Diagram
classDiagram
direction TB
class Subject {
<<interface>>
+attach(obs)
+detach(obs)
+notify(data)
}
class MotionSensor {
-observers: List
+detect_motion()
}
class Observer {
<<interface>>
+update(data)
}
class AlarmSiren {
+update(data)
}
class PhoneApp {
+update(data)
}
Subject <|.. MotionSensor
Observer <|.. AlarmSiren
Observer <|.. PhoneApp
MotionSensor o-- Observer : broadcasts to
Runtime Context (Sequence)
sequenceDiagram
participant Sensor
participant Siren
participant App
Siren->>Sensor: attach(self)
App->>Sensor: attach(self)
Note over Sensor: Motion Detected!
Sensor->>Siren: update(location="Hallway")
Siren->>Siren: Sound Alarm!
Sensor->>App: update(location="Hallway")
App->>App: Send Notification!
๐ป Implementation & Code
๐ง SOLID Principles Applied
- Open/Closed: Add a
SmartCameraobserver without modifyingMotionSensor. - Single Responsibility: The sensor detects motion; the observers handle the reactions.
๐ The Code
The Villain's Code (Without Pattern)
The Hero's Code (With Pattern)
from collections import defaultdict
from typing import Optional, Protocol
class Subscriber(Protocol):
def alert(self, msg: Optional[int]):
...
def critical(self, msg: Optional[int]):
...
class Siren(Subscriber):
def alert(self, msg: Optional[int]):
print(f"The smoke level is {msg}")
def critical(self, msg: Optional[int]):
print("Loud Sounding Siren")
class Light(Subscriber):
def alert(self, msg: Optional[int]):
print("Yellow lights flashing")
def critical(self, msg: Optional[int]):
print("Red lights flashing")
class Phone(Subscriber):
def alert(self, msg: Optional[int]):
print(f"Send Alert notification to the user for smoke level reached: {msg}")
def critical(self, msg: Optional[int]):
print(f"Sent Critical notification to the user for smoke level reached: {msg}")
class EventBus:
def __init__(self) -> None:
self._subscribers: dict[str, list[Subscriber]] = defaultdict(list)
def subscribe(self, topic: str, instance: Subscriber) -> None:
if instance not in self._subscribers[topic]:
self._subscribers[topic].append(instance)
def unsubscribe(self, topic: str, instance: Subscriber) -> None:
if topic in self._subscribers:
if instance in self._subscribers[topic]:
self._subscribers[topic].remove(instance)
def publish(self, topic: str, msg: int, type: str) -> None:
if topic in self._subscribers:
if type == "critical":
for subscriber in self._subscribers[topic]:
subscriber.critical(msg)
elif type == "alert":
for subscriber in self._subscribers[topic]:
subscriber.alert(msg)
class SmokeSensor:
def __init__(self, bus: EventBus) -> None:
self.alertSmokeLevel = 5
self.criticalSmokeLevel = 10
self.bus = bus
def trigger(self, level: int):
if level >= self.criticalSmokeLevel:
self.bus.publish(topic="smoke_warning", msg=level, type="critical")
elif level >= self.alertSmokeLevel:
self.bus.publish(topic="smoke_warning", msg=level, type="alert")
else:
print("Safe Smoke Level")
print("\n")
bus = EventBus()
sensor = SmokeSensor(bus)
my_siren = Siren()
my_light = Light()
my_phone = Phone()
bus.subscribe(topic="smoke_warning", instance=my_siren)
bus.subscribe(topic="smoke_warning", instance=my_light)
sensor.trigger(3)
sensor.trigger(9)
bus.subscribe(topic="smoke_warning", instance=my_phone)
sensor.trigger(11)
sensor.trigger(6)
bus.unsubscribe(topic="smoke_warning", instance=my_siren)
sensor.trigger(13)
โ๏ธ Trade-offs & Testing
| Pros (Why it works) | Cons (The Twist / Pitfalls) |
|---|---|
| Scalability: Easily add 1 or 100 devices. | Complexity: Can be hard to trace "who is listening" in a large system. |
| Modularity: Devices are independent of the sensor. | Memory Leaks: Forgetting to detach() a listener is a common bug. |
| Real-time: Updates happen immediately. | Cyclic Dependencies: If Observer A updates Subject B which updates Observer A... |
๐งช Testing Strategy
- Unit Test Registration: Verify
sensor.attach(obs)adds to the list. - Integration Test: Trigger motion and verify that all registered devices performed their expected actions.
- Test Unsubscribe: Detach the Siren, trigger motion, and verify the Siren remained silent while other devices reacted.
๐ค Interview Toolkit
- Interview Signal: mastery of reactive systems and broadcast patterns.
- When to Use:
- "Build a real-time notification system..."
- "Design a UI that updates when the underlying data changes..."
- "Implement a plugin architecture where plugins react to core events..."
- Scalability Probe: "How to handle slow observers (e.g., an API call)?" (Answer: Use a thread pool or an async event loop so one slow observer doesn't block the others.)
- Design Alternatives:
- Chain of Responsibility: Processes a request sequentially; Observer processes it in parallel.