Drill — Solutions Hidden by Default

Coding Problems

Drillable problems for full-stack DS loops — heavy on feature engineering, evaluation, and production patterns; lighter on classical LeetCode. Multiple approaches per problem.

🎯 10 problems ⏱ ~20 min each 📊 Progress saved
0 / 10 practiced

A · Feature engineering

P1. Compute entity-velocity features

Given events(user_id, ssn, device_id, applied_at), for each row compute: distinct SSNs touched by this device in the trailing 24h; distinct devices touched by this SSN in the trailing 24h.

Show solution
velocity_features.py
import pandas as pd

def velocity_features(events: pd.DataFrame) -> pd.DataFrame:
    events = events.sort_values('applied_at').reset_index(drop=True)
    results = events.copy()
    results['distinct_ssns_per_device_24h'] = 0
    results['distinct_devices_per_ssn_24h'] = 0

    # For each row, find rows within 24h trailing per entity
    for i, row in events.iterrows():
        window_start = row['applied_at'] - pd.Timedelta('24h')
        in_window = events[
            (events['applied_at'] >= window_start) &
            (events['applied_at'] <= row['applied_at'])
        ]
        same_device = in_window[in_window['device_id'] == row['device_id']]
        same_ssn = in_window[in_window['ssn'] == row['ssn']]
        results.at[i, 'distinct_ssns_per_device_24h'] = same_device['ssn'].nunique()
        results.at[i, 'distinct_devices_per_ssn_24h'] = same_ssn['device_id'].nunique()
    return results

Production discussion: the naive solution is O(n²). At scale, push to SQL (with windowed self-joins or partitioned analytics) or to a precomputed feature store. Leakage: the window must be strictly < current event for serving correctness — here we include current row, which is fine for training but would mean "looking at self" in production.

P2. Time-since-last-event

Given events(user_id, event_at), compute time_since_last_event_minutes for each row (NULL for first event per user).

Show solution
time_since_last.py
import pandas as pd

def time_since_last(df: pd.DataFrame) -> pd.DataFrame:
    df = df.sort_values(['user_id', 'event_at']).reset_index(drop=True)
    prev = df.groupby('user_id')['event_at'].shift(1)
    df['time_since_last_minutes'] = (df['event_at'] - prev).dt.total_seconds() / 60
    return df

One line if you're fluent with groupby + shift. The interviewer will probably ask to extend: rolling window stats (events in last hour), or vectorize without pandas for streaming.

P3. Detect stuck-sensor runs

Given a series of sensor readings (timestamp, value), flag any run of 10+ consecutive samples with identical value.

Show solution
stuck_detection.py
import pandas as pd

def flag_stuck(series: pd.Series, run_threshold: int = 10) -> pd.Series:
    # 'Same as previous' boolean
    same = (series.diff() == 0).astype(int)
    # Cumulative run length resets when same==0
    run_length = same * (same.groupby((same != same.shift()).cumsum()).cumcount() + 1)
    return run_length >= run_threshold

Trap: the standard "rolling sum >= threshold" misses the leading samples of the run. The cumcount-reset pattern correctly numbers each run.

B · Evaluation

P4. Lift at top K%

Given scores and labels (0/1), compute the lift at the top 5%: ratio of positive rate among top 5% scored to overall positive rate.

Show solution
lift_at_k.py
import numpy as np

def lift_at_k(scores: np.ndarray, labels: np.ndarray, k_pct: float = 0.05) -> float:
    n = len(scores)
    k = int(np.ceil(n * k_pct))
    # Indices of top k by score (descending)
    top_idx = np.argpartition(-scores, k - 1)[:k]
    top_rate = labels[top_idx].mean()
    base_rate = labels.mean()
    return top_rate / base_rate if base_rate > 0 else float('inf')

argpartition is O(n) for picking top k; argsort would be O(n log n) — both work, but argpartition matters at scale.

P5. Pick threshold for fixed precision

Given scores and labels, find the lowest threshold such that precision ≥ 0.8 among flagged.

Show solution
fixed_precision_threshold.py
import numpy as np

def threshold_for_precision(scores, labels, target_precision=0.8):
    # Sort descending by score
    order = np.argsort(-scores)
    sorted_labels = labels[order]
    sorted_scores = scores[order]
    # Cumulative TP at each cutoff
    cum_tp = np.cumsum(sorted_labels)
    n_flagged = np.arange(1, len(scores) + 1)
    precision = cum_tp / n_flagged
    # Find the largest n_flagged where precision >= target (lowest threshold)
    valid = precision >= target_precision
    if not valid.any():
        return None  # impossible
    last_valid_idx = np.where(valid)[0].max()
    return float(sorted_scores[last_valid_idx])

