PDE3: PINNs – How Do Neural Networks Solve PDEs? 神经网络如何求解偏微分方程?
PDEs are not so difference from ODEs: 就我个人而言, 处理 ODE 时感觉比较轻松, 但见到 PDE 时总感觉有莫名的心里负担. 直到我意识到: ODE 要求的函数是单变量的函数, 如果是多变量的函数呢? 这就是 PDE 啊! 比如热传导方程, 要求解的函数是 \(u(x,t)\), 可是 \(x\) 和 \(t\) 可以认为是等地位的! PDE 只是 ODE 的自然推广而已 (好像这是废话耶, 被自己蠢到了 (///▽///)).
1 Motivation
- Naive method: 比如求解一维热传导方程时, 需要输入位置和时间 \((x,t)\), 输出温度 \(u\), 直接设计一个 2 输入 1 输出的神经网络, 然后收集很多 \((x,t,u)\) 数据集进行训练. 再比如求解 Navier-Stokes 方程时, 同样需要输入位置和时间 \((x,y,z,t)\), 输出速度和压力 \((u,v,w,p)\), 直接设计一个 4 输入 4 输出的神经网络, 然后收集 \((x,y,z,t,u,v,w,p)\) 数据集进行训练.
- 这样需要大量的测量数据! 但是不同于 LLM, 热量的传导和流体的流动我们已经有非常精确的 PDE 来描述了, 那为什么还要用神经网络? 是因为它们很难解哈哈! 但是设计神经网络的时候, 它是完全不知道这些 PDE 的存在的, 有没有什么方法将 PDE 的知识注入到神经网络?
- PINNs: Idea 很简单, 由于 PDE 里面那些算子恰好都是输出值对输入值的微分 (一阶、二阶或高阶), 而神经网络恰好可以用 autograd 来完成输出对输入的微分!!! Autograd 算出来的结果一般都不满足 PDE, 对吧, 会有一个残差 (residual), 那我们就把这个残差加入 Loss 函数让它去优化, 这样神经网络就会在训练过程中逐渐满足 PDE 的约束了!
- 这样所需的测量数据就会很少 (在下面的 hands-on experiment 里我们甚至都没有用任何测量数据, 只用 PDE 的残差来训练神经网络, 毕竟任何满足 PDE 的函数都是热传导方程的解嘛, 只需要对 PDE 进行 “蒸馏” 就可以了).
- 当然一般求解 PDE 还需要给定初始条件和边界条件, 这些也可以加入 Loss 函数中进行优化, 这样神经网络就会同时满足 PDE、初始条件和边界条件的约束了!
2 Heat Equation
这里简单复习一下 1D 的热传导方程: \[ \boxed{u_t = \alpha u_{xx}} \]
简单起见, 假设初始状态是 \(u(x,0)=\sin \pi x\) (见 Figure 1), 边界条件就是 \(x=1,-1\) 处的温度恒为 \(0\). 由于正弦函数是空间二阶导算子的特征向量, 显然在任意时刻的温度分布都是一个 \(\sin\) 函数, 而且是指数衰减的, 所以我们可以轻松地写出解析解:
\[ u(x,t) = \underbrace{e^{-\alpha \pi^2 t}}_{\mathclap{\scriptsize \text{exponential decay}}} \sin \pi x \]
也就是说, 在任意位置和时间 \((x,t)\), 我们都能解析地算出温度, 我们会用这个结果与 PINN 预测的结果进行对比:
ALPHA = 1.0 # Thermal diffusivity
# Function to compute exact solution for verification
def exact_solution(x, t):
return -np.exp(-ALPHA * np.pi**2 * t) * np.sin(np.pi * x)3 Collocation Points 生成
这里我们所有的训练数据都只用 Collocation Points 而没有用实际测量的数据.
# N_IC = 4: Number of initial state points
# N_BC = 3: Number of boundary condition points
# N_PDE = 100: Number of collocation points for PDE residual
def generate_data(N_IC, N_BC, N_PDE):
# a) Initial Condition Data (u(x, 0) = -sin(\pi * x))
x_ic = np.random.uniform(-1, 1, (N_IC, 1))
t_ic = np.zeros((N_IC, 1))
u_ic = -np.sin(np.pi * x_ic)
# b) Boundary Condition Data (u(\pm 1, t) = 0)
# Left boundary (x = -1)
x_bc_left = -np.ones((N_BC // 2, 1))
t_bc_left = np.random.uniform(0, 1, (N_BC // 2, 1))
u_bc_left = np.zeros((N_BC // 2, 1))
# Right boundary (x = 1)
x_bc_right = np.ones((N_BC // 2, 1))
t_bc_right = np.random.uniform(0, 1, (N_BC // 2, 1))
u_bc_right = np.zeros((N_BC // 2, 1))
# c) Collocation Points for PDE Residual (Latin Hypercube Sampling)
# Define bounds: [x_min, t_min], [x_max, t_max]
l_bounds = [-1.0, 0.0]
u_bounds = [1.0, 1.0]
# Create an LHS sampler
sampler = qmc.LatinHypercube(d=2) # 2 dimensions (x, t)
sample = sampler.random(n=N_PDE)
# Scale samples to the domain bounds
points_pde = qmc.scale(sample, l_bounds, u_bounds)
x_pde = points_pde[:, 0:1]
t_pde = points_pde[:, 1:2]
# Convert everything to PyTorch Tensors and move to device
def to_tensor(arr, requires_grad=False):
return torch.tensor(arr, dtype=torch.float32, device=device, requires_grad=requires_grad)
X_ic = to_tensor(x_ic)
T_ic = to_tensor(t_ic)
U_ic = to_tensor(u_ic)
X_bc = to_tensor(np.vstack([x_bc_left, x_bc_right]))
T_bc = to_tensor(np.vstack([t_bc_left, t_bc_right]))
U_bc = to_tensor(np.vstack([u_bc_left, u_bc_right]))
# PDE points need requires_grad=True for autograd
X_pde = to_tensor(x_pde, requires_grad=True)
T_pde = to_tensor(t_pde, requires_grad=True)
return X_ic, T_ic, U_ic, X_bc, T_bc, U_bc, X_pde, T_pde4 Neural Network 设计
输入位置和时间 \((x,t)\), 输出温度值 \(u\), 不妨就用简单的 MLP 来实现 (具体层数和每层的神经元数量可以通过输入 layers 参数来调整):
class PINN(nn.Module):
def __init__(self, layers):
super(PINN, self).__init__()
# layers is a list of hidden layer sizes, e.g., [2, 20, 20, 20, 1]
self.linears = nn.ModuleList()
for i in range(len(layers) - 1):
self.linears.append(nn.Linear(layers[i], layers[i+1]))
# Weight initialization (helpful for PINNs)
nn.init.xavier_normal_(self.linears[-1].weight)
nn.init.zeros_(self.linears[-1].bias)
# Activation function (Tanh is very common for smooth solutions like heat flow)
self.activation = nn.Tanh()
def forward(self, x, t):
# Input concatenation: [x, t]
inputs = torch.cat([x, t], dim=1)
for i in range(len(self.linears) - 1):
inputs = self.activation(self.linears[i](inputs))
# Last layer doesn't need activation
output = self.linears[-1](inputs)
return output下文称这个神经网络为 \(u_\theta\).
5 Training
定义三种 Loss:
Initial Condition Loss: \[ L_1 = \text{MSE}(u_\theta(\texttt{X\_ic}), \texttt{U\_ic}) \]
Boundary Condition Loss: \[ L_2 = \text{MSE}(u_\theta(\texttt{X\_bc}), \texttt{U\_bc}) \]
PDE Residual Loss: \[ L_3 = \text{MSE}\left(\frac{\partial \texttt{u\_pde}}{\partial t}, \alpha \frac{\partial^2 \texttt{u\_pde}}{\partial x^2}\right) \] where \(\texttt{u\_pde} := u_\theta(\texttt{X\_pde}, \texttt{T\_pde})\).
- 这里我们用自动微分来计算 \(\partial \texttt{u\_pde} / \partial t\) 和 \(\partial^2 \texttt{u\_pde} / \partial x^2\).
Total Loss: 三者的加权和: \[ L = w_1 L_1 + w_2 L_2 + w_3 L_3 \]
- \(w_1, w_2, w_3\) 是用来平衡各项损失权重的超参数.
下面的代码没有用 Mini-batch 来训练而是用了整个数据集 (因为数据量很小).
# 返回 epoch 长度的 loss history
def train(pinn, X_ic, T_ic, U_ic, X_bc, T_bc, U_bc, X_pde, T_pde, epochs=5000, lr=1e-3, w_pde=1.0, w_ic=10.0, w_bc=10.0):
optimizer = optim.Adam(pinn.parameters(), lr=lr)
# Track losses
losses = []
for epoch in range(epochs):
optimizer.zero_grad()
# a) Compute losses from initial condition
u_ic_pred = pinn(X_ic, T_ic)
loss_ic = torch.mean((u_ic_pred - U_ic)**2)
# b) Compute losses from boundary conditions
u_bc_pred = pinn(X_bc, T_bc)
loss_bc = torch.mean((u_bc_pred - U_bc)**2)
# c) Compute losses from PDE residual at collocation points
u_pde = pinn(X_pde, T_pde)
# Compute partial derivatives using torch.autograd.grad
u_t = torch.autograd.grad(u_pde, T_pde, torch.ones_like(u_pde), create_graph=True)[0]
u_x = torch.autograd.grad(u_pde, X_pde, torch.ones_like(u_pde), create_graph=True)[0]
u_xx = torch.autograd.grad(u_x, X_pde, torch.ones_like(u_x), create_graph=True)[0]
# PDE residual f = u_t - \ALPHA * u_xx
residual_pde = u_t - ALPHA * u_xx
loss_pde = torch.mean(residual_pde**2)
# Total Loss with weights
total_loss = w_pde * loss_pde + w_ic * loss_ic + w_bc * loss_bc
losses.append(total_loss.item())
total_loss.backward()
optimizer.step()
# Log progress every 500 epochs
if epoch % 500 == 0:
print(f"Epoch {epoch:5d}: Loss = {total_loss.item():.4e} (IC: {loss_ic.item():.2e}, BC: {loss_bc.item():.2e}, PDE: {loss_pde.item():.2e})")
return losses6 Results
我们在:
N_IC = 50
N_BC = 4
N_PDE = 5000
pinn_layers = [2, 20, 20, 20, 20, 1]且 \[ w_1 = 10, \quad w_2 = 10, \quad w_3 = 1 \]
的配置下训练了 500 个 epoch, 下面是训练结果:
7 Takeaways
为了求解一个有初始条件和边界条件的 PDE, 你甚至不需要任何测量数据, 只需要选取不同的自变量点, 比如有两个自变量 \((x,t)\), 就用 meshgrid 随机生成一些 \((x,t)\) 组合, 让神经网络前向传播, 然后用自动微分算出输出对输入的微分, 计算 PDE 的残差, 把残差加入 Loss 函数进行优化. 特别的, 初始条件其实就是 PDE 在 \(t=0\) 时的一个特殊约束, 边界条件也是 PDE 在 \(x=-1\) 和 \(x=1\) 时的特殊约束, 这些都可以直接加入 Loss 函数进行优化.
注意: 这样训练出来的网络只能用来求固定初始条件和边界条件的热传导方程的解!!! 如果想要训练一个能适用于不同初始条件和边界条件的网络, 就需要在网络和训练数据里面注入初始条件和边界条件, 如何注入呢? 我今后会用它引入 DeepONet 和 FNO 这种 operator learning.
pinn.py
import os
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from scipy.stats import qmc
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter
# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# --- 1. Problem Setup ---
# 1D Heat Equation: u_t = ALPHA * u_xx
# Domain: x in [-1, 1], t in [0, 1]
# IC: u(x, 0) = -sin(pi * x)
# BC: u(-1, t) = 0, u(1, t) = 0
ALPHA = 1.0
MODEL_PATH = "pinn_heat.pth"
GIF_PATH = "heat_sine.gif"
# Function to compute exact solution for verification
def exact_solution(x, t):
return -np.exp(-ALPHA * np.pi**2 * t) * np.sin(np.pi * x)
# --- 2. Data Generation ---
def generate_data(N_IC, N_BC, N_PDE):
# a) Initial Condition Data
x_ic = np.random.uniform(-1, 1, (N_IC, 1))
t_ic = np.zeros((N_IC, 1))
u_ic = -np.sin(np.pi * x_ic)
# b) Boundary Condition Data
x_bc_left = -np.ones((N_BC // 2, 1))
t_bc_left = np.random.uniform(0, 1, (N_BC // 2, 1))
u_bc_left = np.zeros((N_BC // 2, 1))
x_bc_right = np.ones((N_BC // 2, 1))
t_bc_right = np.random.uniform(0, 1, (N_BC // 2, 1))
u_bc_right = np.zeros((N_BC // 2, 1))
# c) PDE collocation points (Latin Hypercube Sampling)
l_bounds = [-1.0, 0.0]
u_bounds = [1.0, 1.0]
sampler = qmc.LatinHypercube(d=2)
sample = sampler.random(n=N_PDE)
points_pde = qmc.scale(sample, l_bounds, u_bounds)
x_pde = points_pde[:, 0:1]
t_pde = points_pde[:, 1:2]
def to_tensor(arr, requires_grad=False):
return torch.tensor(
arr, dtype=torch.float32, device=device, requires_grad=requires_grad
)
X_ic = to_tensor(x_ic)
T_ic = to_tensor(t_ic)
U_ic = to_tensor(u_ic)
X_bc = to_tensor(np.vstack([x_bc_left, x_bc_right]))
T_bc = to_tensor(np.vstack([t_bc_left, t_bc_right]))
U_bc = to_tensor(np.vstack([u_bc_left, u_bc_right]))
X_pde = to_tensor(x_pde, requires_grad=True)
T_pde = to_tensor(t_pde, requires_grad=True)
return X_ic, T_ic, U_ic, X_bc, T_bc, U_bc, X_pde, T_pde
# --- 3. Neural Network Design (PINN) ---
class PINN(nn.Module):
def __init__(self, layers):
super(PINN, self).__init__()
self.linears = nn.ModuleList()
for i in range(len(layers) - 1):
layer = nn.Linear(layers[i], layers[i + 1])
nn.init.xavier_normal_(layer.weight)
nn.init.zeros_(layer.bias)
self.linears.append(layer)
self.activation = nn.Tanh()
def forward(self, x, t):
inputs = torch.cat([x, t], dim=1)
for i in range(len(self.linears) - 1):
inputs = self.activation(self.linears[i](inputs))
output = self.linears[-1](inputs)
return output
# --- 4. Training Function ---
def train(
pinn,
X_ic,
T_ic,
U_ic,
X_bc,
T_bc,
U_bc,
X_pde,
T_pde,
epochs=5000,
lr=1e-3,
w_pde=1.0,
w_ic=10.0,
w_bc=10.0,
):
optimizer = optim.Adam(pinn.parameters(), lr=lr)
losses = []
pinn.train()
for epoch in range(epochs):
optimizer.zero_grad()
# Initial condition loss
u_ic_pred = pinn(X_ic, T_ic)
loss_ic = torch.mean((u_ic_pred - U_ic) ** 2)
# Boundary condition loss
u_bc_pred = pinn(X_bc, T_bc)
loss_bc = torch.mean((u_bc_pred - U_bc) ** 2)
# PDE residual loss
u_pde = pinn(X_pde, T_pde)
u_t = torch.autograd.grad(
u_pde, T_pde, torch.ones_like(u_pde), create_graph=True
)[0]
u_x = torch.autograd.grad(
u_pde, X_pde, torch.ones_like(u_pde), create_graph=True
)[0]
u_xx = torch.autograd.grad(
u_x, X_pde, torch.ones_like(u_x), create_graph=True
)[0]
residual_pde = u_t - ALPHA * u_xx
loss_pde = torch.mean(residual_pde ** 2)
total_loss = w_pde * loss_pde + w_ic * loss_ic + w_bc * loss_bc
losses.append(total_loss.item())
total_loss.backward()
optimizer.step()
if epoch % 100 == 0:
print(
f"Epoch {epoch:5d}: Loss = {total_loss.item():.4e} "
f"(IC: {loss_ic.item():.2e}, BC: {loss_bc.item():.2e}, PDE: {loss_pde.item():.2e})"
)
return losses
# --- 5. Save / Load ---
def save_checkpoint(model, loss_history, path=MODEL_PATH):
torch.save(
{
"model_state_dict": model.state_dict(),
"loss_history": loss_history,
},
path,
)
print(f"Model checkpoint saved to: {path}")
def load_checkpoint(model, path=MODEL_PATH):
checkpoint = torch.load(path, map_location=device)
model.load_state_dict(checkpoint["model_state_dict"])
loss_history = checkpoint.get("loss_history", [])
print(f"Loaded model checkpoint from: {path}")
return loss_history
# --- 6. Evaluation ---
def predict_on_grid(model, nx=100, nt=100):
x_test = np.linspace(-1, 1, nx)
t_test = np.linspace(0, 1, nt)
X_test, T_test = np.meshgrid(x_test, t_test)
X_test_flat = torch.tensor(
X_test.flatten()[:, None], dtype=torch.float32, device=device
)
T_test_flat = torch.tensor(
T_test.flatten()[:, None], dtype=torch.float32, device=device
)
model.eval()
with torch.no_grad():
U_pred_flat = model(X_test_flat, T_test_flat)
U_pred = U_pred_flat.cpu().numpy().reshape(nt, nx)
U_exact = exact_solution(X_test, T_test)
U_err = np.abs(U_exact - U_pred)
return x_test, t_test, X_test, T_test, U_exact, U_pred, U_err
# --- 7. Static Plots ---
def plot_heatmaps(X_test, T_test, U_exact, U_pred, U_err):
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
vmin = min(U_exact.min(), U_pred.min())
vmax = max(U_exact.max(), U_pred.max())
# Exact solution
im0 = axes[0].pcolormesh(T_test, X_test, U_exact, cmap="seismic", shading="auto", vmin=vmin, vmax=vmax)
axes[0].set_title("Exact Solution")
axes[0].set_xlabel("Time t")
axes[0].set_ylabel("Position x")
fig.colorbar(im0, ax=axes[0])
# PINN prediction
im1 = axes[1].pcolormesh(T_test, X_test, U_pred, cmap="seismic", shading="auto", vmin=vmin, vmax=vmax)
axes[1].set_title("PINN Prediction")
axes[1].set_xlabel("Time t")
axes[1].set_ylabel("Position x")
fig.colorbar(im1, ax=axes[1])
# Absolute error
im2 = axes[2].pcolormesh(T_test, X_test, U_err, cmap="viridis", shading="auto")
axes[2].set_title("Absolute Error")
axes[2].set_xlabel("Time t")
axes[2].set_ylabel("Position x")
fig.colorbar(im2, ax=axes[2])
plt.tight_layout()
plt.show()
def plot_loss_history(loss_history):
plt.figure(figsize=(8, 5))
plt.plot(loss_history, linewidth=1.5)
plt.xlabel("Epoch")
plt.ylabel("Total Loss")
plt.yscale("log")
plt.title("Loss History")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# --- 8. Animated GIF ---
def save_animation_gif(x_test, t_test, U_exact, U_pred, U_err, gif_path=GIF_PATH):
"""
这里做的是“时间切片动画”:
每一帧固定一个 t,画出沿着 x 的三条/三类分布:
1) exact temperature
2) predicted temperature
3) absolute error
你要求温度纵坐标和 x 范围一致,所以温度子图设置 y in [-1, 1]。
你原话里写 [-1,-1] 是错误区间,这里按 [-1,1] 实现。
如果你更想看清 error,可以把 ax_err.set_ylim(0, U_err.max()*1.1)。
"""
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
ax_temp, ax_err = axes
line_exact, = ax_temp.plot([], [], lw=2, label="Exact", color="tab:blue")
line_pred, = ax_temp.plot([], [], lw=2, label="Prediction", color="tab:orange")
line_err, = ax_err.plot([], [], lw=2, label="Abs Error", color="tab:red")
# Temperature subplot (exact vs prediction on same axes)
ax_temp.set_title("Temperature Evolution (Exact vs PINN)")
ax_temp.set_xlabel("Position x")
ax_temp.set_ylabel("Temperature u(x,t)")
ax_temp.set_xlim(-1, 1)
ax_temp.set_ylim(-1, 1)
ax_temp.grid(True, alpha=0.3)
ax_temp.legend(loc="upper right")
# Error subplot
ax_err.set_title("Absolute Error Evolution")
ax_err.set_xlabel("Position x")
ax_err.set_ylabel("|u_exact - u_pred|")
ax_err.set_xlim(-1, 1)
# Automatically adjust y-limits based on error range
ax_err.set_ylim(0, U_err.max() * 1.1)
ax_err.grid(True, alpha=0.3)
ax_err.legend(loc="upper right")
time_text = fig.suptitle("", fontsize=14)
def init():
line_exact.set_data([], [])
line_pred.set_data([], [])
line_err.set_data([], [])
time_text.set_text("")
return line_exact, line_pred, line_err, time_text
def update(frame):
y_exact = U_exact[frame, :]
y_pred = U_pred[frame, :]
y_err = U_err[frame, :]
line_exact.set_data(x_test, y_exact)
line_pred.set_data(x_test, y_pred)
line_err.set_data(x_test, y_err)
time_text.set_text(f"Time evolution at t = {t_test[frame]:.3f}")
return line_exact, line_pred, line_err, time_text
anim = FuncAnimation(
fig,
update,
frames=len(t_test),
init_func=init,
blit=True,
interval=80,
repeat=True,
)
anim.save(gif_path, writer=PillowWriter(fps=12))
plt.close(fig)
print(f"Animated GIF saved to: {gif_path}")
# --- Main Execution ---
if __name__ == "__main__":
# Settings
N_IC_points = 50
N_BC_points = 4
N_PDE_points = 5000
pinn_layers = [2, 20, 20, 20, 20, 1]
# Reproducibility
torch.manual_seed(42)
np.random.seed(42)
# 1. Generate Data
print("Generating data...")
X_ic, T_ic, U_ic, X_bc, T_bc, U_bc, X_pde, T_pde = generate_data(
N_IC_points, N_BC_points, N_PDE_points
)
# 2. Design Network
print("Designing network...")
pinn_model = PINN(pinn_layers).to(device)
# 3. Train or Load
if os.path.exists(MODEL_PATH):
print(f"Found existing model file: {MODEL_PATH}")
loss_history = load_checkpoint(pinn_model, MODEL_PATH)
else:
print("No existing model file found. Starting training...")
loss_history = train(
pinn_model,
X_ic,
T_ic,
U_ic,
X_bc,
T_bc,
U_bc,
X_pde,
T_pde,
epochs=500,
lr=1e-3,
w_ic=100.0,
w_bc=100.0,
)
save_checkpoint(pinn_model, loss_history, MODEL_PATH)
# 4. Evaluation
print("Evaluating...")
x_test, t_test, X_test, T_test, U_exact, U_pred, U_err = predict_on_grid(
pinn_model, nx=100, nt=100
)
# 5. Static plots: (a)
plot_heatmaps(X_test, T_test, U_exact, U_pred, U_err)
# 6. Loss history: (b)
plot_loss_history(loss_history)
# 7. Animated GIF: (c)
save_animation_gif(x_test, t_test, U_exact, U_pred, U_err, GIF_PATH)