Python Backend Production Patterns — Real Code for Real Systems

These are the patterns that show up in system design rounds, backend coding challenges, and senior engineer interviews. Each pattern includes the theory, the code, and why it matters in production.


LRU Cache

Where it's used: database query caches, API response caches, CDN edge nodes.

Requirements: get(key) in O(1), put(key, value) in O(1), evict the Least Recently Used item when at capacity.

Data structures needed: HashMap for O(1) lookup + Doubly Linked List for O(1) move-to-front and evict-from-back.

Python's OrderedDict gives you both in one:

from collections import OrderedDict
 
class LRUCache:
    def __init__(self, capacity: int):
        self.cache = OrderedDict()
        self.capacity = capacity
 
    def get(self, key: int) -> int:
        if key not in self.cache:
            return -1
        self.cache.move_to_end(key)        # mark as recently used
        return self.cache[key]
 
    def put(self, key: int, value: int) -> None:
        if key in self.cache:
            self.cache.move_to_end(key)
        self.cache[key] = value
        if len(self.cache) > self.capacity:
            self.cache.popitem(last=False)  # evict least recently used (first item)

Manual Implementation (for deep understanding)

The OrderedDict approach is what you write in interviews. The manual version shows you understand what's underneath:

class DLLNode:
    def __init__(self, key=0, val=0):
        self.key = key
        self.val = val
        self.prev = self.next = None
 
class LRUCacheManual:
    """
    DLL Layout:
    HEAD ↔ [LRU] ↔ ... ↔ [MRU] ↔ TAIL
 
    HEAD.next = least recently used
    TAIL.prev = most recently used
    Dummy HEAD/TAIL eliminate edge case checks.
    """
    def __init__(self, capacity: int):
        self.capacity = capacity
        self.cache = {}
        self.head = DLLNode()
        self.tail = DLLNode()
        self.head.next = self.tail
        self.tail.prev = self.head
 
    def _remove(self, node):
        node.prev.next = node.next
        node.next.prev = node.prev
 
    def _add_to_tail(self, node):
        prev = self.tail.prev
        prev.next = node
        node.prev = prev
        node.next = self.tail
        self.tail.prev = node
 
    def get(self, key: int) -> int:
        if key not in self.cache: return -1
        node = self.cache[key]
        self._remove(node)
        self._add_to_tail(node)
        return node.val
 
    def put(self, key: int, value: int) -> None:
        if key in self.cache:
            node = self.cache[key]
            node.val = value
            self._remove(node)
            self._add_to_tail(node)
        else:
            node = DLLNode(key, value)
            self.cache[key] = node
            self._add_to_tail(node)
            if len(self.cache) > self.capacity:
                lru = self.head.next
                self._remove(lru)
                del self.cache[lru.key]

Rate Limiter

Where it's used: API gateways, authentication endpoints, payment APIs.

Sliding Window Rate Limiter

import time
from collections import defaultdict, deque
 
class SlidingWindowRateLimiter:
    """
    Allow at most max_requests per window_seconds per user.
    Uses a deque to store timestamps of recent requests.
 
    Production equivalent: Redis sorted sets replace the deque.
    """
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests = defaultdict(deque)   # user_id → deque of timestamps
 
    def is_allowed(self, user_id: str) -> bool:
        now = time.time()
        window_start = now - self.window
        user_requests = self.requests[user_id]
 
        # Remove timestamps outside current window
        while user_requests and user_requests[0] < window_start:
            user_requests.popleft()
 
        if len(user_requests) < self.max_requests:
            user_requests.append(now)
            return True
 
        return False

Token Bucket Rate Limiter

Handles traffic bursts more gracefully — tokens accumulate up to a max capacity:

