introduction-to-deep-learning/Intelligence Artificielle d.../2. Doom/ai.py

156 lines
4.8 KiB
Python

# AI for Doom
# Importing the libraries
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
# Importing the packages for OpenAI and Doom
import gym
from gym import wrappers
import vizdoomgym
# Importing the other Python files
import experience_replay, image_preprocessing
# Part 1 - Building the AI
# Making the brain
class CNN(nn.Module):
def __init__(self, number_actions):
super(CNN, self).__init__()
self.convolution1 = nn.Conv2d(in_channels = 1, out_channels = 32, kernel_size = 5)
self.convolution2 = nn.Conv2d(in_channels = 32, out_channels = 32, kernel_size = 3)
self.convolution3 = nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size = 2)
self.fc1 = nn.Linear(in_features = self.count_neurons((1, 80, 80)), out_features = 40)
self.fc2 = nn.Linear(in_features = 40, out_features = number_actions)
def count_neurons(self, image_dim):
x = Variable(torch.rand(1, *image_dim))
x = F.relu(F.max_pool2d(self.convolution1(x), 3, 2))
x = F.relu(F.max_pool2d(self.convolution2(x), 3, 2))
x = F.relu(F.max_pool2d(self.convolution3(x), 3, 2))
return x.data.view(1, -1).size(1)
def forward(self, x):
x = F.relu(F.max_pool2d(self.convolution1(x), 3, 2))
x = F.relu(F.max_pool2d(self.convolution2(x), 3, 2))
x = F.relu(F.max_pool2d(self.convolution3(x), 3, 2))
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
# Making the body
class SoftmaxBody(nn.Module):
def __init__(self, T):
super(SoftmaxBody, self).__init__()
self.T = T
def forward(self, outputs):
probs = F.softmax(outputs * self.T)
actions = probs.multinomial(num_samples=1)
return actions
# Making the AI
class AI:
def __init__(self, brain, body):
self.brain = brain
self.body = body
def __call__(self, inputs):
input = Variable(torch.from_numpy(np.array(inputs, dtype = np.float32)))
output = self.brain(input)
actions = self.body(output)
return actions.data.numpy()
# Part 2 - Training the AI with Deep Convolutional Q-Learning
# Getting the Doom environment
doom_env = image_preprocessing.PreprocessImage(gym.make("VizdoomCorridor-v0"), width=80, height=80, grayscale=True)
doom_env = wrappers.Monitor(doom_env, "videos", force = True)
number_actions = doom_env.action_space.n
# Building an AI
cnn = CNN(number_actions)
softmax_body = SoftmaxBody(T = 1.0)
ai = AI(brain = cnn, body = softmax_body)
# Setting up Experience Replay
n_steps = experience_replay.NStepProgress(env = doom_env, ai = ai, n_step = 10)
memory = experience_replay.ReplayMemory(n_steps = n_steps, capacity = 10000)
# Implementing Eligibility Trace
def eligibility_trace(batch):
gamma = 0.99
inputs = []
targets = []
for series in batch:
input = Variable(torch.from_numpy(np.array([series[0].state, series[-1].state], dtype = np.float32)))
output = cnn(input)
cumul_reward = 0.0 if series[-1].done else output[1].data.max()
for step in reversed(series[:-1]):
cumul_reward = step.reward + gamma * cumul_reward
state = series[0].state
target = output[0].data
target[series[0].action] = cumul_reward
inputs.append(state)
targets.append(target)
return torch.from_numpy(np.array(inputs, dtype = np.float32)), torch.stack(targets)
# Making the moving average on 100 steps
class MA:
def __init__(self, size):
self.list_of_rewards = []
self.size = size
def add(self, rewards):
if isinstance(rewards, list):
self.list_of_rewards += rewards
else:
self.list_of_rewards.append(rewards)
while len(self.list_of_rewards) > self.size:
del self.list_of_rewards[0]
def average(self):
return np.mean(self.list_of_rewards)
ma = MA(100)
# Training the AI
loss = nn.MSELoss()
optimizer = optim.Adam(cnn.parameters(), lr = 0.001)
nb_epochs = 100
for epoch in range(1, nb_epochs + 1):
memory.run_steps(200)
for batch in memory.sample_batch(128):
inputs, targets = eligibility_trace(batch)
inputs, targets = Variable(inputs), Variable(targets)
predictions = cnn(inputs)
loss_error = loss(predictions, targets)
optimizer.zero_grad()
loss_error.backward()
optimizer.step()
rewards_steps = n_steps.rewards_steps()
ma.add(rewards_steps)
avg_reward = ma.average()
print("Epoch: %s, Average Reward: %s" % (str(epoch), str(avg_reward)))
if avg_reward >= 1500:
print("Congratulations, your AI wins")
break
# Closing the Doom environment
doom_env.close()