The from-scratch designs and implementations of important and influential Algorithms, ML Models, Data Structures, and Systems.
Algorithms
Boyer-Moore Majority Vote Algorithm
from numbers import Real
def majority_element(nums: list[Real]) -> Real | None:
"""Boyer-Moore Majority Vote Algorithm for finding the majority element."""
if not nums:
return None
candidate = 0
count = 0
for num in nums:
candidate = candidate if count else num
count += 1 if candidate == num else -1
return candidate
Bresenham's Circle Drawing Algorithm
import matplotlib.pyplot as plt
def bresenham(radius: float) -> None:
"""Draws a circle using Bresenham's Circle Drawing Algorithm."""
# Initial parameters
x = 0
y = radius
d = 3 - 2 * radius
# Consider the upper half of the first quadrant
while x <= y:
# Put eight points via eight-way symmetry
plt.scatter([x, x, -x, -x, y, y, -y, -y], [y, -y, y, -y, x, -x, x, -x])
# Make decisions and update parameters
if d <= 0:
d += 4 * x + 6
else:
d += 4 * (x - y) + 10
y -= 1
x += 1
# Show the drawing
plt.title("Circle Drawing Using Bresenham's Circle Drawing Algorithm")
plt.show()
Floyd's Cycle-Finding Algorithm
def floyd(head: LinkedListNode | None) -> bool:
"""Checks for a cycle using Floyd's cycle-finding algorithm."""
slow = head
fast = head
while fast and fast.next:
slow = slow.next
fast = fast.next.next
if slow == fast:
return True
return False
Kadane's Algorithm
from numbers import Real
def max_subarray_sum(nums: list[Real]) -> Real | None:
"""Finds the maximum subarray sum using Kadane's algorithm."""
if not nums:
return None
cur = nums[0]
res = nums[0]
for idx in range(1, len(nums)):
cur = max(cur + nums[idx], nums[idx])
res = max(res, cur)
return res
Levenshtein Distance
def levenshtein_distance(word1: str, word2: str) -> int:
"""Computes the Levenshtein distance between two words."""
m, n = len(word1), len(word2)
dp = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(m + 1):
for j in range(n + 1):
if i * j == 0:
dp[i][j] = i + j
elif word1[i - 1] == word2[j - 1]:
dp[i][j] = dp[i - 1][j - 1]
else:
dp[i][j] = 1 + min(dp[i - 1][j], dp[i][j - 1], dp[i - 1][j - 1])
return dp[m][n]
Longest Common Subsequence
def longest_common_subsequence(word1: str, word2: str) -> int:
"""Computes the Longest Common Subsequence (LCS) between two words."""
m, n = len(word1), len(word2)
dp = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(1, m + 1):
for j in range(1, n + 1):
if word1[i - 1] == word2[j - 1]:
dp[i][j] = 1 + dp[i - 1][j - 1]
else:
dp[i][j] = max(dp[i - 1][j], dp[i][j - 1])
return dp[m][n]
Fair Coin from Unfair Coin
def unfair_flip() -> int:
"""An unfair flip: assume it is provided."""
...
def fair_flip() -> int:
"""Simulates a fair coin from an unfair coin (0 is heads and 1 is tails)."""
flip1, flip2 = unfair_flip(), unfair_flip()
return flip1 | 0 if flip1 ^ flip2 else fair_flip()
System Design
Cache (LFU)
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass
@dataclass
class Node:
"""Doubly linked list node."""
key: int | None = None
val: int | None = None
next: Node | None = None
prev: Node | None = None
freq: int = 1
class List:
"""Doubly linked list."""
def __init__(self) -> None:
"""Initializes the doubly linked list."""
self.head = Node()
self.tail = Node()
self.head.next = self.tail
self.tail.prev = self.head
def is_empty(self) -> bool:
"""Checks whether a doubly linked list is empty."""
return self.head.next == self.tail
def push_front(self, node: Node) -> None:
"""Moves a node to the front."""
head_next = self.head.next
self.head.next = node
head_next.prev = node
node.next = head_next
node.prev = self.head
def pop(self, node: Node | None = None) -> Node | None:
"""Removes and returns the last node."""
if self.is_empty():
return None
if node is None:
node = self.tail.prev
node.next.prev = node.prev
node.prev.next = node.next
return node
class LFUCache:
"""List Frequently Used (LFU) cache."""
def __init__(self, capacity: int) -> None:
"""Initializes the cache with the given capacity."""
self.capacity = capacity
self.key_to_node = {}
self.freqmap = defaultdict(List)
self.min_freq = 1
def _update(self, node: Node) -> None:
"""Performs operations needed to comply with the LFU cache requirements."""
self.freqmap[node.freq].pop(node)
if self.min_freq == node.freq and self.freqmap[node.freq].is_empty():
self.min_freq += 1
node.freq += 1
self.freqmap[node.freq].push_front(node)
def put(self, key: int, val: int) -> None:
"""Puts a key-value pair into the cache."""
if key in self.key_to_node:
node = self.key_to_node[key]
self._update(node)
node.val = val
else:
if len(self.key_to_node) == self.capacity:
node = self.freqmap[self.min_freq].pop()
del self.key_to_node[node.key]
new_node = Node(key, val)
self.key_to_node[key] = new_node
self.freqmap[new_node.freq].push_front(new_node)
self.min_freq = new_node.freq
def get(self, key: int) -> int | None:
"""Gets a value by key from the cache."""
if key in self.key_to_node:
node = self.key_to_node[key]
self._update(node)
return node.val
return None
Cache (LRU)
from __future__ import annotations
from dataclasses import dataclass
@dataclass
class Node:
"""Doubly linked list node."""
key: int | None = None
val: int | None = None
next: Node | None = None
prev: Node | None = None
class List:
"""Doubly linked list."""
def __init__(self) -> None:
"""Initializes the doubly linked list."""
self.head = Node()
self.tail = Node()
self.head.next = self.tail
self.tail.prev = self.head
def is_empty(self) -> bool:
"""Checks whether a doubly linked list is empty."""
return self.head.next == self.tail
def push_front(self, node: Node) -> None:
"""Moves a node to the front."""
head_next = self.head.next
self.head.next = node
head_next.prev = node
node.next = head_next
node.prev = self.head
def pop(self, node: Node | None = None) -> Node | None:
"""Removes and returns the last node."""
if self.is_empty():
return None
if node is None:
node = self.tail.prev
node.next.prev = node.prev
node.prev.next = node.next
return node
class LRUCache:
"""Least Recently Used (LRU) cache."""
def __init__(self, capacity: int) -> None:
"""Initializes the cache with the given capacity."""
self.capacity = capacity
self.container = List()
self.key_to_node = {}
def _update(self, node: Node) -> None:
"""Performs operations needed to comply with the LRU cache requirements."""
self.container.pop(node)
self.container.push_front(node)
def put(self, key: int, val: int) -> None:
"""Puts a key-value pair into the cache."""
if key in self.key_to_node:
node = self.key_to_node[key]
self._update(node)
node.val = val
else:
if len(self.key_to_node) == self.capacity:
node = self.container.pop()
del self.key_to_node[node.key]
new_node = Node(key, val)
self.key_to_node[key] = new_node
self.container.push_front(new_node)
def get(self, key: int) -> int | None:
"""Gets a value by key from the cache."""
if key in self.key_to_node:
node = self.key_to_node[key]
self._update(node)
return node.val
return None
Cache (RR)
import random
from dataclasses import dataclass
from typing import Any
@dataclass
class RRCache:
"""Random Replacement (RR) cache."""
container: list[Any] = []
capacity: int = 16
def put(self, key: Any, val: Any) -> None:
"""Puts a key-value pair."""
if len(self.container) == self.capacity:
self.container.pop()
self.container.append((key, val))
def get(self) -> tuple[Any, Any] | None:
"""Gets a random key-value pair."""
return random.choice(self.container) if self.container else None
def pop(self) -> tuple[Any, Any] | None:
"""Removes and returns a random key-value pair."""
if not self.container:
return None
idx = random.randrange(len(self.container))
self.container[0], self.container[idx] = self.container[idx], self.container[0]
return self.container.pop()
Data Stream Statistics
import random
from dataclasses import dataclass
@dataclass
class DataStreamStats:
"""Data Stream Statistics."""
size: float = 0
mean: float = 0
sum_of_square_diffs: float = 0
var: float = 0
std: float = 0
random: float = 0
def add(self, val: float) -> None:
"""Adds a value to the stream."""
self.size += 1
old_mean = self.mean
self.mean += (val - self.mean) / self.size
self.sum_of_square_diffs += (val - old_mean) * (val - self.mean)
self.var = self.sum_of_square_diffs / self.size
self.std = self.var**0.5
if random.random() < 1 / self.size:
self.random = val
In-Memory File System
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class FileSystemNode:
"""File system node."""
subdirs: dict[str, FileSystemNode] = field(default_factory=dict)
content: str = ""
class FileSystem:
"""An in-memory file system."""
def __init__(self):
self.root = FileSystemNode()
def _find_or_create(self, path: str, create: bool) -> FileSystemNode | None:
"""Finds the node at path and optionally, creates intermediate directories."""
if len(path) <= 1:
return self.root
ptr = self.root
for word in path.split("/")[1:]:
if word not in ptr.subdirs:
if not create:
return None
ptr.subdirs[word] = FileSystemNode()
ptr = ptr.subdirs[word]
return ptr
def ls(self, path: str) -> List[str]:
"""Lists the content of the provided path."""
if ptr := self._find_or_create(path, create=False):
return path.split("/")[-1:] if ptr.content else sorted(ptr.subdirs.keys())
return []
def mkdir(self, path: str) -> None:
"""Makes a new directory or new directories."""
self._find_or_create(path, create=True)
def add_content_to_file(self, filepath: str, content: str) -> None:
"""Appends the provided content to the given filepath."""
if ptr := self._find_or_create(filepath, create=True):
ptr.content += content
return None
def read_content_from_file(self, filepath: str) -> str | None:
"""Reads content from the given filepath."""
if ptr := self._find_or_create(filepath, create=False):
return ptr.content
return None
Prefix Tree
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class PrefixTreeNode:
"""Prefix tree node."""
children: dict[str, PrefixTreeNode] = field(default_factory=dict)
is_end: bool = False
class PrefixTree:
"""Prefix tree."""
def __init__(self) -> None:
"""Initializes a prefix tree as a node."""
self.root = PrefixTreeNode()
def insert(self, word: str) -> None:
"""Inserts a word into the prefix tree."""
ptr = self.root
for char in word:
if char not in ptr.children:
ptr.children[char] = PrefixTreeNode()
ptr = ptr.children[char]
ptr.is_end = True
def search(self, word: str) -> bool:
"""Searches a word in the prefix tree."""
ptr = self.root
for char in word:
if char not in ptr.children:
return False
ptr = ptr.children[char]
return ptr.is_end
def starts_with(self, prefix: str) -> bool:
"""Searches a prefix in the prefix tree."""
ptr = self.root
for char in prefix:
if char not in ptr.children:
return False
ptr = ptr.children[char]
return True
Stack (Maximum Frequency)
from collections import defaultdict
class MaximumFrequencyStack:
"""Maximum frequency stack."""
def __init__(self) -> None:
"""Initializes the stack."""
self.freqmap = {}
self.groups = defaultdict(list)
self.maxfreq = 0
def push(self, val: int) -> None:
"""Pushes a value onto the stack."""
freq = self.freqmap.get(val, 0) + 1
self.freqmap[val] = freq
self.groups[freq].append(val)
self.maxfreq = max(self.maxfreq, freq)
def pop(self) -> int:
"""Pops a value from the stack."""
val = self.groups[self.maxfreq].pop()
self.freqmap[val] -= 1
if not self.groups[self.maxfreq]:
self.maxfreq -= 1
return val
Stack (Minimum)
from dataclasses import dataclass
from numbers import Real
@dataclass
class MinStack:
"""Minimum stack."""
stack: list[Real] = []
def push(self, val: Real) -> None:
"""Pushes a value onto the stack."""
if not self.stack:
self.stack.append((val, val))
else:
self.stack.append((val, min(val, self.stack[-1][1])))
def pop(self) -> Real | None:
"""Pops a value from the stack."""
return self.stack.pop()[0] if self.stack else None
def top(self) -> Real | None:
"""Gets the top value from the stack."""
return self.stack[-1][0] if self.stack else None
def min(self) -> Real | None:
"""Gets the minimum value from the stack."""
return self.stack[-1][1] if self.stack else None
Machine Learning and Statistical Models
Convolutional Neural Network (CNN)
import numpy as np
def conv2d(image: np.ndarray, kernel: np.ndarray, stride: int = 1, padding: int = 0) -> np.ndarray:
"""Computes a 2D Convolution of a single-channel image via given kernel, stride, and padding."""
# Flipping a kernel is necessary as otherwise, we get cross-correlation, which is a related, but
# different operation. Convolution has some nice mathematical properties, which are not always
# present in cross-correlation.
kernel = np.flipud(np.fliplr(kernel))
image_height, image_width = image.shape
kernel_height, kernel_width = kernel.shape
out_height = int(np.floor((image_height - kernel_height + 2 * padding) / stride) + 1)
out_width = int(np.floor((image_width - kernel_width + 2 * padding) / stride) + 1)
out = np.zeros((out_height, out_width))
for h in range(out_height):
for w in range(out_width):
out[h][w] = (image[h : h + kernel_height, w : w + kernel_width] * kernel).sum()
return out
if __name__ == "__main__":
images, labels = get_data()
kernel = np.array([[1, 1, 1], [0, 0, 0], [-1, -1, -1]])
features = np.array([conv2d(image, kernel) for image in images])
lr = LogisticRegression(epochs=1_000, logging=True)
lr.fit(np.maximum(0, features), labels)
Gaussian Naive Bayes
import numpy as np
class GaussianNaiveBayes:
"""Gaussian Naive Bayes."""
def fit(self, features: np.ndarray, labels: np.ndarray) -> None:
"""Fits the Gaussian Naive Bayes model."""
self.labels = labels
self.unique_labels = np.unique(labels)
self.params = []
# For the given label, calculate the mean and variance of all features
for label in self.unique_labels:
label_features = features[self.labels == label]
self.params.append([(col.mean(), col.var()) for col in label_features.T])
def likelihood(self, data: float, mean: float, var: float) -> float:
"""Calculates the Gaussian likelihood of the data with the given mean and variance."""
# NOTE: Added in denominator to prevent division by zero
eps = 1e-4
coeff = 1 / np.sqrt(2 * np.pi * var + eps)
exponent = np.exp(-((data - mean) ** 2 / (2 * var + eps)))
return coeff * exponent
def predict(self, features: np.ndarray) -> np.ndarray:
"""Performs inference using Bayes' Theorem: P(A | B) = P(B | A) * P(A) / P(B)."""
num_samples, _ = features.shape
predictions = np.empty(num_samples)
for idx, feature in enumerate(features):
posteriors = []
for label_idx, label in enumerate(self.unique_labels):
# Prior is the mean of what we have
prior = np.log((self.labels == label).mean())
# Naive assumption (independence):
# P(a0, a1, a2 | B) = P(a0 | B) * P(a1 | B) * P(a2 | B)
pairs = zip(feature, self.params[label_idx])
likelihood = np.sum([np.log(self.likelihood(f, m, v)) for f, (m, v) in pairs])
# Posterior = Prior * Likelihood / Scaling Factor (ignoring scaling factor)
posteriors.append(prior + likelihood)
# Store the label with the largest posterior probability
predictions[idx] = self.unique_labels[np.argmax(posteriors)]
return predictions
K-Means Clustering (KMeans)
import numpy as np
from dataclasses import dataclass
@dataclass
class KMeans:
k: int
iterations: int
rtol: float = 1e-05
atol: float = 1e-08
def fit(self, features: np.ndarray) -> None:
"""Clusters the data."""
n_samples, _ = features.shape
self.centroids = features[np.random.choice(n_samples, size=self.k, replace=False)]
self.closest = np.empty(n_samples)
for _ in range(self.iterations):
old_closest = self.closest.copy()
distances = np.linalg.norm(features[:, None] - self.centroids, axis=2)
self.closest = np.argmin(distances, axis=1)
one_hot = np.eye(self.closest.max() + 1)[self.closest]
self.centroids = one_hot.T @ features / one_hot.sum(axis=0)[:, None]
if np.allclose(self.closest, old_closest, rtol=self.rtol, atol=self.atol):
break
K-Nearest Neighbors (k-NN)
import numpy as np
from dataclasses import dataclass
@dataclass
class KNN:
features: np.ndarray
labels: np.ndarray
k: int
def predict(self, features: np.ndarray) -> np.ndarray:
"""Performs inference using the given features."""
num_samples, _ = features.shape
predictions = np.empty(num_samples)
for idx, feature in enumerate(features):
distances = ((self.features - feature) ** 2).sum(axis=1) ** 0.5
k_sorted_idxs = np.argsort(distances)[: self.k]
most_common_idx = np.bincount(k_sorted_idxs).argmax()
predictions[idx] = self.labels[most_common_idx]
return predictions
Linear Regression
import numpy as np
from dataclasses import dataclass
@dataclass
class LinearRegression:
"""Linear Regression."""
epochs: int
learning_rate: float
logging: bool
def fit(self, features: np.ndarray, labels: np.ndarray) -> None:
"""Fits the Linear Regression model."""
num_samples, num_features = features.shape
self.weights, self.bias = np.zeros(num_features), 0
for epoch in range(self.epochs):
residuals = labels - self.predict(features)
d_weights = -2 / num_samples * features.T.dot(residuals)
d_bias = -2 / num_samples * residuals.sum()
self.weights -= self.learning_rate * d_weights
self.bias -= self.learning_rate * d_bias
if self.logging:
print(f"MSE Loss [{epoch}]: {(residuals ** 2).mean():.3f}")
def predict(self, features: np.ndarray) -> np.ndarray:
"""Performs inference using the given features."""
return features.dot(self.weights) + self.bias
Logistic Regression
import numpy as np
from dataclasses import dataclass
@dataclass
class LogisticRegression:
"""Logistic Regression."""
learning_rate: float
epochs: int
threshold: float
logging: bool
def sigmoid(self, predictions: np.ndarray) -> np.ndarray:
"""The numerically stable implementation of the Sigmoid activation function."""
neg_mask = predictions < 0
pos_mask = ~neg_mask
zs = np.empty_like(predictions)
zs[neg_mask] = np.exp(predictions[neg_mask])
zs[pos_mask] = np.exp(-predictions[pos_mask])
res = np.ones_like(predictions)
res[neg_mask] = zs[neg_mask]
return res / (1 + zs)
def mean_log_loss(self, predictions: np.ndarray, labels: np.ndarray) -> np.float32:
"""Computes the mean Cross Entropy Loss (in binary classification, also called Log-loss)."""
return -(labels * np.log(predictions) + (1 - labels) * np.log(1 - predictions)).mean()
def fit(self, features: np.ndarray, labels: np.ndarray) -> None:
"""Fits the Logistic Regression model."""
num_samples, num_features = features.shape
self.weights, self.bias = np.zeros(num_features), 0
for epoch in range(self.epochs):
prediction = self.sigmoid(features.dot(self.weights) + self.bias)
difference = prediction - labels # type: ignore
d_weights = features.T.dot(difference) / num_samples
d_bias = difference.sum() / num_samples
self.weights -= self.learning_rate * d_weights
self.bias -= self.learning_rate * d_bias
if self.logging:
print(f"Mean Log-loss [{epoch}]: {self.mean_log_loss(prediction, labels):.3f}")
def predict(self, features: np.ndarray) -> np.ndarray:
"""Performs inference using the given features."""
return np.where(self.sigmoid(features.dot(self.weights) + self.bias) < self.threshold, 0, 1)
Principal Component Analysis
import numpy as np
def pca(
data: np.ndarray,
n_components: int,
standardize: bool = True,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Performs dimensionality reduction via Principal Component Analysis."""
if standardize:
data = (data - data.mean(axis=0)) / data.std(axis=0)
cov = np.cov(data.T)
eigval, eigvec = np.linalg.eigh(cov)
# idx = np.argsort(eigval)[::-1][:n_components]
idx = np.arange(len(eigval) - 1, len(eigval) - 1 - n_components, -1)
eigval, eigvec = eigval[idx], eigvec[:, idx]
feat = data @ eigvec
return feat, eigval, eigvec
Byte Pair Encoding
import collections
def bpe(text: str, num_merges: int = 3, start_token: int = 90) -> tuple[str, list[str]]:
"""Byte-Pair Encoding (BPE) algorithm."""
replacements = []
for _ in range(num_merges):
if pair_counts := collections.Counter(zip(text, text[1:])):
max_freq_pair = max(pair_counts, key=pair_counts.get)
start, target = "".join(max_freq_pair), chr(start_token)
text = text.replace(start, target)
start_token -= 1
replacements.append((start, target))
return text, replacements