class TokenBucketRateLimiter:
    """
    Tokens refill at a constant rate.
    Allows bursts up to bucket capacity.
    """
    def __init__(self, rate: float, capacity: int):
        self.rate = rate                    # tokens added per second
        self.capacity = capacity
        self.tokens = defaultdict(lambda: capacity)
        self.last_refill = defaultdict(float)
 
    def _refill(self, user_id: str) -> None:
        now = time.time()
        elapsed = now - self.last_refill[user_id]
        new_tokens = elapsed * self.rate
        self.tokens[user_id] = min(self.capacity, self.tokens[user_id] + new_tokens)
        self.last_refill[user_id] = now
 
    def is_allowed(self, user_id: str, cost: int = 1) -> bool:
        self._refill(user_id)
        if self.tokens[user_id] >= cost:
            self.tokens[user_id] -= cost
            return True
        return False

Sliding Window vs Token Bucket:

Sliding WindowToken Bucket
Burst handlingHard cutoffAllows bursts up to capacity
MemoryO(requests in window)O(users)
ComplexitySimpleSlightly more complex
Use caseStrict per-second limitsAPI with burst tolerance

TTL Cache

Where it's used: API response caching, session storage, computed result caching.

As a Class

import time
from threading import Lock
 
class TTLCache:
    """
    Thread-safe cache with per-key time-to-live expiry.
    Expired keys are removed lazily on access.
    """
    def __init__(self, default_ttl: float = 300):
        self.store = {}                    # key → (value, expiry_timestamp)
        self.default_ttl = default_ttl
        self.lock = Lock()
 
    def get(self, key: str):
        with self.lock:
            if key not in self.store:
                return None
            value, expiry = self.store[key]
            if time.time() > expiry:
                del self.store[key]        # lazy deletion
                return None
            return value
 
    def set(self, key: str, value, ttl: float = None) -> None:
        with self.lock:
            ttl = ttl or self.default_ttl
            self.store[key] = (value, time.time() + ttl)
 
    def delete(self, key: str) -> bool:
        with self.lock:
            if key in self.store:
                del self.store[key]
                return True
            return False
 
    def cleanup_expired(self) -> int:
        """Call periodically to free memory from expired keys."""
        with self.lock:
            now = time.time()
            expired = [k for k, (_, exp) in self.store.items() if now > exp]
            for k in expired:
                del self.store[k]
            return len(expired)

As a Decorator

import functools
 
def ttl_cache(ttl_seconds: float = 300):
    """Cache function results with TTL. Arguments must be hashable."""
    def decorator(func):
        cache = {}
 
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            key = (args, tuple(sorted(kwargs.items())))
            now = time.time()
            if key in cache:
                result, expiry = cache[key]
                if now < expiry:
                    return result          # cache hit
 
            result = func(*args, **kwargs)
            cache[key] = (result, now + ttl_seconds)
            return result
 
        def invalidate(*args, **kwargs):
            key = (args, tuple(sorted(kwargs.items())))
            cache.pop(key, None)
 
        wrapper.cache = cache
        wrapper.invalidate = invalidate
        return wrapper
    return decorator
 
@ttl_cache(ttl_seconds=60)
def fetch_user(user_id: int) -> dict:
    print(f"DB query for user {user_id}")
    return {"id": user_id, "name": f"User_{user_id}"}
 
fetch_user(42)              # DB query for user 42
fetch_user(42)              # cache hit — no print
fetch_user.invalidate(42)  # bust cache
fetch_user(42)              # DB query again

Generator Pipelines for Large Data

Where it's used: log processing, ETL pipelines, streaming data, CSV imports.

The key idea: each stage is a generator. Data flows one record at a time — total memory is O(batch_size) regardless of file size.

import json
from typing import Iterator, Generator
 
def read_lines(filepath: str) -> Generator:
    """Stage 1: Read file one line at a time — O(1) memory."""
    with open(filepath, 'r') as f:
        for line in f:
            yield line.rstrip('\n')
 
def parse_json_lines(lines: Iterator) -> Generator:
    """Stage 2: Parse JSON, skip malformed lines."""
    for line in lines:
        if not line.strip():
            continue
        try:
            yield json.loads(line)
        except json.JSONDecodeError:
            continue
 
def filter_by_level(records: Iterator, level: str) -> Generator:
    """Stage 3: Keep only records matching log level."""
    for record in records:
        if record.get('level', '').upper() == level.upper():
            yield record
 
