Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 66 additions & 32 deletions demos/data_assimilation/data_assimilation.py.rst
Original file line number Diff line number Diff line change
Expand Up @@ -194,23 +194,24 @@ As we will be generating some random noise, we set the random number generator s
from firedrake.adjoint import *
np.random.seed(13)

We use the advection-diffusion equation in one spatial dimension :math:`z`, with a spatially varying advection velocity :math:`c(z)`, a time-dependent forcing term :math:`g(t)`, and periodic boundary conditions.
We use the advection-diffusion equation in two spatial dimensions :math:`x,y`, with a spatially varying advection velocity :math:`c(x, y)`,
a time-dependent forcing term :math:`g(t)`, and periodic boundary conditions in the :math:`x` direction.

.. math::

\partial_{t}u + \vec{c}(z)\cdot\nabla u + \nu\nabla^{2}u = g(t) &

t \in [0, T], \quad z \in \Omega = [0, 1) &
t \in [0, T], \quad z \in \Omega = [0, 1) x [0, 1]&

u(0, t) = u(1, t) &
u(0, y, t) = u(1, y, t) &

c(z) = 1 + \overline{c}\cos(2\pi z)
c(x, y) = (1 + \overline{c}\cos(2\pi x), 0)^{T}

The reference state :math:`\hat{u}` that we will use to generate the "ground-truth" trajectory :math:`x^{t}` is just a simple sinusoid.

.. math::

\hat{u} = \overline{u}\sin(2\pi z)
\hat{u} = \overline{u}\sin(2\pi x)

For the time integration we use the implicit midpoint rule with the semi-discrete weak form:

Expand All @@ -237,9 +238,10 @@ We create the CG1 function space for :math:`V`, and the space of real numbers to
ensemble_rank = ensemble.ensemble_rank
ensemble_size = ensemble.ensemble_size

mesh = PeriodicUnitIntervalMesh(100, comm=ensemble.comm)
mesh = PeriodicUnitSquareMesh(20, 20, direction="x", comm=ensemble.comm)

V = FunctionSpace(mesh, "CG", 1)
Vv = VectorFunctionSpace(mesh, "CG", 1)
Vr = FunctionSpace(mesh, "R", 0)

The control :math:`\mathbf{x}` is a timeseries distributed in time over the ``Ensemble``, with each timestep :math:`x_{j}` being a Firedrake ``Function``.
Expand Down Expand Up @@ -271,10 +273,10 @@ The observations are taken at intervals of :math:`T_{\textrm{stage}}=n_{t}\Delta

dt = Function(Vr).assign(Tstage/nt)
t = Function(Vr).zero()
z, = SpatialCoordinate(mesh)
x_, y_ = SpatialCoordinate(mesh)

cbar = Constant(0.2)
c = Function(V).project(1 + cbar*cos(2*pi*z))
c = Function(Vv).project(as_vector([1 + cbar*cos(2*pi*x_), 0.0]))

reynolds = 100
nu = Constant(1/reynolds)
Expand All @@ -283,18 +285,18 @@ The observations are taken at intervals of :math:`T_{\textrm{stage}}=n_{t}\Delta
v = TestFunction(V)

ubar = Constant(0.3)
reference_ic = Function(V).project(ubar*sin(2*pi*z))
reference_ic = Function(V).project(ubar*sin(2*pi*x_))

