Full Blog TOC

Full Blog Table Of Content with Keywords Available HERE

Tuesday, August 20, 2024

Convolutional Network Usage

 

In this post contains an example of a convolutional network usage.


The network architecture is based on LeNet5, which is based on two major parts: convolution and classifier. 

The convolution part contains 2 layers of Conv2d and MaxPool2d.

The classifier part contains 2 layer of a fully connected neural network.


The dataset is the MNIST database.


We use SGD optimizer, along with learning rate momentum, which uses a moving average of the several last loss results to better converge to the minimum.

We use a learning rate scheduler to update the learning rate accoring to the accuracy.


import time

import matplotlib.pyplot as plt
import torch
import torchvision
from torch.utils.data import DataLoader
from torchvision import datasets


class LeNet5(torch.nn.Module):

def __init__(self, number_of_classes, gray_scale=True):
super(LeNet5, self).__init__()

if gray_scale:
in_channels = 1
else:
in_channels = 3

convolution_kernel_size = 5
pooling_kernel_size = 2
convolution_channels_layer1 = 6
convolution_channels_layer2 = 16

self.convolution = torch.nn.Sequential(

# we could also set the stride and padding
# reduce according to kernel size: 32X32 -> 28X28
torch.nn.Conv2d(in_channels=in_channels, out_channels=convolution_channels_layer1,
kernel_size=convolution_kernel_size),

# we could also use ReLU
torch.nn.Tanh(),

# reduce according to stride which is by default the kernel size 28X28 -> 14X14
torch.nn.MaxPool2d(kernel_size=pooling_kernel_size),

# reduce according to kernel size: 14X14 -> 10X10
torch.nn.Conv2d(in_channels=convolution_channels_layer1, out_channels=convolution_channels_layer2,
kernel_size=convolution_kernel_size),

torch.nn.Tanh(),

# reduce according to stride which is by default the kernel size 10X10 -> 5X5
torch.nn.MaxPool2d(kernel_size=pooling_kernel_size),
)

classifier_layer1_width = 120
classifier_layer2_width = 84

# we could also use dropout
self.classifier = torch.nn.Sequential(
torch.nn.Flatten(),
torch.nn.Linear(in_features=convolution_channels_layer2 * 5 * 5,
out_features=classifier_layer1_width),
torch.nn.Tanh(),
torch.nn.Linear(in_features=classifier_layer1_width, out_features=classifier_layer2_width),
torch.nn.Tanh(),
torch.nn.Linear(in_features=classifier_layer2_width, out_features=number_of_classes),
)

def forward(self, x):
z1 = self.convolution(x)
logits = self.classifier(z1)
return logits


class Trainer:

def __init__(self):
batch_size = 100
learning_rate = 0.1
learning_momentum = 0.9
learning_rate_scheduler_factor = 0.1

self.loss_train_per_batch = []

limit_size = None
limit_size = 10000
if limit_size is None:
sampler = None
shuffle = True
else:
sampler = torch.arange(limit_size)
shuffle = False

self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print('using device:', self.device)

# we could also use RandomCrop, RandomRotation, ...
resize_transform = torchvision.transforms.Compose(
[
torchvision.transforms.Resize((32, 32)),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize((0.5,), (0.5,)),
]
)

dataset_train = datasets.MNIST(root='local_cache_folder',
train=True,
transform=resize_transform,
download=True)

dataset_test = datasets.MNIST(root='local_cache_folder',
train=False,
transform=resize_transform)

print('train samples', dataset_train.data.shape[0])
print('test samples', dataset_train.data.shape[0])

self.loader_train = DataLoader(dataset=dataset_train,
batch_size=batch_size,
shuffle=shuffle,
sampler=sampler,
)

self.loader_test = DataLoader(dataset=dataset_test,
batch_size=batch_size,
shuffle=False,
sampler=sampler,
)

for images, labels in self.loader_train:
print('single batch dimensions:', images.shape)
print('single batch label dimensions:', labels.shape)
self.number_of_features = images.shape[2] * images.shape[3]
print('number of features', self.number_of_features)
break

self.model = LeNet5(number_of_classes=10)
self.model = self.model.to(device=self.device)

