Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
357 changes: 357 additions & 0 deletions machine_learning/arima.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,357 @@
import matplotlib.pyplot as plt
import numpy as np
from numpy.typing import NDArray

"""
This program implements an ARIMA (AutoRegressive Integrated Moving Average) model
from scratch in Python.

References:
Wikipedia page on ARIMA:
https://en.wikipedia.org/wiki/Autoregressive_integrated_moving_average

Author:-Debanuj Roy
Email:- debanujroy1234@gmail.com
"""


class ARIMA:
def __init__(self, p=1, d=1, q=1, lr=0.001, epochs=1000) -> None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide type hint for the parameter: p

Please provide descriptive name for the parameter: p

Please provide type hint for the parameter: d

Please provide descriptive name for the parameter: d

Please provide type hint for the parameter: q

Please provide descriptive name for the parameter: q

Please provide type hint for the parameter: lr

Please provide type hint for the parameter: epochs

"""
Initializes the ARIMA model.

Args:
p: AR lag order (uses past y values).
d: Differencing order (makes data stationary).
q: MA lag order (uses past errors).
lr: Learning rate for Gradient Descent.
epochs: Number of training cycles.
"""
# We need to make sure p, d, and q are sensible numbers (positive integers).
if not all(isinstance(x, int) and x >= 0 for x in [p, d, q]):
raise ValueError("p, d, and q must be non-negative integers")
# Learning rate and epochs should also be positive.
if lr <= 0 or epochs <= 0:
raise ValueError("lr and epochs must be positive")

self.p = p
self.d = d
self.q = q
self.lr = lr
self.epochs = epochs

# These are the parameters our model will learn. We'll initialize them at zero.
# phi -> The weights for the AR (past values) part.
self.phi = np.zeros(p)
# theta -> The weights for the MA (past errors) part.
self.theta = np.zeros(q)
# c -> A constant or intercept, like a baseline value.
self.c = 0.0

# Store info after model training
self.is_fitted = False # Flag for training status
self.train_last = 0.0 # Last value from training data
# Type hints for optional attributes
self.diff_data: NDArray[np.float64] | None = None
self.errors: NDArray[np.float64] | None = None
self.n_train: int | None = None
self.sigma_err: float | None = None

def difference(self, data) -> NDArray[np.float64]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file machine_learning/arima.py, please provide doctest for the function difference

Please provide type hint for the parameter: data

"""
Makes the time series stationary by differencing.

Args:
data: Original time series data.
Returns:
Differenced data.
"""
diff = np.copy(data)
# We loop 'd' times, applying the differencing each time.
for _ in range(self.d):
diff = np.diff(diff) # np.diff is a handy function that does exactly this.
return diff

def inverse_difference(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file machine_learning/arima.py, please provide doctest for the function inverse_difference

self, last_obs: float, diff_forecast: list[float]
) -> list[float]:
"""
Converts differenced data back to the original scale.

Args:
last_obs: Last value from the original data.
diff_forecast: Predictions on differenced data.
Returns:
Forecasts in the original scale.
"""
forecast = []
# We start with the last known value from the original data.
prev = last_obs
# For each predicted *change*, we add it to the last value to get the next one.
for val in diff_forecast:
next_val = prev + val
forecast.append(next_val)
# Store current value for next iteration
prev = next_val
return forecast

def _compute_residuals(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file machine_learning/arima.py, please provide doctest for the function _compute_residuals

self,
diff_data: NDArray[np.float64],
phi: NDArray[np.float64],
theta: NDArray[np.float64],
c: float,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: c

) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""
Computes residuals for given parameters.

