X = np.random.randn(100, 5)
y = np.dot(X, np.array([1, 2, 3, 4, 5]))\
+ np.random.randn(100) * 0.1
model = SGD(lr=0.01, epochs=1000,
batch_size=32, tol=1e-3)
w,b=model.fit(X,y)
y_pred = w*X+b
输出
Epoch 0: Loss 64.66196845798673
Epoch 100: Loss 0.03999940087439455
Epoch 200: Loss 0.008260358272771882
Epoch 300: Loss 0.00823731979566282
Epoch 400: Loss 0.008243022613956992
Epoch 500: Loss 0.008239370268212335
Epoch 600: Loss 0.008236363304624746
Epoch 700: Loss 0.00823205131002819
Epoch 800: Loss 0.00823566681302786
Epoch 900: Loss 0.008237441485197143
这种获取值并根据不同参数调整它们以减少损失函数的循环称为反向传播。
TensorFlow实现此算法
import tensorflow as tf
import numpy as np
class SGD:
def __init__(self, lr=0.001, epochs=2000, batch_size=32, tol=1e-3):
self.learning_rate = lr
self.epochs = epochs
self.batch_size = batch_size
self.tolerance = tol
self.weights = None
self.bias = None
def predict(self, X):
return tf.matmul(X, self.weights) + self.bias
def mean_squared_error(self, y_true, y_pred):
return tf.reduce_mean(tf.square(y_true - y_pred))
def gradient(self, X_batch, y_batch):
with tf.GradientTape() as tape:
y_pred = self.predict(X_batch)
loss = self.mean_squared_error(y_batch, y_pred)
gradient_weights, gradient_bias = tape.gradient(loss, [self.weights, self.bias])
return gradient_weights, gradient_bias
def fit(self, X, y):
n_samples, n_features = X.shape
self.weights = tf.Variable(tf.random.normal((n_features, 1)))
self.bias = tf.Variable(tf.random.normal(()))
for epoch in range(self.epochs):
indices = tf.random.shuffle(tf.range(n_samples))
X_shuffled = tf.gather(X, indices)
y_shuffled = tf.gather(y, indices)
for i in range(0, n_samples, self.batch_size):
X_batch = X_shuffled[i:i+self.batch_size]
y_batch = y_shuffled[i:i+self.batch_size]
gradient_weights, gradient_bias = self.gradient(X_batch, y_batch)
# Gradient clipping
gradient_weights = tf.clip_by_value(gradient_weights, -1, 1)
gradient_bias = tf.clip_by_value(gradient_bias, -1, 1)
self.weights.assign_sub(self.learning_rate * gradient_weights)
self.bias.assign_sub(self.learning_rate * gradient_bias)
if epoch % 100 == 0:
y_pred = self.predict(X)
loss = self.mean_squared_error(y, y_pred)
print(f"Epoch {epoch}: Loss {loss}")
if tf.norm(gradient_weights) < self.tolerance:
print("Convergence reached.")
break
return self.weights.numpy(), self.bias.numpy()
X = np.random.randn(100, 5).astype(np.float32)
y = np.dot(X, np.array([1, 2, 3, 4, 5], dtype=np.float32)) + np.random.randn(100).astype(np.float32) * 0.1
model = SGD(lr=0.005, epochs=1000, batch_size=12, tol=1e-3)
w, b = model.fit(X, y)
y_pred = np.dot(X, w) + b
输出
Epoch 0: Loss 52.73115158081055
Epoch 100: Loss 44.69907760620117
Epoch 200: Loss 44.693603515625
Epoch 300: Loss 44.69377136230469
Epoch 400: Loss 44.67509460449219
Epoch 500: Loss 44.67082595825195
Epoch 600: Loss 44.674285888671875
Epoch 700: Loss 44.666194915771484
Epoch 800: Loss 44.66718292236328
Epoch 900: Loss 44.65559005737305