🎯要点
神经求解器求解对偶方程,并学习两个空间之间的单调变换,最小化它们之间的瓦瑟斯坦距离。
🍪语言内容分比
🍇Python瓦瑟斯坦距离
在数学中,瓦瑟斯坦距离是在给定度量空间 M 上的概率分布之间定义的距离函数。其定义是:
令 (M,d)为度量空间,该空间是波兰空间。对于 p∈[1,+∞],有限 p 矩的M上两个概率测度 μ和 ν之间的瓦瑟斯坦 p-距离为
Wp(μ,ν)=γ∈Γ(μ,ν)inf(E(x,y)∼γd(x,y)p)1/p 其中 Γ(μ,ν)是 μ 和 ν 的所有耦合的集合; W∞(μ,ν) 被定义为 limp→+∞Wp(μ,ν) 并对应于最高范数。耦合 γ 是 M×M 上的联合概率测度,其边际分别为第一个和第二个因子的 μ和 ν。也就是说,对于所有可测量的 A⊂M ,耦合满足
∫A∫Mγ(x,y)dydx=μ(A),∫A∫Mγ(x,y)dxdy=ν(A). 瓦瑟斯坦距离示例
瓦瑟斯坦距离,也称为“推土机距离”。假设 P = (0.2, 0.1, 0.0, 0.0, 0.3, 0.4) 和 Q = (0.0, 0.5, 0.3, 0.0, 0.2, 0.0, 0.0)。如果您将分布 P 视为土堆,将分布 Q 视为洞,则瓦瑟斯坦距离就是将 P 中的所有土转移到 Q 中的洞所需的最小工作量。
在每次转移中,所做的功的量是流量(移动的泥土量)乘以距离。瓦瑟斯坦距离是所做的功的总量。换言之,两个分布之间的瓦瑟斯坦距离是将一个分布转换为另一个分布所需的工作量。
瓦瑟斯坦距离有很多不同的变体。有适用于离散分布的版本,也有适用于数学上连续分布的版本。有的版本中每个数据点都是一个值,例如 0.3,有的版本中每个数据点都是多值,例如 (0.4, 0.8)。本文介绍的 瓦瑟斯坦距离版本适用于两个离散的一维概率分布,是 1-瓦瑟斯坦版本。
该演示将 my_wasserstein() 函数定义为:
def my_wasserstein(p, q):
dirt = np.copy(p)
holes = np.copy(q)
tot_work = 0.0
while True:
from_idx = first_nonzero(dirt)
to_idx = first_nonzero(holes)
if from_idx == -1 or to_idx == -1:
break
work = move_dirt(dirt, from_idx, holes, to_idx)
tot_work += work
return tot_work
大部分算法工作由辅助函数 first_nonzero() 和 move_dirt() 完成。简而言之,该函数先找到第一个可用的泥土,然后找到第一个未填满的洞,然后将尽可能多的泥土移到洞中,并返回已完成的工作量。每次转移时完成的工作量都会累积起来。
def first_nonzero(vec):
dim = len(vec)
for i in range(dim):
if vec[i] > 0.0:
return i
return -1
def move_dirt(dirt, di, holes, hi):
if dirt[di] <= holes[hi]:
flow = dirt[di]
dirt[di] = 0.0
holes[hi] -= flow
elif dirt[di] > holes[hi]:
flow = holes[hi]
dirt[di] -= flow
holes[hi] = 0.0
dist = np.abs(di - hi)
return flow * dist
完整代码
import numpy as np
def first_nonzero(vec):
dim = len(vec)
for i in range(dim):
if vec[i] > 0.0:
return i
return -1
def move_dirt(dirt, di, holes, hi):
if dirt[di] <= holes[hi]:
flow = dirt[di]
dirt[di] = 0.0
holes[hi] -= flow
elif dirt[di] > holes[hi]:
flow = holes[hi]
dirt[di] -= flow
holes[hi] = 0.0
dist = np.abs(di - hi)
return flow * dist
def my_wasserstein(p, q):
dirt = np.copy(p)
holes = np.copy(q)
tot_work = 0.0
while True:
from_idx = first_nonzero(dirt)
to_idx = first_nonzero(holes)
if from_idx == -1 or to_idx == -1:
break
work = move_dirt(dirt, from_idx, holes, to_idx)
tot_work += work
return tot_work
def kullback_leibler(p, q):
n = len(p)
sum = 0.0
for i in range(n):
sum += p[i] * np.log(p[i] / q[i])
return sum
def symmetric_kullback(p, q):
a = kullback_leibler(p, q)
b = kullback_leibler(q, p)
return a + b
def main():
print("\nBegin Wasserstein distance demo ")
P = np.array([0.6, 0.1, 0.1, 0.1, 0.1])
Q1 = np.array([0.1, 0.1, 0.6, 0.1, 0.1])
Q2 = np.array([0.1, 0.1, 0.1, 0.1, 0.6])
kl_p_q1 = symmetric_kullback(P, Q1)
kl_p_q2 = symmetric_kullback(P, Q2)
wass_p_q1 = my_wasserstein(P, Q1)
wass_p_q2 = my_wasserstein(P, Q2)
print("\nKullback-Leibler distances: ")
print("P to Q1 : %0.4f " % kl_p_q1)
print("P to Q2 : %0.4f " % kl_p_q2)
print("\nWasserstein distances: ")
print("P to Q1 : %0.4f " % wass_p_q1)
print("P to Q2 : %0.4f " % wass_p_q2)
print("\nEnd demo ")
if __name__ == "__main__":
main()