-
-
Notifications
You must be signed in to change notification settings - Fork 48.8k
Added ARIMA #13444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added ARIMA #13444
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,357 @@ | ||
import matplotlib.pyplot as plt | ||
import numpy as np | ||
from numpy.typing import NDArray | ||
|
||
""" | ||
This program implements an ARIMA (AutoRegressive Integrated Moving Average) model | ||
from scratch in Python. | ||
|
||
References: | ||
Wikipedia page on ARIMA: | ||
https://en.wikipedia.org/wiki/Autoregressive_integrated_moving_average | ||
|
||
Author:-Debanuj Roy | ||
Email:- debanujroy1234@gmail.com | ||
""" | ||
|
||
|
||
class ARIMA: | ||
def __init__(self, p=1, d=1, q=1, lr=0.001, epochs=1000) -> None: | ||
""" | ||
Initializes the ARIMA model. | ||
|
||
Args: | ||
p: AR lag order (uses past y values). | ||
d: Differencing order (makes data stationary). | ||
q: MA lag order (uses past errors). | ||
lr: Learning rate for Gradient Descent. | ||
epochs: Number of training cycles. | ||
""" | ||
# We need to make sure p, d, and q are sensible numbers (positive integers). | ||
if not all(isinstance(x, int) and x >= 0 for x in [p, d, q]): | ||
raise ValueError("p, d, and q must be non-negative integers") | ||
# Learning rate and epochs should also be positive. | ||
if lr <= 0 or epochs <= 0: | ||
raise ValueError("lr and epochs must be positive") | ||
|
||
self.p = p | ||
self.d = d | ||
self.q = q | ||
self.lr = lr | ||
self.epochs = epochs | ||
|
||
# These are the parameters our model will learn. We'll initialize them at zero. | ||
# phi -> The weights for the AR (past values) part. | ||
self.phi = np.zeros(p) | ||
# theta -> The weights for the MA (past errors) part. | ||
self.theta = np.zeros(q) | ||
# c -> A constant or intercept, like a baseline value. | ||
self.c = 0.0 | ||
|
||
# Store info after model training | ||
self.is_fitted = False # Flag for training status | ||
self.train_last = 0.0 # Last value from training data | ||
# Type hints for optional attributes | ||
self.diff_data: NDArray[np.float64] | None = None | ||
self.errors: NDArray[np.float64] | None = None | ||
self.n_train: int | None = None | ||
self.sigma_err: float | None = None | ||
|
||
def difference(self, data) -> NDArray[np.float64]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As there is no test file in this pull request nor any test function or class in the file Please provide type hint for the parameter: |
||
""" | ||
Makes the time series stationary by differencing. | ||
|
||
Args: | ||
data: Original time series data. | ||
Returns: | ||
Differenced data. | ||
""" | ||
diff = np.copy(data) | ||
# We loop 'd' times, applying the differencing each time. | ||
for _ in range(self.d): | ||
diff = np.diff(diff) # np.diff is a handy function that does exactly this. | ||
return diff | ||
|
||
def inverse_difference( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As there is no test file in this pull request nor any test function or class in the file |
||
self, last_obs: float, diff_forecast: list[float] | ||
) -> list[float]: | ||
""" | ||
Converts differenced data back to the original scale. | ||
|
||
Args: | ||
last_obs: Last value from the original data. | ||
diff_forecast: Predictions on differenced data. | ||
Returns: | ||
Forecasts in the original scale. | ||
""" | ||
forecast = [] | ||
# We start with the last known value from the original data. | ||
prev = last_obs | ||
# For each predicted *change*, we add it to the last value to get the next one. | ||
for val in diff_forecast: | ||
next_val = prev + val | ||
forecast.append(next_val) | ||
# Store current value for next iteration | ||
prev = next_val | ||
return forecast | ||
|
||
def _compute_residuals( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As there is no test file in this pull request nor any test function or class in the file |
||
self, | ||
diff_data: NDArray[np.float64], | ||
phi: NDArray[np.float64], | ||
theta: NDArray[np.float64], | ||
c: float, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please provide descriptive name for the parameter: |
||
) -> tuple[NDArray[np.float64], NDArray[np.float64]]: | ||
""" | ||
Computes residuals for given parameters. | ||
|
||
Args: | ||
diff_data: Differenced data. | ||
phi, theta, c: Model parameters. | ||
Returns: | ||
Tuple of predictions and residuals. | ||
""" | ||
n = len(diff_data) | ||
# Need initial values to get started,we begin after the max of p or q. | ||
start = max(self.p, self.q) | ||
preds = np.zeros(n) | ||
errors = np.zeros(n) | ||
|
||
# Loop through the data from the starting point. | ||
for t in range(start, n): | ||
# AR part: a weighted sum of the last 'p' actual values. | ||
# We reverse the data slice [::-1] so that the most recent value is first. | ||
ar_term = ( | ||
np.dot(phi, diff_data[t - self.p : t][::-1]) if self.p > 0 else 0.0 | ||
) | ||
|
||
# MA part: a weighted sum of the last 'q' prediction errors. | ||
ma_term = np.dot(theta, errors[t - self.q : t][::-1]) if self.q > 0 else 0.0 | ||
|
||
# Combine everything to make the one-step-ahead prediction. | ||
preds[t] = c + ar_term + ma_term | ||
# Calculate error | ||
errors[t] = diff_data[t] - preds[t] | ||
|
||
return preds, errors | ||
|
||
def fit(self, data: list[float] | NDArray[np.float64]) -> "ARIMA": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As there is no test file in this pull request nor any test function or class in the file |
||
""" | ||
Trains the ARIMA model. | ||
|
||
Args: | ||
data: Time series data to train on. | ||
Returns: | ||
The fitted ARIMA model. | ||
""" | ||
data = np.asarray(data, dtype=float) | ||
# Check if we have enough data to even build the model. | ||
if len(data) < max(self.p, self.q) + self.d + 5: | ||
raise ValueError("Not enough data to train. You need more samples.") | ||
|
||
# Store last value of the original data.We'll need it to forecast later. | ||
self.train_last = float(data[-1]) | ||
|
||
# Step 1: Make the data stationary. | ||
diff_data = self.difference(data) | ||
n = len(diff_data) | ||
start = max(self.p, self.q) | ||
|
||
# Another check to make sure we have enough data AFTER differencing. | ||
if n <= start: | ||
raise ValueError("Not enough data after differencing. Try reducing 'd'.") | ||
|
||
# Step 2: Call the chosen training method to find the best parameters. | ||
self._fit_gradient_descent(diff_data, start) | ||
# All done! Mark the model as fitted and ready to forecast. | ||
self.is_fitted = True | ||
self.diff_data = diff_data # Ensure diff_data is assigned correctly | ||
self.errors = np.zeros(len(diff_data)) # Initialize errors as a numpy array | ||
self.n_train = len(diff_data) # Assign n_train as an integer | ||
return self | ||
|
||
def _fit_gradient_descent(self, diff_data: NDArray[np.float64], start: int) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As there is no test file in this pull request nor any test function or class in the file |
||
""" | ||
Trains the model using Gradient Descent. | ||
|
||
Args: | ||
diff_data: Differenced data. | ||
start: Starting index for training. | ||
""" | ||
n = len(diff_data) | ||
errors = np.zeros(n) | ||
preds = np.zeros(n) | ||
m = max(1, n - start) # Number of points we can calculate error on. | ||
|
||
# The main training loop. We repeat this 'epochs' times. | ||
for epoch in range(self.epochs): | ||
# First, calculate the predictions and errors with the current parameters. | ||
for t in range(start, n): | ||
ar_term = ( | ||
np.dot(self.phi, diff_data[t - self.p : t][::-1]) | ||
if self.p > 0 | ||
else 0.0 | ||
) | ||
ma_term = ( | ||
np.dot(self.theta, errors[t - self.q : t][::-1]) | ||
if self.q > 0 | ||
else 0.0 | ||
) | ||
preds[t] = self.c + ar_term + ma_term | ||
errors[t] = diff_data[t] - preds[t] | ||
|
||
# Calculate the "gradient"-which direction we should nudge our parameters | ||
# to reduce the error. It's based on the partial derivatives of the MSE. | ||
dc = -2 * np.sum(errors[start:]) / m | ||
dphi = np.zeros_like(self.phi) | ||
|
||
# Calculate AR gradients | ||
for i in range(self.p): | ||
err_idx = slice(start - i - 1, n - i - 1) | ||
error_term = errors[start:] * diff_data[err_idx] | ||
dphi[i] = -2 * np.sum(error_term) / m | ||
|
||
# Calculate MA gradients | ||
dtheta = np.zeros_like(self.theta) | ||
for j in range(self.q): | ||
err_idx = slice(start - j - 1, n - j - 1) | ||
error_term = errors[start:] * errors[err_idx] | ||
dtheta[j] = -2 * np.sum(error_term) / m | ||
|
||
# Update parameters by taking steps | ||
# in the opposite direction of the gradient. | ||
self.phi -= self.lr * dphi | ||
self.theta -= self.lr * dtheta | ||
self.c -= self.lr * dc | ||
|
||
if epoch % 100 == 0: | ||
mse = np.mean(errors[start:] ** 2) | ||
print(f"Epoch {epoch}: MSE={mse:.6f}, c={self.c:.6f}") | ||
|
||
# After training, store the final results. | ||
self.errors = errors # Ensure errors is assigned correctly | ||
self.diff_data = diff_data # Ensure diff_data is assigned correctly | ||
self.n_train = n # Ensure n_train is assigned as an integer | ||
sigma_term = np.sum(self.errors[start:] ** 2) / m | ||
self.sigma_err = float(np.sqrt(sigma_term)) | ||
msg = f"Fitted params (GD): phi={self.phi},theta={self.theta},c={self.c:.6f}\n" | ||
print(msg) | ||
|
||
def forecast( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As there is no test file in this pull request nor any test function or class in the file |
||
self, steps: int, simulate_errors: bool = False, random_state: int | None = None | ||
) -> NDArray[np.float64]: | ||
""" | ||
Generates future predictions. | ||
|
||
Args: | ||
steps: Number of steps to predict. | ||
simulate_errors: Adds randomness to forecasts if True. | ||
random_state: Seed for reproducibility. | ||
Returns: | ||
Forecasted values. | ||
""" | ||
msg = "Model must be fitted before forecasting. Call fit() first." | ||
if not self.is_fitted: | ||
raise ValueError(msg) | ||
|
||
if self.diff_data is None or self.errors is None or self.sigma_err is None: | ||
raise ValueError(msg) | ||
|
||
# We'll be adding to these lists, so let's work on copies. | ||
diff_data = np.copy(self.diff_data) | ||
errors = np.copy(self.errors) | ||
forecasts_diff = [] | ||
|
||
# A tool for generating random numbers if we need them. | ||
rng = np.random.default_rng(random_state) | ||
|
||
# Generate one forecast at a time. | ||
for _ in range(steps): | ||
# AR part: Use the last 'p' values (from previous y-values). | ||
ar_slice = slice(-self.p, None) | ||
ar_term = np.dot(self.phi, diff_data[ar_slice][::-1]) if self.p > 0 else 0.0 | ||
# MA part:Use the last 'q' errors(from prediction errors). | ||
ma_slice = slice(-self.q, None) | ||
ma_term = np.dot(self.theta, errors[ma_slice][::-1]) if self.q > 0 else 0.0 | ||
|
||
# The next predicted value (on the differenced scale). | ||
next_diff_forecast = self.c + ar_term + ma_term | ||
forecasts_diff.append(next_diff_forecast) | ||
|
||
# For the next loop, we need a value for the "next error". | ||
# If we're simulating, we draw a random error from a normal distribution | ||
# with the same standard deviation as our past errors. | ||
next_error = rng.normal(0.0, self.sigma_err) if simulate_errors else 0.0 | ||
|
||
# Now, append our new prediction and error to the history. | ||
# This is crucial: our next forecast will use these values we just made up! | ||
diff_data = np.append(diff_data, next_diff_forecast) | ||
errors = np.append(errors, next_error) | ||
|
||
# Finally, "un-difference" the forecasts to get them back to the original scale. | ||
final_forecasts = self.inverse_difference(self.train_last, forecasts_diff) | ||
return np.array(final_forecasts, dtype=np.float64) | ||
|
||
|
||
# This part of the code only runs if you execute this script directly. | ||
if __name__ == "__main__": | ||
# Let's create some fake data that follows an ARIMA(2,1,2) pattern. | ||
rng = np.random.default_rng(42) # Replace legacy np.random.seed | ||
n = 500 | ||
ar_params = [0.5, -0.3] | ||
ma_params = [0.4, 0.2] | ||
|
||
# Start with some random noise. | ||
noise = rng.standard_normal(n) # Replace np.random.randn | ||
data = np.zeros(n) | ||
|
||
# Build the ARMA part first. | ||
for t in range(2, n): | ||
data[t] = ( | ||
ar_params[0] * data[t - 1] | ||
+ ar_params[1] * data[t - 2] | ||
+ noise[t] | ||
+ ma_params[0] * noise[t - 1] | ||
+ ma_params[1] * noise[t - 2] | ||
) | ||
# Now, "integrate" it by taking the cumulative sum. | ||
data = np.cumsum(data) | ||
|
||
# Train the model on the first 300 data points. | ||
train_end = 400 | ||
model = ARIMA(p=4, d=2, q=4) | ||
model.fit(data[:train_end]) | ||
|
||
# Forecast the next 200 steps. | ||
forecast_steps = 100 | ||
forecast = model.forecast(forecast_steps, simulate_errors=True) | ||
|
||
# Compare forecast to the actual data. | ||
true_future = data[train_end : train_end + forecast_steps] | ||
rmse = np.sqrt(np.mean((np.array(forecast) - true_future) ** 2)) | ||
print(f"Forecast RMSE: {rmse:.6f}\n") | ||
|
||
# Visualize results. | ||
plt.figure(figsize=(14, 6)) | ||
plt.plot(range(len(data)), data, label="Original Data", linewidth=1.5) | ||
plt.plot( | ||
range(train_end, train_end + forecast_steps), | ||
forecast, | ||
label="Forecast", | ||
color="red", | ||
linewidth=1.5, | ||
) | ||
split_label = "Split" | ||
plt.axvline( | ||
train_end - 1, | ||
color="gray", | ||
linestyle=":", | ||
alpha=0.7, | ||
label=f"Train/Test {split_label}", | ||
) | ||
plt.xlabel("Time Step") | ||
plt.ylabel("Value") | ||
plt.title("ARIMA(2,1,2) Model: Training Data and Forecast") | ||
plt.legend() | ||
plt.tight_layout() | ||
plt.show() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please provide type hint for the parameter:
p
Please provide descriptive name for the parameter:
p
Please provide type hint for the parameter:
d
Please provide descriptive name for the parameter:
d
Please provide type hint for the parameter:
q
Please provide descriptive name for the parameter:
q
Please provide type hint for the parameter:
lr
Please provide type hint for the parameter:
epochs