aboutsummaryrefslogtreecommitdiffstats
path: root/python/ann.py
blob: 05ae647d401f158f47da4e2f1763b7da36f7315d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
from numpy import *
from math import sqrt
from copy import deepcopy
from time import time

class ANN:
	
	"""ANN with one hidden layer, one output and full connections in between consecutive layers.
	Initial weights are chosen from a normal distribution.
	Activation function is tanh."""
	
	INIT_SIGMA = 0.02
	REL_STOP_MARGIN = 0.01
	MAX_ITERATIONS = 1000000
	ACTIVATION = tanh
	D_ACTIVATION = lambda x: 1 - tanh(x)**2 # Derivative of tanh
	VEC_ACTIVATION = vectorize(ACTIVATION)
	VEC_D_ACTIVATION = vectorize(D_ACTIVATION)
	STEP_SIZE = 0.1
	
	def __init__(self, input_size, hidden_size):
		
		#self.input_size = input_size
		#self.hidden_size = hidden_size
		self.hidden_weights = random.normal(0, ANN.INIT_SIGMA, (hidden_size, input_size))
		self.output_weights = random.normal(0, ANN.INIT_SIGMA, hidden_size)
	
	def get_weights(self):
		return self.hidden_weights, self.output_weights
	
	def predict(self, input_vector):
		
		# Predicts the output for this input vector
		# input_vector will be normalized
		
		input_vector = input_vector/linalg.norm(input_vector)
		return ANN.ACTIVATION(dot(self.output_weights, ANN.VEC_ACTIVATION(dot(self.hidden_weights, input_vector))))
	
	@staticmethod
	def frob_norm(a, b):
		
		# Calculates the total Frobenius norm of both matrices A and B
		return sqrt(linalg.norm(a)**2 + linalg.norm(b)**2)
	
	def train(self, examples):
		
		#print("Training")
		start = time()
		
		# examples is a list of (input, output)-tuples
		# input will be normalized
		# We stop when the weights have converged within some relative margin
		
		for example in examples:
			example[0] = example[0]/linalg.norm(example[0])
		
		iteration = 0
		while True:
			
			
			# Store old weights to check for convergence later
			prev_hidden_weights = deepcopy(self.hidden_weights)
			prev_output_weights = deepcopy(self.output_weights)
			
			for k in range(len(examples)):
				
				input_vector, output = examples[k]
				
				# Calculate outputs
				hidden_input = dot(self.hidden_weights, input_vector)
				hidden_output = ANN.VEC_ACTIVATION(hidden_input)
				final_input = dot(self.output_weights, hidden_output)
				predicted_output = ANN.ACTIVATION(final_input)
				
				#print("Output:", output)
				#print("Predicted output:", predicted_output)
				
				# Used in calculations
				prediction_error = output - predicted_output
				output_derivative = ANN.D_ACTIVATION(final_input)
				
				# Adjust output weights and calculate requested hidden change
				requested_hidden_change = prediction_error*output_derivative*self.output_weights
				self.output_weights = self.output_weights + ANN.STEP_SIZE*prediction_error*hidden_output
				
				#print("After adjusting output weights:", ANN.ACTIVATION(dot(self.output_weights, hidden_output)))
				
				# Backpropagate requested hidden change to adjust hidden weights
				self.hidden_weights = self.hidden_weights + ANN.STEP_SIZE*outer(requested_hidden_change*(ANN.VEC_D_ACTIVATION(hidden_input)), input_vector)
				
				#print("After adjusting hidden weights:", ANN.ACTIVATION(dot(self.output_weights, ANN.VEC_ACTIVATION(dot(self.hidden_weights, input_vector)))))
				
				# Check stop criteria
				iteration += 1
				if iteration >= ANN.MAX_ITERATIONS:
					break
			
			# Check stop criteria
			if iteration >= ANN.MAX_ITERATIONS:
				break
			diff = ANN.frob_norm(self.hidden_weights - prev_hidden_weights, self.output_weights - prev_output_weights)
			base = ANN.frob_norm(self.hidden_weights, self.output_weights)
			#if base > 0 and diff/base < ANN.REL_STOP_MARGIN:
			#	break
		
		print(time() - start)
		print("Stopped training after %s iterations."%iteration)

# TESTING

def print_difference(ann1, ann2):
	
	# Prints the differences in weights in between two ANN's with identical topology
	
	hidden_weights1, output_weights1 = ann1.get_weights()
	hidden_weights2, output_weights2 = ann2.get_weights()
	hidden_diff = hidden_weights1 - hidden_weights2
	output_diff = output_weights1 - output_weights2
	
	print(hidden_diff)
	print(output_diff)
	print("Frobenius norms:")
	print("Hidden weights difference:", linalg.norm(hidden_diff))
	print("Output weights difference:", linalg.norm(output_diff))
	print("Both:", ANN.frob_norm(hidden_diff, output_diff))

def RMSE(ann, examples):
	
	total = 0
	for input_vector, output in examples:
		total += (output - ann.predict(input_vector))**2
	return sqrt(total/len(examples))

def generate_examples(amount, input_size, evaluate):
	# evaluate is a function mapping an input vector onto a numerical value
	examples = []
	inputs = random.normal(0, 100, (amount, input_size))
	for i in range(amount):
		input_vector = inputs[i]
		examples.append([input_vector, evaluate(input_vector)])
	return examples

def test():
	
	# Test the ANN by having it model another ANN with identical topology but unknown weights
	
	input_size = 5
	hidden_size = 3
	real = ANN(input_size, hidden_size)
	model = ANN(input_size, hidden_size)
	
	# Generate training data
	training_data = generate_examples(10000, input_size, real.predict)
	validation_data = generate_examples(10000, input_size, real.predict)
	
	# Print initial difference, train, then print new difference
	print("Initial difference:")
	print_difference(real, model)
	print("Initial RMSE (on training data):", RMSE(model, training_data))
	print("Initial RMSE (on validation data):", RMSE(model, validation_data))
	model.train(training_data)
	print("After training:")
	print_difference(real, model)
	print("After training RMSE (on training data):", RMSE(model, training_data))
	print("After training RMSE (on validation data):", RMSE(model, validation_data))

if __name__ == "__main__":
	test()