g = (
ubar*cos(2*pi*z)*(
- sin(2*pi*(z + 0.1*sin(2*pi*t)))
+ ubar*cos(2*pi*t + 1)*sin(2*pi*(3*z - 2*t))
ubar*cos(2*pi*x_)*(
- sin(2*pi*(y_ + 0.1*sin(2*pi*t)))
+ ubar*cos(2*pi*t + 1)*sin(2*pi*(3*x_ - 2*t))
)
)

F = (
inner(Dt(u), v)*dx
+ inner(c, u.dx(0))*v*dx
+ inner(dot(c, grad(u)), v)*dx
+ inner(nu*grad(u), grad(v))*dx
- inner(g, v)*dx(degree=4)
)
Expand Down Expand Up @@ -325,19 +327,48 @@ For convenience we make a Python function for the propagator :math:`\mathcal{M}(
t.assign(t + dt)
return stepper.u0.copy(deepcopy=True)

**Define the observation operator.**
**Define the observation operators.**

Our observations will be point evaluations at a set of random locations in the domain, which are defined using a :class:`~firedrake.mesh.VertexOnlyMesh`.
Here we will demonstrate different possibilities for the observation operator :math:`\mathcal{H}`.
For this example, our first observation will be a line integral across the domain, and the remaining observations will be
point evaluations at a set of random locations in the domain.
To compute the line integral in Firedrake, we define the line as a separate mesh and create a function space on this mesh.
We then cross-mesh interpolate our function onto the line.

::

line = UnitIntervalMesh(10, comm=ensemble.comm)
x, = SpatialCoordinate(line)
lfs = VectorFunctionSpace(line, "CG", 1, dim=2)
new_coords = Function(lfs).interpolate(as_vector([x, x]))
new_line = Mesh(new_coords)
CG_line = FunctionSpace(new_line, "CG", 2)

U_line = FunctionSpace(new_line, "R", 0)

def H_line(x):
c = assemble(interpolate(x, CG_line))
# Project into the real space on the line
a = inner(TestFunction(U_line), TrialFunction(U_line)) * dx(domain=new_line)
b = inner(TestFunction(U_line), c) * dx(domain=new_line)
u = Function(U_line)
solve(a == b, u)
return u


For the point evaluations, we construct a :class:`~firedrake.mesh.VertexOnlyMesh` with vertices at the observation locations.
The observation operator :math:`\mathcal{H}` is then simply interpolating onto this mesh.
After this, we will

::

stations = np.random.random_sample((20, 1))
stations = np.random.random_sample((20, 2))
vom = VertexOnlyMesh(mesh, stations)
U = FunctionSpace(vom, "DG", 0)
U_point = FunctionSpace(vom, "DG", 0)

def H_point(x):
return assemble(interpolate(x, U_point))

def H(x):
return assemble(interpolate(x, U))

**Define the error covariance operators.**

Expand All @@ -360,12 +391,13 @@ The variance of the model error is made proportional to the length of the observ
sigma_q = sqrt(1e-3*Tstage)
Q = AutoregressiveCovariance(V, L=0.05, sigma=sigma_q, m=2, seed=17)

The observations are treated as uncorrelated, i.e. a diagonal covariance operator, which is created by setting :math:`m=0`.
The observations are treated as uncorrelated, i.e. diagonal covariance operators, which are created by setting :math:`m=0`.

::

sigma_r = sqrt(1e-3)
R = AutoregressiveCovariance(U, L=0, sigma=sigma_r, m=0, seed=18)
R_line = AutoregressiveCovariance(U_line, L=0, sigma=sigma_r, m=0, seed=18)
R_point = AutoregressiveCovariance(U_point, L=0, sigma=sigma_r, m=0, seed=18)

Firedrake provides an abstract base class :class:`~firedrake.adjoint.covariance_operator.CovarianceOperatorBase` for implementing new covariance operators.

Expand Down Expand Up @@ -402,7 +434,7 @@ After running the local part of the timeseries on each ensemble member, this all
truth_ic = ensemble.bcast(xt, root=0).copy(deepcopy=True)

if ensemble_rank == 0:
y = [Function(U).assign(H(xt) + R.sample())]
y = [Function(U_line).assign(H_line(xt) + R_line.sample())]
else:
y = []

Expand All @@ -413,7 +445,7 @@ After running the local part of the timeseries on each ensemble member, this all

for _ in range(nlocal_stages):
xt.assign(M(xt) + Q.sample())
y.append(Function(U).assign(H(xt) + R.sample()))
y.append(Function(U_point).assign(H_point(xt) + R_point.sample()))

ctx.state.assign(xt)

Expand All @@ -425,7 +457,9 @@ Now that we have the "ground-truth" observations, we can create a function to ge
::

def observation_error(i):
return lambda x: Function(U).assign(H(x) - y[i])
if ensemble_rank == i:
return lambda x: Function(U_line).assign(H_line(x) - y[i])
return lambda x: Function(U_point).assign(H_point(x) - y[i])

**Define the reduced functional.**

Expand All @@ -445,7 +479,7 @@ To initialise it, it needs an :class:`~firedrake.ensemble.ensemble_function.Ense
Control(control),
background=xb,
background_covariance=B,
observation_covariance=R,
observation_covariance=R_line,
observation_error=observation_error(0),
gauss_newton=True)

Expand All @@ -465,7 +499,7 @@ For each ``stage``, we integrate forward from ``stage.control`` (i.e. :math:`x_{
stage.set_observation(
state=xn1,
observation_error=obs_error,
observation_covariance=R,
observation_covariance=R_line if ensemble_rank == stage.observation_index else R_point,
forward_model_covariance=Q)

pause_annotation()
Expand Down Expand Up @@ -655,14 +689,14 @@ At the initial and final conditions the optimised solution matches the ground tr
::

# Errors at initial timestep:
# prior_error = 6.723e-01
# xopts_error = 4.925e-02
# Error reduction factor = 7.326e-02
# prior_error = 6.682e-01
# xopts_error = 3.306e-01
# Error reduction factor = 4.947e-01

# Errors at final timestep:
# prior_error = 8.843e-01
# xopts_error = 4.333e-02
# Error reduction factor = 4.900e-02
# prior_error = 6.025e-01
# xopts_error = 1.813e-01
# Error reduction factor = 3.009e-01

A runnable python version of this demo can be found :demo:`here<data_assimilation.py>`.
This demo can be run in parallel as long as the number of observations stages :math:`N` is divisible by the number of MPI ranks.
14 changes: 7 additions & 7 deletions firedrake/ensemble/ensemble_functionspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Collection

from ufl.duals import is_primal, is_dual
from pyop2.mpi import MPI

Check failure on line 5 in firedrake/ensemble/ensemble_functionspace.py

View workflow job for this annotation

GitHub Actions / test / Lint codebase

F401

firedrake/ensemble/ensemble_functionspace.py:5:1: F401 'pyop2.mpi.MPI' imported but unused
from firedrake.petsc import PETSc
from firedrake.ensemble.ensemble import Ensemble
from firedrake.functionspace import MixedFunctionSpace
Expand Down Expand Up @@ -93,13 +93,13 @@
.ensemble_function.EnsembleCofunction
"""
def __init__(self, local_spaces: Collection, ensemble: Ensemble):
meshes = set(V.mesh().unique() for V in local_spaces)
nlocal_meshes = len(meshes)
max_local_meshes = ensemble.ensemble_comm.allreduce(nlocal_meshes, MPI.MAX)
if max_local_meshes > 1:
raise ValueError(
f"{self.__class__.__name__} local_spaces must all be defined on the same mesh.")
self._mesh = meshes.pop()
# meshes = set(V.mesh().unique() for V in local_spaces)
# nlocal_meshes = len(meshes)
# max_local_meshes = ensemble.ensemble_comm.allreduce(nlocal_meshes, MPI.MAX)
# if max_local_meshes > 1:
# raise ValueError(
# f"{self.__class__.__name__} local_spaces must all be defined on the same mesh.")
# self._mesh = meshes.pop()
self._ensemble = ensemble
self._local_spaces = tuple(local_spaces)

Expand Down
Loading