Args:
diff_data: Differenced data.
phi, theta, c: Model parameters.
Returns:
Tuple of predictions and residuals.
"""
n = len(diff_data)
# Need initial values to get started,we begin after the max of p or q.
start = max(self.p, self.q)
preds = np.zeros(n)
errors = np.zeros(n)

# Loop through the data from the starting point.
for t in range(start, n):
# AR part: a weighted sum of the last 'p' actual values.
# We reverse the data slice [::-1] so that the most recent value is first.
ar_term = (
np.dot(phi, diff_data[t - self.p : t][::-1]) if self.p > 0 else 0.0
)

# MA part: a weighted sum of the last 'q' prediction errors.
ma_term = np.dot(theta, errors[t - self.q : t][::-1]) if self.q > 0 else 0.0

# Combine everything to make the one-step-ahead prediction.
preds[t] = c + ar_term + ma_term
# Calculate error
errors[t] = diff_data[t] - preds[t]

return preds, errors

def fit(self, data: list[float] | NDArray[np.float64]) -> "ARIMA":

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file machine_learning/arima.py, please provide doctest for the function fit

"""
Trains the ARIMA model.

Args:
data: Time series data to train on.
Returns:
The fitted ARIMA model.
"""
data = np.asarray(data, dtype=float)
# Check if we have enough data to even build the model.
if len(data) < max(self.p, self.q) + self.d + 5:
raise ValueError("Not enough data to train. You need more samples.")

# Store last value of the original data.We'll need it to forecast later.
self.train_last = float(data[-1])

# Step 1: Make the data stationary.
diff_data = self.difference(data)
n = len(diff_data)
start = max(self.p, self.q)

# Another check to make sure we have enough data AFTER differencing.
if n <= start:
raise ValueError("Not enough data after differencing. Try reducing 'd'.")

# Step 2: Call the chosen training method to find the best parameters.
self._fit_gradient_descent(diff_data, start)
# All done! Mark the model as fitted and ready to forecast.
self.is_fitted = True
self.diff_data = diff_data # Ensure diff_data is assigned correctly
self.errors = np.zeros(len(diff_data)) # Initialize errors as a numpy array
self.n_train = len(diff_data) # Assign n_train as an integer
return self

def _fit_gradient_descent(self, diff_data: NDArray[np.float64], start: int) -> None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file machine_learning/arima.py, please provide doctest for the function _fit_gradient_descent

"""
Trains the model using Gradient Descent.

Args:
diff_data: Differenced data.
start: Starting index for training.
"""
n = len(diff_data)
errors = np.zeros(n)
preds = np.zeros(n)
m = max(1, n - start) # Number of points we can calculate error on.

# The main training loop. We repeat this 'epochs' times.
for epoch in range(self.epochs):
# First, calculate the predictions and errors with the current parameters.
for t in range(start, n):
ar_term = (
np.dot(self.phi, diff_data[t - self.p : t][::-1])
if self.p > 0
else 0.0
)
ma_term = (
np.dot(self.theta, errors[t - self.q : t][::-1])
if self.q > 0
else 0.0
)
preds[t] = self.c + ar_term + ma_term
errors[t] = diff_data[t] - preds[t]

# Calculate the "gradient"-which direction we should nudge our parameters
# to reduce the error. It's based on the partial derivatives of the MSE.
dc = -2 * np.sum(errors[start:]) / m
dphi = np.zeros_like(self.phi)

# Calculate AR gradients
for i in range(self.p):
err_idx = slice(start - i - 1, n - i - 1)
error_term = errors[start:] * diff_data[err_idx]
dphi[i] = -2 * np.sum(error_term) / m

# Calculate MA gradients
dtheta = np.zeros_like(self.theta)
for j in range(self.q):
err_idx = slice(start - j - 1, n - j - 1)
error_term = errors[start:] * errors[err_idx]
dtheta[j] = -2 * np.sum(error_term) / m

# Update parameters by taking steps
# in the opposite direction of the gradient.
self.phi -= self.lr * dphi
self.theta -= self.lr * dtheta
self.c -= self.lr * dc

if epoch % 100 == 0:
mse = np.mean(errors[start:] ** 2)
print(f"Epoch {epoch}: MSE={mse:.6f}, c={self.c:.6f}")

# After training, store the final results.
self.errors = errors # Ensure errors is assigned correctly
self.diff_data = diff_data # Ensure diff_data is assigned correctly
self.n_train = n # Ensure n_train is assigned as an integer
sigma_term = np.sum(self.errors[start:] ** 2) / m
self.sigma_err = float(np.sqrt(sigma_term))
msg = f"Fitted params (GD): phi={self.phi},theta={self.theta},c={self.c:.6f}\n"
print(msg)

def forecast(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file machine_learning/arima.py, please provide doctest for the function forecast

self, steps: int, simulate_errors: bool = False, random_state: int | None = None
) -> NDArray[np.float64]:
"""
Generates future predictions.