def enrich(records: Iterator) -> Generator:
    """Stage 4: Add computed fields."""
    for record in records:
        record['is_critical'] = record.get('level') in ('ERROR', 'CRITICAL')
        yield record
 
def batch(records: Iterator, size: int = 1000) -> Generator:
    """Stage 5: Group records for bulk DB inserts."""
    chunk = []
    for record in records:
        chunk.append(record)
        if len(chunk) >= size:
            yield chunk
            chunk = []
    if chunk:
        yield chunk
 
# The pipeline — no data loaded until iteration starts
def process_log(filepath: str) -> int:
    lines    = read_lines(filepath)
    records  = parse_json_lines(lines)
    errors   = filter_by_level(records, 'ERROR')
    enriched = enrich(errors)
    batches  = batch(enriched, size=500)
 
    total = 0
    for b in batches:
        bulk_insert_to_db(b)
        total += len(b)
    return total

Chunking Any Iterable

import itertools
 
def chunked(iterable, size: int):
    """Split any iterable into chunks — works with generators."""
    it = iter(iterable)
    while True:
        chunk = list(itertools.islice(it, size))
        if not chunk:
            break
        yield chunk
 
# Process 10,000 DB rows 100 at a time
def stream_users():
    for i in range(1, 10001):
        yield {"id": i, "name": f"User_{i}"}
 
for batch in chunked(stream_users(), 100):
    send_to_email_service(batch)   # 100 users at a time, constant memory

Priority Task Queue

Where it's used: job schedulers, async task managers, event processing.

import heapq
import itertools
from dataclasses import dataclass, field
from typing import Callable
from enum import IntEnum
 
class Priority(IntEnum):
    CRITICAL = 0    # processed first
    HIGH     = 1
    MEDIUM   = 2
    LOW      = 3
 
@dataclass(order=True)
class Task:
    priority: Priority
    sequence: int                       # tiebreaker — FIFO within same priority
    name: str  = field(compare=False)
    func: Callable = field(compare=False)
    args: tuple = field(default_factory=tuple, compare=False)
    kwargs: dict = field(default_factory=dict, compare=False)
 
class PriorityTaskQueue:
    def __init__(self):
        self._heap = []
        self._counter = itertools.count()
 
    def submit(self, func: Callable, *args, priority=Priority.MEDIUM, name="", **kwargs):
        task = Task(
            priority=priority,
            sequence=next(self._counter),  # ensures FIFO at same priority
            name=name or func.__name__,
            func=func,
            args=args,
            kwargs=kwargs
        )
        heapq.heappush(self._heap, task)
 
    def execute_next(self):
        if not self._heap:
            raise IndexError("Queue is empty")
        task = heapq.heappop(self._heap)
        return task.func(*task.args, **task.kwargs)
 
    def execute_all(self) -> list:
        results = []
        while self._heap:
            try:
                results.append(self.execute_next())
            except Exception as e:
                print(f"Task failed: {e}")
        return results
 
    def size(self) -> int:
        return len(self._heap)
 
# Usage
queue = PriorityTaskQueue()
queue.submit(send_alert, "ceo@co.com", priority=Priority.CRITICAL)
queue.submit(generate_report, 42,      priority=Priority.LOW)
queue.submit(send_welcome, "user@co.com", priority=Priority.HIGH)
queue.execute_all()
# Execution order: send_alert → send_welcome → generate_report

Retry with Exponential Backoff

Where it's used: HTTP clients, database connections, payment APIs, any external call that can fail transiently.

import time
import random
import functools
 
class RetryExhausted(Exception):
    pass
 
