Python Backend Production Patterns — Real Code for Real Systems
These are the patterns that show up in system design rounds, backend coding challenges, and senior engineer interviews. Each pattern includes the theory, the code, and why it matters in production.
LRU Cache
Where it's used: database query caches, API response caches, CDN edge nodes.
Requirements: get(key) in O(1), put(key, value) in O(1), evict the Least Recently Used item when at capacity.
Data structures needed: HashMap for O(1) lookup + Doubly Linked List for O(1) move-to-front and evict-from-back.
Python's OrderedDict gives you both in one:
from collections import OrderedDict
class LRUCache:
def __init__(self, capacity: int):
self.cache = OrderedDict()
self.capacity = capacity
def get(self, key: int) -> int:
if key not in self.cache:
return -1
self.cache.move_to_end(key) # mark as recently used
return self.cache[key]
def put(self, key: int, value: int) -> None:
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False) # evict least recently used (first item)Manual Implementation (for deep understanding)
The OrderedDict approach is what you write in interviews. The manual version shows you understand what's underneath:
class DLLNode:
def __init__(self, key=0, val=0):
self.key = key
self.val = val
self.prev = self.next = None
class LRUCacheManual:
"""
DLL Layout:
HEAD ↔ [LRU] ↔ ... ↔ [MRU] ↔ TAIL
HEAD.next = least recently used
TAIL.prev = most recently used
Dummy HEAD/TAIL eliminate edge case checks.
"""
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = {}
self.head = DLLNode()
self.tail = DLLNode()
self.head.next = self.tail
self.tail.prev = self.head
def _remove(self, node):
node.prev.next = node.next
node.next.prev = node.prev
def _add_to_tail(self, node):
prev = self.tail.prev
prev.next = node
node.prev = prev
node.next = self.tail
self.tail.prev = node
def get(self, key: int) -> int:
if key not in self.cache: return -1
node = self.cache[key]
self._remove(node)
self._add_to_tail(node)
return node.val
def put(self, key: int, value: int) -> None:
if key in self.cache:
node = self.cache[key]
node.val = value
self._remove(node)
self._add_to_tail(node)
else:
node = DLLNode(key, value)
self.cache[key] = node
self._add_to_tail(node)
if len(self.cache) > self.capacity:
lru = self.head.next
self._remove(lru)
del self.cache[lru.key]Rate Limiter
Where it's used: API gateways, authentication endpoints, payment APIs.
Sliding Window Rate Limiter
import time
from collections import defaultdict, deque
class SlidingWindowRateLimiter:
"""
Allow at most max_requests per window_seconds per user.
Uses a deque to store timestamps of recent requests.
Production equivalent: Redis sorted sets replace the deque.
"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window = window_seconds
self.requests = defaultdict(deque) # user_id → deque of timestamps
def is_allowed(self, user_id: str) -> bool:
now = time.time()
window_start = now - self.window
user_requests = self.requests[user_id]
# Remove timestamps outside current window
while user_requests and user_requests[0] < window_start:
user_requests.popleft()
if len(user_requests) < self.max_requests:
user_requests.append(now)
return True
return FalseToken Bucket Rate Limiter
Handles traffic bursts more gracefully — tokens accumulate up to a max capacity:
class TokenBucketRateLimiter:
"""
Tokens refill at a constant rate.
Allows bursts up to bucket capacity.
"""
def __init__(self, rate: float, capacity: int):
self.rate = rate # tokens added per second
self.capacity = capacity
self.tokens = defaultdict(lambda: capacity)
self.last_refill = defaultdict(float)
def _refill(self, user_id: str) -> None:
now = time.time()
elapsed = now - self.last_refill[user_id]
new_tokens = elapsed * self.rate
self.tokens[user_id] = min(self.capacity, self.tokens[user_id] + new_tokens)
self.last_refill[user_id] = now
def is_allowed(self, user_id: str, cost: int = 1) -> bool:
self._refill(user_id)
if self.tokens[user_id] >= cost:
self.tokens[user_id] -= cost
return True
return FalseSliding Window vs Token Bucket:
| Sliding Window | Token Bucket | |
|---|---|---|
| Burst handling | Hard cutoff | Allows bursts up to capacity |
| Memory | O(requests in window) | O(users) |
| Complexity | Simple | Slightly more complex |
| Use case | Strict per-second limits | API with burst tolerance |
TTL Cache
Where it's used: API response caching, session storage, computed result caching.
As a Class
import time
from threading import Lock
class TTLCache:
"""
Thread-safe cache with per-key time-to-live expiry.
Expired keys are removed lazily on access.
"""
def __init__(self, default_ttl: float = 300):
self.store = {} # key → (value, expiry_timestamp)
self.default_ttl = default_ttl
self.lock = Lock()
def get(self, key: str):
with self.lock:
if key not in self.store:
return None
value, expiry = self.store[key]
if time.time() > expiry:
del self.store[key] # lazy deletion
return None
return value
def set(self, key: str, value, ttl: float = None) -> None:
with self.lock:
ttl = ttl or self.default_ttl
self.store[key] = (value, time.time() + ttl)
def delete(self, key: str) -> bool:
with self.lock:
if key in self.store:
del self.store[key]
return True
return False
def cleanup_expired(self) -> int:
"""Call periodically to free memory from expired keys."""
with self.lock:
now = time.time()
expired = [k for k, (_, exp) in self.store.items() if now > exp]
for k in expired:
del self.store[k]
return len(expired)As a Decorator
import functools
def ttl_cache(ttl_seconds: float = 300):
"""Cache function results with TTL. Arguments must be hashable."""
def decorator(func):
cache = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = (args, tuple(sorted(kwargs.items())))
now = time.time()
if key in cache:
result, expiry = cache[key]
if now < expiry:
return result # cache hit
result = func(*args, **kwargs)
cache[key] = (result, now + ttl_seconds)
return result
def invalidate(*args, **kwargs):
key = (args, tuple(sorted(kwargs.items())))
cache.pop(key, None)
wrapper.cache = cache
wrapper.invalidate = invalidate
return wrapper
return decorator
@ttl_cache(ttl_seconds=60)
def fetch_user(user_id: int) -> dict:
print(f"DB query for user {user_id}")
return {"id": user_id, "name": f"User_{user_id}"}
fetch_user(42) # DB query for user 42
fetch_user(42) # cache hit — no print
fetch_user.invalidate(42) # bust cache
fetch_user(42) # DB query againGenerator Pipelines for Large Data
Where it's used: log processing, ETL pipelines, streaming data, CSV imports.
The key idea: each stage is a generator. Data flows one record at a time — total memory is O(batch_size) regardless of file size.
import json
from typing import Iterator, Generator
def read_lines(filepath: str) -> Generator:
"""Stage 1: Read file one line at a time — O(1) memory."""
with open(filepath, 'r') as f:
for line in f:
yield line.rstrip('\n')
def parse_json_lines(lines: Iterator) -> Generator:
"""Stage 2: Parse JSON, skip malformed lines."""
for line in lines:
if not line.strip():
continue
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
def filter_by_level(records: Iterator, level: str) -> Generator:
"""Stage 3: Keep only records matching log level."""
for record in records:
if record.get('level', '').upper() == level.upper():
yield record
def enrich(records: Iterator) -> Generator:
"""Stage 4: Add computed fields."""
for record in records:
record['is_critical'] = record.get('level') in ('ERROR', 'CRITICAL')
yield record
def batch(records: Iterator, size: int = 1000) -> Generator:
"""Stage 5: Group records for bulk DB inserts."""
chunk = []
for record in records:
chunk.append(record)
if len(chunk) >= size:
yield chunk
chunk = []
if chunk:
yield chunk
# The pipeline — no data loaded until iteration starts
def process_log(filepath: str) -> int:
lines = read_lines(filepath)
records = parse_json_lines(lines)
errors = filter_by_level(records, 'ERROR')
enriched = enrich(errors)
batches = batch(enriched, size=500)
total = 0
for b in batches:
bulk_insert_to_db(b)
total += len(b)
return totalChunking Any Iterable
import itertools
def chunked(iterable, size: int):
"""Split any iterable into chunks — works with generators."""
it = iter(iterable)
while True:
chunk = list(itertools.islice(it, size))
if not chunk:
break
yield chunk
# Process 10,000 DB rows 100 at a time
def stream_users():
for i in range(1, 10001):
yield {"id": i, "name": f"User_{i}"}
for batch in chunked(stream_users(), 100):
send_to_email_service(batch) # 100 users at a time, constant memoryPriority Task Queue
Where it's used: job schedulers, async task managers, event processing.
import heapq
import itertools
from dataclasses import dataclass, field
from typing import Callable
from enum import IntEnum
class Priority(IntEnum):
CRITICAL = 0 # processed first
HIGH = 1
MEDIUM = 2
LOW = 3
@dataclass(order=True)
class Task:
priority: Priority
sequence: int # tiebreaker — FIFO within same priority
name: str = field(compare=False)
func: Callable = field(compare=False)
args: tuple = field(default_factory=tuple, compare=False)
kwargs: dict = field(default_factory=dict, compare=False)
class PriorityTaskQueue:
def __init__(self):
self._heap = []
self._counter = itertools.count()
def submit(self, func: Callable, *args, priority=Priority.MEDIUM, name="", **kwargs):
task = Task(
priority=priority,
sequence=next(self._counter), # ensures FIFO at same priority
name=name or func.__name__,
func=func,
args=args,
kwargs=kwargs
)
heapq.heappush(self._heap, task)
def execute_next(self):
if not self._heap:
raise IndexError("Queue is empty")
task = heapq.heappop(self._heap)
return task.func(*task.args, **task.kwargs)
def execute_all(self) -> list:
results = []
while self._heap:
try:
results.append(self.execute_next())
except Exception as e:
print(f"Task failed: {e}")
return results
def size(self) -> int:
return len(self._heap)
# Usage
queue = PriorityTaskQueue()
queue.submit(send_alert, "ceo@co.com", priority=Priority.CRITICAL)
queue.submit(generate_report, 42, priority=Priority.LOW)
queue.submit(send_welcome, "user@co.com", priority=Priority.HIGH)
queue.execute_all()
# Execution order: send_alert → send_welcome → generate_reportRetry with Exponential Backoff
Where it's used: HTTP clients, database connections, payment APIs, any external call that can fail transiently.
import time
import random
import functools
class RetryExhausted(Exception):
pass
def retry(
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
backoff_factor: float = 2.0,
jitter: bool = True,
exceptions: tuple = (Exception,)
):
"""
Decorator: retry with exponential backoff.
Delay schedule: base_delay * backoff_factor^(attempt-1)
Jitter adds ±20% randomness — prevents thundering herd problem
when many clients retry simultaneously.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exc = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exc = e
if attempt == max_attempts:
break
delay = min(base_delay * (backoff_factor ** (attempt - 1)), max_delay)
if jitter:
delay *= (0.8 + random.random() * 0.4) # ±20%
print(f"Attempt {attempt} failed: {e}. Retry in {delay:.2f}s...")
time.sleep(delay)
raise RetryExhausted(f"All {max_attempts} attempts failed") from last_exc
return wrapper
return decorator
@retry(max_attempts=4, base_delay=0.5, exceptions=(ConnectionError, TimeoutError))
def call_payment_api(amount: float) -> dict:
# ... actual API call
passDelay schedule example with base_delay=1.0, backoff_factor=2.0:
| Attempt | Delay before retry |
|---|---|
| 1 | 1s |
| 2 | 2s |
| 3 | 4s |
| 4 | raise exception |
Pagination Patterns
Where it's used: REST APIs, database cursors, search results.
Offset-Based (Simple, Common)
def paginate(data: list, page: int, page_size: int) -> dict:
"""page is 1-indexed."""
total = len(data)
start = (page - 1) * page_size
items = data[start:start + page_size]
return {
"items": items,
"page": page,
"page_size": page_size,
"total": total,
"total_pages": (total + page_size - 1) // page_size,
"has_next": start + page_size < total,
"has_prev": page > 1
}Cursor-Based (Scalable, Stable)
More reliable when data is inserted/deleted between page requests:
def cursor_paginate(data: list, cursor: int, limit: int) -> dict:
"""
cursor = ID of last seen item (0 for first page).
More stable than offset — unaffected by concurrent inserts/deletes.
More efficient for DB — uses index seek instead of full scan.
"""
if cursor == 0:
items = data[:limit]
else:
pos = next((i for i, item in enumerate(data) if item['id'] == cursor), -1)
items = data[pos + 1:pos + 1 + limit]
return {
"items": items,
"next_cursor": items[-1]['id'] if len(items) == limit else None,
"has_more": len(items) == limit
}
# Usage
page1 = cursor_paginate(data, cursor=0, limit=10)
page2 = cursor_paginate(data, cursor=page1["next_cursor"], limit=10)Generator-Based (Memory Efficient)
import itertools
def paginate_generator(iterable, page_size: int, page_num: int) -> dict:
"""
Paginate any iterable — works with DB cursors and generators.
Does not load entire dataset into memory.
"""
start = page_size * (page_num - 1)
items = list(itertools.islice(iterable, start, start + page_size))
return {
"page": page_num,
"items": items,
"has_more": len(items) == page_size
}Event System (Pub-Sub)
Where it's used: microservices event buses, plugin systems, UI state management, decoupled module communication.
from collections import defaultdict
from typing import Callable
import functools
class EventEmitter:
"""
Synchronous pub-sub event system.
Listeners at lower priority numbers fire first.
"""
def __init__(self):
self._listeners = defaultdict(list) # event → [(priority, callback)]
self._once = defaultdict(set)
def on(self, event: str, callback: Callable, priority: int = 0):
self._listeners[event].append((priority, callback))
self._listeners[event].sort(key=lambda x: x[0])
return self
def once(self, event: str, callback: Callable):
"""Fire callback only once, then auto-remove."""
self._once[event].add(callback)
return self.on(event, callback)
def off(self, event: str, callback: Callable = None):
if callback is None:
self._listeners[event] = []
else:
self._listeners[event] = [(p, cb) for p, cb in self._listeners[event] if cb != callback]
return self
def emit(self, event: str, *args, **kwargs) -> int:
called = 0
for _, callback in self._listeners.get(event, []):
try:
callback(*args, **kwargs)
called += 1
except Exception as e:
print(f"Listener error on '{event}': {e}")
if event in self._once:
for cb in self._once[event]:
self.off(event, cb)
del self._once[event]
return called
# Usage: order processing system
emitter = EventEmitter()
emitter.on('order.created', lambda **kw: print(f"Log: order {kw['order_id']}"), priority=0)
emitter.on('order.created', lambda **kw: print(f"Email: {kw['email']}"), priority=1)
emitter.on('order.created', lambda **kw: print(f"Inventory update"), priority=2)
emitter.emit('order.created', order_id='ORD-001', email='alice@example.com')
# Log: order ORD-001
# Email: alice@example.com
# Inventory updateLayered Config with ChainMap
Where it's used: application config with defaults → config file → env vars → CLI args.
import os
from collections import ChainMap
class ConfigManager:
"""
Priority (highest to lowest):
1. CLI / runtime overrides
2. Environment variables
3. Config file values
4. Default values
ChainMap searches layers in order — first match wins.
Writing always goes to the first map (runtime layer).
"""
def __init__(self, defaults: dict = None):
self.defaults = defaults or {}
self.file_config = {}
self.env_config = {}
self.runtime_config = {}
self._build()
def _build(self):
self.config = ChainMap(
self.runtime_config,
self.env_config,
self.file_config,
self.defaults
)
def load_env(self, prefix: str = "APP_") -> None:
self.env_config = {
key[len(prefix):].lower(): value
for key, value in os.environ.items()
if key.startswith(prefix)
}
self._build()
def load_file(self, config_dict: dict) -> None:
self.file_config = config_dict
self._build()
def set(self, key: str, value) -> None:
self.runtime_config[key] = value # goes into highest priority layer
def get(self, key: str, default=None):
return self.config.get(key, default)
def __getitem__(self, key: str):
return self.config[key]
# Usage
config = ConfigManager(defaults={'host': 'localhost', 'port': 8080, 'debug': False})
config.load_file({'port': 9000, 'cache_ttl': 600})
config.load_env(prefix="APP_")
config.set('debug', True) # highest priority — overrides all
config.get('host') # 'localhost' — from defaults
config.get('port') # 9000 — from file (overrides default)
config.get('debug') # True — from runtime (highest priority)Frequency Analysis
Where it's used: analytics dashboards, recommendation systems, anomaly detection, A/B testing.
from collections import Counter
class FrequencyAnalyzer:
def __init__(self, top_k: int = 10):
self.counter = Counter()
self.total = 0
self.top_k = top_k
def record(self, item: str, count: int = 1) -> None:
self.counter[item] += count
self.total += count
def record_batch(self, items: list) -> None:
self.counter.update(items)
self.total += len(items)
def top_items(self, k: int = None):
k = k or self.top_k
return [
(item, count, round(count / self.total * 100, 2))
for item, count in self.counter.most_common(k)
]
def frequency(self, item: str) -> float:
return self.counter[item] / self.total if self.total else 0.0
def merge(self, other: 'FrequencyAnalyzer') -> None:
self.counter += other.counter
self.total += other.total
def anomalies(self, multiplier: float = 3.0) -> list:
"""Items with frequency > multiplier × average."""
if not self.counter: return []
avg = self.total / len(self.counter)
return [(item, count) for item, count in self.counter.items() if count > avg * multiplier]
# Usage
analyzer = FrequencyAnalyzer(top_k=5)
analyzer.record_batch(["python", "java", "python", "go", "python", "java", "rust"])
for item, count, pct in analyzer.top_items():
print(f"{item:15} {count:4} ({pct}%)")
# python 3 (42.86%)
# java 2 (28.57%)
# go 1 (14.29%)
# rust 1 (14.29%)Python Internals — Common Interview Questions
How does Python's garbage collection work?
Python uses two mechanisms:
Reference counting (primary): every object has an ob_refcnt. When it hits zero, the object is immediately deallocated. Fast, but can't handle circular references (a → b → a).
Cyclic garbage collector (secondary): runs periodically, finds unreachable reference cycles. Uses generational collection — three generations where generation 0 (youngest) is collected most frequently.
import gc
gc.collect() # manually trigger GC
gc.get_count() # (gen0, gen1, gen2) object counts
gc.disable() # disable GC (e.g., when using ctypes)What is the GIL?
The Global Interpreter Lock is a mutex that allows only one thread to execute Python bytecode at a time. It exists because CPython's reference counting isn't thread-safe without it.
Practical impact:
| Workload | Threading | Fix |
|---|---|---|
| CPU-bound | GIL prevents parallelism | multiprocessing |
| I/O-bound | GIL released during I/O waits | threading or asyncio |
C extensions like NumPy release the GIL manually, which is why they can be fast in multi-threaded code.
How are dicts implemented in CPython?
CPython 3.6+ uses a "compact dict" — two arrays:
- indices array (sparse):
hash(key) % table_size→ index into entries array - entries array (dense):
[(hash, key, value)]— append-only
The entries array being append-only is why dicts preserve insertion order since Python 3.7.
Collision resolution: open addressing with perturbation — slot = (5*slot + 1 + perturb) % size.
Resizes when load factor exceeds 2/3.
__repr__ vs __str__
class Temperature:
def __init__(self, c): self.c = c
def __str__(self): return f"{self.c}°C" # for end users
def __repr__(self): return f"Temperature({self.c})" # for developers
t = Temperature(25)
print(str(t)) # 25°C — calls __str__
print(repr(t)) # Temperature(25) — calls __repr__
print([t]) # [Temperature(25)] — containers always use __repr__Rule: __repr__ should ideally satisfy eval(repr(obj)) == obj. __str__ is for human display.
What is __slots__?
By default, object attributes live in a __dict__ (a hash table). With __slots__, Python allocates a fixed-size array instead — smaller memory footprint, slightly faster attribute access, but no dynamic attribute creation.
class Point:
__slots__ = ('x', 'y') # replaces __dict__ with fixed slots
def __init__(self, x, y):
self.x, self.y = x, y
import sys
class RegularPoint:
def __init__(self, x, y): self.x = x; self.y = y
p1 = Point(1, 2)
p2 = RegularPoint(1, 2)
sys.getsizeof(p1) # ~56 bytes
sys.getsizeof(p2) # ~48 bytes + __dict__ (~232 bytes)Use __slots__ when creating millions of small objects (e.g., graph nodes, data records).
deepcopy vs pickle
deepcopy | pickle | |
|---|---|---|
| Purpose | In-process deep clone | Serialize to bytes (storage/network) |
| Speed | Faster | Slower |
| Circular refs | Handles | Handles |
| Security | Safe | Never unpickle untrusted data |
| Use when | You need independent copy in same process | Persistence or IPC |
Quick Reference
# LRU Cache
from collections import OrderedDict
cache.move_to_end(key) # mark recently used
cache.popitem(last=False) # evict LRU
# Rate Limiter (sliding window)
while requests[user][0] < now - window: requests[user].popleft()
if len(requests[user]) < limit: requests[user].append(now)
# TTL Cache
cache[key] = (value, time.time() + ttl)
if time.time() > expiry: del cache[key] # lazy expiry check
# Generator pipeline
lines = read_lines(file) # lazy
records = parse(lines) # lazy
errors = filter_fn(records) # lazy
for batch in chunked(errors, 1000): # materialized 1000 at a time
process(batch)
# Priority Queue
heapq.heappush(heap, (priority, counter, task))
# Retry with backoff
delay = min(base * (factor ** attempt), max_delay)
if jitter: delay *= (0.8 + random.random() * 0.4)
# Layered config
config = ChainMap(runtime, env, file, defaults)
config['key'] # first match wins — highest priority layerThis document was created with ❤️ for Python Developers who want to crack backend interviews, by Anubhaw. Feel free to suggest improvements or connect @ ContactMe