Recurrent time series extrapolation
The temporal evolution of recurrent chaotic systems is marked by self-similar patterns that reoccur in finite time. Due to the sensitivity of such systems to both initial conditions and the precision of the numerical scheme, simply integrating their governing equations over long periods of time adds little additional information. Under this circumstance, such a systems temporal evolution can be extrapolated from a given point on by simply replaying sequences from the systems history.
In this post, I will apply a procedure adapted from literature to the Lorentz systems. First, a dataset is generated from its governing equations, then a recurrence plot generated and finally extrapolated.
The Lorentz system
The famous Lorentz system is given by: \begin{align} \frac{dX}{dt} = \begin{bmatrix} \sigma (X_2-X_1) \\ X_1 (\rho-X_3) - X_2 \\ X_1*X_2 - \beta X_3 \end{bmatrix} \end{align} Where $\sigma=10$, $\beta=8/3$ and $\rho=28$ are parameter choices that give recurrent behavior.
In Python, this can be implemented in a few lines:
def lorentz_attractor(t, W, sigma, rho, beta):
x, y, z = W
return [
sigma*(y-x),
x*(rho-z)-y,
x*y - beta*z,
]
import numpy as np
import pandas as pd
from scipy.integrate import solve_ivp
t_span = [0, 50]
integration_result = solve_ivp(
lambda t, W: lorentz_attractor(t, W, sigma=10, beta=8 / 3, rho=28),
t_span=t_span,
y0=[1, 0, 0],
t_eval=np.linspace(*t_span, 5000),
)
trajectory = pd.DataFrame(
np.transpose([integration_result.t, *integration_result.y]),
columns="t x y z".split(),
)
This trajectory can be plotted like this:
import matplotlib.pyplot as plt
fig, (ax1, ax2) = plt.subplots(ncols=2, sharey=True)
trajectory.plot('t', ['y', 'z', 'x'], ax=ax1, label='$X_2$ $X_3$ $X_1$'.split())
trajectory.plot('x', ['y', 'z'], ax=ax2)
ax2.set(aspect='equal', xlabel='$X_1$')
ax2.get_legend().remove()
The dataframe trajectory
now contains a sample of the state space of the Lorentz system.
After a spin-up period that runs up to $t=18$, the $X_1$ and $X_2$ coordinates oscillate around either $+9$ or $-9$, with attraction point switch interval of $1–5$, and $X_3$ oscillating around $28$.
Recurrence plot generation
The recurrence plot $\mathcal{R}(t_i, t_j)$ quantifies the difference between two states of the system. Here, we use the $\mathcal{l}^2$-norm to compare the states (= coordinates $X$) of the system at different times: \begin{align} \mathcal{R}(t_i, t_j) = \frac{1}{3}\sqrt{\sum^3_{k=1}{\left(X_k(t_i)-X_k(t_j)\right)^2}} \end{align} The result of this is a symmetric matrix, meaning that only the half of the entries need to be calculated:
from numpy.linalg import norm # l2 - norm
def recurrence_matrix(trajectory, normed=False):
rec_matrix = np.zeros((len(trajectory), len(trajectory)))
np_traj_values = trajectory[["x", "y", "z"]].to_numpy()
for i in range(1, rec_matrix.shape[0]):
values = norm(
np_traj_values[i, :] - np_traj_values[(i + 1) :, :],
axis=-1,
)
rec_matrix[i, (i + 1) :] = values
rec_matrix[(i + 1) :, i] = values
if normed:
rec_matrix /= rec_matrix.max()
return trajectory["t"], rec_matrix
_, rec_matrix = recurrence_matrix(trajectory, normed=True)
t_range = (trajectory.t.min(), trajectory.t.max())
plt.imshow(
rec_matrix,
vmin=0, vmax=1,
aspect="equal",
origin="lower",
extent=(*t_range, *t_range),
)
plt.colorbar(label='$\\mathcal{R}(t_i,t_j)$')
plt.xlabel('$t_i$')
plt.ylabel('$t_j$')
For extrapolation, we recalculate the recurrence matrix sans startup period.
trajectory_shortened = trajectory.query('t > 20')
_, rec_matrix = recurrence_matrix(trajectory_shortened, normed=True)
Extrapolation using the recurrence matrix
To discuss the extrapolation procedure, we have to define two terms:
- sequence: a set of consecutive time points from the original trajectory
- jump: a transition between two time points from the original trajectory that are not consecutive To extrapolate the original trajectory, we connect sequences using jumps that minimize the discontinuity between the endpoint of the previous sequence and the starting point of the next sequence. The information about similarity between two time points in the original signal is found in the recurrence matrix. Therefore, the jump target can by
- retrieving a given row corresponding to the point and
- identifying the maximum recurrence (minimum of the recurrence matrix).
This approach is prone to re-running similar sequences.
By splitting the recurrence matrix into
n_sections_recmatrix
sections, randomizing the length of the sequence and requiring the next sequence to start in another section, this problem is mitigated.
from numpy.random import default_rng
def extrapolate(
t_extrapolation,
trajectory,
rec_matrix,
max_sequence_length=500,
min_sequence_length=50,
rng=default_rng(42),
start_timei=0,
n_sections_recmatrix=2,
):
dt_extrapolation = trajectory.iloc[1, 0] - trajectory.iloc[0, 0]
current_step = 0
current_timei = start_timei
extrapolated_signal = []
# keep jumping while target signal length is not reached
while current_step <= t_extrapolation // dt_extrapolation:
# 1. identify section of the signal to jump to
current_section = current_timei // length_section_recmatrix
next_section = rng.integers(n_sections_recmatrix)
while next_section is current_section:
next_section = rng.integers(n_sections_recmatrix)
next_section_start = next_section * length_section_recmatrix
# 2. shorten section end to avoid running into it
next_section_end = (
next_section + 1
) * length_section_recmatrix - max_sequence_length
# 3. identify jump target (= sequence start) in the next section
next_section_recmatrix_row = rec_matrix[
current_timei, next_section_start:next_section_end
]
next_sequence_start = np.argmin(next_section_recmatrix_row) + next_section_start
# 4. calculate end of sequence
next_sequence_end = (
rng.integers(min_sequence_length, max_sequence_length) + next_sequence_start
)
# 5. copy next sequence from signal/trajectory to list
next_sequence = trajectory.iloc[next_sequence_start:next_sequence_end]
extrapolated_signal.append(next_sequence)
current_timei = next_sequence_end
current_step += next_sequence_end - next_sequence_start
# concatenate sequences
extrapolated_signal = pd.concat(extrapolated_signal).reset_index(drop=True)
extrapolated_signal["t"] = np.arange(len(extrapolated_signal)) * dt_extrapolation
return extrapolated_signal
extrapolated_signal = extrapolate(
50, # s
trajectory_shortened,
rec_matrix,
max_sequence_length=500,
min_sequence_length=50,
n_sections_recmatrix=2,
)
The resulting extrapolated trajectory shows very similar behavior to the original signal:
fig, (ax1, ax2) = plt.subplots(ncols=2, sharey=True)
extrapolated_signal.plot('t', ['y', 'z', 'x'], ax=ax1, label='$X_2$ $X_3$ $X_1$'.split())
extrapolated_signal.plot('x', ['y', 'z'], ax=ax2)
ax2.set(aspect='equal', xlabel='$X_1$')
ax2.get_legend().remove()
max_sequence_length
, max_sequence_length
, n_sections_recmatrix
, and of course extending the original signal.