Edge case: when no threshold achieves target precision, return None. Trap: with ties in scores, the threshold isn't unique — pick the lower one and document.

P6. PSI between two distributions

Compute the Population Stability Index between a baseline distribution and a current distribution of a feature.

Show solution
psi.py
import numpy as np

def psi(baseline: np.ndarray, current: np.ndarray, n_bins: int = 10) -> float:
    # Use baseline's quantiles to define bin edges
    edges = np.quantile(baseline, np.linspace(0, 1, n_bins + 1))
    edges[0], edges[-1] = -np.inf, np.inf
    base_counts, _ = np.histogram(baseline, bins=edges)
    curr_counts, _ = np.histogram(current,  bins=edges)
    # Avoid log(0)
    base_pct = (base_counts + 1) / (base_counts.sum() + n_bins)
    curr_pct = (curr_counts + 1) / (curr_counts.sum() + n_bins)
    return float(np.sum((curr_pct - base_pct) * np.log(curr_pct / base_pct)))

Common thresholds: PSI < 0.1 = no shift; 0.1–0.25 = moderate; > 0.25 = significant shift requiring attention. The +1 Laplace smoothing prevents the log(0) trap on sparse bins.

C · Production patterns

P7. LRU feature cache with TTL

Implement a cache that evicts the least-recently-used entry when capacity is exceeded, and treats entries older than TTL as missing.

Show solution
ttl_lru.py
from collections import OrderedDict
import time

class TTLLRUCache:
    def __init__(self, capacity: int, ttl_seconds: float):
        self.capacity = capacity
        self.ttl = ttl_seconds
        self.data: OrderedDict = OrderedDict()

    def get(self, key):
        if key not in self.data:
            return None
        value, ts = self.data[key]
        if time.time() - ts > self.ttl:
            del self.data[key]
            return None
        self.data.move_to_end(key)
        return value

    def set(self, key, value):
        self.data[key] = (value, time.time())
        self.data.move_to_end(key)
        if len(self.data) > self.capacity:
            self.data.popitem(last=False)  # evict LRU

P8. Rate limiter for an inference service

Implement a per-customer rate limiter that allows N requests per minute, sliding window.

Show solution
rate_limiter.py
from collections import defaultdict, deque
import time

class SlidingWindowLimiter:
    def __init__(self, max_per_minute: int):
        self.limit = max_per_minute
        self.windows: dict[str, deque] = defaultdict(deque)

    def allow(self, customer_id: str) -> bool:
        now = time.time()
        window = self.windows[customer_id]
        # Drop entries older than 60s
        while window and window[0] < now - 60:
            window.popleft()
        if len(window) >= self.limit:
            return False
        window.append(now)
        return True

Tradeoff discussion to mention unprompted: this is in-memory per-process. At scale you'd want Redis with sorted sets, or a token bucket — both more memory-efficient at high QPS. The sliding-window-log is exact but uses O(N) memory per customer.

D · Classic algorithms (lighter)

P9. Two-sum

Given an array of ints and a target, return indices of two elements summing to target.

Show solution
two_sum.py
def two_sum(nums: list[int], target: int) -> tuple[int, int] | None:
    seen = {}
    for i, n in enumerate(nums):
        if target - n in seen:
            return (seen[target - n], i)
        seen[n] = i
    return None

P10. Subarray sum equals K

Return the number of contiguous subarrays whose sum equals K. Negative numbers allowed.

Show solution
subarray_sum.py
def subarray_sum_eq_k(nums: list[int], k: int) -> int:
    seen = {0: 1}
    running = 0
    answer = 0
    for n in nums:
        running += n
        answer += seen.get(running - k, 0)
        seen[running] = seen.get(running, 0) + 1
    return answer

Prefix-sum + hashmap trick. O(n) time, O(n) space. Handles negatives — that's the difference from a sliding-window approach.

Drill protocol

How to use this page

Enable drill mode. Read each problem, give yourself 20 minutes on paper or in a notebook. Speak through approach + complexity out loud before writing. Reveal the solution. Compare. Mark "practiced." Aim for 6–7 cleared before any onsite — particularly the feature-engineering ones if you're targeting fraud-domain roles.