Recurrent time series extrapolation

The temporal evolution of recurrent chaotic systems is marked by self-similar patterns that reoccur in finite time. Due to the sensitivity of such systems to both initial conditions and the precision of the numerical scheme, simply integrating their governing equations over long periods of time adds little additional information. Under this circumstance, such a systems temporal evolution can be extrapolated from a given point on by simply replaying sequences from the systems history.

In this post, I will apply a procedure adapted from literature to the Lorentz systems. First, a dataset is generated from its governing equations, then a recurrence plot generated and finally extrapolated.

The Lorentz system

The famous Lorentz system is given by: \begin{align} \frac{dX}{dt} = \begin{bmatrix} \sigma (X_2-X_1) \\ X_1 (\rho-X_3) - X_2 \\ X_1*X_2 - \beta X_3 \end{bmatrix} \end{align} Where $\sigma=10$, $\beta=8/3$ and $\rho=28$ are parameter choices that give recurrent behavior.

In Python, this can be implemented in a few lines:

def lorentz_attractor(t, W, sigma, rho, beta):
    x, y, z = W
    return [
        sigma*(y-x),
        x*(rho-z)-y,
        x*y - beta*z,
    ]
And integrated over time using the excellent solve_ivp function of scipy:
import numpy as np
import pandas as pd
from scipy.integrate import solve_ivp

t_span = [0, 50]
integration_result = solve_ivp(
    lambda t, W: lorentz_attractor(t, W, sigma=10, beta=8 / 3, rho=28),
    t_span=t_span,
    y0=[1, 0, 0],
    t_eval=np.linspace(*t_span, 5000),
)
trajectory = pd.DataFrame(
    np.transpose([integration_result.t, *integration_result.y]),
    columns="t x y z".split(),
)

This trajectory can be plotted like this:

import matplotlib.pyplot as plt

fig, (ax1, ax2) = plt.subplots(ncols=2, sharey=True)
trajectory.plot('t', ['y', 'z', 'x'], ax=ax1, label='$X_2$ $X_3$ $X_1$'.split())
trajectory.plot('x', ['y', 'z'], ax=ax2)
ax2.set(aspect='equal', xlabel='$X_1$')
ax2.get_legend().remove()
Trajectory of the Lorentz system in space and time

The dataframe trajectory now contains a sample of the state space of the Lorentz system. After a spin-up period that runs up to $t=18$, the $X_1$ and $X_2$ coordinates oscillate around either $+9$ or $-9$, with attraction point switch interval of $1–5$, and $X_3$ oscillating around $28$.

Recurrence plot generation

The recurrence plot $\mathcal{R}(t_i, t_j)$ quantifies the difference between two states of the system. Here, we use the $\mathcal{l}^2$-norm to compare the states (= coordinates $X$) of the system at different times: \begin{align} \mathcal{R}(t_i, t_j) = \frac{1}{3}\sqrt{\sum^3_{k=1}{\left(X_k(t_i)-X_k(t_j)\right)^2}} \end{align} The result of this is a symmetric matrix, meaning that only the half of the entries need to be calculated:

from numpy.linalg import norm # l2 - norm

def recurrence_matrix(trajectory, normed=False):
    rec_matrix = np.zeros((len(trajectory), len(trajectory)))
    np_traj_values = trajectory[["x", "y", "z"]].to_numpy()
    for i in range(1, rec_matrix.shape[0]):
        values = norm(
            np_traj_values[i, :] - np_traj_values[(i + 1) :, :],
            axis=-1,
        )
        rec_matrix[i, (i + 1) :] = values
        rec_matrix[(i + 1) :, i] = values
    if normed:
        rec_matrix /= rec_matrix.max()
    return trajectory["t"], rec_matrix
_, rec_matrix = recurrence_matrix(trajectory, normed=True)
For visualization recurrence plot can be treated as an image:
t_range = (trajectory.t.min(), trajectory.t.max())
plt.imshow(
    rec_matrix,
    vmin=0, vmax=1,
    aspect="equal",
    origin="lower",
    extent=(*t_range, *t_range),
)
plt.colorbar(label='$\\mathcal{R}(t_i,t_j)$')
plt.xlabel('$t_i$')
plt.ylabel('$t_j$')
Recurrence plot of the Lorentz system
Here, the startup period is easily identifiable as very distinct, self-similar period. It is, however not very similar to about half of the other states. Many regions with a high degree of recurrence can be identified by off-diagonals with low values for $\mathcal{R}$.

