PFRLを試してみる - self play
はじめに
前回 、 PFRLを用いてslime volleyballを学習した。
今回は同じ slime volleyballl環境に対して,
複数のagent を用いたself playを試してみる。
self play
対戦型ゲームにおける強化学習は対戦相手となるエージェントに依存する。
前回の学習では、slime volleyballが予め用意してくれているdefaultエージェントに
勝てるように学習を行ったが、 問題によっては初期に対戦相手となる
エージェントが存在しない場合がある。
そのような場合はself playが有効である。 self playは過去の自分自身に
打ち勝てるように学習を行う手法である。
- 初期にランダムにエージェントを初期化
- 片方のエージェント(A)のみ学習し、もう片方のエージェント(B)のパラメータを固定する
- AがBに安定して勝てるようになるまでAを学習する
- AのパラメータをBにコピーして2に戻る。
過去の自分を超えるプロセスを何回か繰り返すことで、
外部の情報(初期対戦相手)なしに、 エージェントを学習することができる。
slime volleyballについてのself playは以下のページで検証されている。
https://github.com/hardmaru/slimevolleygym/blob/master/TRAINING.md
検証
検証は前回同様Google Colaboratory上で行った。
ライブラリ
!pip install pfrl
!pip install slimevolleygym
import slimevolleygym
import argparse
import os
import torch
import torch.nn as nn
import numpy as np
from PIL import Image
import gym
import pfrl
from pfrl.q_functions import DiscreteActionValueHead
from pfrl import agents
from pfrl import experiments
from pfrl import explorers
from pfrl import nn as pnn
from pfrl import utils
from pfrl.q_functions import DuelingDQN
from pfrl import replay_buffers
from pfrl.wrappers import atari_wrappers
from pfrl.initializers import init_chainer_default
from pfrl.q_functions import DistributionalDuelingDQN
環境構築
コード
SEED = 0
train_seed = SEED
test_seed = 2 ** 31 - 1 - SEED
class SelfPlayMultiBinaryAsDiscreteAction(gym.ActionWrapper):
"""Transforms MultiBinary action space to Discrete.
If the action space of a given env is `gym.spaces.MultiBinary(n)`, then
the action space of the wrapped env will be `gym.spaces.Discrete(2**n)`,
which covers all the combinations of the original action space.
Args:
env (gym.Env): Gym env whose action space is `gym.spaces.MultiBinary`.
"""
def __init__(self, env):
super().__init__(env)
assert isinstance(env.action_space, gym.spaces.MultiBinary)
self.orig_action_space = env.action_space
self.action_space = gym.spaces.Discrete(2 ** env.action_space.n)
def action(self, action):
return [(action >> i) % 2 for i in range(self.orig_action_space.n)]
def step(self, action, otherAction=None):
if otherAction is not None:
otherAction = self.action(otherAction)
return self.env.step(self.action(action), otherAction=otherAction)
def make_env(test):
# Use different random seeds for train and test envs
env_seed = test_seed if test else train_seed
env = gym.make("SlimeVolley-v0")
env.seed(int(env_seed))
if isinstance(env.action_space, gym.spaces.MultiBinary):
env = SelfPlayMultiBinaryAsDiscreteAction(env)
# if args.render:
# env = pfrl.wrappers.Render(env)
return env
# 初期SEED設定
utils.set_random_seed(SEED)
# 環境設定
env = make_env(test=False)
eval_env = make_env(test=True)
obs = env.observation_space
obs_size = env.observation_space.low.size
n_actions = env.action_space.n
print(obs_size, n_actions)
前回との違いはgymのwrapperをmulti agent用に修正した部分である。
slime volleyball環境のstep関数は複数入力を与えることでmulti agent
に対応するため、それに合わせてコードを修正した。
エージェント
コード
class DistributionalDuelingHead(nn.Module):
"""Head module for defining a distributional dueling network.
This module expects a (batch_size, in_size)-shaped `torch.Tensor` as input
and returns `pfrl.action_value.DistributionalDiscreteActionValue`.
Args:
in_size (int): Input size.
n_actions (int): Number of actions.
n_atoms (int): Number of atoms.
v_min (float): Minimum value represented by atoms.
v_max (float): Maximum value represented by atoms.
"""
def __init__(self, in_size, n_actions, n_atoms, v_min, v_max):
super().__init__()
assert in_size % 2 == 0
self.n_actions = n_actions
self.n_atoms = n_atoms
self.register_buffer(
"z_values", torch.linspace(v_min, v_max, n_atoms, dtype=torch.float)
)
self.a_stream = nn.Linear(in_size // 2, n_actions * n_atoms)
self.v_stream = nn.Linear(in_size // 2, n_atoms)
def forward(self, h):
h_a, h_v = torch.chunk(h, 2, dim=1)
a_logits = self.a_stream(h_a).reshape((-1, self.n_actions, self.n_atoms))
a_logits = a_logits - a_logits.mean(dim=1, keepdim=True)
v_logits = self.v_stream(h_v).reshape((-1, 1, self.n_atoms))
probs = nn.functional.softmax(a_logits + v_logits, dim=2)
return pfrl.action_value.DistributionalDiscreteActionValue(probs, self.z_values)
def phi(x):
return np.asarray(x, dtype=np.float32)
def get_rainbow_agent(gamma, gpu, update_interval=1,replay_start_size=2000, target_update_interval=2000,
n_atoms=51, v_max=1, v_min=-1, hidden_size=512,
noisy_net_sigma=0.1,
lr0=1e-3, eps=1.5e-4, betasteps=2e6, num_step_return=3,
minibatch_size=32):
# categorical Q-function
q_func = nn.Sequential(
nn.Linear(obs_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
DistributionalDuelingHead(hidden_size, n_actions, n_atoms, v_min, v_max),
)
pnn.to_factorized_noisy(q_func, sigma_scale=noisy_net_sigma)
# 探索アルゴリズム
explorer = explorers.Greedy()
# 最適化
opt = torch.optim.Adam(q_func.parameters(), lr=lr0, eps=eps)
# replay
rbuf = replay_buffers.PrioritizedReplayBuffer(
10 ** 6,
alpha=0.5,
beta0=0.4,
betasteps=betasteps,
num_steps=num_step_return,
normalize_by_max="memory"
)
agent = agents.CategoricalDoubleDQN(
q_func,
opt,
rbuf,
gpu=gpu,
gamma=gamma,
explorer=explorer,
minibatch_size=minibatch_size,
replay_start_size=replay_start_size,
target_update_interval=target_update_interval,
update_interval=update_interval,
batch_accumulator="mean",
phi=phi,
max_grad_norm=10,
)
return agent
エージェントに関しては前回と同様にRainbow Agentを用いている。
評価コード
from contextlib import ExitStack
def evaluate(agent1, agent2=None, n_episodes=30, num_obs=1, multi_agent=False):
contexts = [agent1]
if multi_agent:
assert agent2 is not None
contexts.append(agent2)
env = eval_env
scores = []
terminate = False
timestep = 0
obses = []
actions = []
rewards = []
dones = []
with ExitStack() as stack:
for agent in contexts:
stack.enter_context(agent.eval_mode())
reset = True
while not terminate:
if reset:
obs = env.reset()
obs2 = obs
done = False
test_r = 0
episode_len = 0
info = {}
a1 = agent1.act(obs)
if multi_agent:
a2 = agent2.act(obs2)
if len(scores) < num_obs:
obses.append(obs)
actions.append(a1)
if multi_agent:
obs, r, done, info = env.step(a1, a2)
obs2 = info['otherObs']
else:
obs, r, done, info = env.step(a1)
rewards.append(r)
dones.append(done)
test_r += r
episode_len += 1
timestep += 1
reset = done or info.get("needs_reset", False)
agent1.observe(obs, r, done, reset)
if multi_agent:
agent2.observe(obs2, -r, done, reset)
if reset:
# As mixing float and numpy float causes errors in statistics
# functions, here every score is cast to float.
scores.append(float(test_r))
terminate = len(scores) >= n_episodes
return obses, actions, rewards, dones, scores
可視化コード
import cv2
from PIL import Image
def get_converter(img_size, x0=-2, x1=2, y0=0, y1=2):
def converter(x, y):
px = int((np.clip(x, x0, x1) - x0)/(x1 - x0) * (img_size - 1))
py = img_size - int((np.clip(y, y0, y1) - y0)/(y1 - y0) * (img_size - 1))
return px, py
return converter
def visualize(obses, rewards, skip=1, img_size=64):
arrs = []
converter = get_converter(img_size)
total_reward = 0
pb1 = converter(0, 0)
pb2 = converter(0, 0.2)
p_a = 0
p_o = 0
for s, (o, reward) in enumerate(zip(obses[:-1:skip], rewards)):
arr = np.zeros([img_size, img_size, 3], dtype=np.uint8) + 255
pa = converter(-o[0], o[1])
pb = converter(-o[4], o[5])
po = converter(o[8], o[9])
cv2.circle(arr, pa, 3, (255, 0, 0), -1)
cv2.circle(arr, pb, 3, (0, 255, 0), -1)
cv2.circle(arr, po, 3, (0, 0, 255), -1)
cv2.line(arr, pb1, pb2, (128, 64, 64), 2)
total_reward += reward
if reward > 0:
p_a += 1
elif reward < 0:
p_o += 1
txt = "{}:{}:{}".format(p_a, p_o, total_reward)
cv2.putText(arr, txt, (3, 15), cv2.FONT_HERSHEY_PLAIN, 1.0, (0, 0, 0))
arrs.append(arr)
return arrs
学習
パラメータ
steps = 10 ** 6
gamma = 0.98
update_interval = 1
betasteps = steps / update_interval * 2
gpu = 0 if torch.cuda.is_available() else -1
print(gpu)
学習
import time
import logging
import pandas as pd
from pfrl.experiments.evaluator import save_agent
agent = get_rainbow_agent(gamma, gpu, update_interval=update_interval,
betasteps=betasteps)
agent2 = get_rainbow_agent(gamma, gpu, update_interval=update_interval,
betasteps=betasteps)
outdir = "selfplay_results"
os.makedirs(outdir, exist_ok=True)
champion_period = 20000
next_champion = champion_period
num_match = 20
update_threshold = 0.5
eval_period = 50000
num_eval = 20
next_eval = eval_period
save_period = 200000
logger = logging.getLogger(__name__)
match_scores = []
eval_scores = []
logs = []
episode_r = 0
episode_idx = 0
episode_len = 0
t = 0
num_champion = 0
obs = env.reset()
obs2 = obs
t0 = time.time()
with agent2.eval_mode():
while t < steps:
action = agent.act(obs)
action2 = agent2.act(obs2)
obs, r, done, info = env.step(action, action2)
obs2 = info['otherObs']
t += 1
episode_r += r
episode_len += 1
reset = info.get("needs_reset", False)
agent.observe(obs, r, done, reset)
agent2.observe(obs2, -r, done, reset)
if done or reset or t == steps:
if t == steps:
break
if t >= next_champion:
_, actions, _, _, scores = evaluate(agent, agent2=agent2, n_episodes=num_match, num_obs=0, multi_agent=True)
mean_score = np.mean(scores)
if mean_score > update_threshold:
agent2.model.load_state_dict(agent.model.state_dict())
save_agent(agent, t, outdir, logger, suffix="_oldagent")
num_champion += 1
print("Match {} :{}, {:.4f} {:.1f} s".format(t, num_champion, mean_score, time.time() - t0))
next_champion += champion_period
match_scores.extend([(t, num_champion, s) for s in scores])
if t >= next_eval:
_, actions, _, _, scores = evaluate(agent, agent2=agent2, n_episodes=num_eval, num_obs=0, multi_agent=False)
eval_scores.extend([(t, s) for s in scores])
stats = {}
stats["steps"] = t
stats["episodes"] = episode_idx
stats["time"] = time.time() - t0
stats["mean"] = np.mean(scores)
stats["median"] = np.median(scores)
stats["stdev"] = np.std(scores)
for k, v in agent.get_statistics():
stats[k] = v
print("Eval {} : {} {:.1f} s".format(t, np.mean(scores), time.time() - t0))
logs.append(stats)
pd.DataFrame(logs).to_csv(os.path.join(outdir, "scores.csv"))
next_eval += eval_period
# Start a new episode
episode_r = 0
episode_idx += 1
episode_len = 0
obs = env.reset()
obs2 = obs
if t % save_period == 0:
save_agent(agent, t, outdir, logger, suffix="_checkpoint")
以下の条件でself play学習を行った。
- 20,000 framesごとに新エージェントと旧エージェントを20回対戦させる
- 新エージェントの旧エージェントに対するスコアが0.5以上の場合は旧エージェントのパラメータを更新する
学習時間の節約のため新エージェントと旧エージェントとの評価用対戦は20回としているが、
性能差を正確に見たい場合は、もう少し多くしたほうが良いかもしれない。
また、対戦周期の20,000 framesも検討の余地があると思う。
結果
学習経過
学習エージェントAの対戦エージェントB(1つ前の世代)に対する平均スコアを
プロットすると上図のようになる。
800,000 framesで計14世代のエージェントが生まれた。
序盤の性能の低い段階では、世代の切り替わりが激しい。
(第三世代はやや長いが...)
10世代以降になると1世代にかかるframe数も伸びており、
ある程度以上成長すると過去の自分に打ち勝つことが難しくなるのが分かる。
対戦相手がslime volleyball defaultエージェントの場合のスコア
学習序盤ではdefaultエージェントに対するスコアはほとんど伸びていない。
350,000 framesほどで-2.5くらいとなり、その後しばらく停滞するが、
最終的には-0.4(700,000 frames)となり、やや負け越すぐらいの性能となる。
self playのみでもdefaultエージェントに近い性能までは学習できることが分かった。
self play対戦成績
世代交代はエージェントがひとつ前の世代に対して勝つと
行われるが、 それ以前の世代にも勝てるかは判定していない。
そのため、前の世代のみに強くてそれ以前の世代には弱いような
汎用性のないエージェントが生じる可能性もある。
ここでは、いくつかの世代間で対戦成績を比較し、そのようなことが
起こっていないかを確認する。
対戦結果
今回は1, 3, 8, 10, 12, 14世代に対して総当りの対戦を行った。
1つの組み合わせあたり、30戦行い平均スコアを求めた。
A\B | 1 | 3 | 8 | 10 | 12 | 14 |
---|---|---|---|---|---|---|
1 | nan | -0.266667 | -3.86667 | -4.36667 | -4.63333 | -4.93333 |
3 | 0.266667 | nan | -3.96667 | -4.53333 | -4.73333 | -4.83333 |
8 | 3.86667 | 3.96667 | nan | -3.93333 | -4.2 | -4.3 |
10 | 4.36667 | 4.53333 | 3.93333 | nan | -2.46667 | -2.2 |
12 | 4.63333 | 4.73333 | 4.2 | 2.46667 | nan | -0.533333 |
14 | 4.93333 | 4.83333 | 4.3 | 2.2 | 0.533333 | nan |
エージェントA(縦軸)とエージェントB(横軸)の対戦結果は上表のようになる。
1, 3世代は8世代以降に大きく負け越しており、 defaultエージェントに対する学習曲線で
見られた通り、 1, 3 世代と8世代以降には大きな性能差があることが分かる。
それ以降の世代でも今回比較した範囲では後の世代に行くほど性能が向上している。
(12世代と14世代の差は僅かではあるが...)
3世代 vs 8世代 (スコア : -3.97)
左側の赤が3世代、右側の青が8世代のエージェントである。
実際は30ゲーム試行して平均スコアを計算しているが、最初の1ゲームのみ表示。
3世代と8世代を比較すると,3世代はほとんどボールに触れていないが、
8世代になると自分の近くに来たボールには反応できている。
8世代 vs 10世代 (スコア : -3.93)
赤 : 8世代、 青 : 10世代。
8世代と10世代の試合ではだいぶラリーが続くようになっている。
8世代はボールに反応して動けてはいるが、 ボールコントロールが
あまく、相手コートにうまく返せていない。 一方で10世代は2, 3タッチで
ボールを相手コートに返せているので、成長が見て取れる。
10世代 vs 14世代 (スコア : -2.2)
赤 : 10世代、 青 : 14世代。
10世代と14世代の対戦ではどちらも中々ボールを落とさない。
14世代のほうが動きが洗練されているように見えるが、 10世代も粘るので
結果的に5点先取までにタイムアップ(3,000 framesでタイムアップ)となり、
スコアが2くらいになるものと思われる。
まとめ
今回は slime volleyballをself playによって学習した。
self playのハイパーパラメータには調整の余地があると思ったが、
self playのみでもslime volleyballのdefault エージェントくらいのモデルは
学習できることが分かった。 学習を早めるために、学習初期はself playを
行い、 終盤はdefault エージェントを使うようにすればいいのではないかと
思った。