156 lines
4.8 KiB
Python
156 lines
4.8 KiB
Python
|
# AI for Doom
|
||
|
|
||
|
|
||
|
|
||
|
# Importing the libraries
|
||
|
import numpy as np
|
||
|
import torch
|
||
|
import torch.nn as nn
|
||
|
import torch.nn.functional as F
|
||
|
import torch.optim as optim
|
||
|
from torch.autograd import Variable
|
||
|
|
||
|
# Importing the packages for OpenAI and Doom
|
||
|
import gym
|
||
|
from gym import wrappers
|
||
|
import vizdoomgym
|
||
|
|
||
|
# Importing the other Python files
|
||
|
import experience_replay, image_preprocessing
|
||
|
|
||
|
|
||
|
|
||
|
# Part 1 - Building the AI
|
||
|
|
||
|
# Making the brain
|
||
|
|
||
|
class CNN(nn.Module):
|
||
|
|
||
|
def __init__(self, number_actions):
|
||
|
super(CNN, self).__init__()
|
||
|
self.convolution1 = nn.Conv2d(in_channels = 1, out_channels = 32, kernel_size = 5)
|
||
|
self.convolution2 = nn.Conv2d(in_channels = 32, out_channels = 32, kernel_size = 3)
|
||
|
self.convolution3 = nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size = 2)
|
||
|
self.fc1 = nn.Linear(in_features = self.count_neurons((1, 80, 80)), out_features = 40)
|
||
|
self.fc2 = nn.Linear(in_features = 40, out_features = number_actions)
|
||
|
|
||
|
def count_neurons(self, image_dim):
|
||
|
x = Variable(torch.rand(1, *image_dim))
|
||
|
x = F.relu(F.max_pool2d(self.convolution1(x), 3, 2))
|
||
|
x = F.relu(F.max_pool2d(self.convolution2(x), 3, 2))
|
||
|
x = F.relu(F.max_pool2d(self.convolution3(x), 3, 2))
|
||
|
return x.data.view(1, -1).size(1)
|
||
|
|
||
|
def forward(self, x):
|
||
|
x = F.relu(F.max_pool2d(self.convolution1(x), 3, 2))
|
||
|
x = F.relu(F.max_pool2d(self.convolution2(x), 3, 2))
|
||
|
x = F.relu(F.max_pool2d(self.convolution3(x), 3, 2))
|
||
|
x = x.view(x.size(0), -1)
|
||
|
x = F.relu(self.fc1(x))
|
||
|
x = self.fc2(x)
|
||
|
return x
|
||
|
|
||
|
# Making the body
|
||
|
|
||
|
class SoftmaxBody(nn.Module):
|
||
|
|
||
|
def __init__(self, T):
|
||
|
super(SoftmaxBody, self).__init__()
|
||
|
self.T = T
|
||
|
|
||
|
def forward(self, outputs):
|
||
|
probs = F.softmax(outputs * self.T)
|
||
|
actions = probs.multinomial(num_samples=1)
|
||
|
return actions
|
||
|
|
||
|
# Making the AI
|
||
|
|
||
|
class AI:
|
||
|
|
||
|
def __init__(self, brain, body):
|
||
|
self.brain = brain
|
||
|
self.body = body
|
||
|
|
||
|
def __call__(self, inputs):
|
||
|
input = Variable(torch.from_numpy(np.array(inputs, dtype = np.float32)))
|
||
|
output = self.brain(input)
|
||
|
actions = self.body(output)
|
||
|
return actions.data.numpy()
|
||
|
|
||
|
|
||
|
|
||
|
# Part 2 - Training the AI with Deep Convolutional Q-Learning
|
||
|
|
||
|
# Getting the Doom environment
|
||
|
doom_env = image_preprocessing.PreprocessImage(gym.make("VizdoomCorridor-v0"), width=80, height=80, grayscale=True)
|
||
|
doom_env = wrappers.Monitor(doom_env, "videos", force = True)
|
||
|
number_actions = doom_env.action_space.n
|
||
|
|
||
|
# Building an AI
|
||
|
cnn = CNN(number_actions)
|
||
|
softmax_body = SoftmaxBody(T = 1.0)
|
||
|
ai = AI(brain = cnn, body = softmax_body)
|
||
|
|
||
|
# Setting up Experience Replay
|
||
|
n_steps = experience_replay.NStepProgress(env = doom_env, ai = ai, n_step = 10)
|
||
|
memory = experience_replay.ReplayMemory(n_steps = n_steps, capacity = 10000)
|
||
|
|
||
|
# Implementing Eligibility Trace
|
||
|
def eligibility_trace(batch):
|
||
|
gamma = 0.99
|
||
|
inputs = []
|
||
|
targets = []
|
||
|
for series in batch:
|
||
|
input = Variable(torch.from_numpy(np.array([series[0].state, series[-1].state], dtype = np.float32)))
|
||
|
output = cnn(input)
|
||
|
cumul_reward = 0.0 if series[-1].done else output[1].data.max()
|
||
|
for step in reversed(series[:-1]):
|
||
|
cumul_reward = step.reward + gamma * cumul_reward
|
||
|
state = series[0].state
|
||
|
target = output[0].data
|
||
|
target[series[0].action] = cumul_reward
|
||
|
inputs.append(state)
|
||
|
targets.append(target)
|
||
|
return torch.from_numpy(np.array(inputs, dtype = np.float32)), torch.stack(targets)
|
||
|
|
||
|
# Making the moving average on 100 steps
|
||
|
class MA:
|
||
|
def __init__(self, size):
|
||
|
self.list_of_rewards = []
|
||
|
self.size = size
|
||
|
def add(self, rewards):
|
||
|
if isinstance(rewards, list):
|
||
|
self.list_of_rewards += rewards
|
||
|
else:
|
||
|
self.list_of_rewards.append(rewards)
|
||
|
while len(self.list_of_rewards) > self.size:
|
||
|
del self.list_of_rewards[0]
|
||
|
def average(self):
|
||
|
return np.mean(self.list_of_rewards)
|
||
|
ma = MA(100)
|
||
|
|
||
|
# Training the AI
|
||
|
loss = nn.MSELoss()
|
||
|
optimizer = optim.Adam(cnn.parameters(), lr = 0.001)
|
||
|
nb_epochs = 100
|
||
|
for epoch in range(1, nb_epochs + 1):
|
||
|
memory.run_steps(200)
|
||
|
for batch in memory.sample_batch(128):
|
||
|
inputs, targets = eligibility_trace(batch)
|
||
|
inputs, targets = Variable(inputs), Variable(targets)
|
||
|
predictions = cnn(inputs)
|
||
|
loss_error = loss(predictions, targets)
|
||
|
optimizer.zero_grad()
|
||
|
loss_error.backward()
|
||
|
optimizer.step()
|
||
|
rewards_steps = n_steps.rewards_steps()
|
||
|
ma.add(rewards_steps)
|
||
|
avg_reward = ma.average()
|
||
|
print("Epoch: %s, Average Reward: %s" % (str(epoch), str(avg_reward)))
|
||
|
if avg_reward >= 1500:
|
||
|
print("Congratulations, your AI wins")
|
||
|
break
|
||
|
|
||
|
# Closing the Doom environment
|
||
|
doom_env.close()
|