Adversarial perturbations can pose a serious threat for deploying machine learning systems. Recent works have shown existence of image-agnostic perturbations that can fool classifiers over most natural images. Existing methods present optimization approaches that solve for a fooling objective with an imperceptibility constraint to craft the perturbations making it very hard to defend.
Current Approaches for crafting adversaries for a given classifier generate only one perturbation at a time, which is a single instance from the manifold of adversarial perturbations. In order to build robust models, it is essential to explore diverse manifold of adversarial perturbations. This work can be of very useful, when we are using adversarial trainning, where the cost of generation of adversaries is high(Depends on the attack). With this approach, we will be able to generate adversarial noises from the learned distribution of adversarial perturbations.
The author's demonstrate that perturbations crafted by our model
The architecture of the proposed model is inspired from that of GANs and is trained using fooling and diversity objectives. Our trained generator network attempts to capture the distribution of adversarial perturbations for a given classifier and readily generates a wide variety of such perturbations.
from glob import glob
train_ok = True
val_ok = True
print("Training Data Verification")
cls_count = len(glob("ILSVRC/train/*"))
print("Total Number of Classes: {} in train directory".format(cls_count))
count = 0
for cls_ in glob("ILSVRC/train/*"):
imgs = glob(cls_ + "/*")
img_count = len(imgs)
count += img_count
if img_count != 10:
print(cls_.split("/")[-1], img_count)
train_ok=False
print("Total {} number of files in {} classes. i.e 10 Images/Class".format(count, cls_count))
print("Validation Data Verification")
val_files = glob("ILSVRC/valid/*")
val_count = len(val_files)
if val_count == 50000:
print("Validation Data has correct number of files i.e {}".format(val_count))
else:
print("Validation Data has some issue. Has following number of file : {}. Kindly Check!!".format(val_count))
val_ok=False
if train_ok and val_ok:
print("Dataset is Setup Correctly")
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from torch.utils.data import DataLoader,Dataset
import torchvision
import torchvision.models as tvm
from torchvision import transforms
from torchvision.datasets.folder import DatasetFolder,ImageFolder
import numpy as np
from glob import glob
from PIL import Image
import pandas as pd
import os,time,gc
from pathlib import Path
from tqdm import tqdm_notebook as tqdm
import datetime,random,string
ngpu=torch.cuda.device_count()
device = torch.device("cuda" if (torch.cuda.is_available() and ngpu > 0) else "cpu")
print("Using Pytorch Version : {} and Torchvision Version : {}. Using Device {}".format(torch.__version__,torchvision.__version__,device))
dataset_path=r'ILSVRC/'
train_dataset_path=dataset_path+'train'
test_dataset_path=dataset_path+'valid'
print("Dataset root Folder:{}. Train Data Path: {}. Validation Data Path {}".format(dataset_path,train_dataset_path,test_dataset_path))
# Preparation of Labels
label_dict={}
label_idx={}
with open('ILSVRC/LOC_synset_mapping.txt') as file:
lines=file.readlines()
for idx,line in enumerate(lines):
label,actual =line.strip('\n').split(' ',maxsplit=1)
label_dict[label]=actual
label_idx[label]=idx
# transforms
size=224
# Imagenet Stats
vgg_mean = [103.939, 116.779, 123.68]
preprocess=transforms.Compose([transforms.Resize((size,size)),
transforms.ToTensor(),
transforms.Normalize(vgg_mean,(0.5, 0.5, 0.5))])
class CustomDataset(Dataset):
def __init__(self, subset, root_dir, transform=None):
self.root_dir=root_dir
self.transform=transform
self.subset=subset
if self.subset=='train':
data_dir=os.path.join(self.root_dir,self.subset)
self.images_fn=glob(f'{data_dir}/*/*')
self.labels=[Path(fn).parent.name for fn in self.images_fn]
elif subset =='valid':
df=pd.read_csv('ILSVRC/LOC_val_solution.csv')
df['label']=df['PredictionString'].str.split(' ',n=1,expand=True)[0]
df=df.drop(columns=['PredictionString'])
self.images_fn='ILSVRC/valid/'+df['ImageId'].values+'.JPEG'
self.labels=df['label']
else:
raise ValueError
print(f" Number of instances in {self.subset} subset of Dataset: {len(self.images_fn)}")
def __getitem__(self,idx):
fn=self.images_fn[idx]
label=self.labels[idx]
image=Image.open(fn)
if image.getbands()[0] == 'L':
image = image.convert('RGB')
if self.transform:
image = self.transform(image)
return image,label_idx[label]
def __len__(self):
return len(self.images_fn)
data_train=ImageFolder(root='ILSVRC/train',transform=preprocess)
class2idx=data_train.class_to_idx
data_valid=CustomDataset(subset='valid',root_dir=dataset_path,transform=preprocess)
train_num = len(data_train)
val_num = len(data_valid)
def fooling_objective(qc_):
'''Helper function to computer compute -log(1-qc'),
where qc' is the adversarial probability of the class having
maximum probability in the corresponding clean probability
qc' ---> qc_
Parameters:
prob_vec : Probability vector for the clean batch
adv_prob_vec : Probability vecotr of the adversarial batch
Returns:
-log(1-qc') , qc'
'''
# Get the largest probablities from predictions : Shape (bs,1)
qc_=qc_.mean()
return -1*torch.log(1-qc_) , qc_
def diversity_objective(prob_vec_no_shuffle, prob_vec_shuffled):
'''Helper function to calculate the cosine distance between two probability vectors
Parameters:
prob_vec : Probability vector for the clean batch
adv_prob_vec : Probability vector for the adversarial batch
Returns :
Cosine distance between the corresponding clean and adversarial batches
'''
return torch.cosine_similarity(prob_vec_no_shuffle,prob_vec_shuffled).mean()
## TODO : Not Required. As we always take the last layer.
def intermediate_activation_objective(layer_name=None):
''' Extract the activations of any intermediate layer for:
1. batch of images (of batch size=32) corrupted by the perturbations (of batch size=32)
2. same batch of images corrupted by same batch of perturbations but in different (random) order
(in this case the intermdeiate layer is set to 'res4f' of ResNet 50 architecture)
'''
if arch =='resnet50':
layer_name='res4f'
pass
from torch import nn
ngf=128
nz= latent_dim=10
e_lim = 10
nc=3 # Number of Channels
# Fixed Architecture: Weights will be updated by Backprop.
class AdveraryGenerator(nn.Module):
def __init__(self,e_lim):
super(AdveraryGenerator, self).__init__()
self.e_lim = e_lim
self.main = nn.Sequential(
nn.ConvTranspose2d( in_channels=nz,out_channels= 1024, kernel_size=4, stride=1, padding=0, bias=False),
nn.BatchNorm2d(1024),
nn.ReLU(True),
# state size. (ngf*8) x 4 x 4
nn.ConvTranspose2d(1024, 512, 4, 2, 1, bias=False),
nn.BatchNorm2d(512),
nn.ReLU(True),
# state size. (ngf*4) x 8 x 8
nn.ConvTranspose2d( 512, 256, 4, 2, 1, bias=False),
nn.BatchNorm2d(256),
nn.ReLU(True),
# state size. (ngf*2) x 16 x 16
nn.ConvTranspose2d(256, 128, 4, 2, 2, bias=False),
nn.BatchNorm2d(128),
nn.ReLU(True),
# state size. (ngf) x 32 x 32
nn.ConvTranspose2d( 128, 64, 4, 2, 2, bias=False),
nn.BatchNorm2d(64),
nn.ReLU(True),
# state size. (nc) x 64 x 64
nn.ConvTranspose2d( 64, 3, 4, 4,4, bias=False),
nn.BatchNorm2d(3),
nn.ReLU(True),
nn.Tanh()
)
def forward(self, x):
return self.e_lim * self.main(x) # Scaling of ε
# move Generator to GPU if available
adversarygen=AdveraryGenerator(e_lim).to(device)
if debug:
try:
from torchsummary import summary
summary(adversarygen,(nz,1,1))
except:
raise('Check torchsummary is installed. If not install using the command pip install torchsummary')
from torchvision.models import googlenet, vgg16 , vgg19, resnet152, resnet50
model_dict ={
'googlenet': googlenet,
'vgg16': vgg16 ,
'vgg19':vgg19,
'resnet152':resnet152, # TODO Generate Perturbations
'resnet50':resnet50 # TODO Generate Perturbations
}
# Get all Pretrained Weights:
for arch in model_dict.keys():
if arch !='vgg-f':
model=model_dict[arch](pretrained=True)
# epsillon=10
# batch_size=32
# latent_dim = 10
img_h,img_w,img_c=(224,224,3)
latent_dim=10
arch='resnet50'
archs=model_dict.keys() # ['vgg-f','vgg16','vgg19','googlenet','resnet50','resnet152']
def get_bs(arch):
if torch.cuda.is_available():
# GPU_BENCHMARK= 8192.0
# GPU_MAX_MEM = torch.cuda.get_device_properties(device).total_memory / (1024*1024)
# BS_DIV= GPU_BENCHMARK/GPU_MAX_MEM
# print(f"Current GPU MAX Size : {GPU_MAX_MEM}. {BS_DIV}")
if arch not in ['resnet50','resnet152']:# ['vgg16','vgg19','vgg-f','googlenet']:
bs=int(64)
elif arch in ['resnet50','resnet152']:
bs=int(32)
else:
raise ValueError(f'Architecture type not supported. Please choose one from the following {archs}')
else:
bs=8 # OOM Error
return bs
get_bs(arch)
model=model_dict[arch](pretrained=True)
model
def save_checkpoint(model, to_save, filename='checkpoint.pth'):
"""Save checkpoint if a new best is achieved"""
if to_save:
print ("=> Saving a new best")
torch.save(model.state_dict(), filename) # save checkpoint
else:
print ("=> Validation Accuracy did not improve")
def save_perturbations(noise,arch,epoch,wabdb_flag=False):
rand_str= ''.join( random.choice(string.ascii_letters) for i in range(6))
os.makedirs(f"{arch}-{rand_str}",exist_ok=True)
perturbations=noise.permute(0,2,3,1).cpu().detach().numpy()*255
np.save(f'{arch}-{rand_str}/Perturbations_{arch}_{epoch}.npy', perturbations)
for perturb_idx,perturbation in enumerate(perturbations[:,]):
im = Image.fromarray(perturbation.astype(np.uint8))
if wabdb_flag:
wandb.log({"noise": [wandb.Image(im, caption=f"Noise_{arch}_{epoch}_{perturb_idx}")]})
im.save(f'{arch}-{rand_str}/Perturbations_{arch}_{epoch}_{perturb_idx}.png')
# TODO
def visualize_perturbations():
# MAtplotlib Subplot ?
# Subplots(4*4) or (3*3)
# From Memory or Disk - Epoch number ?
pass
def get_preds(predictions,return_idx=False, k=1):
idxs= torch.argsort(predictions,descending=True)[:,:k]
if return_idx:
return predictions[:,idxs], idxs
return predictions[:,idxs]
# val_iterations = val_num/bs
def compute_fooling_rate(prob_adv,prob_real):
'''Helper function to calculate mismatches in the top index vector
for clean and adversarial batch
Parameters:
prob_adv : Index vector for adversarial batch
prob_real : Index vector for clean batch
Returns:
Number of mismatch and its percentage
'''
nfool=0
size = prob_real.shape[0]
for i in range(size):
if prob_real[i]!=prob_adv[i]:
nfool = nfool+1
return nfool, 100*float(nfool)/size
def validate_generator_old(noise,val_dl,val_iterations=10):
total_fool=0
print("############### VALIDATION PHASE STARTED ################")
train_log.writelines("############### VALIDATION PHASE STARTED ################")
for val_idx in range(val_iterations):
for batch_idx, data in enumerate(val_dl):
images = data[0].to(device)
# labels = data[1].to(device)
prob_vec_clean = F.softmax(D_model(images),dim=0) # Variable q
prob_vec_no_shuffle = D_model(images + noise)
nfool, _ = compute_fooling_rate(prob_vec_no_shuffle,prob_vec_clean)
total_fool += nfool
fool_rate = 100*float(total_fool)/(val_iterations*batch_size)
print(f"Fooling rate: {foolr}. Total Items Fooled :{total_fool}")
train_log.writelines(f"Fooling rate: {foolr}. Total Items Fooled :{total_fool}")
def validate_generator(noise,D_model,val_dl):
total_fool=0
for batch_idx, data in tqdm(enumerate(val_dl),total = val_num//val_dl.batch_size):
val_images = data[0].to(device)
val_labels = data[1].to(device)
prob_vec_clean,clean_idx = get_preds(F.softmax(D_model(val_images),dim=0),return_idx=True) # Variable q
prob_vec_no_shuffle,adv_idx = get_preds(F.softmax(D_model(val_images + noise),dim=0),return_idx=True)
nfool, _ = compute_fooling_rate(adv_idx,clean_idx)
total_fool += nfool
fool_rate = 100*float(total_fool)/(val_num)
return fool_rate,total_fool
## Test Fooling Objective
adv = torch.randint(0,1000,(32,1))
real = torch.randint(0,1000,(32,1))
# Setup Wandb
import wandb
wandb.login()
wandb.init(project="NAG_Pytorch")
def fit(nb_epochs,D_model,dls,optimizer,adversarygen=adversarygen):
# Set the Discriminator in Eval mode; Weights are fixed.
train_dl,val_dl = dls
D_model=D_model.to(device)
D_model.eval()
timestamp=datetime.datetime.now().strftime("%d%b%Y_%H_%M")
train_log = open(f'train_log_{arch}_{timestamp}.txt','w')
for epoch in tqdm(range(nb_epochs),total=nb_epochs):
running_loss=0
rand_str= ''.join( random.choice(string.ascii_letters) for i in range(6))
train_log.writelines(f"############### TRAIN PHASE STARTED : {epoch}################")
for batch_idx, data in tqdm(enumerate(train_dl),total = train_num//train_dl.batch_size):
# Move Data and Labels to device(GPU)
images = data[0].to(device)
labels = data[1].to(device)
# Generate the Adversarial Noise from Uniform Distribution U[-1,1]
latent_seed = 2 * torch.rand(bs, nz, 1, 1, device=device,requires_grad=True) -1 # (r1 - r2) * torch.rand(a, b) + r2
noise = adversarygen(latent_seed)
optimizer.zero_grad()
# XB = images
#preds_XB = f(images)
prob_vec_clean = F.softmax(D_model(images),dim=0) # Variable q
clean_preds ,clean_idx = get_preds(prob_vec_clean,return_idx=True,k=1)
#XA = images+noise
#preds_XA = f(images + noise)
prob_vec_no_shuffle = D_model(images + noise)
qc_ = F.softmax(prob_vec_no_shuffle,dim=0).gather(1,clean_idx) # Variable q'c
# 1. fooling_objective: encourages G to generate perturbations that decrease confidence of benign predictions
fool_obj, mean_qc_ = fooling_objective(qc_)
# Perturbations are shuffled across the batch dimesion to improve diversity
#XS = images+ noise[torch.randperm(bs)]
prob_vec_shuffled = D_model(images + noise[torch.randperm(bs)])
# 2. encourages Generator to explore the space of perturbations and generate a diverse set of perturbations
divesity_obj=diversity_objective(prob_vec_no_shuffle, prob_vec_shuffled)
# Compute Total Loss
total_loss = divesity_obj + fool_obj
# Lets perform Backpropagation to compute Gradients and update the weights
total_loss.backward()
optimizer.step()
# wandb Logging : Expensive : Logs Perturbation Images each iteration
# perturbations=noise.permute(0,2,3,1).cpu().detach().numpy()*255
# for perturb_idx,perturbation in enumerate(perturbations[:,]):
# im = Image.fromarray(perturbation.astype(np.uint8))
# wandb.log({"noise": [wandb.Image(im, caption=f"Noise_{arch}_{epoch}_{perturb_idx}")]})
wandb.log({"fool_obj": fool_obj.item(),
"divesity_obj": divesity_obj.item(),
"total_loss":total_loss.item(),
})
running_loss += total_loss.item()
if batch_idx!=0 and batch_idx % 100 ==0 :
train_log.writelines(f"############### VALIDATION PHASE STARTED : {epoch}, Step : {int(batch_idx / 100)} ################")
fool_rate,total_fool= validate_generator(noise,D_model,val_dl)
print(f"Fooling rate: {fool_rate}. Total Items Fooled :{total_fool}")
train_log.writelines(f"Fooling rate: {fool_rate}. Total Items Fooled :{total_fool}")
print(f"Diversity Loss :{divesity_obj.item()} \n Fooling Loss: {fool_obj.item()} \n")
print(f"Total Loss after Epoch No: {epoch +1} - {running_loss/(train_num//train_dl.batch_size)}")
train_log.writelines(f"Loss after Epoch No: {epoch +1} is {running_loss/(train_num//train_dl.batch_size)}")
# to_save can be any expression/condition that returns a bool
save_checkpoint(adversarygen, to_save= True, filename=f'GeneratorW_{arch}_{epoch}_{rand_str}.pth')
if epoch % 1 == 0:
# save_perturbations(noise,arch,epoch)
save_perturbations(noise,arch,epoch,wabdb_flag=True)
train_log.close()
total_epochs = 20
lr = 1e-3
# Setting up Dataloaders
import time,gc
arch='resnet50'
start= time.time()
print(f"Training Generator for Arch {arch}")
model= model_dict[arch](pretrained=True)
bs = get_bs(arch)
print(bs)
train_dl=DataLoader(data_train,batch_size=bs,shuffle=True,num_workers=4,pin_memory=True,drop_last=True)
val_dl=DataLoader(data_valid,batch_size=bs,shuffle=True,num_workers=4,pin_memory=True,drop_last=True)
dls = [train_dl,val_dl]
optimizer = optim.Adam(adversarygen.parameters(), lr=lr)
print(f"Elsasped Time {time.time()-start} Seconds")
fit(nb_epochs=total_epochs,D_model=model,dls=dls,optimizer=optimizer)
!{sys.executable} PrepareCaffenetModel.py
import torch
from vgg import VGG_F
model = vgg_f()
model.load_state_dict(torch.load('VGG_FACE.caffemodel.pt'))
model_dict['vgg-f'] = model
model(torch.rand((3,224,224)))
# Uncomment the below line after setting up kaggle api key
# !kaggle datasets download -d gokkulnath/nag-pytorch-pretrained
Steps to evaluate the perturbations generated by Generator Network (TODO)
arch='Fixed'
for modelarch, model in model_dict.items():
num_iteration = 10 # Blackbox Settings
if modelarch == arch:
num_iteration =100 # Whitebox Settings
for i range(num_iteration)
1. Load the Weights of the Generator
2. Generate a Perturbation using a random vector of dimension latent_dim,1
3. Add the noise to a sample image
[![Interpolating Latent Dimension for NAG](https://img.youtube.com/vi/2lojORAu8vA/0.jpg)](https://www.youtube.com/watch?v=2lojORAu8vA&feature=youtu.be)