PFRLを試してみる - slime volleyball
はじめに
前回はPFRLでatari SpaceInvadorの学習を行ったが、
計算時間が足りず、うまく学習できなかった。
今回はもう少し簡単な、Slime Volleyball1に対して学習を行う。
slime volleyball
slime volleyballは2人のプレイヤーがボールを相手のコートに
落とそうとする単純なバレーボールである。
上図の赤はプレイヤーの位置、緑はボールの位置、青は対戦相手の位置である。
状態としてプレイヤー、ボール、対戦相手の位置・速度を受け取り(12次元)、
対戦相手に勝てるようなプレイヤーの動かし方を学習するという問題である。
どちらかが5点とればゲーム終了であり、
自分がポイントをとると1点、相手がポイントをとると-1点の報酬が得られる。
ゆえに1エピソード当たりの収益は-5 ~ 5の間の整数となる。
検証
学習アルゴリズムは前回に引き続きRainbow2を用いる。
PFRLのexampleを参考にした。
Google Colaboratoryで検証
学習コード
ライブラリ読み込み
!pip install pfrl
!pip install slimevolleygym
import slimevolleygym
import argparse
import os
import torch
import torch.nn as nn
import numpy as np
from PIL import Image
import gym
import pfrl
from pfrl.q_functions import DiscreteActionValueHead
from pfrl import agents
from pfrl import experiments
from pfrl import explorers
from pfrl import nn as pnn
from pfrl import utils
from pfrl.q_functions import DuelingDQN
from pfrl import replay_buffers
from pfrl.wrappers import atari_wrappers
from pfrl.initializers import init_chainer_default
from pfrl.q_functions import DistributionalDuelingDQN
環境設定
SEED = 0
train_seed = SEED
test_seed = 2 ** 31 - 1 - SEED
class MultiBinaryAsDiscreteAction(gym.ActionWrapper):
"""Transforms MultiBinary action space to Discrete.
If the action space of a given env is `gym.spaces.MultiBinary(n)`, then
the action space of the wrapped env will be `gym.spaces.Discrete(2**n)`,
which covers all the combinations of the original action space.
Args:
env (gym.Env): Gym env whose action space is `gym.spaces.MultiBinary`.
"""
def __init__(self, env):
super().__init__(env)
assert isinstance(env.action_space, gym.spaces.MultiBinary)
self.orig_action_space = env.action_space
self.action_space = gym.spaces.Discrete(2 ** env.action_space.n)
def action(self, action):
return [(action >> i) % 2 for i in range(self.orig_action_space.n)]
def make_env(test):
# Use different random seeds for train and test envs
env_seed = test_seed if test else train_seed
env = gym.make("SlimeVolley-v0")
env.seed(int(env_seed))
if isinstance(env.action_space, gym.spaces.MultiBinary):
env = MultiBinaryAsDiscreteAction(env)
# if args.render:
# env = pfrl.wrappers.Render(env)
return env
# 初期SEED設定
utils.set_random_seed(SEED)
# 環境設定
env = make_env(test=False)
eval_env = make_env(test=True)
obs = env.observation_space
obs_size = env.observation_space.low.size
n_actions = env.action_space.n
print(obs_size, n_actions)
slime volleyballの行動は3成分のmulti binaryなので、
1成分のdiscrete action(23個)として扱うためにwrapperクラスを挟んでいる。
Agent 構築
class DistributionalDuelingHead(nn.Module):
"""Head module for defining a distributional dueling network.
This module expects a (batch_size, in_size)-shaped `torch.Tensor` as input
and returns `pfrl.action_value.DistributionalDiscreteActionValue`.
Args:
in_size (int): Input size.
n_actions (int): Number of actions.
n_atoms (int): Number of atoms.
v_min (float): Minimum value represented by atoms.
v_max (float): Maximum value represented by atoms.
"""
def __init__(self, in_size, n_actions, n_atoms, v_min, v_max):
super().__init__()
assert in_size % 2 == 0
self.n_actions = n_actions
self.n_atoms = n_atoms
self.register_buffer(
"z_values", torch.linspace(v_min, v_max, n_atoms, dtype=torch.float)
)
self.a_stream = nn.Linear(in_size // 2, n_actions * n_atoms)
self.v_stream = nn.Linear(in_size // 2, n_atoms)
def forward(self, h):
h_a, h_v = torch.chunk(h, 2, dim=1)
a_logits = self.a_stream(h_a).reshape((-1, self.n_actions, self.n_atoms))
a_logits = a_logits - a_logits.mean(dim=1, keepdim=True)
v_logits = self.v_stream(h_v).reshape((-1, 1, self.n_atoms))
probs = nn.functional.softmax(a_logits + v_logits, dim=2)
return pfrl.action_value.DistributionalDiscreteActionValue(probs, self.z_values)
def phi(x):
return np.asarray(x, dtype=np.float32)
def get_rainbow_agent(gamma, gpu, update_interval=1,replay_start_size=2000, target_update_interval=2000,
n_atoms=51, v_max=1, v_min=-1, hidden_size=512,
noisy_net_sigma=0.1,
lr0=1e-3, eps=1.5e-4, betasteps=2e6, num_step_return=3,
minibatch_size=32):
# categorical Q-function
q_func = nn.Sequential(
nn.Linear(obs_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
DistributionalDuelingHead(hidden_size, n_actions, n_atoms, v_min, v_max),
)
pnn.to_factorized_noisy(q_func, sigma_scale=noisy_net_sigma)
print(q_func)
# 探索アルゴリズム
explorer = explorers.Greedy()
# 最適化
opt = torch.optim.Adam(q_func.parameters(), lr=lr0, eps=eps)
# replay
rbuf = replay_buffers.PrioritizedReplayBuffer(
10 ** 6,
alpha=0.5,
beta0=0.4,
betasteps=betasteps,
num_steps=num_step_return,
normalize_by_max="memory"
)
agent = agents.CategoricalDoubleDQN(
q_func,
opt,
rbuf,
gpu=gpu,
gamma=gamma,
explorer=explorer,
minibatch_size=minibatch_size,
replay_start_size=replay_start_size,
target_update_interval=target_update_interval,
update_interval=update_interval,
batch_accumulator="mean",
phi=phi,
max_grad_norm=10,
)
return agent
画像を入力としていたので、 その部分を今回の問題に合わせて書き換えている。
学習設定
import time
class Hook:
def __init__(self, period=10000):
self.t0 = time.time()
self.period = period
def __call__(self, env, agent, t):
if t % self.period == 0:
t1 = time.time()
dt = t1 - self.t0
print("{} : elps {:.3f}".format(t, dt))
self.t0 = t1
steps = 10 ** 6
gamma = 0.98
update_interval = 1
betasteps = steps / update_interval * 2
gpu = 0 if torch.cuda.is_available() else -1
学習
rainbow_agent = get_rainbow_agent(gamma, gpu, update_interval=update_interval,
betasteps=betasteps)
out_dir = "results_rainbow"
os.makedirs(out_dir, exist_ok=True)
checkpoint_frequency = 2 * 10 ** 5
eval_n_runs = 10
eval_interval = 5 * 10 ** 4
experiments.train_agent_with_evaluation(
agent=rainbow_agent,
env=env,
steps=steps,
eval_n_steps=None,
checkpoint_freq=checkpoint_frequency,
eval_n_episodes=eval_n_runs,
eval_interval=eval_interval,
outdir=out_dir,
save_best_so_far_agent=False,
eval_env=eval_env,
step_hooks=(Hook(), )
)
#rbuf = replay_buffers.ReplayBuffer(10 ** 6, num_step_return)
結果
学習はcolab上で3時間程度(800,000 frameで途中終了)で終了した。
学習曲線
import pandas as pd import matplotlib.pyplot as plt df = pd.read_csv(os.path.join(out_dir, "scores.txt"), sep="\t") cols = ["mean", "median", "average_loss"] fig, axes = plt.subplots(figsize=(20, 6), ncols=3) for c, col in enumerate(cols): df.set_index("steps").plot(y=col, ax=fig.axes[c])
序盤はスコア(mean:平均値, median:中間値)がほぼ-5 (1ポイントもとれずに全敗)だが、
250,000 frameくらいから性能が急激に上昇し、
400,000 frameほどで収益のmedianスコアが0(対戦相手と勝率が5割程度)となっている。
その後はほぼ横ばい。 lossはまだ減少傾向にあるので時間をかければ性能向上する
可能性はある。
挙動確認
評価
from contextlib import ExitStack
def convert_action(action):
return [(action >> i) % 2 for i in range(3)]
def evaluate(agent1, agent2=None, n_episodes=30, num_obs=1, multi_agent=False):
contexts = [agent1]
if multi_agent:
multi_env = gym.make("SlimeVolley-v0")
env = multi_env
assert agent2 is not None
contexts.append(agent2)
else:
env = eval_env
scores = []
terminate = False
timestep = 0
obses = []
actions = []
rewards = []
dones = []
# with agent1.eval_mode(), agent2.eval_mode():
with ExitStack() as stack:
for agent in contexts:
stack.enter_context(agent.eval_mode())
# with [agent.eval_mode() for agent in contexts]:
reset = True
while not terminate:
if reset:
obs = env.reset()
obs2 = obs
done = False
test_r = 0
episode_len = 0
info = {}
a1 = agent1.act(obs)
if multi_agent:
a2 = agent2.act(obs2)
if len(scores) < num_obs:
obses.append(obs)
actions.append(a1)
if multi_agent:
obs, r, done, info = env.step(convert_action(a1), convert_action(a2))
obs2 = info['otherObs']
else:
obs, r, done, info = env.step(a1)
rewards.append(r)
dones.append(done)
test_r += r
episode_len += 1
timestep += 1
reset = done or info.get("needs_reset", False)
agent1.observe(obs, r, done, reset)
if multi_agent:
agent2.observe(obs2, -r, done, reset)
if reset:
# As mixing float and numpy float causes errors in statistics
# functions, here every score is cast to float.
scores.append(float(test_r))
terminate = len(scores) >= n_episodes
return obses, actions, rewards, dones, scores
可視化
import cv2
from PIL import Image
def get_converter(img_size, x0=-2, x1=2, y0=0, y1=2):
def converter(x, y):
px = int((np.clip(x, x0, x1) - x0)/(x1 - x0) * (img_size - 1))
py = img_size - int((np.clip(y, y0, y1) - y0)/(y1 - y0) * (img_size - 1))
return px, py
return converter
def visualize(obses, rewards, skip=1, img_size=64):
arrs = []
converter = get_converter(img_size)
total_reward = 0
pb1 = converter(0, 0)
pb2 = converter(0, 0.2)
p_a = 0
p_o = 0
for s, (o, reward) in enumerate(zip(obses[:-1:skip], rewards)):
arr = np.zeros([img_size, img_size, 3], dtype=np.uint8) + 255
pa = converter(-o[0], o[1])
pb = converter(-o[4], o[5])
po = converter(o[8], o[9])
cv2.circle(arr, pa, 3, (255, 0, 0), -1)
cv2.circle(arr, pb, 3, (0, 255, 0), -1)
cv2.circle(arr, po, 3, (0, 0, 255), -1)
cv2.line(arr, pb1, pb2, (128, 64, 64), 2)
total_reward += reward
if reward > 0:
p_a += 1
elif reward < 0:
p_o += 1
txt = "{}:{}:{}".format(p_a, p_o, total_reward)
cv2.putText(arr, txt, (3, 15), cv2.FONT_HERSHEY_PLAIN, 1.0, (0, 0, 0))
arrs.append(arr)
return arrs
rainbow_agent.load(os.path.join(out_dir, "800000_checkpoint")) results = evaluate(rainbow_agent, multi_agent=False) obses = results[0] rewards = results[2][:len(obses)] print(len(obses), np.mean(results[-1])) arrs = visualize(obses, rewards) imgs = [Image.fromarray(arr) for arr in arrs] imgs[0].save("rainbow.gif", save_all=True, loop=0, delay=33, append_images=imgs[1:])
800,000 Iterまで学習したAgentの挙動は上図のようになる。
赤のプレイヤーは正しくボールに追随できており、青の対戦相手と
互角にラリーできている。
まとめ
今回はPFRLを使って、 slime volleyballの学習を行った。
3時間程度である程度の性能のAgentが学習できた。
slime volleyballは対戦型ゲームなので次はself-playを行ってみたい。