def retry(
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    backoff_factor: float = 2.0,
    jitter: bool = True,
    exceptions: tuple = (Exception,)
):
    """
    Decorator: retry with exponential backoff.
 
    Delay schedule: base_delay * backoff_factor^(attempt-1)
    Jitter adds ±20% randomness — prevents thundering herd problem
    when many clients retry simultaneously.
    """
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exc = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exc = e
                    if attempt == max_attempts:
                        break
                    delay = min(base_delay * (backoff_factor ** (attempt - 1)), max_delay)
                    if jitter:
                        delay *= (0.8 + random.random() * 0.4)  # ±20%
                    print(f"Attempt {attempt} failed: {e}. Retry in {delay:.2f}s...")
                    time.sleep(delay)
            raise RetryExhausted(f"All {max_attempts} attempts failed") from last_exc
        return wrapper
    return decorator
 
@retry(max_attempts=4, base_delay=0.5, exceptions=(ConnectionError, TimeoutError))
def call_payment_api(amount: float) -> dict:
    # ... actual API call
    pass

Delay schedule example with base_delay=1.0, backoff_factor=2.0:

AttemptDelay before retry
11s
22s
34s
4raise exception

Pagination Patterns

Where it's used: REST APIs, database cursors, search results.

Offset-Based (Simple, Common)

def paginate(data: list, page: int, page_size: int) -> dict:
    """page is 1-indexed."""
    total = len(data)
    start = (page - 1) * page_size
    items = data[start:start + page_size]
    return {
        "items": items,
        "page": page,
        "page_size": page_size,
        "total": total,
        "total_pages": (total + page_size - 1) // page_size,
        "has_next": start + page_size < total,
        "has_prev": page > 1
    }

Cursor-Based (Scalable, Stable)

More reliable when data is inserted/deleted between page requests:

def cursor_paginate(data: list, cursor: int, limit: int) -> dict:
    """
    cursor = ID of last seen item (0 for first page).
    More stable than offset — unaffected by concurrent inserts/deletes.
    More efficient for DB — uses index seek instead of full scan.
    """
    if cursor == 0:
        items = data[:limit]
    else:
        pos = next((i for i, item in enumerate(data) if item['id'] == cursor), -1)
        items = data[pos + 1:pos + 1 + limit]
 
    return {
        "items": items,
        "next_cursor": items[-1]['id'] if len(items) == limit else None,
        "has_more": len(items) == limit
    }
 
# Usage
page1 = cursor_paginate(data, cursor=0, limit=10)
page2 = cursor_paginate(data, cursor=page1["next_cursor"], limit=10)

Generator-Based (Memory Efficient)

import itertools
 
def paginate_generator(iterable, page_size: int, page_num: int) -> dict:
    """
    Paginate any iterable — works with DB cursors and generators.
    Does not load entire dataset into memory.
    """
    start = page_size * (page_num - 1)
    items = list(itertools.islice(iterable, start, start + page_size))
    return {
        "page": page_num,
        "items": items,
        "has_more": len(items) == page_size
    }

Event System (Pub-Sub)

Where it's used: microservices event buses, plugin systems, UI state management, decoupled module communication.

from collections import defaultdict
from typing import Callable
import functools
 
class EventEmitter:
    """
    Synchronous pub-sub event system.
    Listeners at lower priority numbers fire first.
    """
    def __init__(self):
        self._listeners = defaultdict(list)  # event → [(priority, callback)]
        self._once = defaultdict(set)
 
    def on(self, event: str, callback: Callable, priority: int = 0):
        self._listeners[event].append((priority, callback))
        self._listeners[event].sort(key=lambda x: x[0])
        return self
 
    def once(self, event: str, callback: Callable):
        """Fire callback only once, then auto-remove."""
        self._once[event].add(callback)
        return self.on(event, callback)
 
    def off(self, event: str, callback: Callable = None):
        if callback is None:
            self._listeners[event] = []
        else:
            self._listeners[event] = [(p, cb) for p, cb in self._listeners[event] if cb != callback]
        return self
 
    def emit(self, event: str, *args, **kwargs) -> int:
        called = 0
        for _, callback in self._listeners.get(event, []):
            try:
                callback(*args, **kwargs)
                called += 1
            except Exception as e:
                print(f"Listener error on '{event}': {e}")
 
        if event in self._once:
            for cb in self._once[event]:
                self.off(event, cb)
            del self._once[event]
 
        return called
 