Args:
steps: Number of steps to predict.
simulate_errors: Adds randomness to forecasts if True.
random_state: Seed for reproducibility.
Returns:
Forecasted values.
"""
msg = "Model must be fitted before forecasting. Call fit() first."
if not self.is_fitted:
raise ValueError(msg)

if self.diff_data is None or self.errors is None or self.sigma_err is None:
raise ValueError(msg)

# We'll be adding to these lists, so let's work on copies.
diff_data = np.copy(self.diff_data)
errors = np.copy(self.errors)
forecasts_diff = []

# A tool for generating random numbers if we need them.
rng = np.random.default_rng(random_state)

# Generate one forecast at a time.
for _ in range(steps):
# AR part: Use the last 'p' values (from previous y-values).
ar_slice = slice(-self.p, None)
ar_term = np.dot(self.phi, diff_data[ar_slice][::-1]) if self.p > 0 else 0.0
# MA part:Use the last 'q' errors(from prediction errors).
ma_slice = slice(-self.q, None)
ma_term = np.dot(self.theta, errors[ma_slice][::-1]) if self.q > 0 else 0.0

# The next predicted value (on the differenced scale).
next_diff_forecast = self.c + ar_term + ma_term
forecasts_diff.append(next_diff_forecast)

# For the next loop, we need a value for the "next error".
# If we're simulating, we draw a random error from a normal distribution
# with the same standard deviation as our past errors.
next_error = rng.normal(0.0, self.sigma_err) if simulate_errors else 0.0

# Now, append our new prediction and error to the history.
# This is crucial: our next forecast will use these values we just made up!
diff_data = np.append(diff_data, next_diff_forecast)
errors = np.append(errors, next_error)

# Finally, "un-difference" the forecasts to get them back to the original scale.
final_forecasts = self.inverse_difference(self.train_last, forecasts_diff)
return np.array(final_forecasts, dtype=np.float64)


# This part of the code only runs if you execute this script directly.
if __name__ == "__main__":
# Let's create some fake data that follows an ARIMA(2,1,2) pattern.
rng = np.random.default_rng(42) # Replace legacy np.random.seed
n = 500
ar_params = [0.5, -0.3]
ma_params = [0.4, 0.2]

# Start with some random noise.
noise = rng.standard_normal(n) # Replace np.random.randn
data = np.zeros(n)

# Build the ARMA part first.
for t in range(2, n):
data[t] = (
ar_params[0] * data[t - 1]
+ ar_params[1] * data[t - 2]
+ noise[t]
+ ma_params[0] * noise[t - 1]
+ ma_params[1] * noise[t - 2]
)
# Now, "integrate" it by taking the cumulative sum.
data = np.cumsum(data)

# Train the model on the first 300 data points.
train_end = 400
model = ARIMA(p=4, d=2, q=4)
model.fit(data[:train_end])

# Forecast the next 200 steps.
forecast_steps = 100
forecast = model.forecast(forecast_steps, simulate_errors=True)

# Compare forecast to the actual data.
true_future = data[train_end : train_end + forecast_steps]
rmse = np.sqrt(np.mean((np.array(forecast) - true_future) ** 2))
print(f"Forecast RMSE: {rmse:.6f}\n")

# Visualize results.
plt.figure(figsize=(14, 6))
plt.plot(range(len(data)), data, label="Original Data", linewidth=1.5)
plt.plot(
range(train_end, train_end + forecast_steps),
forecast,
label="Forecast",
color="red",
linewidth=1.5,
)
split_label = "Split"
plt.axvline(
train_end - 1,
color="gray",
linestyle=":",
alpha=0.7,
label=f"Train/Test {split_label}",
)
plt.xlabel("Time Step")
plt.ylabel("Value")
plt.title("ARIMA(2,1,2) Model: Training Data and Forecast")
plt.legend()
plt.tight_layout()
plt.show()