|
| 1 | +import numpy as np |
| 2 | + |
| 3 | +class VAE: |
| 4 | + def __init__(self, input_dim, hidden_dim, latent_dim, learning_rate=0.01): |
| 5 | + self.input_dim = input_dim |
| 6 | + self.hidden_dim = hidden_dim |
| 7 | + self.latent_dim = latent_dim |
| 8 | + self.learning_rate = learning_rate |
| 9 | + |
| 10 | + # Encoder weights and biases |
| 11 | + self.W1 = np.random.randn(input_dim, hidden_dim) * 0.01 |
| 12 | + self.b1 = np.zeros(hidden_dim) |
| 13 | + |
| 14 | + self.W_mu = np.random.randn(hidden_dim, latent_dim) * 0.01 |
| 15 | + self.b_mu = np.zeros(latent_dim) |
| 16 | + |
| 17 | + self.W_logvar = np.random.randn(hidden_dim, latent_dim) * 0.01 |
| 18 | + self.b_logvar = np.zeros(latent_dim) |
| 19 | + |
| 20 | + # Decoder weights and biases |
| 21 | + self.W2 = np.random.randn(latent_dim, hidden_dim) * 0.01 |
| 22 | + self.b2 = np.zeros(hidden_dim) |
| 23 | + |
| 24 | + self.W_out = np.random.randn(hidden_dim, input_dim) * 0.01 |
| 25 | + self.b_out = np.zeros(input_dim) |
| 26 | + |
| 27 | + def sigmoid(self, x): |
| 28 | + return 1 / (1 + np.exp(-x)) |
| 29 | + |
| 30 | + def sigmoid_derivative(self, x): |
| 31 | + s = self.sigmoid(x) |
| 32 | + return s * (1 - s) |
| 33 | + |
| 34 | + def binary_cross_entropy(self, recon_x, x): |
| 35 | + eps = 1e-8 |
| 36 | + return -np.sum(x * np.log(recon_x + eps) + (1 - x) * np.log(1 - recon_x + eps)) |
| 37 | + |
| 38 | + def encode(self, x): |
| 39 | + h = self.sigmoid(np.dot(x, self.W1) + self.b1) |
| 40 | + mu = np.dot(h, self.W_mu) + self.b_mu |
| 41 | + logvar = np.dot(h, self.W_logvar) + self.b_logvar |
| 42 | + return mu, logvar, h |
| 43 | + |
| 44 | + def reparameterize(self, mu, logvar): |
| 45 | + std = np.exp(0.5 * logvar) |
| 46 | + eps = np.random.randn(*mu.shape) |
| 47 | + return mu + eps * std |
| 48 | + |
| 49 | + def decode(self, z): |
| 50 | + h_dec = self.sigmoid(np.dot(z, self.W2) + self.b2) |
| 51 | + x_recon = self.sigmoid(np.dot(h_dec, self.W_out) + self.b_out) |
| 52 | + return x_recon, h_dec |
| 53 | + |
| 54 | + def compute_loss(self, x, x_recon, mu, logvar): |
| 55 | + bce = self.binary_cross_entropy(x_recon, x) |
| 56 | + kl = -0.5 * np.sum(1 + logvar - mu**2 - np.exp(logvar)) |
| 57 | + return bce + kl |
| 58 | + |
| 59 | + def train(self, data, epochs=100, batch_size=10): |
| 60 | + data = np.array(data) |
| 61 | + n_samples = data.shape[0] |
| 62 | + |
| 63 | + for epoch in range(epochs): |
| 64 | + indices = np.random.permutation(n_samples) |
| 65 | + total_loss = 0 |
| 66 | + |
| 67 | + for i in range(0, n_samples, batch_size): |
| 68 | + batch = data[indices[i:i + batch_size]] |
| 69 | + grads = self._compute_gradients(batch) |
| 70 | + self._update_parameters(grads) |
| 71 | + total_loss += grads['loss'] |
| 72 | + |
| 73 | + avg_loss = total_loss / n_samples |
| 74 | + print(f"Epoch {epoch+1}/{epochs} - Loss: {avg_loss:.4f}") |
| 75 | + |
| 76 | + def _compute_gradients(self, x): |
| 77 | + m = x.shape[0] |
| 78 | + |
| 79 | + # Forward pass |
| 80 | + mu, logvar, h_enc = self.encode(x) |
| 81 | + z = self.reparameterize(mu, logvar) |
| 82 | + x_recon, h_dec = self.decode(z) |
| 83 | + |
| 84 | + # Loss |
| 85 | + loss = self.compute_loss(x, x_recon, mu, logvar) |
| 86 | + |
| 87 | + # Backpropagation (simplified SGD) |
| 88 | + # Output layer |
| 89 | + delta_out = x_recon - x # (m, input_dim) |
| 90 | + dW_out = np.dot(h_dec.T, delta_out) / m |
| 91 | + db_out = np.mean(delta_out, axis=0) |
| 92 | + |
| 93 | + # Decoder hidden layer |
| 94 | + delta_dec = np.dot(delta_out, self.W_out.T) * self.sigmoid_derivative(np.dot(z, self.W2) + self.b2) |
| 95 | + dW2 = np.dot(z.T, delta_dec) / m |
| 96 | + db2 = np.mean(delta_dec, axis=0) |
| 97 | + |
| 98 | + # Latent space gradients |
| 99 | + dz = np.dot(delta_dec, self.W2.T) |
| 100 | + |
| 101 | + dmu = dz + mu / m |
| 102 | + dlogvar = 0.5 * dz * (np.exp(0.5 * logvar)) / m |
| 103 | + |
| 104 | + # Encoder hidden layer |
| 105 | + dh = (np.dot(dmu, self.W_mu.T) + np.dot(dlogvar, self.W_logvar.T)) * self.sigmoid_derivative(np.dot(x, self.W1) + self.b1) |
| 106 | + |
| 107 | + dW_mu = np.dot(h_enc.T, dmu) / m |
| 108 | + db_mu = np.mean(dmu, axis=0) |
| 109 | + |
| 110 | + dW_logvar = np.dot(h_enc.T, dlogvar) / m |
| 111 | + db_logvar = np.mean(dlogvar, axis=0) |
| 112 | + |
| 113 | + dW1 = np.dot(x.T, dh) / m |
| 114 | + db1 = np.mean(dh, axis=0) |
| 115 | + |
| 116 | + return { |
| 117 | + 'dW1': dW1, 'db1': db1, |
| 118 | + 'dW_mu': dW_mu, 'db_mu': db_mu, |
| 119 | + 'dW_logvar': dW_logvar, 'db_logvar': db_logvar, |
| 120 | + 'dW2': dW2, 'db2': db2, |
| 121 | + 'dW_out': dW_out, 'db_out': db_out, |
| 122 | + 'loss': loss |
| 123 | + } |
| 124 | + |
| 125 | + def _update_parameters(self, grads): |
| 126 | + self.W1 -= self.learning_rate * grads['dW1'] |
| 127 | + self.b1 -= self.learning_rate * grads['db1'] |
| 128 | + |
| 129 | + self.W_mu -= self.learning_rate * grads['dW_mu'] |
| 130 | + self.b_mu -= self.learning_rate * grads['db_mu'] |
| 131 | + |
| 132 | + self.W_logvar -= self.learning_rate * grads['dW_logvar'] |
| 133 | + self.b_logvar -= self.learning_rate * grads['db_logvar'] |
| 134 | + |
| 135 | + self.W2 -= self.learning_rate * grads['dW2'] |
| 136 | + self.b2 -= self.learning_rate * grads['db2'] |
| 137 | + |
| 138 | + self.W_out -= self.learning_rate * grads['dW_out'] |
| 139 | + self.b_out -= self.learning_rate * grads['db_out'] |
| 140 | + |
| 141 | + def reconstruct(self, x): |
| 142 | + mu, logvar, _ = self.encode(x) |
| 143 | + z = self.reparameterize(mu, logvar) |
| 144 | + x_recon, _ = self.decode(z) |
| 145 | + return x_recon |
| 146 | + |
| 147 | + def sample(self, n_samples=1): |
| 148 | + z = np.random.randn(n_samples, self.latent_dim) |
| 149 | + x_recon, _ = self.decode(z) |
| 150 | + return x_recon |
| 151 | + |
| 152 | + |
| 153 | +# Create synthetic binary data (patterns) |
| 154 | +np.random.seed(42) |
| 155 | +data = np.random.binomial(n=1, p=0.5, size=(100, 6)) |
| 156 | + |
| 157 | +# Initialize VAE: input=6, hidden=4, latent=2 |
| 158 | +vae = VAE(input_dim=6, hidden_dim=4, latent_dim=2, learning_rate=0.1) |
| 159 | + |
| 160 | +# Train |
| 161 | +vae.train(data, epochs=50, batch_size=10) |
| 162 | + |
| 163 | +# Reconstruct |
| 164 | +x_test = data[0] |
| 165 | +x_recon = vae.reconstruct(x_test.reshape(1, -1)) |
| 166 | +print("\nOriginal: ", x_test) |
| 167 | +print("Reconstructed:", np.round(x_recon[0], 2)) |
| 168 | + |
| 169 | +# Generate new samples |
| 170 | +samples = vae.sample(n_samples=3) |
| 171 | +print("\nGenerated samples:") |
| 172 | +print(np.round(samples, 2)) |
| 173 | + |
0 commit comments