# Usage: order processing system
emitter = EventEmitter()
 
emitter.on('order.created', lambda **kw: print(f"Log: order {kw['order_id']}"), priority=0)
emitter.on('order.created', lambda **kw: print(f"Email: {kw['email']}"), priority=1)
emitter.on('order.created', lambda **kw: print(f"Inventory update"), priority=2)
 
emitter.emit('order.created', order_id='ORD-001', email='alice@example.com')
# Log: order ORD-001
# Email: alice@example.com
# Inventory update

Layered Config with ChainMap

Where it's used: application config with defaults → config file → env vars → CLI args.

import os
from collections import ChainMap
 
class ConfigManager:
    """
    Priority (highest to lowest):
    1. CLI / runtime overrides
    2. Environment variables
    3. Config file values
    4. Default values
 
    ChainMap searches layers in order — first match wins.
    Writing always goes to the first map (runtime layer).
    """
    def __init__(self, defaults: dict = None):
        self.defaults = defaults or {}
        self.file_config = {}
        self.env_config = {}
        self.runtime_config = {}
        self._build()
 
    def _build(self):
        self.config = ChainMap(
            self.runtime_config,
            self.env_config,
            self.file_config,
            self.defaults
        )
 
    def load_env(self, prefix: str = "APP_") -> None:
        self.env_config = {
            key[len(prefix):].lower(): value
            for key, value in os.environ.items()
            if key.startswith(prefix)
        }
        self._build()
 
    def load_file(self, config_dict: dict) -> None:
        self.file_config = config_dict
        self._build()
 
    def set(self, key: str, value) -> None:
        self.runtime_config[key] = value    # goes into highest priority layer
 
    def get(self, key: str, default=None):
        return self.config.get(key, default)
 
    def __getitem__(self, key: str):
        return self.config[key]
 
# Usage
config = ConfigManager(defaults={'host': 'localhost', 'port': 8080, 'debug': False})
config.load_file({'port': 9000, 'cache_ttl': 600})
config.load_env(prefix="APP_")
config.set('debug', True)               # highest priority — overrides all
 
config.get('host')     # 'localhost' — from defaults
config.get('port')     # 9000 — from file (overrides default)
config.get('debug')    # True — from runtime (highest priority)

Frequency Analysis

Where it's used: analytics dashboards, recommendation systems, anomaly detection, A/B testing.

from collections import Counter
 
class FrequencyAnalyzer:
    def __init__(self, top_k: int = 10):
        self.counter = Counter()
        self.total = 0
        self.top_k = top_k
 
    def record(self, item: str, count: int = 1) -> None:
        self.counter[item] += count
        self.total += count
 
    def record_batch(self, items: list) -> None:
        self.counter.update(items)
        self.total += len(items)
 
    def top_items(self, k: int = None):
        k = k or self.top_k
        return [
            (item, count, round(count / self.total * 100, 2))
            for item, count in self.counter.most_common(k)
        ]
 
    def frequency(self, item: str) -> float:
        return self.counter[item] / self.total if self.total else 0.0
 
    def merge(self, other: 'FrequencyAnalyzer') -> None:
        self.counter += other.counter
        self.total += other.total
 
    def anomalies(self, multiplier: float = 3.0) -> list:
        """Items with frequency > multiplier × average."""
        if not self.counter: return []
        avg = self.total / len(self.counter)
        return [(item, count) for item, count in self.counter.items() if count > avg * multiplier]
 
# Usage
analyzer = FrequencyAnalyzer(top_k=5)
analyzer.record_batch(["python", "java", "python", "go", "python", "java", "rust"])
 
for item, count, pct in analyzer.top_items():
    print(f"{item:15} {count:4}  ({pct}%)")
 
# python           3  (42.86%)
# java             2  (28.57%)
# go               1  (14.29%)
# rust             1  (14.29%)

Python Internals — Common Interview Questions

How does Python's garbage collection work?

Python uses two mechanisms:

Reference counting (primary): every object has an ob_refcnt. When it hits zero, the object is immediately deallocated. Fast, but can't handle circular references (a → b → a).

