In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

In [None]:
mnist = tf.keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

print(train_labels[0])
plt.imshow(train_images[0], cmap="Greys_r")

# first difference: data is not reshaped to 784 anymore, but 28x28x1
# note the 1 color channel!! this is important
train_data = tf.data.Dataset.from_tensor_slices(
    (train_images.reshape([-1, 28, 28, 1]).astype(np.float32) / 255, train_labels.astype(np.int32)))
train_data = train_data.shuffle(buffer_size=60000).batch(128).repeat()

test_images = test_images.reshape([-1, 28, 28, 1]).astype(np.float32) / 255
test_labels = test_labels.astype(np.int32)

In [None]:
train_steps = 1000  # might need more steps

# convolutional kernels/biases
W_conv1 = tf.Variable(tf.random.uniform([5, 5, 1, 16], -0.1, 0.1))
b_conv1 = tf.Variable(tf.zeros([16]))
W_conv2 = tf.Variable(tf.random.uniform([5, 5, 16, 32], -0.1, 0.1))
b_conv2 = tf.Variable(tf.zeros([32]))

# fully connected layer at the end
W_out = tf.Variable(tf.random.uniform([7*7*32, 10]))
b_out = tf.Variable(tf.zeros([10]))

variables = [W_conv1, b_conv1, W_conv2, b_conv2, W_out, b_out]


# 2 conv layers, each followed by 2x2 max pool
# you should look up the parameters in the API!
def model(inputs):
    conv1 = tf.nn.relu(tf.nn.conv2d(inputs, W_conv1, 1, padding="SAME") + b_conv1)
    conv1 = tf.nn.max_pool2d(conv1, 2, 2, padding="SAME")
    conv2 = tf.nn.relu(tf.nn.conv2d(conv1, W_conv2, 1, padding="SAME") + b_conv2)
    conv2 = tf.nn.max_pool2d(conv2, 2, 2, padding="SAME")
    conv2 = tf.reshape(conv2, [-1, 7*7*32])  # "flatten"

    logits = tf.matmul(conv2, W_out) + b_out

    return logits


# Adam makes things much smoother
optimizer = tf.optimizers.Adam()
# from_logits = True!! #neverforget
loss_fn = tf.losses.SparseCategoricalCrossentropy(from_logits=True)

In [None]:
# this basically hasn't changed
for step, (image_batch, label_batch) in enumerate(train_data):
    if step > train_steps:
        break

    with tf.GradientTape() as tape:
        logits = model(image_batch)
        loss = loss_fn(label_batch, logits)

    gradients = tape.gradient(loss, variables)
      
    optimizer.apply_gradients(zip(gradients, variables))
    
    if not step % 100:
        predictions = tf.argmax(logits, axis=1, output_type=tf.int32)
        accuracy = tf.reduce_mean(tf.cast(tf.equal(predictions, label_batch),
                             tf.float32))
        print("Step {} Loss: {} Accuracy: {}".format(step, loss, accuracy))
    

In [None]:
# evaluating the full test set at once should be ok on colab, but might be too much for your local machine!
test_predictions = tf.argmax(model(test_images), axis=1,
                       output_type=tf.int32)
accuracy = tf.reduce_mean(tf.cast(tf.equal(test_predictions, test_labels),
                             tf.float32))
print(accuracy)

In [None]:
# here's an example of what a custom conv implementation might look like
# it's in numpy because TF doesn't allow for assigning values to tensor indices.
# a TF solution would be possible in other ways, but be more difficult to understand

# this is the full version with batched inputs, multiple input and output channels
# further below you can find simpler versions as well

def custom_conv2d(inputs, filters):
    # inputs: shape batch x height x width x in_channels
    # filters: shape filter_width x filter_height x in_channels x out_channels
    filter_height = filters.shape[0]
    filter_width = filters.shape[1]
    # output loses size since we do not use padding. (reduced by filter_size - 1)
    output = np.zeros((inputs.shape[0], inputs.shape[1]-filter_height+1, inputs.shape[2]-filter_width+1, filters.shape[3]))

    # loop for "sliding the filter"
    for row_ind in range(output.shape[1]):
        for col_ind in range(output.shape[2]):
            # grab the respective part of the input
            input_slice = inputs[:, row_ind:(row_ind + filter_height), col_ind:(col_ind + filter_width), :]
            # b x h x w x i   *   h x w x i x o    ->    b x o
            # element-wise product, but add axis to input so it's replicated over the output channel axis
            local_product = input_slice[..., np.newaxis] * filters
            # sum over width, height, input channels
            local_reduce = np.sum(local_product, axis=(1, 2, 3))
            #local_reduce = tf.einsum("bhwi,hwio -> bo", input_slice, filters)  # alternative to the two lines above
            output[:, row_ind, col_ind, :] = local_reduce

    return output

