瓦瑟斯坦距离,也称为“推土机距离”。假设 P = (0.2, 0.1, 0.0, 0.0, 0.3, 0.4) 和 Q = (0.0, 0.5, 0.3, 0.0, 0.2, 0.0, 0.0)。如果您将分布 P 视为土堆,将分布 Q 视为洞,则瓦瑟斯坦距离就是将 P 中的所有土转移到 Q 中的洞所需的最小工作量。
在每次转移中,所做的功的量是流量(移动的泥土量)乘以距离。瓦瑟斯坦距离是所做的功的总量。换言之,两个分布之间的瓦瑟斯坦距离是将一个分布转换为另一个分布所需的工作量。
瓦瑟斯坦距离有很多不同的变体。有适用于离散分布的版本,也有适用于数学上连续分布的版本。有的版本中每个数据点都是一个值,例如 0.3,有的版本中每个数据点都是多值,例如 (0.4, 0.8)。本文介绍的 瓦瑟斯坦距离版本适用于两个离散的一维概率分布,是 1-瓦瑟斯坦版本。
def my_wasserstein(p, q):
dirt = np.copy(p)
holes = np.copy(q)
tot_work = 0.0
while True:
from_idx = first_nonzero(dirt)
to_idx = first_nonzero(holes)
if from_idx == -1 or to_idx == -1:
break
work = move_dirt(dirt, from_idx, holes, to_idx)
tot_work += work
return tot_work
大部分算法工作由辅助函数 first_nonzero() 和 move_dirt() 完成。简而言之,该函数先找到第一个可用的泥土,然后找到第一个未填满的洞,然后将尽可能多的泥土移到洞中,并返回已完成的工作量。每次转移时完成的工作量都会累积起来。
def first_nonzero(vec):
dim = len(vec)
for i in range(dim):
if vec[i] > 0.0:
return i
return -1
def move_dirt(dirt, di, holes, hi):
if dirt[di] <= holes[hi]:
flow = dirt[di]
dirt[di] = 0.0
holes[hi] -= flow
elif dirt[di] > holes[hi]:
flow = holes[hi]
dirt[di] -= flow
holes[hi] = 0.0
dist = np.abs(di - hi)
return flow * dist
import numpy as np
def first_nonzero(vec):
dim = len(vec)
for i in range(dim):
if vec[i] > 0.0:
return i
return -1
def move_dirt(dirt, di, holes, hi):
if dirt[di] <= holes[hi]:
flow = dirt[di]
dirt[di] = 0.0
holes[hi] -= flow
elif dirt[di] > holes[hi]:
flow = holes[hi]
dirt[di] -= flow
holes[hi] = 0.0
dist = np.abs(di - hi)
return flow * dist
def my_wasserstein(p, q):
dirt = np.copy(p)
holes = np.copy(q)
tot_work = 0.0
while True:
from_idx = first_nonzero(dirt)
to_idx = first_nonzero(holes)
if from_idx == -1 or to_idx == -1:
break
work = move_dirt(dirt, from_idx, holes, to_idx)
tot_work += work
return tot_work
def kullback_leibler(p, q):
n = len(p)
sum = 0.0
for i in range(n):
sum += p[i] * np.log(p[i] / q[i])
return sum
def symmetric_kullback(p, q):
a = kullback_leibler(p, q)
b = kullback_leibler(q, p)
return a + b
def main():
print("\nBegin Wasserstein distance demo ")
P = np.array([0.6, 0.1, 0.1, 0.1, 0.1])
Q1 = np.array([0.1, 0.1, 0.6, 0.1, 0.1])
Q2 = np.array([0.1, 0.1, 0.1, 0.1, 0.6])
kl_p_q1 = symmetric_kullback(P, Q1)
kl_p_q2 = symmetric_kullback(P, Q2)
wass_p_q1 = my_wasserstein(P, Q1)
wass_p_q2 = my_wasserstein(P, Q2)
print("\nKullback-Leibler distances: ")
print("P to Q1 : %0.4f " % kl_p_q1)
print("P to Q2 : %0.4f " % kl_p_q2)
print("\nWasserstein distances: ")
print("P to Q1 : %0.4f " % wass_p_q1)
print("P to Q2 : %0.4f " % wass_p_q2)
print("\nEnd demo ")
if __name__ == "__main__":
main()