introduction-to-deep-learning/Le Deep Learning de A a Z/Part 4 - Self_Organizing_Maps/minisom.py

267 lines
11 KiB
Python
Raw Normal View History

2023-08-21 15:09:08 +00:00
from math import sqrt
from numpy import (array, unravel_index, nditer, linalg, random, subtract,
power, exp, pi, zeros, arange, outer, meshgrid, dot)
from collections import defaultdict
from warnings import warn
"""
Minimalistic implementation of the Self Organizing Maps (SOM).
"""
def fast_norm(x):
"""Returns norm-2 of a 1-D numpy array.
* faster than linalg.norm in case of 1-D arrays (numpy 1.9.2rc1).
"""
return sqrt(dot(x, x.T))
class MiniSom(object):
def __init__(self, x, y, input_len, sigma=1.0, learning_rate=0.5, decay_function=None, random_seed=None):
"""
Initializes a Self Organizing Maps.
x,y - dimensions of the SOM
input_len - number of the elements of the vectors in input
sigma - spread of the neighborhood function (Gaussian), needs to be adequate to the dimensions of the map.
(at the iteration t we have sigma(t) = sigma / (1 + t/T) where T is #num_iteration/2)
learning_rate - initial learning rate
(at the iteration t we have learning_rate(t) = learning_rate / (1 + t/T) where T is #num_iteration/2)
decay_function, function that reduces learning_rate and sigma at each iteration
default function: lambda x,current_iteration,max_iter: x/(1+current_iteration/max_iter)
random_seed, random seed to use.
"""
if sigma >= x/2.0 or sigma >= y/2.0:
warn('Warning: sigma is too high for the dimension of the map.')
if random_seed:
self.random_generator = random.RandomState(random_seed)
else:
self.random_generator = random.RandomState(random_seed)
if decay_function:
self._decay_function = decay_function
else:
self._decay_function = lambda x, t, max_iter: x/(1+t/max_iter)
self.learning_rate = learning_rate
self.sigma = sigma
self.weights = self.random_generator.rand(x,y,input_len)*2-1 # random initialization
for i in range(x):
for j in range(y):
self.weights[i,j] = self.weights[i,j] / fast_norm(self.weights[i,j]) # normalization
self.activation_map = zeros((x,y))
self.neigx = arange(x)
self.neigy = arange(y) # used to evaluate the neighborhood function
self.neighborhood = self.gaussian
def _activate(self, x):
""" Updates matrix activation_map, in this matrix the element i,j is the response of the neuron i,j to x """
s = subtract(x, self.weights) # x - w
it = nditer(self.activation_map, flags=['multi_index'])
while not it.finished:
self.activation_map[it.multi_index] = fast_norm(s[it.multi_index]) # || x - w ||
it.iternext()
def activate(self, x):
""" Returns the activation map to x """
self._activate(x)
return self.activation_map
def gaussian(self, c, sigma):
""" Returns a Gaussian centered in c """
d = 2*pi*sigma*sigma
ax = exp(-power(self.neigx-c[0], 2)/d)
ay = exp(-power(self.neigy-c[1], 2)/d)
return outer(ax, ay) # the external product gives a matrix
def diff_gaussian(self, c, sigma):
""" Mexican hat centered in c (unused) """
xx, yy = meshgrid(self.neigx, self.neigy)
p = power(xx-c[0], 2) + power(yy-c[1], 2)
d = 2*pi*sigma*sigma
return exp(-p/d)*(1-2/d*p)
def winner(self, x):
""" Computes the coordinates of the winning neuron for the sample x """
self._activate(x)
return unravel_index(self.activation_map.argmin(), self.activation_map.shape)
def update(self, x, win, t):
"""
Updates the weights of the neurons.
x - current pattern to learn
win - position of the winning neuron for x (array or tuple).
t - iteration index
"""
eta = self._decay_function(self.learning_rate, t, self.T)
sig = self._decay_function(self.sigma, t, self.T) # sigma and learning rate decrease with the same rule
g = self.neighborhood(win, sig)*eta # improves the performances
it = nditer(g, flags=['multi_index'])
while not it.finished:
# eta * neighborhood_function * (x-w)
self.weights[it.multi_index] += g[it.multi_index]*(x-self.weights[it.multi_index])
# normalization
self.weights[it.multi_index] = self.weights[it.multi_index] / fast_norm(self.weights[it.multi_index])
it.iternext()
def quantization(self, data):
""" Assigns a code book (weights vector of the winning neuron) to each sample in data. """
q = zeros(data.shape)
for i, x in enumerate(data):
q[i] = self.weights[self.winner(x)]
return q
def random_weights_init(self, data):
""" Initializes the weights of the SOM picking random samples from data """
it = nditer(self.activation_map, flags=['multi_index'])
while not it.finished:
self.weights[it.multi_index] = data[self.random_generator.randint(len(data))]
self.weights[it.multi_index] = self.weights[it.multi_index]/fast_norm(self.weights[it.multi_index])
it.iternext()
def train_random(self, data, num_iteration):
""" Trains the SOM picking samples at random from data """
self._init_T(num_iteration)
for iteration in range(num_iteration):
rand_i = self.random_generator.randint(len(data)) # pick a random sample
self.update(data[rand_i], self.winner(data[rand_i]), iteration)
def train_batch(self, data, num_iteration):
""" Trains using all the vectors in data sequentially """
self._init_T(len(data)*num_iteration)
iteration = 0
while iteration < num_iteration:
idx = iteration % (len(data)-1)
self.update(data[idx], self.winner(data[idx]), iteration)
iteration += 1
def _init_T(self, num_iteration):
""" Initializes the parameter T needed to adjust the learning rate """
self.T = num_iteration/2 # keeps the learning rate nearly constant for the last half of the iterations
def distance_map(self):
""" Returns the distance map of the weights.
Each cell is the normalised sum of the distances between a neuron and its neighbours.
"""
um = zeros((self.weights.shape[0], self.weights.shape[1]))
it = nditer(um, flags=['multi_index'])
while not it.finished:
for ii in range(it.multi_index[0]-1, it.multi_index[0]+2):
for jj in range(it.multi_index[1]-1, it.multi_index[1]+2):
if ii >= 0 and ii < self.weights.shape[0] and jj >= 0 and jj < self.weights.shape[1]:
um[it.multi_index] += fast_norm(self.weights[ii, jj, :]-self.weights[it.multi_index])
it.iternext()
um = um/um.max()
return um
def activation_response(self, data):
"""
Returns a matrix where the element i,j is the number of times
that the neuron i,j have been winner.
"""
a = zeros((self.weights.shape[0], self.weights.shape[1]))
for x in data:
a[self.winner(x)] += 1
return a
def quantization_error(self, data):
"""
Returns the quantization error computed as the average distance between
each input sample and its best matching unit.
"""
error = 0
for x in data:
error += fast_norm(x-self.weights[self.winner(x)])
return error/len(data)
def win_map(self, data):
"""
Returns a dictionary wm where wm[(i,j)] is a list with all the patterns
that have been mapped in the position i,j.
"""
winmap = defaultdict(list)
for x in data:
winmap[self.winner(x)].append(x)
return winmap
### unit tests
from numpy.testing import assert_almost_equal, assert_array_almost_equal, assert_array_equal
class TestMinisom:
def setup_method(self, method):
self.som = MiniSom(5, 5, 1)
for i in range(5):
for j in range(5):
assert_almost_equal(1.0, linalg.norm(self.som.weights[i,j])) # checking weights normalization
self.som.weights = zeros((5, 5)) # fake weights
self.som.weights[2, 3] = 5.0
self.som.weights[1, 1] = 2.0
def test_decay_function(self):
assert self.som._decay_function(1., 2., 3.) == 1./(1.+2./3.)
def test_fast_norm(self):
assert fast_norm(array([1, 3])) == sqrt(1+9)
def test_gaussian(self):
bell = self.som.gaussian((2, 2), 1)
assert bell.max() == 1.0
assert bell.argmax() == 12 # unravel(12) = (2,2)
def test_win_map(self):
winners = self.som.win_map([5.0, 2.0])
assert winners[(2, 3)][0] == 5.0
assert winners[(1, 1)][0] == 2.0
def test_activation_reponse(self):
response = self.som.activation_response([5.0, 2.0])
assert response[2, 3] == 1
assert response[1, 1] == 1
def test_activate(self):
assert self.som.activate(5.0).argmin() == 13.0 # unravel(13) = (2,3)
def test_quantization_error(self):
self.som.quantization_error([5, 2]) == 0.0
self.som.quantization_error([4, 1]) == 0.5
def test_quantization(self):
q = self.som.quantization(array([4, 2]))
assert q[0] == 5.0
assert q[1] == 2.0
def test_random_seed(self):
som1 = MiniSom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1)
som2 = MiniSom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1)
assert_array_almost_equal(som1.weights, som2.weights) # same initialization
data = random.rand(100,2)
som1 = MiniSom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1)
som1.train_random(data,10)
som2 = MiniSom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1)
som2.train_random(data,10)
assert_array_almost_equal(som1.weights,som2.weights) # same state after training
def test_train_batch(self):
som = MiniSom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1)
data = array([[4, 2], [3, 1]])
q1 = som.quantization_error(data)
som.train_batch(data, 10)
assert q1 > som.quantization_error(data)
def test_train_random(self):
som = MiniSom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1)
data = array([[4, 2], [3, 1]])
q1 = som.quantization_error(data)
som.train_random(data, 10)
assert q1 > som.quantization_error(data)
def test_random_weights_init(self):
som = MiniSom(2, 2, 2, random_seed=1)
som.random_weights_init(array([[1.0, .0]]))
for w in som.weights:
assert_array_equal(w[0], array([1.0, .0]))