Saturday, 10 June 2017

Prepare data in MNIST TensorFlow LeNet tutorial format

Let's say we have a custom dataset of 146 character classes, and we want to train a LeNet to recognize these characters. We put the image files in separate train/test directories, and for train and test we have subdirectories 0,1,2,3,...,145. Image filenames are in this format: ['train' or 'test']/[0-145]/any_name.jpg. First, create read_data.py as follows so that we can read the images and labels, and save them to npy files:
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt

def read_data(path):
    im_list = []
    label_list = []
    for (dirpath, dirnames, filenames) in os.walk(path):
        print('Processing ' + dirpath + ' ...')
        for filename in filenames:
            if filename.endswith('.jpg'):
                fullfile = os.sep.join([dirpath, filename])
                im = cv2.imread(fullfile)
                if im is None:
                    print(' WARN: ' + filename + ' in ' + dirpath + ' is bad!')
                    continue
                im = cv2.cvtColor(im, cv2.COLOR_RGB2GRAY)
                im = cv2.resize(im, (32, 32), interpolation=cv2.INTER_NEAREST)
                # plt.imshow(im)
                im = 255 - im # for jpeg white is 255 black is 0, we revert so that white is 0 and black is 255
                im = np.divide(im.astype(np.float32), 255)
                im = np.expand_dims(im, 2) # add an additional dimension (i.e., from 28x28 to 28x28x1)
                im_list.append(im)
                label = int(os.path.split(dirpath)[-1])
                label_list.append(np.uint8(label)) # 0 to 145, so uint8 is fine (0-255)
    im_array = np.array(im_list)
    label_array = np.array(label_list)
    return (im_array, label_array)
if __name__ == "__main__":
    #base_dir = '/home/twang/data/hcr/single_char/train/'
    #(im_array, label_array) = read_data(base_dir)
    #np.save(base_dir + 'trainData', im_array)
    #np.save(base_dir + 'trainLabel', label_array)

    base_dir = '/home/twang/data/hcr/single_char/test/'
    (im_array, label_array) = read_data(base_dir)
    np.save(base_dir + 'testData', im_array)
    np.save(base_dir + 'testLabel', label_array)
The training and evaluation scripts are then given by (as per the TensorFlow tutorial and this project):
import tensorflow as tf
from tensorflow.contrib.layers import flatten
import numpy as np
from sklearn.utils import shuffle

EPOCHS = 10
BATCH_SIZE = 128


def LeNet(x):
    # Hyperparameters
    mu = 0
    sigma = 0.1

    # SOLUTION: Layer 1: Convolutional. Input = 32x32x1. Output = 28x28x6.
    conv1_W = tf.Variable(tf.truncated_normal(shape=(5, 5, 1, 6), mean=mu, stddev=sigma))
    conv1_b = tf.Variable(tf.zeros(6))
    conv1 = tf.nn.conv2d(x, conv1_W, strides=[1, 1, 1, 1], padding='VALID') + conv1_b

    # SOLUTION: Activation.
    conv1 = tf.nn.relu(conv1)

    # SOLUTION: Pooling. Input = 28x28x6. Output = 14x14x6.
    conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')

    # SOLUTION: Layer 2: Convolutional. Output = 10x10x16.
    conv2_W = tf.Variable(tf.truncated_normal(shape=(5, 5, 6, 16), mean=mu, stddev=sigma))
    conv2_b = tf.Variable(tf.zeros(16))
    conv2 = tf.nn.conv2d(conv1, conv2_W, strides=[1, 1, 1, 1], padding='VALID') + conv2_b

    # SOLUTION: Activation.
    conv2 = tf.nn.relu(conv2)

    # SOLUTION: Pooling. Input = 10x10x16. Output = 5x5x16.
    conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')

    # SOLUTION: Flatten. Input = 5x5x16. Output = 400.
    fc0 = flatten(conv2)

    # SOLUTION: Layer 3: Fully Connected. Input = 400. Output = 200.
    fc1_W = tf.Variable(tf.truncated_normal(shape=(400, 200), mean=mu, stddev=sigma))
    fc1_b = tf.Variable(tf.zeros(200))
    fc1 = tf.matmul(fc0, fc1_W) + fc1_b

    # SOLUTION: Activation.
    fc1 = tf.nn.relu(fc1)

    # SOLUTION: Layer 4: Fully Connected. Input = 200. Output = 200.
    fc2_W = tf.Variable(tf.truncated_normal(shape=(200, 200), mean=mu, stddev=sigma))
    fc2_b = tf.Variable(tf.zeros(200))
    fc2 = tf.matmul(fc1, fc2_W) + fc2_b

    # SOLUTION: Activation.
    fc2 = tf.nn.relu(fc2)

    # SOLUTION: Layer 5: Fully Connected. Input = 200. Output = 146.
    fc3_W = tf.Variable(tf.truncated_normal(shape=(200, 146), mean=mu, stddev=sigma))
    fc3_b = tf.Variable(tf.zeros(146))
    logits = tf.matmul(fc2, fc3_W) + fc3_b

    return logits