For extrapolation, we recalculate the recurrence matrix sans startup period.

trajectory_shortened = trajectory.query('t > 20')
_, rec_matrix = recurrence_matrix(trajectory_shortened, normed=True)

Extrapolation using the recurrence matrix

To discuss the extrapolation procedure, we have to define two terms:

  • sequence: a set of consecutive time points from the original trajectory
  • jump: a transition between two time points from the original trajectory that are not consecutive To extrapolate the original trajectory, we connect sequences using jumps that minimize the discontinuity between the endpoint of the previous sequence and the starting point of the next sequence. The information about similarity between two time points in the original signal is found in the recurrence matrix. Therefore, the jump target can by
  • retrieving a given row corresponding to the point and
  • identifying the maximum recurrence (minimum of the recurrence matrix). This approach is prone to re-running similar sequences. By splitting the recurrence matrix into n_sections_recmatrix sections, randomizing the length of the sequence and requiring the next sequence to start in another section, this problem is mitigated.
from numpy.random import default_rng

def extrapolate(
    t_extrapolation,
    trajectory,
    rec_matrix,
    max_sequence_length=500,
    min_sequence_length=50,
    rng=default_rng(42),
    start_timei=0,
    n_sections_recmatrix=2,
):
    dt_extrapolation = trajectory.iloc[1, 0] - trajectory.iloc[0, 0]

    current_step = 0
    current_timei = start_timei
    extrapolated_signal = []
    # keep jumping while target signal length is not reached
    while current_step <= t_extrapolation // dt_extrapolation:
        # 1. identify section of the signal to jump to
        current_section = current_timei // length_section_recmatrix
        next_section = rng.integers(n_sections_recmatrix)
        while next_section is current_section:
            next_section = rng.integers(n_sections_recmatrix)
        next_section_start = next_section * length_section_recmatrix

        # 2. shorten section end to avoid running into it
        next_section_end = (
            next_section + 1
        ) * length_section_recmatrix - max_sequence_length

        # 3. identify jump target (= sequence start) in the next section
        next_section_recmatrix_row = rec_matrix[
            current_timei, next_section_start:next_section_end
        ]
        next_sequence_start = np.argmin(next_section_recmatrix_row) + next_section_start

        # 4. calculate end of sequence
        next_sequence_end = (
            rng.integers(min_sequence_length, max_sequence_length) + next_sequence_start
        )

        # 5. copy next sequence from signal/trajectory to list
        next_sequence = trajectory.iloc[next_sequence_start:next_sequence_end]
        extrapolated_signal.append(next_sequence)

        current_timei = next_sequence_end
        current_step += next_sequence_end - next_sequence_start

    # concatenate sequences
    extrapolated_signal = pd.concat(extrapolated_signal).reset_index(drop=True)
    extrapolated_signal["t"] = np.arange(len(extrapolated_signal)) * dt_extrapolation
    return extrapolated_signal

extrapolated_signal = extrapolate(
    50, # s
    trajectory_shortened,
    rec_matrix,
    max_sequence_length=500,
    min_sequence_length=50,
    n_sections_recmatrix=2,
)

The resulting extrapolated trajectory shows very similar behavior to the original signal:

fig, (ax1, ax2) = plt.subplots(ncols=2, sharey=True)
extrapolated_signal.plot('t', ['y', 'z', 'x'], ax=ax1, label='$X_2$ $X_3$ $X_1$'.split())
extrapolated_signal.plot('x', ['y', 'z'], ax=ax2)
ax2.set(aspect='equal', xlabel='$X_1$')
ax2.get_legend().remove()
Extrapolated trajectory of the Lorentz attractor
The overall dynamics of the Lorentz system is preserved, with a few artifacts that are jumps between close orbits. These artifacts are a product of the lack of truly similar states in the other sections of the original signal This can be tuned by adjusting the max_sequence_length, max_sequence_length, n_sections_recmatrix, and of course extending the original signal.