self.optimizer = torch.optim.SGD(self.model.parameters(), lr=learning_rate, momentum=learning_momentum)

self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer=self.optimizer,
factor=learning_rate_scheduler_factor, mode='max')

self.loss_function = torch.nn.functional.cross_entropy

def run_batches(self, data_loader, batch_callback=None):
total_loss = 0
total_samples = 0
correct_predictions = 0
batches_in_epoch_count = 0
for _, (batch_x, batch_labels) in enumerate(data_loader):
batches_in_epoch_count += 1
# reshape from [100, 1, 28, 28] to [100, 28*28]
batch_x = batch_x.to(device=self.device)
batch_labels = batch_labels.to(device=self.device)
batch_samples = batch_x.shape[0]
total_samples += batch_samples
logics = self.model(batch_x)
batch_loss = self.loss_function(logics, batch_labels)
batch_predictions = torch.argmax(logics, dim=1)
batch_correct = batch_predictions == batch_labels
correct_predictions += batch_correct.sum()
if batch_callback is not None:
batch_callback(logics, batch_loss, batch_samples)
total_loss += batch_loss

average_loss = total_loss / total_samples
accuracy = float(correct_predictions) / total_samples

return average_loss.cpu(), accuracy, batches_in_epoch_count

def batch_callback_train(self, _, batch_loss, batch_samples):
self.loss_train_per_batch.append(batch_loss.item() / batch_samples)

self.optimizer.zero_grad()
batch_loss.backward()
self.optimizer.step()

def train(self, number_of_epochs):
accuracy_train_per_epoch = []
accuracy_test_per_epoch = []
loss_train_per_epoch = []
loss_test_per_epoch = []
self.loss_train_per_batch = []
self.model.train()

for epoch_index in range(number_of_epochs):
start_time = time.time()
self.run_batches(data_loader=self.loader_train, batch_callback=self.batch_callback_train)

with torch.no_grad():
epoch_loss_train, epoch_accuracy_train, batches_in_epoch_count = self.run_batches(
data_loader=self.loader_train)
epoch_loss_test, epoch_accuracy_test, _ = self.run_batches(data_loader=self.loader_test)

loss_train_per_epoch.append(epoch_loss_train)
loss_test_per_epoch.append(epoch_loss_test)
accuracy_train_per_epoch.append(epoch_accuracy_train)
accuracy_test_per_epoch.append(epoch_accuracy_test)

self.scheduler.step(epoch_loss_train)

passed_seconds = time.time() - start_time
print(f'epoch {epoch_index},'
f'process seconds {passed_seconds},'
f'loss train {epoch_loss_train},'
f'loss test {epoch_loss_test},'
f'accuracy train {epoch_accuracy_train},'
f'accuracy test {epoch_accuracy_test}')

self.model.eval()

loss_train_per_epoch = self.spread_points(loss_train_per_epoch, batches_in_epoch_count)
loss_test_per_epoch = self.spread_points(loss_test_per_epoch, batches_in_epoch_count)
plt.clf()
plt.plot(self.loss_train_per_batch, color='b', label='train batch')
plt.plot(loss_train_per_epoch, color='g', label='train epoch')
plt.plot(loss_test_per_epoch, color='r', label='test')
plt.legend()
plt.ylim(0, 0.01)
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.savefig("loss.pdf")

plt.clf()
plt.plot(accuracy_train_per_epoch, color='b', label='train')
plt.plot(accuracy_test_per_epoch, color='r', label='test')
plt.legend()
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.savefig("accuracy.pdf")
self.loss_train_per_batch = []

@staticmethod
def spread_points(points, spread_factor):
result = []
for point in points:
for _ in range(spread_factor):
result.append(point)
return result


def main():
random_seed = 42
number_of_epochs = 10

torch.manual_seed(random_seed)
trainer = Trainer()
trainer.train(number_of_epochs)


main()



The results for accuracy and lost are below.

The loss chart includes both loss per each batch, and a global loss upon epoch completion.


We can see an overfitting taking place. We could address this issue by:

1. Using a larger dataset (we used only 10K samples of the dataset)

2. Adding DropOut as part of the classifier part

3. Using augmentated input such as RandomCrop, RandomRotation.

4. Using BatchNorm









No comments:

Post a Comment