def evaluate(X_data, y_data):
    num_examples = len(X_data)
    total_accuracy = 0
    sess = tf.get_default_session()
    for offset in range(0, num_examples, BATCH_SIZE):
        batch_x, batch_y = X_data[offset:offset+BATCH_SIZE], y_data[offset:offset+BATCH_SIZE]
        accuracy = sess.run(accuracy_operation, feed_dict={x: batch_x, y: batch_y})
        total_accuracy += (accuracy * len(batch_x))
    return total_accuracy / num_examples

# mnist = input_data.read_data_sets("MNIST_data/", reshape=False)
# X_train, y_train           = mnist.train.images, mnist.train.labels
# X_validation, y_validation = mnist.validation.images, mnist.validation.labels
# X_test, y_test             = mnist.test.images, mnist.test.labels
X_train = np.load('/home/twang/data/hcr/single_char/train/trainData.npy')
y_train = np.load('/home/twang/data/hcr/single_char/train/trainLabel.npy')
X_test = np.load('/home/twang/data/hcr/single_char/test/testData.npy')
y_test = np.load('/home/twang/data/hcr/single_char/test/testLabel.npy')

assert(len(X_train) == len(y_train))
assert(len(X_test) == len(y_test))

print()
print("Image Shape: {}".format(X_train[0].shape))
print()
print("Training Set:   {} samples".format(len(X_train)))
print("Test Set:       {} samples".format(len(X_test)))

print("Image Shape: {}".format(X_train[0].shape))

# Shuffule training data
X_train, y_train = shuffle(X_train, y_train)

# Create placeholders for training data,
# x is a placeholder for a batch of input images. y is a placeholder for a batch of output labels.
x = tf.placeholder(tf.float32, (None, 32, 32, 1))
y = tf.placeholder(tf.int32, (None))
one_hot_y = tf.one_hot(y, 146)

# training pipeline
rate = 0.001

logits = LeNet(x)
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=one_hot_y)
loss_operation = tf.reduce_mean(cross_entropy)
optimizer = tf.train.AdamOptimizer(learning_rate = rate)
training_operation = optimizer.minimize(loss_operation)

# evaluation
correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(one_hot_y, 1))
accuracy_operation = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
saver = tf.train.Saver()

# Training
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    num_examples = len(X_train)

    print("Training...")
    print()
    for i in range(EPOCHS):
        X_train, y_train = shuffle(X_train, y_train)
        for offset in range(0, num_examples, BATCH_SIZE):
            end = offset + BATCH_SIZE
            batch_x, batch_y = X_train[offset:end], y_train[offset:end]
            sess.run(training_operation, feed_dict={x: batch_x, y: batch_y})

        test_accuracy = evaluate(X_test, y_test)
        print("EPOCH {} ...".format(i + 1))
        print("Test Accuracy = {:.3f}".format(test_accuracy))
        print()

    saver.save(sess, 'lenet')
    print("Model saved")

with tf.Session() as sess:
    saver.restore(sess, tf.train.latest_checkpoint('.'))

    test_accuracy = evaluate(X_test, y_test)
    print("Test Accuracy = {:.3f}".format(test_accuracy))
Note that we have changed the dimensions of the last few layers (in the original LeNet, there are 10 classes, i.e., digits 0 to 9) to reflect the fact that we are working with a dataset of 146 classes.