File: train_keras.py

package info (click to toggle)
python-einx 0.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 1,112 kB
  • sloc: python: 11,619; makefile: 13
file content (112 lines) | stat: -rw-r--r-- 3,139 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import ssl

ssl._create_default_https_context = (
    ssl._create_unverified_context
)  # Fixed problem with downloading CIFAR10 dataset

import torch
import keras
import einx
import os
import torchvision
import time
import torchvision.transforms as transforms
import einx.nn.keras as einn
import numpy as np
import tensorflow as tf

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

batch_size = 256

cifar10_path = os.path.join(os.path.dirname(__file__), "cifar10")
trainset = torchvision.datasets.CIFAR10(
    root=cifar10_path, train=True, download=True, transform=transform
)
trainloader = torch.utils.data.DataLoader(
    trainset, batch_size=batch_size, shuffle=True, num_workers=2
)

testset = torchvision.datasets.CIFAR10(
    root=cifar10_path, train=False, download=True, transform=transform
)
testloader = torch.utils.data.DataLoader(
    testset, batch_size=batch_size, shuffle=False, num_workers=2
)


# Option 1: Functional
inputs = x = keras.Input(
    shape=(3, 32, 32), batch_size=1
)  # Requires specifying batch_size with some dummy value, since dynamic shapes are not allowed
for c in [1024, 512, 256]:
    x = einn.Linear("b [...->c]", c=c)(x)
    x = einn.Norm("[b] c", decay_rate=0.99)(x)
    x = keras.layers.Activation(keras.activations.gelu)(x)
    x = einn.Dropout("[...]", drop_rate=0.2)(x)
x = einn.Linear("b [...->c]", c=10)(x)
model = keras.Model(inputs=inputs, outputs=x)

# Option 2: Sequential
# blocks = []
# for c in [1024, 512, 256]:
#     blocks.append(einn.Linear("b [...->c]", c=c))
#     blocks.append(einn.Norm("[b] c", decay_rate=0.99))
#     blocks.append(keras.layers.Activation(keras.activations.gelu))
#     blocks.append(einn.Dropout("[...]", drop_rate=0.2))
# blocks.append(einn.Linear("b [...->c]", c=10))
# model = keras.Sequential(blocks)


optimizer = keras.optimizers.Adam(learning_rate=1e-3)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)


@tf.function
def train_step(inputs, labels):
    with tf.GradientTape() as tape:
        logits = model(inputs, training=True)
        loss_value = loss_fn(labels, logits)

    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply(grads, model.trainable_weights)


@tf.function
def test_step(inputs, labels):
    outputs = model(inputs, training=False)
    predicted = tf.math.argmax(outputs, axis=1)
    return predicted == labels


print("Starting training")
for epoch in range(100):
    t0 = time.time()

    # Train
    for data in trainloader:
        inputs, labels = data
        inputs = np.array(inputs)
        labels = np.array(labels)

        train_step(inputs, labels)

    # Test
    correct = 0
    total = 0
    for data in testloader:
        images, labels = data
        images = np.array(images)
        labels = np.array(labels)

        accurate = test_step(images, labels)
        total += accurate.shape[0]
        correct += tf.math.count_nonzero(accurate)

    print(
        f"Test accuracy after {epoch + 1:5d} epochs {float(correct) / total} "
        f"({time.time() - t0:.2f}sec)"
    )