A senior engineer teaching another engineer how to crack backend interviews.
Not a Python encyclopedia. Not competitive programming. Optimized for: interview signal, production thinking, and real backend engineering.
Python passes everything by object reference — not by value, not by C++ reference. Understanding this prevents the most common class of Python bugs.
# ❌ Mutable default argument — the #1 Python gotchadef append_item(item, lst=[]): # lst is created ONCE at definition time lst.append(item) return lstappend_item(1) # [1]append_item(2) # [1, 2] — bug! shared across all calls# ✅ Correct patterndef append_item(item, lst=None): if lst is None: lst = [] lst.append(item) return lst
# is vs ==a = [1, 2, 3]b = a # same objectc = [1, 2, 3] # equal but different objecta == c # True — equalitya is c # False — identitya is b # True — same object
Reference counting + cyclic GC — Python frees memory when refcount hits 0. Cycles handled by garbage collector.
Level 2 — Strong Candidate Knowledge
Interning: Python interns small integers (-5 to 256) and short strings. This is why a is b can be True for small integers without explicit assignment.
a = 256; b = 256; a is b # True (interned)a = 257; b = 257; a is b # False (not interned) — watch out!
__slots__ — Eliminate __dict__ per-instance overhead for high-volume objects.
class Point: __slots__ = ['x', 'y'] # No __dict__ created per instance def __init__(self, x, y): self.x = x self.y = y# 3–5x less memory than a regular class# Use case: ORM row objects, graph nodes, DTOs at scale (1M+ instances)
Level 3 — Deep Internals
CPython uses reference counting + cyclic garbage collector. The GIL (Global Interpreter Lock) ensures only one thread executes Python bytecode at a time:
CPU-bound: threads don't give true parallelism → use multiprocessing
I/O-bound: threads work, asyncio is better for high concurrency
🎯 Interview Reality
What interviewers actually ask:
"Explain Python's memory model" → mention reference counting, GC, GIL
"Why is mutable default argument a bug?" → one object created at function definition time
"What's the difference between is and ==?" → identity vs equality
Strong candidate answer for mutable default:"The default argument is evaluated once when the function is defined, not each call. All calls share the same list object. Fix: use None as default, create a new list inside the function body."
Common trap: Saying Python is "pass by reference" — it's pass-by-object-reference (the reference itself is passed by value).
Follow-up: "When would you use __slots__?""When I know attributes at class definition time and I'm creating millions of instances — like ORM row objects, graph nodes, or data transfer objects. Saves the per-instance __dict__ overhead."
⚙️ Production Thinking
# Memory optimization for bulk data processing# Without __slots__: ~400 bytes/object# With __slots__: ~56 bytes/objectclass LogEntry: __slots__ = ['timestamp', 'level', 'message', 'service']# Processing 1M log entries: saves ~350MB RAM
1.2 Built-in Data Structures
Level 1 — Must Know
Structure
Best for
Key Complexity
list
Ordered, mutable sequence
O(1) append, O(n) insert/search
dict
Key-value, O(1) lookup
O(1) avg get/set. Ordered since 3.7
set
Uniqueness, membership tests
O(1) avg lookup
deque
Queue/stack with O(1) both ends
O(1) appendleft/popleft
heapq
Priority queue
O(log n) push/pop
from collections import deque, defaultdict, Counterimport heapq# deque — O(1) at both ends (use instead of list for queues)q = deque([1, 2, 3])q.appendleft(0) # O(1) — list.insert(0, x) is O(n)!q.popleft() # O(1)# Counter — frequency countingc = Counter(['apple', 'banana', 'apple'])c.most_common(2) # [('apple', 2), ('banana', 1)]# defaultdict — clean grouping, no KeyErrorgraph = defaultdict(list)graph['a'].append('b')# heapq — min-heap (negate values for max-heap)heap = []heapq.heappush(heap, 3)heapq.heappush(heap, 1)heapq.heappop(heap) # 1 (min)
Level 2 — Strong Candidate Knowledge
# Use set not list for membership testsVALID_STATUSES = {'active', 'inactive', 'pending'} # O(1)if user.status in VALID_STATUSES: ... # vs list: O(n)# LRU Cache using OrderedDictfrom collections import OrderedDictclass LRUCache: def __init__(self, capacity: int): self.cache = OrderedDict() self.capacity = capacity def get(self, key: int) -> int: if key not in self.cache: return -1 self.cache.move_to_end(key) # Mark as recently used return self.cache[key] def put(self, key: int, value: int) -> None: if key in self.cache: self.cache.move_to_end(key) self.cache[key] = value if len(self.cache) > self.capacity: self.cache.popitem(last=False) # Evict LRU
Level 3 — Deep Internals
CPython dict uses open-addressing hash table with load factor < 2/3 before rehashing (O(n) but amortized O(1) per insert). Dict keys must be hashable (immutable). Since Python 3.7, dicts maintain insertion order.
🎯 Interview Reality
"How would you count word frequency?" → Counter
"Implement a queue in Python" → deque with append/popleft
"Why is list.insert(0, x) slow?" → O(n) shift; use deque
Strong answer:"I'd use collections.deque. It's a doubly linked list — appendleft and popleft are O(1). A list is O(n) for front inserts because it has to shift all elements."
Follow-up: "When would you use OrderedDict now that dicts are ordered?""Mostly for move_to_end() — useful for implementing LRU cache. Otherwise regular dict is fine."
1.3 Comprehensions & Generators
Level 1 — Must Know
# List comprehension — creates full list in memorysquares = [x**2 for x in range(10) if x % 2 == 0]# Dict comprehension — transform dicts (common in APIs)user_map = {user.id: user for user in users}# Generator expression — lazy, constant memorytotal = sum(x**2 for x in range(1_000_000)) # No list created!# Rule: generator when iterating once over large data# list when you need random access, len(), or multiple passes
# Generator function — for streaming/pipeline patternsdef read_large_file(filepath): with open(filepath) as f: for line in f: yield line.strip() # O(1) memory regardless of file size# Process line by line — never loads full filefor line in read_large_file('large_log.txt'): process(line)
Level 2 — Strong Candidate Knowledge
# Generator pipelines — Unix pipe styledef parse_logs(lines): for line in lines: yield json.loads(line)def filter_errors(records): for record in records: if record['level'] == 'ERROR': yield record# Compose — processes one item at a time, O(1) memorylines = read_large_file('app.log')records = parse_logs(lines)errors = filter_errors(records)# yield from — delegate to sub-generatordef flatten(nested): for item in nested: if isinstance(item, list): yield from flatten(item) else: yield item
🎯 Interview Reality
"How do you process a file with 10M lines without loading it all?" → generator function
"What's the difference between [x for x in ...] and (x for x in ...)?" → list vs generator
Common trap: Using list comprehension inside sum() — always use generator expression:
sum(x**2 for x in ...) # ✅ generator — no intermediate listsum([x**2 for x in ...]) # ❌ creates full list first
⚙️ Production Thinking
Generators are the right tool for:
Processing multi-GB log files without OOM
Streaming API responses (StreamingResponse in FastAPI)
DB pagination — yield page by page, not all rows at once
ETL pipelines — parse → filter → transform → load
1.4 Decorators & Closures
Level 1 — Must Know
import functoolsimport time# Decorator with arguments — three levels of nestingdef retry(max_attempts=3, exceptions=(Exception,)): def decorator(func): @functools.wraps(func) # Preserves __name__, __doc__ def wrapper(*args, **kwargs): for attempt in range(max_attempts): try: return func(*args, **kwargs) except exceptions as e: if attempt == max_attempts - 1: raise time.sleep(2 ** attempt) # Exponential backoff return wrapper return decorator@retry(max_attempts=3, exceptions=(ConnectionError, TimeoutError))def call_external_api(url): return requests.get(url)
Level 2 — Strong Candidate Knowledge
# Closure — function that captures outer scope variablesdef make_counter(start=0): count = start # closed-over variable def increment(): nonlocal count # must declare to modify outer var count += 1 return count return incrementcounter = make_counter(10)counter() # 11counter() # 12# Classic closure bug in loopsfuncs = [lambda: i for i in range(3)][f() for f in funcs] # [2, 2, 2] — all capture same i!# Fix: capture by value via default argumentfuncs = [lambda i=i: i for i in range(3)][f() for f in funcs] # [0, 1, 2] ✅
# Class-based decorator — for stateful decoratorsclass RateLimiter: def __init__(self, max_calls, period): self.max_calls = max_calls self.period = period self.calls = [] def __call__(self, func): @functools.wraps(func) def wrapper(*args, **kwargs): now = time.time() self.calls = [c for c in self.calls if now - c < self.period] if len(self.calls) >= self.max_calls: raise Exception("Rate limit exceeded") self.calls.append(now) return func(*args, **kwargs) return wrapper@RateLimiter(max_calls=100, period=60)def api_endpoint(): pass
Level 3 — Deep Internals
Closures capture variables by reference, not value — the cell object in __closure__ stores the shared reference. This is why the loop variable capture bug happens.
🎯 Interview Reality
"Implement a retry decorator" → common Python exercise
"What does @functools.wraps do?" → preserves __name__, __doc__; without it func.__name__ returns 'wrapper'
"What's a closure?" → function that captures variables from its enclosing scope
Strong answer for retry:"A retry decorator wraps the function in a loop. On exception, it catches it, waits with exponential backoff, and retries. In production I'd add jitter to the backoff to avoid thundering herd when many clients retry simultaneously."
⚙️ Production Thinking
# Auth decorator pattern — common in FastAPIdef require_permission(permission: str): def decorator(func): @functools.wraps(func) async def wrapper(*args, **kwargs): request = kwargs.get('request') or args[0] if not request.user.has_permission(permission): raise HTTPException(403, "Insufficient permissions") return await func(*args, **kwargs) return wrapper return decorator@router.delete("/users/{id}")@require_permission("users:delete")async def delete_user(id: int, request: Request): ...
1.5 OOP & Design Patterns
Level 1 — Must Know
from abc import ABC, abstractmethodfrom dataclasses import dataclass, field# Abstract Base Class — enforce interface contractsclass Repository(ABC): @abstractmethod def get(self, id: int): pass @abstractmethod def save(self, entity): pass# Dataclass — use instead of plain dicts in APIs@dataclassclass UserDTO: id: int name: str email: str roles: list[str] = field(default_factory=list) active: bool = True
@classmethod vs @staticmethod:
@classmethod receives the class as cls → use for alternative constructors: User.from_dict()
@staticmethod is a regular function namespaced to the class → use for utility functions
"How would you design a plugin system?" → ABC + registry
"Explain Repository pattern" → decouple business logic from data access; makes mocking trivial
Strong answer:"I'd use the Repository pattern. The service layer depends on the abstract interface. In tests I inject a MockRepository — no DB connection needed. In production I inject the SQLAlchemy implementation. This makes unit tests fast and lets me swap the storage layer."
1.6 Error Handling
Level 1 — Must Know
# Custom exceptions — always define for your domainclass AppError(Exception): """Base exception""" passclass NotFoundError(AppError): def __init__(self, resource: str, id): self.resource = resource self.id = id super().__init__(f"{resource} with id={id} not found")class ValidationError(AppError): def __init__(self, field: str, message: str): self.field = field super().__init__(f"Validation failed on {field}: {message}")# Context manager for resource managementfrom contextlib import contextmanager@contextmanagerdef transaction(db): try: yield db db.commit() except Exception: db.rollback() raise
Level 2 — Strong Candidate Knowledge
# Exception chaining — preserve original causetry: result = external_api.call()except requests.RequestException as e: raise ServiceUnavailableError("Payment service down") from e # Original traceback preserved in __cause__
⚙️ Production Thinking
# Global exception handler in FastAPI — map domain errors to HTTP@app.exception_handler(NotFoundError)async def not_found_handler(request: Request, exc: NotFoundError): return JSONResponse( status_code=404, content={"error": "NOT_FOUND", "message": str(exc), "resource": exc.resource} )# Clean separation: domain raises domain errors,# HTTP layer maps them to HTTP responses
from typing import Protocol, TypedDict, Literal# Protocol — structural subtyping (duck typing + type safety)class Saveable(Protocol): def save(self) -> None: ...# TypedDict — typed dicts for JSON structuresclass UserResponse(TypedDict): id: int name: str email: str# Literal — constrain to specific valuesStatus = Literal['active', 'inactive', 'banned']def update_status(user_id: int, status: Status) -> None: ...
🎯 Interview Reality
"What's the difference between Protocol vs ABC?""Protocol enables structural subtyping — if an object has the right methods, it satisfies the Protocol without explicit inheritance. This is more flexible for third-party libraries. ABCs require explicit inheritance. I prefer Protocol for defining interfaces between services."
Part 2: DSA for Backend Interviews
The Goal: Not competitive programming. Pattern recognition and clear communication under interview pressure.
2.1 Arrays & HashMaps
Pattern Recognition
Arrays + HashMaps cover ~60% of backend interview problems.
Core insight: When you need O(1) lookup, complement, or frequency — reach for a hashmap.
# Pattern 1: Two Sum — hashmap for O(n) complement lookupdef two_sum(nums: list[int], target: int) -> list[int]: seen = {} # value -> index for i, num in enumerate(nums): complement = target - num if complement in seen: return [seen[complement], i] seen[num] = i return []# Time: O(n) Space: O(n)
# Pattern 2: Frequency map — top-K, duplicates, anagramsfrom collections import Counterdef top_k_frequent(nums: list[int], k: int) -> list[int]: return [x for x, _ in Counter(nums).most_common(k)]# Pattern 3: Group Anagrams — canonical key for groupingdef group_anagrams(strs: list[str]) -> list[list[str]]: groups = defaultdict(list) for s in strs: key = tuple(sorted(s)) # Canonical form groups[key].append(s) return list(groups.values())
🎯 Interview Reality
Approach for array problems:
Clarify: sorted? duplicates allowed? return indices or values?
State O(n²) brute force first
Optimize: "I'll trade O(n) space for O(n) time with a hashmap"
Code, then test with edge cases
Common traps:
Forgetting duplicates: [3, 3] with target 6
Off-by-one in indices
Not asking about constraints (empty array, negatives)
⚙️ Production Thinking
# Real-world: idempotency check for API requestsclass IdempotencyStore: def __init__(self): self._store: dict[str, dict] = {} def is_duplicate(self, key: str) -> bool: return key in self._store def store_result(self, key: str, result: dict, ttl: int = 86400): self._store[key] = {'result': result, 'expires_at': time.time() + ttl}# In production: use Redis for distributed idempotency
2.2 Sliding Window
Pattern Recognition
Two pointers maintaining a window. Use for: subarray/substring with constraint, running metrics over a range, rate limiting.
# Fixed-size window — max sum of k elementsdef max_sum_k(nums: list[int], k: int) -> int: window_sum = sum(nums[:k]) max_sum = window_sum for i in range(k, len(nums)): window_sum += nums[i] - nums[i - k] # Add new, remove old max_sum = max(max_sum, window_sum) return max_sum
# Variable-size window — longest substring without repeating charsdef length_of_longest_substring(s: str) -> int: seen = {} # char -> last index left = 0 max_len = 0 for right, char in enumerate(s): if char in seen and seen[char] >= left: left = seen[char] + 1 # Shrink window seen[char] = right max_len = max(max_len, right - left + 1) return max_len# Template: expand right → update state → shrink left while invalid → update result
# Sliding window rate limiter — DIRECT production usagefrom collections import dequeimport timeclass SlidingWindowRateLimiter: def __init__(self, max_requests: int, window_seconds: int): self.max_requests = max_requests self.window = window_seconds self.requests: dict[str, deque] = defaultdict(deque) def is_allowed(self, user_id: str) -> bool: now = time.time() q = self.requests[user_id] while q and q[0] < now - self.window: q.popleft() if len(q) < self.max_requests: q.append(now) return True return False
🎯 Interview Reality
Template to memorize:
left = 0; result = 0; window_state = {}for right in range(len(arr)): # 1. Add arr[right] to window # 2. Shrink left while constraint violated # 3. Update result result = max(result, right - left + 1)
# Valid parenthesesdef is_valid(s: str) -> bool: stack = [] pairs = {')': '(', '}': '{', ']': '['} for char in s: if char in '({[': stack.append(char) elif not stack or stack[-1] != pairs[char]: return False else: stack.pop() return len(stack) == 0# Monotonic stack — next greater elementdef next_greater_element(nums: list[int]) -> list[int]: result = [-1] * len(nums) stack = [] # Indices waiting for their next greater for i, num in enumerate(nums): while stack and nums[stack[-1]] < num: result[stack.pop()] = num stack.append(i) return result
🎯 Interview Reality
When to think "stack":
"Process X and undo" → stack
"Nested structure validation" → stack
"Previous/next larger element" → monotonic stack
2.4 Queue & BFS
Pattern Recognition
Queue = FIFO. BFS uses a queue. Use for: shortest path (unweighted), level-order traversal, dependency resolution.
from collections import deque# BFS templatedef bfs(graph: dict, start: str) -> list[str]: visited = {start} queue = deque([start]) order = [] while queue: node = queue.popleft() order.append(node) for neighbor in graph.get(node, []): if neighbor not in visited: visited.add(neighbor) queue.append(neighbor) return order# Level-by-level BFS — when depth mattersdef bfs_levels(graph: dict, start: str) -> list[list[str]]: visited = {start} queue = deque([start]) levels = [] while queue: level = [] for _ in range(len(queue)): # Process one level at a time node = queue.popleft() level.append(node) for neighbor in graph.get(node, []): if neighbor not in visited: visited.add(neighbor) queue.append(neighbor) levels.append(level) return levels
⚙️ Production Thinking
# BFS for dependency/service startup order (Kahn's algorithm)def topological_sort(dependencies: dict[str, list[str]]) -> list[str]: in_degree = defaultdict(int) all_nodes = set() for node, deps in dependencies.items(): all_nodes.add(node) for dep in deps: in_degree[node] += 1 all_nodes.add(dep) queue = deque([n for n in all_nodes if in_degree[n] == 0]) order = [] while queue: node = queue.popleft(); order.append(node) for dep, deps in dependencies.items(): if node in deps: in_degree[dep] -= 1 if in_degree[dep] == 0: queue.append(dep) if len(order) != len(all_nodes): raise ValueError("Cycle detected") return order
2.5 Heap / Priority Queue
Pattern Recognition
Heap = efficient min/max retrieval. Python only has min-heap — negate values for max-heap.
import heapq# Top-K largest — min-heap of size k, O(n log k)def top_k_largest(nums: list[int], k: int) -> list[int]: heap = [] for num in nums: heapq.heappush(heap, num) if len(heap) > k: heapq.heappop(heap) # Remove smallest return sorted(heap, reverse=True)# K-way merge — merge K sorted listsdef merge_k_sorted(lists: list[list[int]]) -> list[int]: result = [] heap = [(lst[0], i, 0) for i, lst in enumerate(lists) if lst] heapq.heapify(heap) while heap: val, li, ei = heapq.heappop(heap) result.append(val) if ei + 1 < len(lists[li]): heapq.heappush(heap, (lists[li][ei + 1], li, ei + 1)) return result# Backend usage: merge paginated results from multiple DB shards# Median of streamclass MedianFinder: def __init__(self): self.lo = [] # max-heap (negate) for lower half self.hi = [] # min-heap for upper half def add_num(self, num: int): heapq.heappush(self.lo, -num) heapq.heappush(self.hi, -heapq.heappop(self.lo)) if len(self.hi) > len(self.lo): heapq.heappush(self.lo, -heapq.heappop(self.hi)) def find_median(self) -> float: if len(self.lo) > len(self.hi): return -self.lo[0] return (-self.lo[0] + self.hi[0]) / 2
🎯 Interview Reality
Key insight: Python only has min-heap. For max-heap: negate values.
Time complexity:
heappush/heappop: O(log n)
heapify: O(n) — build heap from list
nlargest/nsmallest(k, n): O(n log k)
Common mistake: Using max(list) in a loop — O(n²); use heap for repeated min/max queries.
2.6 Binary Search
Pattern Recognition
Binary search is not just "find element in sorted array." It's:
Find the boundary where a condition changes
Find the minimum/maximum that satisfies a condition
# Standard binary searchdef binary_search(nums: list[int], target: int) -> int: left, right = 0, len(nums) - 1 while left <= right: mid = left + (right - left) // 2 if nums[mid] == target: return mid elif nums[mid] < target: left = mid + 1 else: right = mid - 1 return -1# Binary search on answer — "minimize X such that condition holds"def minimize(feasible_fn, lo: int, hi: int) -> int: while lo < hi: mid = (lo + hi) // 2 if feasible_fn(mid): hi = mid # mid works, try smaller else: lo = mid + 1 # mid fails, try larger return lo# Find first position (left boundary)def find_first(nums: list[int], target: int) -> int: left, right = 0, len(nums) result = -1 while left < right: mid = (left + right) // 2 if nums[mid] == target: result = mid right = mid # Keep searching left elif nums[mid] < target: left = mid + 1 else: right = mid return result
🎯 Interview Reality
Recognizing binary search:
"Find minimum X such that..." → binary search on answer
Off-by-one: left <= right vs left < right (depends on pattern)
Infinite loop: not updating both left and right
2.7 DFS & Backtracking
# DFS — iterative (avoids stack overflow on deep graphs)def dfs_iterative(graph, start): visited = set(); stack = [start]; result = [] while stack: node = stack.pop() if node in visited: continue visited.add(node); result.append(node) stack.extend(reversed(graph.get(node, []))) return result# Backtracking template — combinations/subsetsdef combination_sum(candidates: list[int], target: int) -> list[list[int]]: result = [] def dfs(start: int, current: list[int], remaining: int): if remaining == 0: result.append(current[:]); return if remaining < 0: return # Prune for i in range(start, len(candidates)): current.append(candidates[i]) dfs(i + 1, current, remaining - candidates[i]) current.pop() # Undo choice dfs(0, [], target) return result# Pattern: choose → explore → unchoose
🎯 Interview Reality
"All combinations/subsets/permutations" → backtracking
Recursion vs iteration → recursion is cleaner; mention Python default limit of 1000
In interviews say:"For production code I'd use iterative DFS to avoid stack overflow on deep graphs. Python's default recursion limit is 1000 — I'd mention that and increase it or use iteration."
⚙️ Production Thinking
# DFS for role/permission hierarchy traversaldef get_all_permissions(role_id: str, role_graph: dict) -> set[str]: visited = set(); all_perms = set() def dfs(role: str): if role in visited: return visited.add(role) all_perms.update(role_graph[role].get('permissions', [])) for parent in role_graph[role].get('inherits_from', []): dfs(parent) dfs(role_id) return all_perms
2.8 Prefix Sum
Pattern Recognition
Precompute cumulative sums for O(1) range queries.
# Build: O(n) Query: O(1)def build_prefix(nums: list[int]) -> list[int]: prefix = [0] * (len(nums) + 1) for i, num in enumerate(nums): prefix[i + 1] = prefix[i] + num return prefixdef range_sum(prefix: list[int], left: int, right: int) -> int: return prefix[right + 1] - prefix[left]# Subarray sum equals K — prefix + hashmapdef subarray_sum(nums: list[int], k: int) -> int: count = 0; prefix = 0 seen = {0: 1} # prefix_sum -> frequency for num in nums: prefix += num count += seen.get(prefix - k, 0) seen[prefix] = seen.get(prefix, 0) + 1 return count
⚙️ Production Thinking
Prefix sum powers: analytics dashboards, billing calculations, SLA monitoring, time-series range queries.
Example: "total page views between day 10 and day 50" → build prefix once, answer any range in O(1).
2.9 Dynamic Programming
When to Recognize DP
Signal words: "minimum/maximum", "number of ways", "can/cannot achieve", "longest/shortest subsequence."
DP = optimal substructure + overlapping subproblems.
# Bottom-up DP templatedef dp_template(n: int) -> int: dp = [0] * (n + 1) dp[0] = base_case for i in range(1, n + 1): dp[i] = transition(dp, i) return dp[n]# 0/1 Knapsack — resource allocationdef knapsack(weights: list[int], values: list[int], capacity: int) -> int: n = len(weights) dp = [[0] * (capacity + 1) for _ in range(n + 1)] for i in range(1, n + 1): for w in range(capacity + 1): dp[i][w] = dp[i-1][w] # Don't take item if weights[i-1] <= w: dp[i][w] = max(dp[i][w], dp[i-1][w - weights[i-1]] + values[i-1]) return dp[n][capacity]# Backend usage: feature flag allocation, resource scheduling
🎯 Interview Reality
DP in backend interviews: Less common than arrays/graphs but shows up for optimization problems. Interviewers care about recognizing it and explaining the recurrence clearly.
Always start with:"For each item I have two choices: include or exclude. The recurrence is dp[i][w] = max(dp[i-1][w], dp[i-1][w-wi] + vi). Base case: 0 items or 0 capacity → 0 value."
2.10 Graph Algorithms
# Detect cycle in directed graph (3-color DFS)def has_cycle(graph: dict) -> bool: WHITE, GRAY, BLACK = 0, 1, 2 color = {node: WHITE for node in graph} def dfs(node) -> bool: color[node] = GRAY for neighbor in graph.get(node, []): if color[neighbor] == GRAY: return True # Back edge = cycle if color[neighbor] == WHITE and dfs(neighbor): return True color[node] = BLACK return False return any(dfs(n) for n in graph if color[n] == WHITE)# Dijkstra's — weighted shortest pathdef dijkstra(graph: dict, start: str) -> dict[str, float]: distances = {node: float('inf') for node in graph} distances[start] = 0 heap = [(0, start)] while heap: dist, node = heapq.heappop(heap) if dist > distances[node]: continue for neighbor, weight in graph.get(node, []): new_dist = dist + weight if new_dist < distances[neighbor]: distances[neighbor] = new_dist heapq.heappush(heap, (new_dist, neighbor)) return distances# Union-Find — connected componentsclass UnionFind: def __init__(self, n: int): self.parent = list(range(n)) self.rank = [0] * n def find(self, x: int) -> int: if self.parent[x] != x: self.parent[x] = self.find(self.parent[x]) # Path compression return self.parent[x] def union(self, x: int, y: int) -> bool: px, py = self.find(x), self.find(y) if px == py: return False if self.rank[px] < self.rank[py]: px, py = py, px self.parent[py] = px if self.rank[px] == self.rank[py]: self.rank[px] += 1 return True
Example (2 min) — trace through a small example manually
Approach (3 min) — state brute force, then optimize; get buy-in before coding
Code (10–15 min) — clean variable names, explain choices as you go
Test (3 min) — trace your example, check edge cases
Complexity Template
Always analyze BOTH time AND space:
Time: O(n log n) — one sort O(n log n) + one linear pass O(n)
Space: O(n) — hashmap in worst case (all unique elements)
Part 3: Async Python & FastAPI
3.1 Understanding async/await
Level 1 — Must Know
Core mental model:asyncio is cooperative multitasking on a single thread. Tasks voluntarily yield control at await points. No OS thread switching — just callbacks.
import asyncio# Sequential — 0.2s totalasync def sequential(): a = await fetch_user(1) # suspend, wait 0.1s b = await fetch_user(2) # suspend, wait 0.1s return a, b# Concurrent — ~0.1s totalasync def concurrent(): a, b = await asyncio.gather(fetch_user(1), fetch_user(2)) return a, b# With error handlingresults = await asyncio.gather(*tasks, return_exceptions=True)successes = [r for r in results if not isinstance(r, Exception)]
Level 2 — Strong Candidate Knowledge
# Semaphore — limit concurrency (protect downstream services)async def fetch_all(urls: list[str]) -> list[dict]: semaphore = asyncio.Semaphore(10) # Max 10 concurrent async def fetch_one(url: str) -> dict: async with semaphore: return await fetch(url) return await asyncio.gather(*[fetch_one(u) for u in urls])# Timeoutresult = await asyncio.wait_for(fetch(url), timeout=5.0)# Task group (Python 3.11+)async with asyncio.TaskGroup() as tg: task1 = tg.create_task(fetch_user(1)) task2 = tg.create_task(fetch_user(2))# Both done here, any exception propagates
The event loop is a single-threaded selector that polls I/O events. When a coroutine does await socket.read(), the OS registers a callback for when data arrives. The event loop runs other coroutines until data is ready. This is why async handles thousands of concurrent connections on one thread.
🎯 Interview Reality
"What happens if you call time.sleep() in an async function?" → blocks the entire event loop; use asyncio.sleep()
"When would you NOT use asyncio?" → CPU-bound work; use multiprocessing
"What's the difference between asyncio.gather and asyncio.wait?" → gather returns results in order; wait gives more control over cancellation
Strong event loop answer:"The event loop is a single-threaded scheduler. When a coroutine hits await on I/O, it registers a callback with the OS and suspends. The loop picks up the next runnable coroutine. When I/O completes, the OS notifies the loop, which resumes the coroutine. This is why async is great for I/O-bound work — one thread can handle thousands of concurrent connections."
GIL question:"asyncio runs on a single thread, so the GIL isn't a concern. For CPU-bound tasks I'd use multiprocessing to bypass the GIL, or loop.run_in_executor with a ProcessPoolExecutor."
3.2 FastAPI
Level 1 — Must Know
from fastapi import FastAPI, Depends, HTTPException, status, Query, Pathfrom pydantic import BaseModel, EmailStr, validatorapp = FastAPI(title="User API", version="1.0.0")class UserCreate(BaseModel): name: str email: EmailStr age: int @validator('age') def age_valid(cls, v): if not 0 <= v <= 150: raise ValueError('age must be 0–150') return vclass UserResponse(BaseModel): id: int; name: str; email: str class Config: from_attributes = True@app.post("/users", response_model=UserResponse, status_code=201)async def create_user(payload: UserCreate, db: AsyncSession = Depends(get_db)): user = User(**payload.dict()) db.add(user) await db.commit() await db.refresh(user) return user@app.get("/users/{user_id}", response_model=UserResponse)async def get_user( user_id: int = Path(..., gt=0), db: AsyncSession = Depends(get_db)): user = await db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") return user
Level 2 — Strong Candidate Knowledge
# Dependency injection — the heart of FastAPIasync def get_db() -> AsyncGenerator[AsyncSession, None]: async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raiseasync def get_current_user( token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)) -> User: payload = verify_token(token) # raises if invalid user = await db.get(User, payload['sub']) if not user: raise HTTPException(401, "User not found") return user# Chain dependenciesasync def get_active_user(user: User = Depends(get_current_user)) -> User: if not user.is_active: raise HTTPException(403, "Inactive user") return user# Middleware — cross-cutting concerns@app.middleware("http")async def request_id_middleware(request: Request, call_next): request_id = str(uuid.uuid4()) request.state.request_id = request_id response = await call_next(request) response.headers["X-Request-ID"] = request_id return response# Background tasks — fire and forget after response@app.post("/users")async def create_user(payload: UserCreate, background_tasks: BackgroundTasks, ...): user = await create_user_in_db(db, payload) background_tasks.add_task(send_welcome_email, user.email) # After response background_tasks.add_task(notify_analytics, user.id) return user
🎯 Interview Reality
BackgroundTasks vs Celery:
BackgroundTasks
Celery
Process
Same
Separate workers
Persistent on crash
No
Yes (broker)
Retries
Manual
Built-in
Scheduling
No
Yes (beat)
Best for
Quick post-request work
Heavy/scheduled jobs
Strong answer on DI:"FastAPI's DI resolves dependencies before calling the endpoint. I can chain them — get_active_user depends on get_current_user which depends on get_db. FastAPI handles call order and caches results within a request. The endpoint just declares what it needs."
# Refresh token rotation — production patternasync def refresh_tokens(refresh_token: str, db) -> tuple[str, str]: try: payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=["HS256"]) except jwt.InvalidTokenError: raise InvalidTokenError("Invalid refresh token") # Check not revoked record = await db.execute( select(RefreshToken).where( RefreshToken.token == refresh_token, RefreshToken.revoked == False ) ) if not record.scalar_one_or_none(): raise InvalidTokenError("Token already used or revoked") # Revoke old, issue new (rotation) await db.execute( update(RefreshToken) .where(RefreshToken.token == refresh_token) .values(revoked=True) ) new_access = create_access_token(int(payload["sub"]), payload["roles"]) new_refresh = create_refresh_token(int(payload["sub"])) await db.commit() return new_access, new_refresh
🎯 Interview Reality
"JWT is signed, NOT encrypted — never put sensitive data in payload"
"How do you invalidate a JWT?" → Redis blocklist of JTIs; short expiry + refresh token rotation
"Why not sessions?" → stateless; works across multiple servers without shared state
Strong answer on JWT tradeoffs:"JWTs can't be invalidated before expiry since they're stateless. I'd keep access tokens short-lived (15–30 min) and use refresh token rotation — each use issues a new refresh token and revokes the old. For immediate invalidation (logout/ban), I maintain a Redis blocklist of revoked JTIs."
3.4 Database & SQLAlchemy
Level 1 — Must Know
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationshipfrom sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmakerclass Base(DeclarativeBase): passclass User(Base): __tablename__ = "users" id: Mapped[int] = mapped_column(primary_key=True) email: Mapped[str] = mapped_column(String(255), unique=True, index=True) name: Mapped[str] = mapped_column(String(100)) created_at: Mapped[datetime] = mapped_column(server_default=func.now()) orders: Mapped[list["Order"]] = relationship(back_populates="user")# The N+1 Problem — most common DB performance bug# ❌ WRONG: N+1 queriesusers = (await db.execute(select(User))).scalars().all()for user in users: orders = await db.execute(select(Order).where(Order.user_id == user.id)) # N extra!# ✅ CORRECT: eager loadingfrom sqlalchemy.orm import selectinload, joinedloadusers = await db.execute( select(User).options(selectinload(User.orders)) # One extra query, not N)# joinedload — for one-to-one/few (JOIN instead of separate query)
"Explain the N+1 problem" → 1 query for list + N queries per item; fix with selectinload/joinedload
"Cursor vs offset pagination?" → offset does full O(n) scan; cursor seeks by index O(1)
"How do you handle concurrent writes?" → optimistic locking (version column) or SELECT FOR UPDATE
Strong answer:"N+1 happens when you load a collection, then loop and query each item's relationships. Fix: eager loading — selectinload for one-to-many (separate batch query) or joinedload for one-to-one (JOIN). I'd also add query logging in staging to catch N+1 before production."
# Cache-aside pattern — standard approachasync def get_user_cached(user_id: int) -> Optional[dict]: cache_key = f"user:{user_id}" cached = await r.get(cache_key) if cached: return json.loads(cached) user = await db.get(User, user_id) if user: await r.set(cache_key, json.dumps(user.to_dict()), ex=300) return user.to_dict() if user else None# Distributed lock with Lua (atomic release)@asynccontextmanagerasync def redis_lock(r, lock_name: str, timeout: int = 10): lock_value = str(uuid.uuid4()) lock_key = f"lock:{lock_name}" acquired = await r.set(lock_key, lock_value, nx=True, ex=timeout) if not acquired: raise LockNotAcquiredError(lock_name) try: yield finally: release_script = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) end return 0 """ await r.eval(release_script, 1, lock_key, lock_value)
🎯 Interview Reality
Cache stampede prevention:"When cache expires, many requests hit the DB simultaneously. Solutions: (1) Mutex — only one request fetches, others wait; (2) Probabilistic early expiry — randomly refresh before TTL expires; (3) Stale-while-revalidate — return stale data while refreshing async."
Redis vs Memcached:"Redis has more data structures, persistence, pub/sub, and Lua scripting. Memcached is simpler and faster for pure caching with no persistence needs. In most backend stacks today, Redis is the default choice."
3.6 Celery & Background Jobs
Level 1 — Must Know
from celery import Celerycelery = Celery( "myapp", broker="redis://localhost:6379/0", # Task queue backend="redis://localhost:6379/1", # Result store)celery.conf.update( task_soft_time_limit=300, # 5 min — raises SoftTimeLimitExceeded task_time_limit=360, # 6 min — kills worker worker_max_tasks_per_child=1000 # Restart to prevent memory leaks)@celery.task(bind=True, max_retries=3)def send_email(self, user_id: int, template: str): try: user = get_user(user_id) email_service.send(user.email, template) except EmailServiceError as e: raise self.retry(exc=e, countdown=2 ** self.request.retries)# Call taskssend_email.delay(user_id=1, template="welcome") # Async nowsend_email.apply_async(args=[1, "welcome"], countdown=60) # Delayed
3.7 Logging & Monitoring
Level 1 — Must Know
# Structured JSON logging — always use in productionimport logging, jsonfrom contextvars import ContextVarrequest_id_var: ContextVar[str] = ContextVar('request_id', default='')class JSONFormatter(logging.Formatter): def format(self, record: logging.LogRecord) -> str: return json.dumps({ "timestamp": datetime.utcnow().isoformat(), "level": record.levelname, "message": record.getMessage(), "request_id": request_id_var.get(), "module": record.module, "line": record.lineno, **({"exception": self.formatException(record.exc_info)} if record.exc_info else {}) })# Health check endpoint — required for load balancers, k8s@app.get("/health")async def health_check(): checks = {} status_code = 200 try: await db.execute(text("SELECT 1")); checks["database"] = "ok" except Exception as e: checks["database"] = f"error: {e}"; status_code = 503 try: await redis.ping(); checks["redis"] = "ok" except Exception as e: checks["redis"] = f"error: {e}"; status_code = 503 return JSONResponse( content={"status": "ok" if status_code == 200 else "degraded", "checks": checks}, status_code=status_code )# /health/live — is process running? (don't restart if DB is down)# /health/ready — ready to serve traffic? (remove from LB if degraded)
3.8 Testing
Level 1 — Must Know
import pytestfrom httpx import AsyncClient, ASGITransportfrom sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker@pytest.fixtureasync def db(): engine = create_async_engine("sqlite+aiosqlite:///:memory:") async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) async with async_sessionmaker(engine)() as session: yield session await engine.dispose()@pytest.fixtureasync def client(db): app.dependency_overrides[get_db] = lambda: db async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: yield ac app.dependency_overrides.clear()@pytest.mark.asyncioasync def test_create_user(client): response = await client.post("/users", json={"name": "Alice", "email": "a@b.com", "age": 25}) assert response.status_code == 201 data = response.json() assert data["name"] == "Alice" assert "id" in data# Mocking external servicesfrom unittest.mock import AsyncMock, patch@pytest.mark.asyncioasync def test_email_sent_on_register(client): with patch('myapp.services.email.send_email', new_callable=AsyncMock) as mock_email: await client.post("/users", json={"name": "Alice", "email": "a@b.com"}) mock_email.assert_called_once_with("a@b.com", "welcome")
🎯 Interview Reality
Testing pyramid for backend APIs:
Unit tests (~60%): Business logic, validators — no DB, no HTTP
Integration tests (~30%): API endpoints with real test DB, mocked external services
E2E tests (~10%): Full stack, staging environment
What interviewers want to hear:"I write unit tests for business logic in isolation using mocks. Integration tests cover the API layer with a real test database — I use SQLite in-memory or TestContainers. I mock external APIs like Stripe or SendGrid. For critical paths, I have contract tests to verify service boundaries."
Part 4: PostgreSQL, System Design & Architecture
4.1 PostgreSQL for Backend Engineers
Level 1 — Must Know
-- EXPLAIN ANALYZE — your debugging superpowerEXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)SELECT u.id, u.name, COUNT(o.id) as order_countFROM users uLEFT JOIN orders o ON o.user_id = u.idWHERE u.created_at > NOW() - INTERVAL '30 days'GROUP BY u.id, u.nameORDER BY order_count DESC LIMIT 20;-- What to look for:-- ❌ Seq Scan on large table → add index-- ❌ High rows estimate mismatch → run ANALYZE (stale statistics)-- ✅ Index Scan / Index Only Scan-- Indexing strategyCREATE INDEX CONCURRENTLY idx_users_email ON users(email);-- Composite — equality conditions FIRSTCREATE INDEX idx_orders_user_status ON orders(user_id, status);-- Useful for: WHERE user_id = ? AND status = ?-- NOT useful for: WHERE status = ? (no user_id prefix)-- Partial — only index a subset (smaller, faster)CREATE INDEX idx_active_users ON users(email) WHERE is_active = true;-- Covering — include extra cols to avoid heap fetchCREATE INDEX idx_orders_covering ON orders(user_id) INCLUDE (id, status, created_at);
Level 2 — Strong Candidate Knowledge
-- SELECT FOR UPDATE — pessimistic locking for bank transfersBEGIN;SELECT * FROM accounts WHERE id = 1 FOR UPDATE; -- Lock this rowUPDATE accounts SET balance = balance - 100 WHERE id = 1;COMMIT;-- SKIP LOCKED — queue processing pattern (no blocking)SELECT * FROM jobs WHERE status = 'pending'LIMIT 1 FOR UPDATE SKIP LOCKED;-- Window functions — analytics without multiple queriesSELECT user_id, amount, SUM(amount) OVER (PARTITION BY user_id ORDER BY order_date) AS running_total, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY order_date DESC) AS recency_rankFROM orders;-- JSONB for flexible schemasCREATE TABLE events ( id SERIAL PRIMARY KEY, type VARCHAR(50), payload JSONB, created_at TIMESTAMPTZ DEFAULT NOW());CREATE INDEX idx_events_payload ON events USING gin(payload);SELECT * FROM events WHERE payload @> '{"action": "login"}';
🎯 Interview Reality
"How do you optimize a slow query?" → EXPLAIN ANALYZE, check Seq Scans, add appropriate indexes
"What isolation level for bank transfers?" → REPEATABLE READ or SERIALIZABLE
"What is MVCC?" → PostgreSQL never overwrites; creates new row versions; VACUUM reclaims dead rows
Strong answer:"First I'd run EXPLAIN ANALYZE to see the plan. I'd look for Seq Scans on large tables, stale statistics (row count mismatch), and expensive sorts. Common fixes: composite index on WHERE/JOIN columns, partial index for common filters, ANALYZE to update statistics."
4.2 System Design Fundamentals
Level 1 — Must Know
Concept
Definition
When to use
Horizontal scaling
Add more servers
Stateless services, read replicas
Vertical scaling
Bigger server
DB writes before horizontal
Sharding
Split DB by key
user_id % N for user data
Replication
Copy data across servers
Read replicas, high availability
CDN
Cache at edge
Static assets, images, JS/CSS
Message queue
Async communication
Decouple services, buffer spikes
Level 2 — Strong Candidate Knowledge
CAP Theorem: In a distributed system, you can have at most 2 of: Consistency, Availability, Partition Tolerance. Since partitions happen, you choose CP or AP.
CP examples: PostgreSQL, Redis cluster with quorum
AP examples: Cassandra, DynamoDB, CouchDB
QPS estimation template:
10M users × 10% DAU = 1M DAU
1M × 10 requests/day = 10M req/day
10M / 86400 ≈ 115 QPS average, 1150 QPS peak (10x multiplier)
🎯 Interview Reality
System design answer structure:
Clarify requirements and scale (2–3 min)
Estimate QPS, storage, bandwidth (1–2 min)
High-level design: boxes and arrows (3–5 min)
Deep dive on requested component (10–15 min)
Discuss tradeoffs proactively
What separates senior answers:
Mid: "I'd cache this in Redis."
Senior: "I'd cache product listings in Redis with 5-min TTL. On miss, I'd use a mutex to prevent stampede — only one request fetches from DB while others wait. Monitor hit rate — if below 80%, TTL is too short."
# Best practices:# 1. Never delete a column in the same migration as removing code# 2. Always test downgrade() works# 3. Use CONCURRENTLY for index creation in production# 4. Data migrations go in separate scripts, not schema migrations# Zero-downtime deployment order:# Phase 1: Deploy migration (add nullable column)# Phase 2: Deploy app code (reads new column, writes both)# Phase 3: Backfill data (background job)# Phase 4: Add NOT NULL constraint, remove old columndef upgrade(): # Step 1: Add nullable (instant, no lock) op.add_column('users', sa.Column('phone', sa.String(20), nullable=True))def downgrade(): op.drop_column('users', 'phone')
"In development, I use .env files with pydantic-settings — .env is in .gitignore. In production, secrets come from environment variables injected by infrastructure: AWS Secrets Manager, Kubernetes Secrets, or Vault. I never commit secrets. For rotation: secret versioning + zero-downtime deploy."
ALPHABET = string.ascii_letters + string.digits # 62 chars# Base62 encode DB ID — guaranteed unique, no collisiondef id_to_code(id: int) -> str: code = [] while id > 0: code.append(ALPHABET[id % 62]) id //= 62 return ''.join(reversed(code)) or ALPHABET[0]class ShortenRequest(BaseModel): url: HttpUrl custom_alias: Optional[str] = None ttl_days: Optional[int] = None@app.post("/shorten", response_model=ShortenResponse)async def shorten_url(request: ShortenRequest, db=Depends(get_db)): url_obj = ShortenedURL( original_url=str(request.url), expires_at=datetime.utcnow() + timedelta(days=request.ttl_days) if request.ttl_days else None ) db.add(url_obj) await db.flush() # Get auto-increment ID url_obj.short_code = request.custom_alias or id_to_code(url_obj.id) await db.commit() return ShortenResponse(short_url=f"https://short.ly/{url_obj.short_code}")@app.get("/{short_code}")async def redirect(short_code: str, background_tasks: BackgroundTasks, db=Depends(get_db)): # Check Redis cache first cached = await redis.get(f"url:{short_code}") if cached: background_tasks.add_task(increment_click, short_code) return RedirectResponse(cached, status_code=301) url_obj = (await db.execute( select(ShortenedURL).where( ShortenedURL.short_code == short_code, ShortenedURL.is_active == True ) )).scalar_one_or_none() if not url_obj: raise HTTPException(404, "Not found") if url_obj.expires_at and url_obj.expires_at < datetime.utcnow(): raise HTTPException(410, "URL expired") await redis.set(f"url:{short_code}", url_obj.original_url, ex=3600) background_tasks.add_task(increment_click, short_code) return RedirectResponse(url_obj.original_url, status_code=301)
Scaling Discussion
Hot URL / Cache stampede:
"Popular URLs will hammer the DB on every redirect. I'd cache short_code → original_url in Redis. Cache hit rate should be 99%+ for viral URLs."
Click count accuracy vs performance:
"For viral URLs, updating click_count on every request creates write contention. I'd batch: Redis INCR per request, background job flushes to DB every 60s. Trades perfect accuracy for scalability."
Problem 2: Rate Limiter
Fixed Window vs Sliding Window
Fixed window has a boundary problem: 100 requests at 00:59 + 100 at 01:01 = 200 requests in 2 seconds. Sliding window tracks exact timestamps.
No indexes on foreign keys and frequently queried columns
Part 6: Interview Communication & Behavioral
6.1 "Tell Me About Yourself"
90 seconds, structured, ends with why you're here.
Template:
"I'm a backend engineer with [X] years of experience in [domain]. Most recently at [Company], I [one impressive thing]. My primary stack is Python — FastAPI, async SQLAlchemy, Redis. I'm looking for [what you want] because [why this company]."
Strong Example:
"I'm a backend engineer with 4 years of experience, mostly fintech. At my current company I led migration of our payment service from monolith to microservices, reducing P99 from 800ms to 120ms. I own the core auth system serving 2M users. I'm looking for a role where I can own backend architecture end-to-end, which is why I'm excited about this position."
Situation: Product listing API timing out at 3–4s. 50k users, 500 req/min peak.
Task: Investigate and fix the performance issue.
Action: EXPLAIN ANALYZE revealed 15 DB queries per request (N+1 on categories). Fixed with selectinload — dropped to 800ms. Added Redis cache-aside with 5-min TTL. Added rate limiting at 100 req/min/user.
Result: P95 dropped from 3.4s to 85ms. Cache hit rate 94%. Zero timeouts for 6 months.
6.3 How to Explain Technical Decisions
Weak:"I used Redis for caching."
Strong:"I used Redis because our product catalog is read-heavy with infrequent writes — ideal cache candidate. Redis specifically because it's already in our stack, supports TTL natively, and gives O(1) gets/sets. Memcached would also work, but Redis gives us sorted sets for leaderboards if needed later."
Rule: Always justify choices, not just state them. Mention the tradeoffs you considered.
6.4 "Tell Me About a Failure"
Template:
What happened (factual, take ownership)
Why it happened (root cause, no excuses)
Immediate action (incident response)
Permanent fix (systemic prevention)
Example:
"I deployed a migration that dropped a column still being read by production — 4-minute outage. Root cause: I ran the schema change before deploying new code (wrong order). Immediately rolled back. Post-mortem: established a deployment runbook (code first, then schema), added schema compatibility checks in CI. Zero migration-related outages since."
6.5 Common Follow-up Questions & Strong Answers
"How would you handle [service going down]?"
"Add circuit breakers — after 5 failures in 10 seconds, open the circuit and fail fast with a fallback. Retries with exponential backoff for transient failures. Health checks and auto-restart in Kubernetes."
"What if the database becomes a bottleneck?"
"Identify: read-heavy or write-heavy? For reads: read replicas + Redis cache. For writes: vertical scaling first (cheaper), then sharding by user_id if needed. For both: CQRS — separate read and write paths."
"How do you debug a performance issue in production?"
"Start with metrics: which endpoint is slow? APM traces to find bottleneck — DB, external service, or computation? For DB: EXPLAIN ANALYZE, check missing indexes, check connection pool exhaustion. Use py-spy for async profiling without stopping the process."
"How do you ensure consistency in distributed systems?"
"Depends on requirement. For financial: saga pattern or 2-phase commit. For non-critical: eventual consistency with idempotent operations. Redis distributed locks for operations that must not run concurrently."
6.6 Questions to Ask Interviewers
Technical:
"What's the biggest technical challenge the team is facing right now?"
"How does the team handle on-call and incident response?"
"How long does a change take to reach production?"
"How do you handle zero-downtime database migrations?"
Team & Culture:
"How much ownership do engineers have over architecture decisions?"
"What does success look like in the first 6 months?"
6.7 Concurrency Quick Reference
Pattern
When to use
Python tool
Async I/O
Many concurrent I/O operations
asyncio, FastAPI
Thread pool
I/O + existing sync libraries
ThreadPoolExecutor
Process pool
CPU-bound parallel work
ProcessPoolExecutor
Queue-based workers
Background jobs, retries, scheduling
Celery
Rate limiting
Protect downstream services
asyncio.Semaphore
Concurrent requests
Fan-out to multiple services
asyncio.gather
# Decision treeif task_is_cpu_bound: use(ProcessPoolExecutor)elif task_needs_retry_and_persistence: use(Celery)elif task_is_io_bound and high_concurrency: use(asyncio)elif task_is_io_bound and existing_sync_library: use(ThreadPoolExecutor)
Part 7: Quick Reference, Production Patterns & Interview Checklist
7.1 HTTP Status Codes — Must Know
Code
When to use
200 OK
Successful GET, PUT, PATCH
201 Created
Successful POST that created a resource
204 No Content
Successful DELETE or action with no body
400 Bad Request
Malformed request, missing required fields
401 Unauthorized
Not authenticated (missing/invalid token)
403 Forbidden
Authenticated but not authorized
404 Not Found
Resource does not exist
409 Conflict
Duplicate resource (email already exists)
422 Unprocessable Entity
Validation failed (FastAPI/Pydantic default)
429 Too Many Requests
Rate limited
500 Internal Server Error
Unexpected server error
503 Service Unavailable
Dependency down (DB, external API)
7.2 Common Python Bugs — Interview Traps
# Bug 1: Mutable default argumentdef f(lst=[]): ... # ❌def f(lst=None): ... # ✅# Bug 2: Late binding closuresfuncs = [lambda: i for i in range(3)] # [2, 2, 2] ❌funcs = [lambda i=i: i for i in range(3)] # [0, 1, 2] ✅# Bug 3: Modifying list while iteratingfor x in lst: if cond(x): lst.remove(x) # ❌ skips elementslst = [x for x in lst if not cond(x)] # ✅# Bug 4: Bare except catches everythingexcept: # ❌ catches KeyboardInterrupt, SystemExitexcept Exception: # ✅# Bug 5: String concatenation in loops — O(n²)result = ""for s in strings: result += s # ❌result = "".join(strings) # ✅ O(n)# Bug 6: Blocking the event loopasync def f(): time.sleep(5) # ❌ blocks entire loopasync def f(): await asyncio.sleep(5) # ✅# Bug 7: Coroutine not awaiteduser = get_user() # ❌ returns coroutine objectuser = await get_user() # ✅
7.3 Big-O Quick Reference
Operation
Data Structure
Time Complexity
Lookup by key
dict / set
O(1) average
Append
list
O(1) amortized
Insert at index
list
O(n)
Search
list
O(n)
heappush / heappop
heap
O(log n)
Sort
list.sort()
O(n log n)
BFS / DFS
graph
O(V + E)
Binary search
sorted array
O(log n)
Build prefix sum
array
O(n)
Range sum query
prefix array
O(1)
Space complexity reminders:
Recursion depth = O(depth) stack space
BFS queue = O(width of tree/graph)
Memoization = O(number of subproblems)
7.4 Pre-ship Checklists
Database Performance
EXPLAIN ANALYZE on all new queries
No Seq Scans on tables > 10K rows
Foreign keys have indexes
WHERE clause columns have indexes
No N+1 queries (use selectinload/joinedload)
Connection pool configured (pool_size, max_overflow)
Password reset doesn't leak email existence (always return 200)
Sensitive data not logged
HTTPS enforced
7.5 Interview Day Checklist
DSA Interview
Clarify before coding (duplicates? constraints? return type?)
State brute force, then optimize — get buy-in before coding
Write clean, readable variable names
Test with your own example, then edge cases
State time AND space complexity unprompted
System Design
Ask about scale before designing
Draw high-level diagram first
Justify every choice ("I chose X because...")
Mention tradeoffs proactively
Ask "what component should I dive deeper on?"
Machine Coding
Clarify requirements for 5–10 min
Define data models first
Start with working skeleton, then add features
State what you'd add if you had more time
7.6 The "Senior Engineer" Mindset
What separates senior-level from mid-level answers:
Mid-level
Senior-level
"I'd add an index."
"I'd add a composite index on (user_id, status) — equality condition first. Created CONCURRENTLY to avoid locks. I'd also check if a partial index on active users only would be smaller."
"I'd use async."
"Async because this endpoint makes 3 external API calls. With gather(), total time = max of 3, not sum. For the CPU-bound step, I'd offload to Celery with ProcessPoolExecutor workers."
"I'd cache this in Redis."
"Redis cache-aside with 5-min TTL. On miss, mutex to prevent stampede. Event-driven invalidation on write. Monitor hit rate — if below 80%, TTL is too short."
Machine coding projects + System design + Behavioral STAR stories + Mock interviews
Build these from scratch:
URL Shortener with Redis caching + click tracking
Task Manager with JWT auth + role-based access
Rate limiter middleware (sliding window)
Celery-based notification service with retries
Paginated API with cursor pagination + EXPLAIN ANALYZE verification
7.8 Topic Priority by Interview Type
Interview Type
High Priority
Medium Priority
Startup (early stage)
FastAPI, PostgreSQL, Docker, auth, deployment
Redis, Celery, testing
Product company (Series B+)
DSA (arrays/graphs/DP), system design, SQL optimization
FastAPI, async, Redis
Big tech (FAANG)
All DSA patterns, distributed system design
Python internals, concurrency
Fintech / Healthcare
Consistency, transactions, security, audit trails
Performance, caching
7.9 Resources
DSA Practice:
LeetCode — Apply these filters - Difficulty: Medium, tagged: Hash Table, Two Pointers, Sliding Window, BFS/DFS, Heap
Grind 75 list — focused 75 problems covering all patterns
System Design:
Designing Data-Intensive Applications by Kleppmann — the bible
ByteByteGo — visual explanations
High Scalability blog — real architecture stories
Python Backend:
FastAPI official docs
SQLAlchemy 2.0 docs
Architecture Patterns with Python (O'Reilly)
This document was created with ❤️ for Python Developers who want to crack backend interviews, by Anubhaw. Feel free to suggest improvements or connect @ ContactMe