-
Notifications
You must be signed in to change notification settings - Fork 0
/
gaussian_diffusion.py
190 lines (161 loc) · 6.84 KB
/
gaussian_diffusion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import numpy as np
import tensorflow as tf
class GaussianDiffusion:
"""Gaussian diffusion utility.
Args:
beta_start: Start value of the scheduled variance
beta_end: End value of the scheduled variance
timesteps: Number of time steps in the forward process
"""
def __init__(
self,
beta_start=1e-4,
beta_end=0.02,
timesteps=1000,
clip_min=-1.0,
clip_max=1.0,
):
self.beta_start = beta_start
self.beta_end = beta_end
self.timesteps = timesteps
self.clip_min = clip_min
self.clip_max = clip_max
# Define the linear variance schedule
self.betas = betas = np.linspace(
beta_start,
beta_end,
timesteps,
dtype=np.float64, # Using float64 for better precision
)
self.num_timesteps = int(timesteps)
alphas = 1.0 - betas
alphas_cumprod = np.cumprod(alphas, axis=0)
alphas_cumprod_prev = np.append(1.0, alphas_cumprod[:-1])
self.betas = tf.constant(betas, dtype=tf.float32)
self.alphas_cumprod = tf.constant(alphas_cumprod, dtype=tf.float32)
self.alphas_cumprod_prev = tf.constant(alphas_cumprod_prev, dtype=tf.float32)
# Calculations for diffusion q(x_t | x_{t-1}) and others
self.sqrt_alphas_cumprod = tf.constant(
np.sqrt(alphas_cumprod), dtype=tf.float32
)
self.sqrt_one_minus_alphas_cumprod = tf.constant(
np.sqrt(1.0 - alphas_cumprod), dtype=tf.float32
)
self.log_one_minus_alphas_cumprod = tf.constant(
np.log(1.0 - alphas_cumprod), dtype=tf.float32
)
self.sqrt_recip_alphas_cumprod = tf.constant(
np.sqrt(1.0 / alphas_cumprod), dtype=tf.float32
)
self.sqrt_recipm1_alphas_cumprod = tf.constant(
np.sqrt(1.0 / alphas_cumprod - 1), dtype=tf.float32
)
# Calculations for posterior q(x_{t-1} | x_t, x_0)
posterior_variance = (
betas * (1.0 - alphas_cumprod_prev) / (1.0 - alphas_cumprod)
)
self.posterior_variance = tf.constant(posterior_variance, dtype=tf.float32)
# Log calculation clipped because the posterior variance is 0 at the beginning
# of the diffusion chain
self.posterior_log_variance_clipped = tf.constant(
np.log(np.maximum(posterior_variance, 1e-20)), dtype=tf.float32
)
self.posterior_mean_coef1 = tf.constant(
betas * np.sqrt(alphas_cumprod_prev) / (1.0 - alphas_cumprod),
dtype=tf.float32,
)
self.posterior_mean_coef2 = tf.constant(
(1.0 - alphas_cumprod_prev) * np.sqrt(alphas) / (1.0 - alphas_cumprod),
dtype=tf.float32,
)
def _extract(self, a, t, x_shape):
"""Extract some coefficients at specified timesteps,
then reshape to [batch_size, 1, 1, 1, 1, ...] for broadcasting purposes.
Args:
a: Tensor to extract from
t: Timestep for which the coefficients are to be extracted
x_shape: Shape of the current batched samples
"""
batch_size = x_shape[0]
out = tf.gather(a, t)
return tf.reshape(out, [batch_size, 1, 1])
def q_mean_variance(self, x_start, t):
"""Extracts the mean, and the variance at current timestep.
Args:
x_start: Initial sample (before the first diffusion step)
t: Current timestep
"""
x_start_shape = tf.shape(x_start)
mean = self._extract(self.sqrt_alphas_cumprod, t, x_start_shape) * x_start
variance = self._extract(1.0 - self.alphas_cumprod, t, x_start_shape)
log_variance = self._extract(
self.log_one_minus_alphas_cumprod, t, x_start_shape
)
return mean, variance, log_variance
def q_sample(self, x_start, t, noise):
"""Diffuse the data.
Args:
x_start: Initial sample (before the first diffusion step)
t: Current timestep
noise: Gaussian noise to be added at the current timestep
Returns:
Diffused samples at timestep `t`
"""
x_start_shape = tf.shape(x_start)
return (
self._extract(self.sqrt_alphas_cumprod, t, tf.shape(x_start)) * x_start
+ self._extract(self.sqrt_one_minus_alphas_cumprod, t, x_start_shape)
* noise
)
def predict_start_from_noise(self, x_t, t, noise):
x_t_shape = tf.shape(x_t)
return (
self._extract(self.sqrt_recip_alphas_cumprod, t, x_t_shape) * x_t
- self._extract(self.sqrt_recipm1_alphas_cumprod, t, x_t_shape) * noise
)
def q_posterior(self, x_start, x_t, t):
"""Compute the mean and variance of the diffusion
posterior q(x_{t-1} | x_t, x_0).
Args:
x_start: Stating point(sample) for the posterior computation
x_t: Sample at timestep `t`
t: Current timestep
Returns:
Posterior mean and variance at current timestep
"""
x_t_shape = tf.shape(x_t)
posterior_mean = (
self._extract(self.posterior_mean_coef1, t, x_t_shape) * x_start
+ self._extract(self.posterior_mean_coef2, t, x_t_shape) * x_t
)
posterior_variance = self._extract(self.posterior_variance, t, x_t_shape)
posterior_log_variance_clipped = self._extract(
self.posterior_log_variance_clipped, t, x_t_shape
)
return posterior_mean, posterior_variance, posterior_log_variance_clipped
def p_mean_variance(self, pred_noise, x, t, clip_denoised=True):
x_recon = self.predict_start_from_noise(x, t=t, noise=pred_noise)
if clip_denoised:
x_recon = tf.clip_by_value(x_recon, self.clip_min, self.clip_max)
model_mean, posterior_variance, posterior_log_variance = self.q_posterior(
x_start=x_recon, x_t=x, t=t
)
return model_mean, posterior_variance, posterior_log_variance
def p_sample(self, pred_noise, x, t, clip_denoised=True):
"""Sample from the diffusion model.
Args:
pred_noise: Noise predicted by the diffusion model
x: Samples at a given timestep for which the noise was predicted
t: Current timestep
clip_denoised (bool): Whether to clip the predicted noise
within the specified range or not.
"""
model_mean, _, model_log_variance = self.p_mean_variance(
pred_noise, x=x, t=t, clip_denoised=clip_denoised
)
noise = tf.random.normal(shape=x.shape, dtype=x.dtype)
# No noise when t == 0
nonzero_mask = tf.reshape(
1 - tf.cast(tf.equal(t, 0), tf.float32), [tf.shape(x)[0], 1, 1]
)
return model_mean + nonzero_mask * tf.exp(0.5 * model_log_variance) * noise