This repository has been archived by the owner on Nov 2, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathsnn_fpga_simulation.py
223 lines (164 loc) · 8.01 KB
/
snn_fpga_simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# import numpy
import numpy as np
# import modules from pytorch
import torch
from torchvision import transforms
# import modules from bindsnet
from bindsnet.datasets import MNIST, DataLoader
from bindsnet.encoding import PoissonEncoder, BernoulliEncoder, RankOrderEncoder
from bindsnet.evaluation import all_activity, proportion_weighting, assign_labels
from bindsnet.network.monitors import Monitor
from bindsnet.network import load
# miscellaneous imports
import os
import argparse
# create an argument parser to interpret command line arguments
parser = argparse.ArgumentParser()
# --encoding specifies the type of encoding (Poisson, Bernoulli or RankOrder)
parser.add_argument("--encoding", type=str, default="Poisson")
parser.add_argument("--weight_size", type=int, default=16)
parser.add_argument("--neuron_type", type=str, default="IF")
parser.add_argument("--batch_size", type=int, default="IF")
# parse the arguments
args = parser.parse_args()
# declare global variables
# n_neurons specifies the number of neurons per layer
n_neurons = 100
# batch_size specifies the number of training samples to collect weight changes from before updating the weights
batch_size = args.batch_size
# n_train specifies the number of training samples
n_train = 60000
# n_test specifies the number of testing samples
n_test = 10000
# update_steps specifies the number of batches to process before reporting an update
update_steps = 10
# time specifies the simulation time of the SNN
time = 100
# dt specifies the timestep size for the simulation time
dt = 1
# intensity specifies the maximum intensity of the input data
intensity = 128
# gpu setting
gpu = torch.cuda.is_available()
# update_interavl specifies the number of samples processed before updating accuracy estimations
update_interval = update_steps * batch_size
# setup CUDA
device = torch.device("cuda" if torch.cuda.is_available() and gpu else "cpu")
# determine number of worker threads to load data
n_workers = 0
# if n_workers == -1:
# n_workers = gpu * 4 * torch.cuda.device_count()
# report the selected encoding scheme, neural model and learning technique
print("Encoding Scheme:",args.encoding)
# assign a value to the encoder based on the input argument
encoder = None
if args.encoding == "Poisson":
encoder = PoissonEncoder(time=time,dt=dt)
if args.encoding == "Bernoulli":
encoder = BernoulliEncoder(time=time,dt=dt)
if args.encoding == "RankOrder":
encoder = RankOrderEncoder(time=time,dt=dt)
neuron_type = ""
if args.neuron_type == "IF":
neuron_type = "if"
else:
neuron_type = "diehlAndCook"
# build network based on the input argument
networkFile = f"./networks/{neuron_type}_Poisson_{batch_size}_{args.weight_size}bit_snn.pt"
weightFileDirectory = f"./networks/{neuron_type}_Poisson_{batch_size}_{args.weight_size}bit_weights"
network = None
assignments = None
proportions = None
if gpu:
network = load(f"./networks/{neuron_type}_Poisson_{batch_size}_{args.weight_size}bit_snn.pt")
assignments = torch.load(f'./networks/{neuron_type}_Poisson_{batch_size}_{args.weight_size}bit_snn_assignments.pt')
proportions = torch.load(f'./networks/{neuron_type}_Poisson_{batch_size}_{args.weight_size}bit_snn_proportions.pt')
else:
network = load(f"./networks/{neuron_type}_Poisson_{batch_size}_{args.weight_size}bit_snn.pt",map_location=torch.device('cpu'))
assignments = torch.load(f'./networks/{neuron_type}_Poisson_{batch_size}_{args.weight_size}bit_snn_assignments.pt',map_location=torch.device('cpu'))
proportions = torch.load(f'./networks/{neuron_type}_Poisson_{batch_size}_{args.weight_size}bit_snn_proportions.pt',map_location=torch.device('cpu'))
proportions = proportions.view(1,n_neurons)
if gpu:
assignments = assignments.cuda()
proportions = proportions.cuda()
# update weights based on the FPGA values
# extract connections
excitatoryConnectionWeights = network.connections["X","Y"].w
# test the adjusted weights
# for each hidden layer neuron
for neuronIdx in range(excitatoryConnectionWeights.shape[1]):
# new file
neuronFile = open(f"{weightFileDirectory}/{neuronIdx}.txt","r")
# for each input neuron
for inputIdx in range(excitatoryConnectionWeights.shape[0]):
# read the weight value from the file
hexWeightValue = neuronFile.readline()
weightValue = int(hexWeightValue,16)
excitatoryConnectionWeights[inputIdx][neuronIdx] = weightValue
neuronFile.close()
# run the network using the GPU/CUDA
if gpu:
network.to("cuda")
# load the MNIST test dataset
# use the encoder to convert the input into spikes
test_dataset = MNIST(
encoder,
None,
root=os.path.join(".", "data", "MNIST"),
download=True,
train=False,
transform=transforms.Compose(
[transforms.ToTensor(), transforms.Lambda(lambda x: x * intensity)]
),
)
# create a dataloader to iterate over and batch the test data
test_dataloader = DataLoader( test_dataset, batch_size=256, shuffle=False, num_workers=n_workers, pin_memory=True, )
# declare variables needed for estimating the network accuracy
n_classes = 10
# create a monitor to record the spiking activity of the output layer (Y)
output_spikes_monitor = Monitor(network.layers["Y"], state_vars=["s"], time=int(time / dt))
# add the monitor to the network
network.add_monitor(output_spikes_monitor, name="Y")
# create a tensor to store the spiking activity for all neurons for the duration of the update_interval
spike_record = torch.zeros((update_interval, int(time / dt), n_neurons), device=device)
# create a dictionary to store all assignment and proportional assignment accuracy values for the test data
accuracy = {"all": 0, "proportion": 0}
# run the network for each test sample
print("\nBegin testing\n")
# put the network into test mode
network.train(mode=False)
# iterate over each batch
for step, batch in enumerate(test_dataloader):
# get next input sample
inputs = {"X": batch["encoded_image"]}
if gpu:
inputs = {k: v.cuda() for k, v in inputs.items()}
# run the network on the input
network.run(inputs=inputs, time=time, input_time_dim=1)
# get the spikes produced by the current batch
spike_record = output_spikes_monitor.get("s").permute((1, 0, 2))
# convert the array of labels into a tensor
label_tensor = torch.tensor(batch["label"], device=device)
# get network predictions based on the spiking activity, previous assignments and number of classes
all_activity_pred = all_activity( spikes=spike_record, assignments=assignments, n_labels=n_classes )
# get network predictions based on the spiking activity, previous assignments, proportional assignments and number of classes
proportion_pred = proportion_weighting( spikes=spike_record, assignments=assignments, proportions=proportions, n_labels=n_classes, )
# compute the network accuracy based on the prediction results and add the results to the accuracy dictionary
accuracy["all"] += float(torch.sum(label_tensor.long() == all_activity_pred).item())
# compute the network accuracy based on the proportional prediction results and add the results to the accuracy dictionary
accuracy["proportion"] += float( torch.sum(label_tensor.long() == proportion_pred).item() )
print(f"all activity: {all_activity_pred}")
print(f"proportion activity: {proportion_pred}")
# if it is time to print out an accuracy estimate
if step % update_steps == 0 and step > 0:
# print out the assignment and proportional assignment accuracy
print("\nAll activity accuracy: %.2f" % (accuracy["all"] / (step*256)))
print("Proportion weighting accuracy: %.2f" % (accuracy["proportion"] / (step*256)))
#print out how many test samples are remaining
print("Progress:",step*256,"/",n_test)
# reset the network before running it again
network.reset_state_variables()
# print out the final assignment and proportional assignment accuracies
print("\nAll activity accuracy: %.2f" % (accuracy["all"] / n_test))
print("Proportion weighting accuracy: %.2f \n" % (accuracy["proportion"] / n_test))
print("Testing complete.\n")