PFRLを試してみる - slime volleyball

はじめに

前回PFRLatari SpaceInvadorの学習を行ったが、
計算時間が足りず、うまく学習できなかった。
今回はもう少し簡単な、Slime Volleyball1に対して学習を行う。

slime volleyball

slime volleyballは2人のプレイヤーがボールを相手のコートに
落とそうとする単純なバレーボールである。

f:id:nakamrnk:20200809143725g:plain

上図の赤はプレイヤーの位置、緑はボールの位置、青は対戦相手の位置である。
状態としてプレイヤー、ボール、対戦相手の位置・速度を受け取り(12次元)、
対戦相手に勝てるようなプレイヤーの動かし方を学習するという問題である。  

どちらかが5点とればゲーム終了であり、
自分がポイントをとると1点、相手がポイントをとると-1点の報酬が得られる。
ゆえに1エピソード当たりの収益は-5 ~ 5の間の整数となる。

検証

学習アルゴリズムは前回に引き続きRainbow2を用いる。
PFRLのexampleを参考にした。
Google Colaboratoryで検証

学習コード

ライブラリ読み込み

!pip install pfrl
!pip install slimevolleygym
import slimevolleygym
import argparse
import os

import torch
import torch.nn as nn
import numpy as np
from PIL import Image


import gym


import pfrl
from pfrl.q_functions import DiscreteActionValueHead
from pfrl import agents
from pfrl import experiments
from pfrl import explorers
from pfrl import nn as pnn
from pfrl import utils
from pfrl.q_functions import DuelingDQN
from pfrl import replay_buffers

from pfrl.wrappers import atari_wrappers
from pfrl.initializers import init_chainer_default
from pfrl.q_functions import DistributionalDuelingDQN

環境設定

SEED = 0
train_seed = SEED
test_seed = 2 ** 31 - 1 - SEED

class MultiBinaryAsDiscreteAction(gym.ActionWrapper):
    """Transforms MultiBinary action space to Discrete.
    If the action space of a given env is `gym.spaces.MultiBinary(n)`, then
    the action space of the wrapped env will be `gym.spaces.Discrete(2**n)`,
    which covers all the combinations of the original action space.
    Args:
        env (gym.Env): Gym env whose action space is `gym.spaces.MultiBinary`.
    """

    def __init__(self, env):
        super().__init__(env)
        assert isinstance(env.action_space, gym.spaces.MultiBinary)
        self.orig_action_space = env.action_space
        self.action_space = gym.spaces.Discrete(2 ** env.action_space.n)

    def action(self, action):
        return [(action >> i) % 2 for i in range(self.orig_action_space.n)]



def make_env(test):
  # Use different random seeds for train and test envs
  env_seed = test_seed if test else train_seed
  env = gym.make("SlimeVolley-v0")  
  env.seed(int(env_seed))
  if isinstance(env.action_space, gym.spaces.MultiBinary):
      env = MultiBinaryAsDiscreteAction(env)
  # if args.render:
  #     env = pfrl.wrappers.Render(env)
  return env 

# 初期SEED設定
utils.set_random_seed(SEED)

# 環境設定
env = make_env(test=False)
eval_env = make_env(test=True)
obs = env.observation_space
obs_size = env.observation_space.low.size
n_actions = env.action_space.n
print(obs_size, n_actions)

slime volleyballの行動は3成分のmulti binaryなので、
1成分のdiscrete action(23個)として扱うためにwrapperクラスを挟んでいる。

Agent 構築