Cyclic garbage collector (secondary): runs periodically, finds unreachable reference cycles. Uses generational collection — three generations where generation 0 (youngest) is collected most frequently.

import gc
gc.collect()       # manually trigger GC
gc.get_count()     # (gen0, gen1, gen2) object counts
gc.disable()       # disable GC (e.g., when using ctypes)

What is the GIL?

The Global Interpreter Lock is a mutex that allows only one thread to execute Python bytecode at a time. It exists because CPython's reference counting isn't thread-safe without it.

Practical impact:

WorkloadThreadingFix
CPU-boundGIL prevents parallelismmultiprocessing
I/O-boundGIL released during I/O waitsthreading or asyncio

C extensions like NumPy release the GIL manually, which is why they can be fast in multi-threaded code.

How are dicts implemented in CPython?

CPython 3.6+ uses a "compact dict" — two arrays:

  1. indices array (sparse): hash(key) % table_size → index into entries array
  2. entries array (dense): [(hash, key, value)] — append-only

The entries array being append-only is why dicts preserve insertion order since Python 3.7.

Collision resolution: open addressing with perturbation — slot = (5*slot + 1 + perturb) % size.

Resizes when load factor exceeds 2/3.

__repr__ vs __str__

class Temperature:
    def __init__(self, c): self.c = c
    def __str__(self):  return f"{self.c}°C"           # for end users
    def __repr__(self): return f"Temperature({self.c})" # for developers
 
t = Temperature(25)
print(str(t))    # 25°C       — calls __str__
print(repr(t))   # Temperature(25) — calls __repr__
print([t])       # [Temperature(25)] — containers always use __repr__

Rule: __repr__ should ideally satisfy eval(repr(obj)) == obj. __str__ is for human display.

What is __slots__?

By default, object attributes live in a __dict__ (a hash table). With __slots__, Python allocates a fixed-size array instead — smaller memory footprint, slightly faster attribute access, but no dynamic attribute creation.

class Point:
    __slots__ = ('x', 'y')    # replaces __dict__ with fixed slots
    def __init__(self, x, y):
        self.x, self.y = x, y
 
import sys
class RegularPoint:
    def __init__(self, x, y): self.x = x; self.y = y
 
p1 = Point(1, 2)
p2 = RegularPoint(1, 2)
sys.getsizeof(p1)   # ~56 bytes
sys.getsizeof(p2)   # ~48 bytes + __dict__ (~232 bytes)

Use __slots__ when creating millions of small objects (e.g., graph nodes, data records).

deepcopy vs pickle

deepcopypickle
PurposeIn-process deep cloneSerialize to bytes (storage/network)
SpeedFasterSlower
Circular refsHandlesHandles
SecuritySafeNever unpickle untrusted data
Use whenYou need independent copy in same processPersistence or IPC

Quick Reference

# LRU Cache
from collections import OrderedDict
cache.move_to_end(key)              # mark recently used
cache.popitem(last=False)           # evict LRU
 
# Rate Limiter (sliding window)
while requests[user][0] < now - window: requests[user].popleft()
if len(requests[user]) < limit: requests[user].append(now)
 
# TTL Cache
cache[key] = (value, time.time() + ttl)
if time.time() > expiry: del cache[key]   # lazy expiry check
 
# Generator pipeline
lines   = read_lines(file)          # lazy
records = parse(lines)              # lazy
errors  = filter_fn(records)       # lazy
for batch in chunked(errors, 1000): # materialized 1000 at a time
    process(batch)
 
# Priority Queue
heapq.heappush(heap, (priority, counter, task))
 
# Retry with backoff
delay = min(base * (factor ** attempt), max_delay)
if jitter: delay *= (0.8 + random.random() * 0.4)
 
# Layered config
config = ChainMap(runtime, env, file, defaults)
config['key']   # first match wins — highest priority layer

This document was created with ❤️ for Python Developers who want to crack backend interviews, by Anubhaw. Feel free to suggest improvements or connect @ ContactMe