"""The SPSA attack
"""
# pylint: disable=missing-docstring
import warnings
import numpy as np
from six.moves import xrange
import tensorflow as tf
# import tensorflow_addons as tfa
from cleverhans.attacks.attack import Attack
from cleverhans.compat import reduce_mean, reduce_sum, reduce_max
from cleverhans.model import Model
from cleverhans import utils_tf
tf_dtype = tf.as_dtype('float32')
[docs]class SPSA(Attack):
"""
This implements the SPSA adversary, as in https://arxiv.org/abs/1802.05666
(Uesato et al. 2018). SPSA is a gradient-free optimization method, which
is useful when the model is non-differentiable, or more generally, the
gradients do not point in useful directions.
:param model: cleverhans.model.Model
:param sess: optional tf.Session
:param dtypestr: dtype of the data
:param kwargs: passed through to super constructor
"""
DEFAULT_SPSA_SAMPLES = 128
DEFAULT_SPSA_ITERS = 1
DEFAULT_DELTA = 0.01
DEFAULT_LEARNING_RATE = 0.01
def __init__(self, model, sess=None, dtypestr='float32', **kwargs):
super(SPSA, self).__init__(model, sess, dtypestr, **kwargs)
self.feedable_kwargs = ('eps', 'clip_min', 'clip_max', 'y', 'y_target')
self.structural_kwargs = [
'nb_iter',
'spsa_samples',
'spsa_iters',
'early_stop_loss_threshold',
'is_debug',
'is_targeted',
]
assert isinstance(self.model, Model)
[docs] def generate(self,
x,
y=None,
y_target=None,
eps=None,
clip_min=None,
clip_max=None,
nb_iter=None,
is_targeted=None,
early_stop_loss_threshold=None,
learning_rate=DEFAULT_LEARNING_RATE,
delta=DEFAULT_DELTA,
spsa_samples=DEFAULT_SPSA_SAMPLES,
batch_size=None,
spsa_iters=DEFAULT_SPSA_ITERS,
is_debug=False,
epsilon=None,
num_steps=None):
"""
Generate symbolic graph for adversarial examples.
:param x: The model's symbolic inputs. Must be a batch of size 1.
:param y: A Tensor or None. The index of the correct label.
:param y_target: A Tensor or None. The index of the target label in a
targeted attack.
:param eps: The size of the maximum perturbation, measured in the
L-infinity norm.
:param clip_min: If specified, the minimum input value
:param clip_max: If specified, the maximum input value
:param nb_iter: The number of optimization steps.
:param early_stop_loss_threshold: A float or None. If specified, the
attack will end as soon as the loss
is below `early_stop_loss_threshold`.
:param learning_rate: Learning rate of ADAM optimizer.
:param delta: Perturbation size used for SPSA approximation.
:param spsa_samples: Number of inputs to evaluate at a single time.
The true batch size (the number of evaluated
inputs for each update) is `spsa_samples *
spsa_iters`
:param batch_size: Deprecated param that is an alias for spsa_samples
:param spsa_iters: Number of model evaluations before performing an
update, where each evaluation is on `spsa_samples`
different inputs.
:param is_debug: If True, print the adversarial loss after each update.
:param epsilon: Deprecated alias for `eps`
:param num_steps: Deprecated alias for `nb_iter`.
:param is_targeted: Deprecated argument. Ignored.
"""
if epsilon is not None:
if eps is not None:
raise ValueError("Should not specify both eps and its deprecated "
"alias, epsilon")
warnings.warn("`epsilon` is deprecated. Switch to `eps`. `epsilon` may "
"be removed on or after 2019-04-15.")
eps = epsilon
del epsilon
if num_steps is not None:
if nb_iter is not None:
raise ValueError("Should not specify both nb_iter and its deprecated "
"alias, num_steps")
warnings.warn("`num_steps` is deprecated. Switch to `nb_iter`. "
"`num_steps` may be removed on or after 2019-04-15.")
nb_iter = num_steps
del num_steps
assert nb_iter is not None
if (y is not None) + (y_target is not None) != 1:
raise ValueError("Must specify exactly one of y (untargeted attack, "
"cause the input not to be classified as this true "
"label) and y_target (targeted attack, cause the "
"input to be classified as this target label).")
if is_targeted is not None:
warnings.warn("`is_targeted` is deprecated. Simply do not specify it."
" It may become an error to specify it on or after "
"2019-04-15.")
assert is_targeted == y_target is not None
is_targeted = y_target is not None
if x.get_shape().as_list()[0] is None:
check_batch = utils_tf.assert_equal(tf.shape(x)[0], 1)
with tf.control_dependencies([check_batch]):
x = tf.identity(x)
elif x.get_shape().as_list()[0] != 1:
raise ValueError("For SPSA, input tensor x must have batch_size of 1.")
if batch_size is not None:
warnings.warn(
'The "batch_size" argument to SPSA is deprecated, and will '
'be removed on 2019-03-17. '
'Please use spsa_samples instead.')
spsa_samples = batch_size
optimizer = SPSAAdam(
lr=learning_rate,
delta=delta,
num_samples=spsa_samples,
num_iters=spsa_iters)
def loss_fn(x, label):
"""
Margin logit loss, with correct sign for targeted vs untargeted loss.
"""
logits = self.model.get_logits(x)
loss_multiplier = 1 if is_targeted else -1
return loss_multiplier * margin_logit_loss(
logits, label,
nb_classes=self.model.nb_classes or logits.get_shape()[-1])
y_attack = y_target if is_targeted else y
adv_x = projected_optimization(
loss_fn,
x,
y_attack,
eps,
num_steps=nb_iter,
optimizer=optimizer,
early_stop_loss_threshold=early_stop_loss_threshold,
is_debug=is_debug,
clip_min=clip_min,
clip_max=clip_max
)
return adv_x
[docs] def generate_np(self, x_val, **kwargs):
if "epsilon" in kwargs:
warnings.warn("Using deprecated argument: see `generate`")
assert "eps" not in kwargs
kwargs["eps"] = kwargs["epsilon"]
del kwargs["epsilon"]
assert "eps" in kwargs
if "num_steps" in kwargs:
warnings.warn("Using deprecated argument: see `generate`")
assert "nb_iter" not in kwargs
kwargs["nb_iter"] = kwargs["num_steps"]
del kwargs["num_steps"]
if 'y' in kwargs and kwargs['y'] is not None:
assert kwargs['y'].dtype in [np.int32, np.int64]
if 'y_target' in kwargs and kwargs['y_target'] is not None:
assert kwargs['y_target'].dtype in [np.int32, np.int64]
# Call self.generate() sequentially for each image in the batch
x_adv = []
batch_size = x_val.shape[0]
y = kwargs.pop('y', [None] * batch_size)
assert len(x_val) == len(y), '# of images and labels should match'
for x_single, y_single in zip(x_val, y):
x = np.expand_dims(x_single, axis=0)
adv_img = super(SPSA, self).generate_np(x, y=y_single, **kwargs)
x_adv.append(adv_img)
return np.concatenate(x_adv, axis=0)
def _project_perturbation(perturbation, epsilon, input_image, clip_min=None,
clip_max=None):
"""Project `perturbation` onto L-infinity ball of radius `epsilon`.
Also project into hypercube such that the resulting adversarial example
is between clip_min and clip_max, if applicable.
"""
if clip_min is None or clip_max is None:
raise NotImplementedError("_project_perturbation currently has clipping "
"hard-coded in.")
# Ensure inputs are in the correct range
with tf.control_dependencies([
utils_tf.assert_less_equal(input_image,
tf.cast(clip_max, input_image.dtype)),
utils_tf.assert_greater_equal(input_image,
tf.cast(clip_min, input_image.dtype))
]):
clipped_perturbation = utils_tf.clip_by_value(
perturbation, -epsilon, epsilon)
new_image = utils_tf.clip_by_value(
input_image + clipped_perturbation, clip_min, clip_max)
return new_image - input_image
class TensorOptimizer(object):
"""Optimizer for Tensors rather than tf.Variables.
TensorOptimizers implement optimizers where the values being optimized
are ordinary Tensors, rather than Variables. TF Variables can have strange
behaviors when being assigned multiple times within a single sess.run()
call, particularly in Distributed TF, so this avoids thinking about those
issues. These are helper classes for the `projected_optimization`
method. Apart from not using Variables, they follow an interface very
similar to tf.Optimizer.
"""
def _compute_gradients(self, loss_fn, x, unused_optim_state):
"""Compute a new value of `x` to minimize `loss_fn`.
Args:
loss_fn: a callable that takes `x`, a batch of images, and returns
a batch of loss values. `x` will be optimized to minimize
`loss_fn(x)`.
x: A list of Tensors, the values to be updated. This is analogous
to the `var_list` argument in standard TF Optimizer.
unused_optim_state: A (possibly nested) dict, containing any state
info needed for the optimizer.
Returns:
new_x: A list of Tensors, the same length as `x`, which are updated
new_optim_state: A dict, with the same structure as `optim_state`,
which have been updated.
"""
# Assumes `x` is a list,
# and contains a tensor representing a batch of images
assert len(x) == 1 and isinstance(x, list), \
'x should be a list and contain only one image tensor'
x = x[0]
loss = reduce_mean(loss_fn(x), axis=0)
return tf.gradients(loss, x)
def _apply_gradients(self, grads, x, optim_state):
"""
Given a gradient, make one optimization step.
:param grads: list of tensors, same length as `x`, containing the corresponding gradients
:param x: list of tensors to update
:param optim_state: dict
Returns:
new_x: list of tensors, updated version of `x`
new_optim_state: dict, updated version of `optim_state`
"""
raise NotImplementedError(
"_apply_gradients should be defined in each subclass")
def minimize(self, loss_fn, x, optim_state):
"""
Analogous to tf.Optimizer.minimize
:param loss_fn: tf Tensor, representing the loss to minimize
:param x: list of Tensor, analogous to tf.Optimizer's var_list
:param optim_state: A possibly nested dict, containing any optimizer state.
Returns:
new_x: list of Tensor, updated version of `x`
new_optim_state: dict, updated version of `optim_state`
"""
grads = self._compute_gradients(loss_fn, x, optim_state)
return self._apply_gradients(grads, x, optim_state)
def init_state(self, x):
"""Returns the initial state of the optimizer.
Args:
x: A list of Tensors, which will be optimized.
Returns:
A dictionary, representing the initial state of the optimizer.
"""
raise NotImplementedError(
"init_state should be defined in each subclass")
class TensorGradientDescent(TensorOptimizer):
"""Vanilla Gradient Descent TensorOptimizer."""
def __init__(self, lr):
self._lr = lr
def init_state(self, x):
return {}
def _apply_gradients(self, grads, x, optim_state):
new_x = [None] * len(x)
for i in xrange(len(x)):
new_x[i] = x[i] - self._lr * grads[i]
return new_x, optim_state
class TensorAdam(TensorOptimizer):
"""The Adam optimizer defined in https://arxiv.org/abs/1412.6980."""
def __init__(self, lr=0.001, beta1=0.9, beta2=0.999, epsilon=1e-9):
self._lr = lr
self._beta1 = beta1
self._beta2 = beta2
self._epsilon = epsilon
def init_state(self, x):
"""
Initialize t, m, and u
"""
optim_state = {}
optim_state["t"] = 0.
optim_state["m"] = [tf.zeros_like(v) for v in x]
optim_state["u"] = [tf.zeros_like(v) for v in x]
return optim_state
def _apply_gradients(self, grads, x, optim_state):
"""Refer to parent class documentation."""
new_x = [None] * len(x)
new_optim_state = {
"t": optim_state["t"] + 1.,
"m": [None] * len(x),
"u": [None] * len(x)
}
t = new_optim_state["t"]
for i in xrange(len(x)):
g = grads[i]
m_old = optim_state["m"][i]
u_old = optim_state["u"][i]
new_optim_state["m"][i] = (
self._beta1 * m_old + (1. - self._beta1) * g)
new_optim_state["u"][i] = (
self._beta2 * u_old + (1. - self._beta2) * g * g)
m_hat = new_optim_state["m"][i] / (1. - tf.pow(self._beta1, t))
u_hat = new_optim_state["u"][i] / (1. - tf.pow(self._beta2, t))
new_x[i] = (
x[i] - self._lr * m_hat / (tf.sqrt(u_hat) + self._epsilon))
return new_x, new_optim_state
class SPSAAdam(TensorAdam):
"""Optimizer for gradient-free attacks in https://arxiv.org/abs/1802.05666.
Gradients estimates are computed using Simultaneous Perturbation Stochastic
Approximation (SPSA), combined with the ADAM update rule.
"""
def __init__(self,
lr=0.01,
delta=0.01,
num_samples=128,
num_iters=1,
compare_to_analytic_grad=False):
super(SPSAAdam, self).__init__(lr=lr)
assert num_samples % 2 == 0, "number of samples must be even"
self._delta = delta
self._num_samples = num_samples // 2 # Since we mirror +/- delta later
self._num_iters = num_iters
self._compare_to_analytic_grad = compare_to_analytic_grad
def _get_delta(self, x, delta):
x_shape = x.get_shape().as_list()
delta_x = delta * tf.sign(
tf.random_uniform(
[self._num_samples] + x_shape[1:],
minval=-1.,
maxval=1.,
dtype=tf_dtype))
return delta_x
def _compute_gradients(self, loss_fn, x, unused_optim_state):
"""Compute gradient estimates using SPSA."""
# Assumes `x` is a list, containing a [1, H, W, C] image
# If static batch dimension is None, tf.reshape to batch size 1
# so that static shape can be inferred
assert len(x) == 1
static_x_shape = x[0].get_shape().as_list()
if static_x_shape[0] is None:
x[0] = tf.reshape(x[0], [1] + static_x_shape[1:])
assert x[0].get_shape().as_list()[0] == 1
x = x[0]
x_shape = x.get_shape().as_list()
def body(i, grad_array):
delta = self._delta
delta_x = self._get_delta(x, delta)
delta_x = tf.concat([delta_x, -delta_x], axis=0)
loss_vals = tf.reshape(
loss_fn(x + delta_x),
[2 * self._num_samples] + [1] * (len(x_shape) - 1))
avg_grad = reduce_mean(loss_vals * delta_x, axis=0) / delta
avg_grad = tf.expand_dims(avg_grad, axis=0)
new_grad_array = grad_array.write(i, avg_grad)
return i + 1, new_grad_array
def cond(i, _):
return i < self._num_iters
_, all_grads = tf.while_loop(
cond,
body,
loop_vars=[
0, tf.TensorArray(size=self._num_iters, dtype=tf_dtype)
],
back_prop=False,
parallel_iterations=1)
avg_grad = reduce_sum(all_grads.stack(), axis=0)
return [avg_grad]
def margin_logit_loss(model_logits, label, nb_classes=10, num_classes=None):
"""Computes difference between logit for `label` and next highest logit.
The loss is high when `label` is unlikely (targeted by default).
This follows the same interface as `loss_fn` for TensorOptimizer and
projected_optimization, i.e. it returns a batch of loss values.
"""
if num_classes is not None:
warnings.warn("`num_classes` is depreciated. Switch to `nb_classes`."
" `num_classes` may be removed on or after 2019-04-23.")
nb_classes = num_classes
del num_classes
if 'int' in str(label.dtype):
logit_mask = tf.one_hot(label, depth=nb_classes, axis=-1)
else:
logit_mask = label
if 'int' in str(logit_mask.dtype):
logit_mask = tf.to_float(logit_mask)
try:
label_logits = reduce_sum(logit_mask * model_logits, axis=-1)
except TypeError:
raise TypeError("Could not take row-wise dot product between "
"logit mask, of dtype " + str(logit_mask.dtype)
+ " and model_logits, of dtype "
+ str(model_logits.dtype))
logits_with_target_label_neg_inf = model_logits - logit_mask * 99999
highest_nonlabel_logits = reduce_max(
logits_with_target_label_neg_inf, axis=-1)
loss = highest_nonlabel_logits - label_logits
return loss
def _apply_black_border(x, border_size):
orig_height = x.get_shape().as_list()[1]
orig_width = x.get_shape().as_list()[2]
x = tf.image.resize_images(x, (orig_width - 2*border_size,
orig_height - 2*border_size))
return tf.pad(x, [[0, 0],
[border_size, border_size],
[border_size, border_size],
[0, 0]], 'CONSTANT')
def _apply_transformation(inputs):
x, trans = inputs[0], inputs[1]
dx, dy, angle = trans[0], trans[1], trans[2]
height = x.get_shape().as_list()[1]
width = x.get_shape().as_list()[2]
# Pad the image to prevent two-step rotation / translation from truncating
# corners
max_dist_from_center = np.sqrt(height**2+width**2) / 2
min_edge_from_center = float(np.min([height, width])) / 2
padding = np.ceil(max_dist_from_center -
min_edge_from_center).astype(np.int32)
x = tf.pad(x, [[0, 0],
[padding, padding],
[padding, padding],
[0, 0]],
'CONSTANT')
# Apply rotation
angle *= np.pi / 180
x = tfa.image.rotate(x, angle, interpolation='BILINEAR')
# Apply translation
dx_in_px = -dx * height
dy_in_px = -dy * width
translation = tf.convert_to_tensor([dx_in_px, dy_in_px])
try:
x = tfa.image.translate(x, translation, interpolation='BILINEAR')
except AttributeError as e:
print("WARNING: SpatialAttack requires tf 1.6 or higher")
raise e
x = tfa.image.translate(x, translation, interpolation='BILINEAR')
return tf.image.resize_image_with_crop_or_pad(x, height, width)
def spm(x, model, y=None, n_samples=None, dx_min=-0.1,
dx_max=0.1, n_dxs=5, dy_min=-0.1, dy_max=0.1, n_dys=5,
angle_min=-30, angle_max=30, n_angles=31, black_border_size=0):
"""
TensorFlow implementation of the Spatial Transformation Method.
:return: a tensor for the adversarial example
"""
if y is None:
preds = model.get_probs(x)
# Using model predictions as ground truth to avoid label leaking
preds_max = reduce_max(preds, 1, keepdims=True)
y = tf.to_float(tf.equal(preds, preds_max))
y = tf.stop_gradient(y)
del preds
y = y / reduce_sum(y, 1, keepdims=True)
# Define the range of transformations
dxs = np.linspace(dx_min, dx_max, n_dxs)
dys = np.linspace(dy_min, dy_max, n_dys)
angles = np.linspace(angle_min, angle_max, n_angles)
if n_samples is None:
import itertools
transforms = list(itertools.product(*[dxs, dys, angles]))
else:
sampled_dxs = np.random.choice(dxs, n_samples)
sampled_dys = np.random.choice(dys, n_samples)
sampled_angles = np.random.choice(angles, n_samples)
transforms = zip(sampled_dxs, sampled_dys, sampled_angles)
transformed_ims = parallel_apply_transformations(
x, transforms, black_border_size)
def _compute_xent(x):
preds = model.get_logits(x)
return tf.nn.softmax_cross_entropy_with_logits_v2(
labels=y, logits=preds)
all_xents = tf.map_fn(
_compute_xent,
transformed_ims,
parallel_iterations=1) # Must be 1 to avoid keras race conditions
# Return the adv_x with worst accuracy
# all_xents is n_total_samples x batch_size (SB)
all_xents = tf.stack(all_xents) # SB
# We want the worst case sample, with the largest xent_loss
worst_sample_idx = tf.argmax(all_xents, axis=0) # B
batch_size = tf.shape(x)[0]
keys = tf.stack([
tf.range(batch_size, dtype=tf.int32),
tf.cast(worst_sample_idx, tf.int32)
], axis=1)
transformed_ims_bshwc = tf.einsum('sbhwc->bshwc', transformed_ims)
after_lookup = tf.gather_nd(transformed_ims_bshwc, keys) # BHWC
return after_lookup
def parallel_apply_transformations(x, transforms, black_border_size=0):
"""
Apply image transformations in parallel.
:param transforms: TODO
:param black_border_size: int, size of black border to apply
Returns:
Transformed images
"""
transforms = tf.convert_to_tensor(transforms, dtype=tf.float32)
x = _apply_black_border(x, black_border_size)
num_transforms = transforms.get_shape().as_list()[0]
im_shape = x.get_shape().as_list()[1:]
# Pass a copy of x and a transformation to each iteration of the map_fn
# callable
tiled_x = tf.reshape(
tf.tile(x, [num_transforms, 1, 1, 1]),
[num_transforms, -1] + im_shape)
elems = [tiled_x, transforms]
transformed_ims = tf.map_fn(
_apply_transformation,
elems,
dtype=tf.float32,
parallel_iterations=1, # Must be 1 to avoid keras race conditions
)
return transformed_ims
[docs]def projected_optimization(loss_fn,
input_image,
label,
epsilon,
num_steps,
clip_min=None,
clip_max=None,
optimizer=TensorAdam(),
project_perturbation=_project_perturbation,
early_stop_loss_threshold=None,
is_debug=False):
"""Generic projected optimization, generalized to work with approximate
gradients. Used for e.g. the SPSA attack.
Args:
:param loss_fn: A callable which takes `input_image` and `label` as
arguments, and returns a batch of loss values. Same
interface as TensorOptimizer.
:param input_image: Tensor, a batch of images
:param label: Tensor, a batch of labels
:param epsilon: float, the L-infinity norm of the maximum allowable
perturbation
:param num_steps: int, the number of steps of gradient descent
:param clip_min: float, minimum pixel value
:param clip_max: float, maximum pixel value
:param optimizer: A `TensorOptimizer` object
:param project_perturbation: A function, which will be used to enforce
some constraint. It should have the same
signature as `_project_perturbation`.
:param early_stop_loss_threshold: A float or None. If specified, the attack will end if the loss is below
`early_stop_loss_threshold`.
Enabling this option can have several different effects:
- Setting the threshold to 0. guarantees that if a successful attack is found, it is returned.
This increases the attack success rate, because without early stopping the optimizer can accidentally
bounce back to a point where the attack fails.
- Early stopping can make the attack run faster because it may run for fewer steps.
- Early stopping can make the attack run slower because the loss must be calculated at each step.
The loss is not calculated as part of the normal SPSA optimization procedure.
For most reasonable choices of hyperparameters, early stopping makes the attack much faster because
it decreases the number of steps dramatically.
:param is_debug: A bool. If True, print debug info for attack progress.
Returns:
adversarial version of `input_image`, with L-infinity difference less than
epsilon, which tries to minimize loss_fn.
Note that this function is not intended as an Attack by itself. Rather, it
is designed as a helper function which you can use to write your own attack
methods. The method uses a tf.while_loop to optimize a loss function in
a single sess.run() call.
"""
assert num_steps is not None
if is_debug:
with tf.device("/cpu:0"):
input_image = tf.Print(
input_image, [],
"Starting PGD attack with epsilon: %s" % epsilon)
init_perturbation = tf.random_uniform(
tf.shape(input_image),
minval=tf.cast(-epsilon, input_image.dtype),
maxval=tf.cast(epsilon, input_image.dtype),
dtype=input_image.dtype)
init_perturbation = project_perturbation(init_perturbation, epsilon,
input_image, clip_min=clip_min,
clip_max=clip_max)
init_optim_state = optimizer.init_state([init_perturbation])
nest = tf.nest
def loop_body(i, perturbation, flat_optim_state):
"""Update perturbation to input image."""
optim_state = nest.pack_sequence_as(
structure=init_optim_state, flat_sequence=flat_optim_state)
def wrapped_loss_fn(x):
return loss_fn(input_image + x, label)
new_perturbation_list, new_optim_state = optimizer.minimize(
wrapped_loss_fn, [perturbation], optim_state)
projected_perturbation = project_perturbation(new_perturbation_list[0],
epsilon, input_image,
clip_min=clip_min,
clip_max=clip_max)
# Be careful with this bool. A value of 0. is a valid threshold but evaluates to False, so we must explicitly
# check whether the value is None.
early_stop = early_stop_loss_threshold is not None
compute_loss = is_debug or early_stop
# Don't waste time building the loss graph if we're not going to use it
if compute_loss:
# NOTE: this step is not actually redundant with the optimizer step.
# SPSA calculates the loss at randomly perturbed points but doesn't calculate the loss at the current point.
loss = reduce_mean(wrapped_loss_fn(projected_perturbation), axis=0)
if is_debug:
with tf.device("/cpu:0"):
loss = tf.Print(loss, [loss], "Total batch loss")
if early_stop:
i = tf.cond(tf.less(loss, early_stop_loss_threshold),
lambda: float(num_steps), lambda: i)
return i + 1, projected_perturbation, nest.flatten(new_optim_state)
def cond(i, *_):
return tf.less(i, num_steps)
flat_init_optim_state = nest.flatten(init_optim_state)
_, final_perturbation, _ = tf.while_loop(
cond,
loop_body,
loop_vars=(tf.constant(0.), init_perturbation, flat_init_optim_state),
parallel_iterations=1,
back_prop=False,
maximum_iterations=num_steps)
if project_perturbation is _project_perturbation:
# TODO: this assert looks totally wrong.
# Not bothering to fix it now because it's only an assert.
# 1) Multiplying by 1.1 gives a huge margin of error. This should probably
# take the difference and allow a tolerance of 1e-6 or something like
# that.
# 2) I think it should probably check the *absolute value* of
# final_perturbation
perturbation_max = epsilon * 1.1
check_diff = utils_tf.assert_less_equal(
final_perturbation,
tf.cast(perturbation_max, final_perturbation.dtype),
message="final_perturbation must change no pixel by more than "
"%s" % perturbation_max)
else:
# TODO: let caller pass in a check_diff function as well as
# project_perturbation
check_diff = tf.no_op()
if clip_min is None or clip_max is None:
raise NotImplementedError("This function only supports clipping for now")
check_range = [utils_tf.assert_less_equal(input_image,
tf.cast(clip_max,
input_image.dtype)),
utils_tf.assert_greater_equal(input_image,
tf.cast(clip_min,
input_image.dtype))]
with tf.control_dependencies([check_diff] + check_range):
adversarial_image = input_image + final_perturbation
return tf.stop_gradient(adversarial_image)