1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
|
from numpy import *
from math import sqrt
from copy import deepcopy
from time import time
class ANN:
"""ANN with one hidden layer, one output and full connections in between consecutive layers.
Initial weights are chosen from a normal distribution.
Activation function is tanh."""
INIT_SIGMA = 0.02
REL_STOP_MARGIN = 0.01
MAX_ITERATIONS = 1000000
ACTIVATION = tanh
D_ACTIVATION = lambda x: 1 - tanh(x)**2 # Derivative of tanh
VEC_ACTIVATION = vectorize(ACTIVATION)
VEC_D_ACTIVATION = vectorize(D_ACTIVATION)
STEP_SIZE = 0.1
def __init__(self, input_size, hidden_size):
#self.input_size = input_size
#self.hidden_size = hidden_size
self.hidden_weights = random.normal(0, ANN.INIT_SIGMA, (hidden_size, input_size))
self.output_weights = random.normal(0, ANN.INIT_SIGMA, hidden_size)
def get_weights(self):
return self.hidden_weights, self.output_weights
def predict(self, input_vector):
# Predicts the output for this input vector
# input_vector will be normalized
input_vector = input_vector/linalg.norm(input_vector)
return ANN.ACTIVATION(dot(self.output_weights, ANN.VEC_ACTIVATION(dot(self.hidden_weights, input_vector))))
@staticmethod
def frob_norm(a, b):
# Calculates the total Frobenius norm of both matrices A and B
return sqrt(linalg.norm(a)**2 + linalg.norm(b)**2)
def train(self, examples):
#print("Training")
start = time()
# examples is a list of (input, output)-tuples
# input will be normalized
# We stop when the weights have converged within some relative margin
for example in examples:
example[0] = example[0]/linalg.norm(example[0])
iteration = 0
while True:
# Store old weights to check for convergence later
prev_hidden_weights = deepcopy(self.hidden_weights)
prev_output_weights = deepcopy(self.output_weights)
for k in range(len(examples)):
input_vector, output = examples[k]
# Calculate outputs
hidden_input = dot(self.hidden_weights, input_vector)
hidden_output = ANN.VEC_ACTIVATION(hidden_input)
final_input = dot(self.output_weights, hidden_output)
predicted_output = ANN.ACTIVATION(final_input)
#print("Output:", output)
#print("Predicted output:", predicted_output)
# Used in calculations
prediction_error = output - predicted_output
output_derivative = ANN.D_ACTIVATION(final_input)
# Adjust output weights and calculate requested hidden change
requested_hidden_change = prediction_error*output_derivative*self.output_weights
self.output_weights = self.output_weights + ANN.STEP_SIZE*prediction_error*hidden_output
#print("After adjusting output weights:", ANN.ACTIVATION(dot(self.output_weights, hidden_output)))
# Backpropagate requested hidden change to adjust hidden weights
self.hidden_weights = self.hidden_weights + ANN.STEP_SIZE*outer(requested_hidden_change*(ANN.VEC_D_ACTIVATION(hidden_input)), input_vector)
#print("After adjusting hidden weights:", ANN.ACTIVATION(dot(self.output_weights, ANN.VEC_ACTIVATION(dot(self.hidden_weights, input_vector)))))
# Check stop criteria
iteration += 1
if iteration >= ANN.MAX_ITERATIONS:
break
# Check stop criteria
if iteration >= ANN.MAX_ITERATIONS:
break
diff = ANN.frob_norm(self.hidden_weights - prev_hidden_weights, self.output_weights - prev_output_weights)
base = ANN.frob_norm(self.hidden_weights, self.output_weights)
#if base > 0 and diff/base < ANN.REL_STOP_MARGIN:
# break
print(time() - start)
print("Stopped training after %s iterations."%iteration)
# TESTING
def print_difference(ann1, ann2):
# Prints the differences in weights in between two ANN's with identical topology
hidden_weights1, output_weights1 = ann1.get_weights()
hidden_weights2, output_weights2 = ann2.get_weights()
hidden_diff = hidden_weights1 - hidden_weights2
output_diff = output_weights1 - output_weights2
print(hidden_diff)
print(output_diff)
print("Frobenius norms:")
print("Hidden weights difference:", linalg.norm(hidden_diff))
print("Output weights difference:", linalg.norm(output_diff))
print("Both:", ANN.frob_norm(hidden_diff, output_diff))
def RMSE(ann, examples):
total = 0
for input_vector, output in examples:
total += (output - ann.predict(input_vector))**2
return sqrt(total/len(examples))
def generate_examples(amount, input_size, evaluate):
# evaluate is a function mapping an input vector onto a numerical value
examples = []
inputs = random.normal(0, 100, (amount, input_size))
for i in range(amount):
input_vector = inputs[i]
examples.append([input_vector, evaluate(input_vector)])
return examples
def test():
# Test the ANN by having it model another ANN with identical topology but unknown weights
input_size = 5
hidden_size = 3
real = ANN(input_size, hidden_size)
model = ANN(input_size, hidden_size)
# Generate training data
training_data = generate_examples(10000, input_size, real.predict)
validation_data = generate_examples(10000, input_size, real.predict)
# Print initial difference, train, then print new difference
print("Initial difference:")
print_difference(real, model)
print("Initial RMSE (on training data):", RMSE(model, training_data))
print("Initial RMSE (on validation data):", RMSE(model, validation_data))
model.train(training_data)
print("After training:")
print_difference(real, model)
print("After training RMSE (on training data):", RMSE(model, training_data))
print("After training RMSE (on validation data):", RMSE(model, validation_data))
if __name__ == "__main__":
test()
|