Coding Fundamentals
The Python patterns full-stack DS interviewers actually reach for — production-flavored code, testability, vectorization, NumPy / pandas fluency, and the Big-O intuition that wins live-coding rounds.
What to expect
For staff-level full-stack DS, coding rounds skew toward "production-flavored" problems: write a function that another engineer could review, handle inputs cleanly, and be ready to discuss extensions. Staff-DS JDs name "production code and tests" explicitly.
You should expect:
- 30–60 minute live coding sessions.
- Problems framed in domain language (fraud, scoring, features) rather than pure algorithms.
- Follow-ups that add wrinkles — latency budget, streaming variant, scale requirement.
- An expectation that your code includes types, small functions, and at least one example test.
Classes & testability
When the function has state (a model, a threshold, a feature pipeline), wrap it in a class with explicit dependencies. Makes mocking and testing easy.
from typing import Protocol
class Model(Protocol):
def predict_proba(self, X: list[list[float]]) -> list[list[float]]: ...
class Scorer:
def __init__(self, model: Model, threshold: float):
self.model = model
self.threshold = threshold
def score(self, features: list[float]) -> dict:
prob = self.model.predict_proba([features])[0][1]
return {"prob": prob, "decision": "flag" if prob >= self.threshold else "pass"}
# Test with a fake model — no need to load a real one
class FakeModel:
def predict_proba(self, X):
return [[0.4, 0.6] for _ in X]
def test_scorer_flags_above_threshold():
s = Scorer(FakeModel(), threshold=0.5)
assert s.score([1.0, 2.0])["decision"] == "flag"
The senior signal: writing the test alongside the class without being asked.
Generators & streaming
For data that doesn't fit in memory or arrives over time, use generators. They yield one item at a time and never load the whole stream.
from collections import defaultdict
def read_events(path):
"""Stream events from a large file one at a time."""
with open(path) as f:
for line in f:
yield parse(line)
def aggregate_by_user(events):
counts = defaultdict(int)
for e in events:
counts[e["user_id"]] += 1
return counts
counts = aggregate_by_user(read_events("huge_log.jsonl"))
The interview tell: when the problem says "a large file," reach for generators. When the problem says "streaming," reach for generators or async iteration.
Vectorization vs loops
Loops over arrays / series are 10–100× slower than vectorized operations. For any data of meaningful size, vectorize.
import numpy as np
x = np.random.rand(1_000_000)
y = np.random.rand(1_000_000)
# Slow — Python loop
result_slow = [a * b for a, b in zip(x, y)]
# Fast — vectorized
result_fast = x * y
# Conditional logic: np.where, not Python if/else
flags = np.where(x > 0.95, "high", np.where(x > 0.5, "mid", "low"))
NumPy patterns
Broadcasting
Operations between arrays of different shapes follow rules — the smaller is broadcast over the larger. Master this and a lot of code shrinks.
import numpy as np
# Per-feature standardization without loops
X = np.random.randn(1000, 20) # 1000 rows, 20 features
mean = X.mean(axis=0) # shape (20,)
std = X.std(axis=0) # shape (20,)
X_normalized = (X - mean) / std # broadcast (1000, 20) - (20,)
Indexing tricks
- Boolean masking:
X[mask]selects rows where mask is True. - Fancy indexing:
X[indices]selects rows in arbitrary order. - argsort / argpartition: get indices for sorting, no need to sort the actual array.
Pandas patterns
Group-by-aggregate
import pandas as pd
# Multiple aggregations per group
df.groupby('user_id').agg(
n_events=('event_id', 'count'),
total_spend=('amount', 'sum'),
first_event=('event_at', 'min'),
)
# Window functions
df['cumulative_spend'] = df.groupby('user_id')['amount'].cumsum()
df['rank_within_user'] = df.groupby('user_id')['event_at'].rank('dense')
df['days_since_prev'] = df.groupby('user_id')['event_at'].diff().dt.days
Merge / join
Pandas defaults to inner join. Always specify how. Always check row counts before and after a merge — silent row duplication is a common bug.
The asof merge
For "what was the value of X at the time of event Y" — standard time-series alignment problem. Use pd.merge_asof with direction='backward' and a tolerance.
Use pandas for analysis and prototyping. For production paths that need to scale past a few GB or run in real-time, either push to SQL or use a more performant alternative (Polars, DuckDB). Knowing when to leave pandas is part of the staff signal.
Testing & reproducibility
- Pytest for unit and integration tests.
- Fixtures for shared setup. Don't duplicate the model-loading boilerplate across tests.
- Parameterize tests over cases. One test, many cases, clean output on failure.
- Set random seeds wherever a model fit, sample, or train/test split happens. Reproducibility for free.
Big-O cheat sheet
| Operation | Complexity |
|---|---|
| Dict / set lookup, insert | O(1) amortized |
| Sort | O(n log n) |
| Heap push/pop | O(log n) |
| Top-K via heap | O(n log K) |
| NumPy vectorized op | O(n), constant ~100× smaller than Python loop |
| Pandas groupby aggregate | O(n) for built-ins, much slower for .apply with Python function |
| Boolean mask on array of size n | O(n) |
| String concat in loop | O(n²) in Python — use list + join |
Live-coding tips for full-stack DS rounds
- Clarify the input: types, shape, edge cases. fraud-domain-flavored problems often have NULL/missing data buried in the spec.
- State approach + complexity before coding.
- Write tests first or alongside. Especially for transformation functions.
- Use types:
def f(x: list[int]) -> dict[str, float]:— interviewers notice. - Talk through tradeoffs out loud: "I'm using a dict for O(1) lookup; could also sort and binary-search."
- Handle edge cases explicitly: empty input, single element, ties.
- Anticipate the production follow-up: "What would change at 1B rows?" Have an answer.
- When stuck, narrate — silence is the enemy.