As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Event handling patterns form the foundation of modern responsive applications in Python. I've extensively worked with these patterns across various projects, and I'll share my experience implementing them effectively.
The Observer Pattern
The Observer pattern enables objects to notify other objects about state changes automatically. I find this pattern particularly useful in GUI applications and distributed systems.
class Subject:
def __init__(self):
self._observers = []
self._state = None
def attach(self, observer):
self._observers.append(observer)
def detach(self, observer):
self._observers.remove(observer)
def notify(self):
for observer in self._observers:
observer.update(self._state)
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state = value
self.notify()
class Observer:
def update(self, state):
pass
Event Class Implementation
Creating a custom event system provides fine-grained control over event handling. Here's a robust implementation I've used in production:
from typing import Callable, Dict, List
import asyncio
class Event:
def __init__(self):
self._handlers = []
self._async_handlers = []
def add_handler(self, handler: Callable):
self._handlers.append(handler)
def add_async_handler(self, handler: Callable):
self._async_handlers.append(handler)
def remove_handler(self, handler: Callable):
if handler in self._handlers:
self._handlers.remove(handler)
if handler in self._async_handlers:
self._async_handlers.remove(handler)
def fire(self, *args, **kwargs):
for handler in self._handlers:
handler(*args, **kwargs)
async def fire_async(self, *args, **kwargs):
tasks = [handler(*args, **kwargs) for handler in self._async_handlers]
await asyncio.gather(*tasks)
Signal Handling
System-level event handling requires careful signal management. The signal module provides this capability:
import signal
import sys
def signal_handler(sig, frame):
print('Handling shutdown...')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
Publisher-Subscriber Pattern
The PubSub pattern enables loosely coupled communication between components:
class PubSub:
def __init__(self):
self.subscribers = defaultdict(list)
def subscribe(self, event_type: str, callback: Callable):
self.subscribers[event_type].append(callback)
def publish(self, event_type: str, data: any):
for callback in self.subscribers[event_type]:
callback(data)
def unsubscribe(self, event_type: str, callback: Callable):
if callback in self.subscribers[event_type]:
self.subscribers[event_type].remove(callback)
Event Loops and Async Integration
Modern Python applications often require asynchronous event handling. Here's how I implement it:
import asyncio
from typing import Dict, List, Callable
class AsyncEventEmitter:
def __init__(self):
self.events: Dict[str, List[Callable]] = defaultdict(list)
self.loop = asyncio.get_event_loop()
async def emit(self, event: str, *args, **kwargs):
callbacks = self.events[event]
await asyncio.gather(
*(callback(*args, **kwargs) for callback in callbacks)
)
def on(self, event: str, callback: Callable):
self.events[event].append(callback)
def remove_listener(self, event: str, callback: Callable):
if callback in self.events[event]:
self.events[event].remove(callback)
Event Queue Management
Managing event priorities and queues is crucial for complex applications:
import queue
from dataclasses import dataclass
from enum import Enum
from threading import Thread
class Priority(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
@dataclass
class Event:
type: str
data: any
priority: Priority
class EventQueue:
def __init__(self):
self.queue = queue.PriorityQueue()
self.running = True
self.worker = Thread(target=self._process_events, daemon=True)
self.worker.start()
def push_event(self, event: Event):
self.queue.put((-event.priority.value, event))
def _process_events(self):
while self.running:
try:
_, event = self.queue.get(timeout=1)
self._handle_event(event)
except queue.Empty:
continue
def _handle_event(self, event: Event):
print(f"Processing event: {event.type} with priority {event.priority}")
def stop(self):
self.running = False
self.worker.join()
Error Handling in Event Systems
Robust error handling is essential in event-driven applications:
class EventHandler:
def __init__(self):
self.error_handlers = []
def add_error_handler(self, handler: Callable):
self.error_handlers.append(handler)
def handle_event(self, event: str, *args, **kwargs):
try:
self._process_event(event, *args, **kwargs)
except Exception as e:
self._handle_error(e, event, *args, **kwargs)
def _handle_error(self, error: Exception, event: str, *args, **kwargs):
context = {
'event': event,
'args': args,
'kwargs': kwargs,
'error': error
}
for handler in self.error_handlers:
try:
handler(context)
except Exception as e:
print(f"Error in error handler: {e}")
Memory Management
Proper memory management prevents memory leaks in long-running event systems:
import weakref
class WeakEventEmitter:
def __init__(self):
self._events = defaultdict(list)
def on(self, event: str, callback: Callable):
self._events[event].append(weakref.ref(callback))
def emit(self, event: str, *args, **kwargs):
for ref in self._events[event][:]:
callback = ref()
if callback is None:
self._events[event].remove(ref)
else:
callback(*args, **kwargs)
Event Propagation
Controlling event propagation helps manage complex event hierarchies:
class EventNode:
def __init__(self, name: str, parent=None):
self.name = name
self.parent = parent
self.children = []
self.handlers = []
def add_child(self, child):
child.parent = self
self.children.append(child)
def add_handler(self, handler: Callable):
self.handlers.append(handler)
def handle_event(self, event: str, *args, propagate=True, **kwargs):
for handler in self.handlers:
handler(event, *args, **kwargs)
if propagate and self.parent:
self.parent.handle_event(event, *args, propagate=True, **kwargs)
for child in self.children:
child.handle_event(event, *args, propagate=False, **kwargs)
These patterns form a comprehensive toolkit for building responsive applications. The key is choosing the right pattern for your specific use case and implementing it with careful consideration for error handling, memory management, and performance.
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)