In [None]:
# compare our implementation to the TF op
dummy_images = np.random.normal(size=(16, 32, 32, 3))
filters = np.random.normal(size=(5, 5, 3, 8))

own_try = custom_conv2d(dummy_images, filters)
tf_try = tf.nn.conv2d(dummy_images, filters, 1, padding="VALID")

tf_try.shape, own_try.shape  # shapes are the same?

In [None]:
# discrepancy should be very small (only due to numerics)
np.max(abs(tf_try - own_try))

In [None]:
# some simpler versions...


# most basic: inputs are 2d, filters as well. so single input/output channel
def custom_conv2d_1(inputs, filters):
    # inputs: height x width
    # filters: f_h x f_w
    filter_height = filters.shape[0]
    filter_width = filters.shape[1]
    output = np.zeros((inputs.shape[0]-filter_height+1, inputs.shape[1]-filter_width+1))

    for row_ind in range(output.shape[0]):
        for col_ind in range(output.shape[1]):
            input_slice = inputs[row_ind:(row_ind + filter_height), col_ind:(col_ind + filter_width)]
            # h x w   *   h x w -> 1
            local_product = input_slice * filters
            local_reduce = np.sum(local_product)
            output[row_ind, col_ind] = local_reduce
    return output


# now we have inputs with multiple channels
def custom_conv2d_2(inputs, filters):
    # inputs: height x width x i
    # filters: f_h x f_w x i
    filter_height = filters.shape[0]
    filter_width = filters.shape[1]
    output = np.zeros((inputs.shape[0]-filter_height+1, inputs.shape[1]-filter_width+1))

    for row_ind in range(output.shape[0]):
        for col_ind in range(output.shape[1]):
            input_slice = inputs[row_ind:(row_ind + filter_height), col_ind:(col_ind + filter_width), :]
            # h x w x i   *   h x w x i -> 1
            local_product = input_slice * filters
            local_reduce = np.sum(local_product)
            # note that there is still just a single output, even though there are multiple input channels
            # the dot product summarizes over all input channels AND spatial dimensions
            output[row_ind, col_ind] = local_reduce
    return output


# and finally, multiple output channels as well.
# the full version above only adds a batch axis in the beginning
def custom_conv2d_3(inputs, filters):
    # inputs: height x width x i
    # filters: f_h x f_w x i x o
    filter_height = filters.shape[0]
    filter_width = filters.shape[1]
    output = np.zeros((inputs.shape[0]-filter_height+1, inputs.shape[1]-filter_width+1, filters.shape[3]))

    for row_ind in range(output.shape[0]):
        for col_ind in range(output.shape[1]):
            input_slice = inputs[row_ind:(row_ind + filter_height), col_ind:(col_ind + filter_width), :]
            # h x w x i   *   h x w x i x o -> o
            # now we no longer want a single output, but one output per output channel (or "filter")
            local_product = input_slice[..., np.newaxis] * filters
            local_reduce = np.sum(local_product, axis=(0, 1, 2))
            output[row_ind, col_ind] = local_reduce
    return output

In [None]:
# sanity check. we can trust multiplication and adding to work well
# so we only check the shapes
dummy_images = np.random.normal(size=(32, 32))
filters = np.random.normal(size=(5, 5))

# output shape reduced by 5-1, okay!
custom_conv2d_1(dummy_images, filters).shape

In [None]:
# now add input channels
dummy_images = np.random.normal(size=(32, 32, 3))
filters = np.random.normal(size=(5, 5, 3))

# multiple input channels still result in only one output, okay!
custom_conv2d_2(dummy_images, filters).shape

In [None]:
# and add output channels
dummy_images = np.random.normal(size=(32, 32, 3))
filters = np.random.normal(size=(5, 5, 3, 8))

# now our output has channels as well
custom_conv2d_3(dummy_images, filters).shape