class DistributionalDuelingHead(nn.Module):
    """Head module for defining a distributional dueling network.
    This module expects a (batch_size, in_size)-shaped `torch.Tensor` as input
    and returns `pfrl.action_value.DistributionalDiscreteActionValue`.
    Args:
        in_size (int): Input size.
        n_actions (int): Number of actions.
        n_atoms (int): Number of atoms.
        v_min (float): Minimum value represented by atoms.
        v_max (float): Maximum value represented by atoms.
    """

    def __init__(self, in_size, n_actions, n_atoms, v_min, v_max):
        super().__init__()
        assert in_size % 2 == 0
        self.n_actions = n_actions
        self.n_atoms = n_atoms
        self.register_buffer(
            "z_values", torch.linspace(v_min, v_max, n_atoms, dtype=torch.float)
        )
        self.a_stream = nn.Linear(in_size // 2, n_actions * n_atoms)
        self.v_stream = nn.Linear(in_size // 2, n_atoms)

    def forward(self, h):
        h_a, h_v = torch.chunk(h, 2, dim=1)
        a_logits = self.a_stream(h_a).reshape((-1, self.n_actions, self.n_atoms))
        a_logits = a_logits - a_logits.mean(dim=1, keepdim=True)
        v_logits = self.v_stream(h_v).reshape((-1, 1, self.n_atoms))
        probs = nn.functional.softmax(a_logits + v_logits, dim=2)
        return pfrl.action_value.DistributionalDiscreteActionValue(probs, self.z_values)

def phi(x):
  return np.asarray(x, dtype=np.float32)



def get_rainbow_agent(gamma, gpu, update_interval=1,replay_start_size=2000, target_update_interval=2000,
                      n_atoms=51, v_max=1, v_min=-1, hidden_size=512,
                      noisy_net_sigma=0.1,
                      lr0=1e-3, eps=1.5e-4, betasteps=2e6, num_step_return=3,
                      minibatch_size=32):
  
  # categorical Q-function
  q_func = nn.Sequential(
          nn.Linear(obs_size, hidden_size),
          nn.ReLU(),
          nn.Linear(hidden_size, hidden_size),
          nn.ReLU(),
          DistributionalDuelingHead(hidden_size, n_actions, n_atoms, v_min, v_max),
      )


  pnn.to_factorized_noisy(q_func, sigma_scale=noisy_net_sigma)
  print(q_func)
  # 探索アルゴリズム
  explorer = explorers.Greedy()

  # 最適化
  opt = torch.optim.Adam(q_func.parameters(), lr=lr0, eps=eps)

  # replay
  rbuf = replay_buffers.PrioritizedReplayBuffer(
            10 ** 6,
            alpha=0.5,
            beta0=0.4,
            betasteps=betasteps,
            num_steps=num_step_return,
            normalize_by_max="memory"
        )

  agent = agents.CategoricalDoubleDQN(
          q_func,
          opt,
          rbuf,
          gpu=gpu,
          gamma=gamma,
          explorer=explorer,
          minibatch_size=minibatch_size,
          replay_start_size=replay_start_size,
          target_update_interval=target_update_interval,
          update_interval=update_interval,
          batch_accumulator="mean",
          phi=phi,
          max_grad_norm=10,
      )
  return agent

前回用いたPFRLで直接実装されているRainbow用のネットワークはatari用であり
画像を入力としていたので、 その部分を今回の問題に合わせて書き換えている。

学習設定

import time
class Hook:
  def __init__(self, period=10000):
    self.t0 = time.time()
    self.period = period 

  def __call__(self, env, agent, t):
    if t % self.period == 0:
      t1 = time.time()
      dt = t1 - self.t0 
      print("{} : elps {:.3f}".format(t, dt))
      self.t0 = t1

steps = 10 ** 6
gamma = 0.98

update_interval = 1 
betasteps = steps / update_interval  * 2
gpu = 0 if torch.cuda.is_available() else -1

学習


rainbow_agent = get_rainbow_agent(gamma, gpu, update_interval=update_interval,
                                  betasteps=betasteps) 

out_dir = "results_rainbow"
os.makedirs(out_dir, exist_ok=True)
checkpoint_frequency = 2 * 10 ** 5
eval_n_runs = 10
eval_interval = 5 * 10 ** 4

experiments.train_agent_with_evaluation(
            agent=rainbow_agent,
            env=env,
            steps=steps,
            eval_n_steps=None,
            checkpoint_freq=checkpoint_frequency,
            eval_n_episodes=eval_n_runs,
            eval_interval=eval_interval,
            outdir=out_dir,
            save_best_so_far_agent=False,
            eval_env=eval_env,
            step_hooks=(Hook(), )
        )

#rbuf = replay_buffers.ReplayBuffer(10 ** 6, num_step_return)

結果

学習はcolab上で3時間程度(800,000 frameで途中終了)で終了した。

学習曲線
import pandas as pd
import matplotlib.pyplot as plt

df = pd.read_csv(os.path.join(out_dir, "scores.txt"), sep="\t")
cols = ["mean", "median", "average_loss"]
fig, axes = plt.subplots(figsize=(20, 6), ncols=3)
for c, col in enumerate(cols):
  df.set_index("steps").plot(y=col, ax=fig.axes[c])

f:id:nakamrnk:20200809152943p:plain

序盤はスコア(mean:平均値, median:中間値)がほぼ-5 (1ポイントもとれずに全敗)だが、
250,000 frameくらいから性能が急激に上昇し、
400,000 frameほどで収益のmedianスコアが0(対戦相手と勝率が5割程度)となっている。
その後はほぼ横ばい。 lossはまだ減少傾向にあるので時間をかければ性能向上する
可能性はある。

挙動確認

評価

from contextlib import ExitStack

def convert_action(action):
  return [(action >> i) % 2 for i in range(3)]

def evaluate(agent1, agent2=None, n_episodes=30,  num_obs=1, multi_agent=False):
  contexts = [agent1]

  if multi_agent:
    multi_env = gym.make("SlimeVolley-v0")  
    env = multi_env

    assert agent2 is not None
    contexts.append(agent2)
  else:
    env = eval_env

  scores = []
  terminate = False
  timestep = 0
  obses = []
  actions = []
  rewards = []
  dones = []

#  with agent1.eval_mode(), agent2.eval_mode():
  with ExitStack() as stack:
    for agent in contexts:
      stack.enter_context(agent.eval_mode())
  # with [agent.eval_mode() for agent in contexts]:
    reset = True
    while not terminate:
        if reset:
            obs = env.reset()
            obs2 = obs
            done = False
            test_r = 0
            episode_len = 0
            info = {}
  
        a1 = agent1.act(obs)
        if multi_agent:
          a2 = agent2.act(obs2)

        if len(scores) < num_obs:
          obses.append(obs)
        actions.append(a1)

        if multi_agent:
          obs, r, done, info = env.step(convert_action(a1), convert_action(a2))
          obs2 = info['otherObs']
        else:
          obs, r, done, info = env.step(a1)

        rewards.append(r)
        dones.append(done)

        test_r += r
        episode_len += 1
        timestep += 1
        reset = done or info.get("needs_reset", False)
        agent1.observe(obs, r, done, reset)
        if multi_agent:
          agent2.observe(obs2, -r, done, reset)     
        if reset:
            # As mixing float and numpy float causes errors in statistics
            # functions, here every score is cast to float.
            scores.append(float(test_r))
        terminate = len(scores) >= n_episodes

  return obses, actions, rewards, dones, scores


可視化

import cv2
from PIL import Image

def get_converter(img_size, x0=-2, x1=2, y0=0, y1=2):
  def converter(x, y):
    px = int((np.clip(x, x0, x1) - x0)/(x1 - x0) * (img_size - 1))
    py = img_size - int((np.clip(y, y0, y1) - y0)/(y1 - y0) * (img_size - 1))
    return px, py

  return converter


def visualize(obses, rewards, skip=1, img_size=64):
  arrs = []
  converter = get_converter(img_size)
  total_reward = 0

  pb1 = converter(0, 0)
  pb2 = converter(0, 0.2)

  p_a = 0
  p_o = 0
  for s, (o, reward) in enumerate(zip(obses[:-1:skip], rewards)):
    arr = np.zeros([img_size, img_size, 3], dtype=np.uint8) + 255
    pa = converter(-o[0], o[1])
    pb = converter(-o[4], o[5])
    po = converter(o[8], o[9])

    cv2.circle(arr, pa, 3, (255, 0, 0), -1)
    cv2.circle(arr, pb, 3, (0, 255, 0), -1)
    cv2.circle(arr, po, 3, (0, 0, 255), -1)
    cv2.line(arr, pb1, pb2, (128, 64, 64), 2)
    total_reward += reward
    if reward > 0:
      p_a += 1
    elif reward < 0:
      p_o += 1

    txt = "{}:{}:{}".format(p_a, p_o, total_reward)
    cv2.putText(arr, txt, (3, 15), cv2.FONT_HERSHEY_PLAIN, 1.0, (0, 0, 0))

    arrs.append(arr)


  return arrs

rainbow_agent.load(os.path.join(out_dir, "800000_checkpoint"))
results = evaluate(rainbow_agent, multi_agent=False)
obses = results[0]
rewards = results[2][:len(obses)]
print(len(obses), np.mean(results[-1]))
arrs = visualize(obses, rewards)
imgs = [Image.fromarray(arr) for arr in arrs]
imgs[0].save("rainbow.gif", save_all=True, loop=0, delay=33, append_images=imgs[1:])

f:id:nakamrnk:20200809153456g:plain

800,000 Iterまで学習したAgentの挙動は上図のようになる。
赤のプレイヤーは正しくボールに追随できており、青の対戦相手と
互角にラリーできている。

まとめ

今回はPFRLを使って、 slime volleyballの学習を行った。
3時間程度である程度の性能のAgentが学習できた。
slime volleyballは対戦型ゲームなので次はself-playを行ってみたい。

参考文献