1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
import ssl
ssl._create_default_https_context = (
ssl._create_unverified_context
) # Fixed problem with downloading CIFAR10 dataset
import torch
import keras
import einx
import os
import torchvision
import time
import torchvision.transforms as transforms
import einx.nn.keras as einn
import numpy as np
import tensorflow as tf
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])
batch_size = 256
cifar10_path = os.path.join(os.path.dirname(__file__), "cifar10")
trainset = torchvision.datasets.CIFAR10(
root=cifar10_path, train=True, download=True, transform=transform
)
trainloader = torch.utils.data.DataLoader(
trainset, batch_size=batch_size, shuffle=True, num_workers=2
)
testset = torchvision.datasets.CIFAR10(
root=cifar10_path, train=False, download=True, transform=transform
)
testloader = torch.utils.data.DataLoader(
testset, batch_size=batch_size, shuffle=False, num_workers=2
)
# Option 1: Functional
inputs = x = keras.Input(
shape=(3, 32, 32), batch_size=1
) # Requires specifying batch_size with some dummy value, since dynamic shapes are not allowed
for c in [1024, 512, 256]:
x = einn.Linear("b [...->c]", c=c)(x)
x = einn.Norm("[b] c", decay_rate=0.99)(x)
x = keras.layers.Activation(keras.activations.gelu)(x)
x = einn.Dropout("[...]", drop_rate=0.2)(x)
x = einn.Linear("b [...->c]", c=10)(x)
model = keras.Model(inputs=inputs, outputs=x)
# Option 2: Sequential
# blocks = []
# for c in [1024, 512, 256]:
# blocks.append(einn.Linear("b [...->c]", c=c))
# blocks.append(einn.Norm("[b] c", decay_rate=0.99))
# blocks.append(keras.layers.Activation(keras.activations.gelu))
# blocks.append(einn.Dropout("[...]", drop_rate=0.2))
# blocks.append(einn.Linear("b [...->c]", c=10))
# model = keras.Sequential(blocks)
optimizer = keras.optimizers.Adam(learning_rate=1e-3)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
@tf.function
def train_step(inputs, labels):
with tf.GradientTape() as tape:
logits = model(inputs, training=True)
loss_value = loss_fn(labels, logits)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply(grads, model.trainable_weights)
@tf.function
def test_step(inputs, labels):
outputs = model(inputs, training=False)
predicted = tf.math.argmax(outputs, axis=1)
return predicted == labels
print("Starting training")
for epoch in range(100):
t0 = time.time()
# Train
for data in trainloader:
inputs, labels = data
inputs = np.array(inputs)
labels = np.array(labels)
train_step(inputs, labels)
# Test
correct = 0
total = 0
for data in testloader:
images, labels = data
images = np.array(images)
labels = np.array(labels)
accurate = test_step(images, labels)
total += accurate.shape[0]
correct += tf.math.count_nonzero(accurate)
print(
f"Test accuracy after {epoch + 1:5d} epochs {float(correct) / total} "
f"({time.time() - t0:.2f}sec)"
)
|