XAIについての検証 - SHAP

はじめに

前回に引き続きfashion-mnistデータに対するXAIの検証を行う。
今回はSHAPアルゴリズムについて検証する。

SHAP

SHAP1はXAIアルゴリズムの一つである。

  • 各特徴量が加減算的に予測に寄与するとする
  • ある特徴を使う場合と使わない場合の差から寄与度(SHAP値)を求める
  • Gradient を利用してCNNにも適用できる

実装は以下で公開されている。
https://github.com/slundberg/shap

検証

前回学習したモデルをそのまま利用。
pytorchによるSHAP Deep Explainerは以下にチュートリアルがあったのでそれを参考にした。
pytorch mnistチュートリアル

コード (jupyter notebook)

前回と共通 (DataLoader, モデル定義)

import os

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns

import torch
from torch.utils.data import DataLoader
import torchvision


data_root = "data"
os.makedirs(data_root, exist_ok=True)

IMG_SIZE= 28
batch_size = 104



train_transform =  torchvision.transforms.Compose([
        torchvision.transforms.RandomHorizontalFlip(),
        torchvision.transforms.RandomRotation(5),
        torchvision.transforms.RandomResizedCrop(IMG_SIZE, scale=(0.9, 1.0)),
        torchvision.transforms.ToTensor(),
    ])
test_transform = torchvision.transforms.ToTensor()


train_dataset = torchvision.datasets.FashionMNIST(data_root, train=True, transform=train_transform, target_transform=None, download=True)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

valid_dataset = torchvision.datasets.FashionMNIST(data_root, train=False, transform=test_transform, target_transform=None, download=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

print("train : ", len(train_dataset))
print("valid : ", len(valid_dataset))

import torch.nn as nn
import torch.nn.functional as F

class ConvMod(nn.Sequential):
  def __init__(self, in_channel, out_channel, kernel_size, padding=1, stride=2, norm_fn=None, act_func=None):
    seqs = []
    seqs.append(nn.Conv2d(in_channel, out_channel, kernel_size, padding=padding, stride=stride))
    if norm_fn is not None:
      seqs.append(norm_fn(out_channel))
    if act_func is not None:
      seqs.append(act_func())
    super().__init__(*seqs)


class Model(nn.Module):
  MOD = ConvMod
  def __init__(self, num_classes=10, c0=16, num_conv=3, stride=2, norm_fn=None, act_func=nn.ReLU, last_act=None):
    super().__init__()
    seqs = []
    cin = 1
    cout = c0
    for n in range(num_conv):
      seqs.append(self.MOD(cin, cout, 3, padding=1, stride=stride, norm_fn=norm_fn, act_func=act_func))
      cin, cout = cout, cout * 2
    self.feature = nn.Sequential(*seqs)
    
    self.pool = nn.AdaptiveAvgPool2d(1)
    
    self.pred = nn.Linear(c0*4, num_classes)
    
    if last_act is not None:
      self.last_act = last_act()
    else:
      self.last_act = None

  def forward(self, x):
    
    x = self.feature(x)
    
    gap = self.pool(x).squeeze(3).squeeze(2)
    
    o = self.pred(gap)
    
    if self.last_act is not None:
      o = self.last_act(o)

    return o

モデル読み込み

from functools import partial
MODEL_PATH = "model.pth"
c0 = 32
device = "cpu"
valid_df = pd.read_csv("valid.csv", index_col=0)


model = Model(num_classes=len(train_dataset.classes), c0=c0, norm_fn=nn.BatchNorm2d, last_act=partial(nn.Softmax, dim=1)).to(device)
model.load_state_dict(torch.load(MODEL_PATH))
model.eval()
print((valid_df["true"]==valid_df["pred"]).mean())
valid_df.head()

解析・可視化

import shap

def plot_figures(shap_numpy, image_numpy, probs, num_vis_rank=3):
  num_classes = len(shap_numpy)
  nrows = 1 + num_vis_rank
  ncols = len(image_numpy)
  figsize = (ncols * 3, nrows*2.5)
  
  fig, axes  = plt.subplots(figsize=figsize, ncols=ncols, nrows=nrows)
  max_val = np.percentile(np.abs(np.concatenate(shap_numpy)), 99.9)
    
  for j in range(ncols):
    axes[0, j].imshow(image_numpy[j][:, :, 0], cmap="gray")
    axes[0, j].set_axis_off()

    p = probs[j]
    inds = np.argsort(p)[::-1]
    pro = p[inds]
    
    for n in range(num_vis_rank):
      class_id = inds[n]
      class_name = valid_dataset.classes[class_id]
      prob_str = "{:.1f}%".format(pro[n]*100)
      axes[1 + n, j].imshow(shap_numpy[class_id][j][:, :, 0], cmap="bwr", vmin=-max_val, vmax=max_val)
      axes[1 + n, j].set_axis_off()
      axes[1 + n, j].set_title(class_name + " : " +  prob_str, fontsize=18)
  fig.tight_layout()
  return fig
  
  
def analyze(v_indices, bg_indices):
  background = torch.cat([valid_dataset[i][0][None] for i in bg_indices])
  test_images = torch.cat([valid_dataset[i][0][None] for i in v_indices])
  with torch.no_grad():
    outputs = model(background)
    expected_value = outputs.mean(0).cpu().numpy()
    probs = model(test_images).cpu().numpy()
  e = shap.DeepExplainer(model, background)
  shap_values = e.shap_values(test_images)
  sh_arr = np.array([shap_value.sum(axis=(1, 2, 3)) for shap_value in shap_values])

  shap_numpy = [np.swapaxes(np.swapaxes(s, 1, -1), 1, 2) for s in shap_values]
  test_numpy = np.swapaxes(np.swapaxes(test_images.numpy(), 1, -1), 1, 2)
  return plot_figures(shap_numpy, test_numpy, probs)  
  
fig = analyze(valid_df[valid_df["true"] == 8].sample(10).index, valid_df.sample(200).index)
fig.tight_layout()
fig.savefig("tmp.jpg")

前回モデルのクラスごとの予測性能
クラス precision recall F-measure
T-shirt/top 0.884774 0.86 0.872211
Trouser 0.993946 0.985 0.989453
Pullover 0.86724 0.908 0.887152
Dress 0.90619 0.937 0.921337
Coat 0.878 0.878 0.878
Sandal 0.985787 0.971 0.978338
Shirt 0.804233 0.76 0.781491
Sneaker 0.95122 0.975 0.962963
Bag 0.984048 0.987 0.985522
Ankle boot 0.968938 0.967 0.967968

Bagクラス

f:id:nakamrnk:20201013113510j:plain

最上段は元の画像、 以降は予測結果上位クラス(1, 2, 3位)に対する
SHAP値の分布である。赤い部分は予測に対して正の寄与をしている領域
青い部分は予測に対して負の寄与をしている領域である。

Bagクラスは多くの画像が正しく判定できているため、上から2行目の第1予測
クラスへの反応が大きい。 バッグの左右端の領域や持ち手部分がBag予測に
寄与していることが分かり、妥当な結果と言える。
第2予測以降はほとんど反応していない。

Trouserクラス

f:id:nakamrnk:20201013114412j:plain

Trouserクラスも比較的高精度で予測出来ているクラスである。
Trouserクラスの場合は股下の背景部分に赤い領域が多く、
そこに注目して判定を行っている。この構造は他のクラスにないため
判定基準としては妥当と思われる。

一方で股下構造の見えない 右から3番目の画像は誤判定している
(Dress クラス 85 %, Trouser 13%) 。このように典型的な特徴から
外れた画像に対しては誤判定が起こりやすい。

靴クラス比較

Sandalクラス

f:id:nakamrnk:20201013115142j:plain

Sneakerクラス

f:id:nakamrnk:20201013115228j:plain

Ankle bootクラス

f:id:nakamrnk:20201013115307j:plain

靴クラスを比較すると

  • Sandalクラスは隙間部分や紐部分に反応している
  • Sneakerクラスは特定箇所への反応が弱い
    • 全体の構造を見て判定しているためSHAPでは特徴がでない?
  • Ankle bootクラスはつま先に強く反応しているものが多い

Ankle bootはくるぶしを覆う靴なのでくるぶし当たりに反応するほうが
人間の感覚からは自然であろう。 しかし、実際はつま先付近の構造に
強く反応しているため、つま先付近の構造に対して何か他クラスとの
違いを発見したものと思われる。

その特徴が適切なものならば良いのだが、Sneakerクラス画像の右から二番目は
つま先付近に反応してAnkle bootクラスと誤判定しており、 Ankle bootクラスの
特徴としては十分ではないと思われる (ラベルミスの可能性もあるが)。
今回のモデルで偶然このような学習が進んだのか、 現状の学習手法に問題が
あるかは今後の検証課題である。

トップスクラス比較

T-shirt/top クラス

f:id:nakamrnk:20201013122027j:plain

Pullover クラス

f:id:nakamrnk:20201013122039j:plain

Dress クラス

f:id:nakamrnk:20201013122051j:plain

Coat クラス

f:id:nakamrnk:20201013122116j:plain

Shirt クラス

f:id:nakamrnk:20201013122131j:plain

各クラスに対するSHAPについて

  • T-shirt/top は肩から脇付近への反応がやや強い
  • Pullover は長袖部分への反応が強いように見える
    • そこまではっきりはしていない
  • Dressクラスは肩から胸付近と腰付近に反応
  • Coatクラスは首元への反応がやや強い
  • Shirtクラスは首元と体中心付近に反応

現状トップスについてはSHAPによって説得力のある説明は
できないと思う。Dressクラスの特徴的なボディラインや
T-shirt/topクラスの肩まわりなどある程度人間の感覚に近い傾向
も見て取れるが、noisyであまり綺麗に特徴を捉えているとは言えない。
正の寄与と負の寄与が混在しているような領域も多く、
一見してどちらが優勢なのか分かりづらいのもマイナス点である。

まとめ

前回に引き続きfashion-mnistデータに対してXAIの検証を行った。
SHAPはTrouserの股下などわりと細かい特徴も捉えられているが、
正負の寄与が混じった領域の解釈などに難があると感じた。
今後はGrad-CAMなどの滑らかなsaliency map系のXAIと比較していきたい。

参考文献

XAIについての検証 - Anchors

はじめに

画像系Deep LearingにおけるXAI (Explainable AI)のひとつAnchorsを用いて
Fahion-mnistデータに対して学習を行ったモデルの解析を行った。

XAI

Deep Learning モデルはその性能の高さから様々な分野で利用されているが、
処理の多くがNeural Networkにより抽出された特徴量に依存しているため、
人間には理解することが難しい(ブラックボックス化している)1

このAIのブラックボックス化を緩和するためXAIと呼ばれる技術が注目されている。
XAI関連の技術は様々(NNの説明可能モデルへの置き換え、ブラックボックスの中身検査等2)
であるが今回はモデルの出力結果を説明する技術の一つAnchorsを検証する。

学習データ・モデル

学習データはFashion-Mnistを用いた。
これは28x28のファッション画像データに対する10クラス分類である。

学習データは60,000(各クラス6,000)、評価データ10,000(各クラス1000)である。

ソースコード (jupyter notebook)

モジュール

import os

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns

データ読み込み

import torch
from torch.utils.data import DataLoader
import torchvision


data_root = "data"
os.makedirs(data_root, exist_ok=True)

IMG_SIZE= 28
batch_size = 32



train_transform =  torchvision.transforms.Compose([
        torchvision.transforms.RandomHorizontalFlip(),
        torchvision.transforms.RandomRotation(5),
        torchvision.transforms.RandomResizedCrop(IMG_SIZE, scale=(0.9, 1.0)),
        torchvision.transforms.ToTensor(),
    ])
test_transform = torchvision.transforms.ToTensor()


train_dataset = torchvision.datasets.FashionMNIST(data_root, train=True, transform=train_transform, target_transform=None, download=True)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

valid_dataset = torchvision.datasets.FashionMNIST(data_root, train=False, transform=test_transform, target_transform=None, download=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

print("train : ", len(train_dataset))
print("valid : ", len(valid_dataset))

モデル定義

import torch.nn as nn
import torch.nn.functional as F

class ConvMod(nn.Sequential):
  def __init__(self, in_channel, out_channel, kernel_size, padding=1, stride=2, norm_fn=None, act_func=None):
    seqs = []
    seqs.append(nn.Conv2d(in_channel, out_channel, kernel_size, padding=padding, stride=stride))
    if norm_fn is not None:
      seqs.append(norm_fn(out_channel))
    if act_func is not None:
      seqs.append(act_func())
    super().__init__(*seqs)


class Model(nn.Module):
  
  MOD = ConvMod
  
  
  def __init__(self, num_classes=10, c0=16, num_conv=3, stride=2, norm_fn=None, act_func=nn.ReLU):
    super().__init__()
    seqs = []
    cin = 1
    cout = c0
    for n in range(num_conv):
      seqs.append(self.MOD(cin, cout, 3, padding=1, stride=stride, norm_fn=norm_fn, act_func=act_func))
      cin, cout = cout, cout * 2
    self.feature = nn.Sequential(*seqs)
    
    self.pool = nn.AdaptiveAvgPool2d(1)
    
    self.pred = nn.Linear(c0*4, num_classes)
    
    
    
  def forward(self, x):
    
    x = self.feature(x)
    
    gap = self.pool(x).squeeze(3).squeeze(2)
    
    o = self.pred(gap)
    return o
    
 

学習

from torch.optim import Adam
from tqdm import tqdm as tqdm


def evaluation(model, valid_loader, criteria, device):
  model.eval()
  prg = tqdm(valid_loader)
  prg.set_description("valid")
  results = [[], []]
  losses = []
  for batch in prg:
    imgs, labels = [b.to(device) for b in batch]
    with torch.no_grad():
      preds = model(imgs)
      loss = criteria(preds, labels)
      losses.append(loss.item())

    results[0].append(labels.cpu().numpy())    
    results[1].append(preds.argmax(dim=1).cpu().numpy())

  valid_loss = np.mean(losses)
  result_df = pd.DataFrame(np.array([np.concatenate(col) for col in results]).T, columns=["true", "pred"])
  return valid_loss, result_df


lr0 = 1e-3
num_epochs = 30
c0 = 32 # num of first layer channels 
device = "cpu"
MODEL_PATH = "model.pth"

def accum_func(epoch):
  if epoch < num_epochs * 0.85:
    return 1
  else:
    return 10
  

model = Model(num_classes=len(train_dataset.classes), c0=c0, norm_fn=nn.BatchNorm2d).to(device)

optimizer = Adam(model.parameters(), lr=lr0)
criterion = nn.CrossEntropyLoss()


optimizer.zero_grad()

logs = []
for epoch in range(1, num_epochs + 1):
  model.train()

  prg = tqdm(train_loader)
  prg.set_description("train Epoch {}".format(epoch))

  num_accum = accum_func(epoch)
  loss_factor = 1.0 / num_accum
  it = 0
  count = 0
  hit = 0
  losses = []
  for batch in prg:
    imgs, labels = [b.to(device) for b in batch]
    preds = model(imgs)
    loss = criterion(preds, labels) * loss_factor
    loss.backward()
    # calculate train accuracy
    it += 1
    count += len(labels)
    hit += (preds.argmax(dim=1) == labels).sum().item()
    # gradient accumulation
    if it % num_accum == 0:
      optimizer.step()
      optimizer.zero_grad()
    
    losses.append(loss.item() / loss_factor)
    
  train_acc = hit / count
  train_loss = np.mean(losses)
  
  valid_loss, valid_df = evaluation(model, valid_loader, criterion, device)
  valid_acc = (valid_df["true"]==valid_df["pred"]).mean()
  print("Epoch {} , Loss train / {:.3f} valid / {:.3f}, Accuracy  train /{:.3f} valid {:.3f}".format(epoch, train_loss, valid_loss, train_acc, valid_acc))
  logs.append((epoch, train_loss, valid_loss, train_acc, valid_acc))
    
logs_df = pd.DataFrame(logs, columns=["Epoch", "train_loss", "valid_loss", "train_acc", "valid_acc"]).set_index("Epoch")
        
fig, _ = plt.subplots(figsize=(15, 4), ncols=2)
logs_df[["train_loss", "valid_loss"]].plot(ax=fig.axes[0], title="loss")
logs_df[["train_acc", "valid_acc"]].plot(ax=fig.axes[1], title="accuracy")
    
    
    
torch.save(model.state_dict(), MODEL_PATH)
valid_df.groupby(["true", "pred"]).size().unstack(fill_value=0)

学習結果

学習曲線

f:id:nakamrnk:20201012103658j:plain

confusion matrix
true\pred T-shirt/top Trouser Pullover Dress Coat Sandal Shirt Sneaker Bag Ankle boot
T-shirt/top 860 0 20 19 5 2 89 0 5 0
Trouser 0 985 1 10 0 0 2 0 2 0
Pullover 10 1 908 10 36 0 35 0 0 0
Dress 8 3 9 937 22 0 20 0 1 0
Coat 1 0 52 32 878 0 36 0 1 0
Sandal 0 0 0 0 0 971 0 21 0 8
Shirt 92 2 56 25 57 1 760 0 7 0
Sneaker 0 0 0 0 0 4 0 975 0 21
Bag 1 0 1 1 2 1 3 2 987 2
Ankle boot 0 0 0 0 0 6 0 27 0 967

全体で92%精度の精度だった。
誤判定は

  • T-shirt/top - Shirtクラス間
  • Pullover - Shirtクラス間

などが多い。

クラスごとの評価結果
precision recall F-measure
T-shirt/top 0.884774 0.86 0.872211
Trouser 0.993946 0.985 0.989453
Pullover 0.86724 0.908 0.887152
Dress 0.90619 0.937 0.921337
Coat 0.878 0.878 0.878
Sandal 0.985787 0.971 0.978338
Shirt 0.804233 0.76 0.781491
Sneaker 0.95122 0.975 0.962963
Bag 0.984048 0.987 0.985522
Ankle boot 0.968938 0.967 0.967968

クラスの内訳は

  • トップス - T-shirt/top , Pullover, Dress , Coat, Shirt
  • ボトムス- Trouser
  • 靴 - Sandal, Sneaker, Ankle boot
  • その他 - Bag

であるので、画像全体の形状が独立しているTrouser, Bagクラスは正解率が高い。
一方で全体の構造が似ているトップス関連(私が見ても違いがよく分からないものも多い)
はやや性能が低く、Shirtクラスの正解率が特に低いため原因を説明したい。

Anchors

AnchorsはXAI技術の一つであり、データ点周辺の予測結果がほとんど変わらない
"Anchor"と呼ばれる領域を用いてモデルを説明しようとするアルゴリズムである。
AnchorsLIMEに近い手法である(Authorが同じ)。

LIMEはあるデータ点近くのモデルの挙動を線形近似することでモデルを説明する
アルゴリズムであるが、 Anchorsはデータ点周辺で予測結果があまり変わらない領域と
それに寄与している特徴量を求めることで、 あるデータの予測に欠かせない
特徴量セットを求めるというアルゴリズムである。

superpixels

LIMEAnchorsは画像を教師なしで領域分割するsuperpixelsと呼ばれる技術により
画像を分割し、分割した部分を塗りつぶしてモデルに入力することによる結果の変化から
その領域の重要度を推定している。

このsuperpixelsを行うためのアルゴリズムは複数あり、このアルゴリズム選択がLIME
説明結果に影響を与える3

今回は以下の3つの手法を比較した。

  • quick-shift4
  • slic5
  • 固定グリッド6

Anchors検証

alibi7ライブラリの実装を利用して検証した。

superpixels 準備

from alibi.explainers import AnchorImage
from skimage.segmentation import slic, quickshift, watershed

def slic_segmentation(image):
  return slic(image, n_segments=30)


def superpixel(image, size=(4, 7)):
    segments = np.zeros([image.shape[0], image.shape[1]])
    row_idx, col_idx = np.where(segments == 0)
    for i, j in zip(row_idx, col_idx):
        segments[i, j] = int((image.shape[1]/size[1]) * (i//size[0]) + j//size[1])
    return segments
  
  
def predict_fn(x):
  img = torch.from_numpy(x.astype(np.float32)/255.0).reshape([-1, 1, IMG_SIZE, IMG_SIZE]).to(device)
  with torch.no_grad():
    logits = model(img)
    probs = nn.Softmax(dim=1)(logits).cpu().numpy()
  return probs


v_index = 1
tensor, label = valid_dataset[v_index]
arr = (tensor.cpu().numpy()[0] * 255).astype(np.uint8)
image_shape =arr.shape


explainer_qs = AnchorImage(predict_fn, image_shape, segmentation_fn="quickshift")
explainer_slic = AnchorImage(predict_fn, image_shape, segmentation_fn=slic_segmentation)
explainer_grid = AnchorImage(predict_fn, image_shape, segmentation_fn=superpixel)


expls = {
  "qs":explainer_qs,
  "slic":explainer_slic,
  "grid":explainer_grid
}

解析・可視化

def analyze(v_indices):
  num_images = len(v_indices)
  num_expls = len(expls)
  figsize = (min(num_images * 4, 25), 3 * (1 + num_expls))
  fig, axes = plt.subplots(figsize=figsize, nrows= 1 + num_expls, ncols=num_images)


  for i, v_index in enumerate(v_indices):
    tensor, label = valid_dataset[v_index]
    arr = (tensor.cpu().numpy()[0] * 255).astype(np.uint8)
    class_label = valid_dataset.classes[label]

    axes[0, i].imshow(arr, cmap="gray");
    axes[0, i].set_title(class_label)


    probs = predict_fn(arr)[0]
    pred_index = probs.argmax()
    prob  = probs[pred_index]
    prob_label = "{} : {:.2f}%".format(valid_dataset.classes[pred_index], prob*100)  
    for k, (key, expl) in enumerate(expls.items()):
      explanation = expl.explain(arr.reshape([IMG_SIZE, IMG_SIZE, 1]), threshold=.95, p_sample=.8, seed=0)
      print(i, key, explanation.precision, explanation.coverage)
      axes[1 + k, i].imshow(explanation.anchor[:,:,0], cmap="gray");
      axes[1 + k, i].set_title(prob_label)
  return fig

各クラス、ランダムに10枚サンプリングした評価データに対してAnchorsの計算を行う。

Bagクラス

f:id:nakamrnk:20201012153925j:plain

上図がBagクラスに対するAnchors検証結果である。
最上段が元画像、2,3, 4行目がそれぞれ、"quick-shift", "slic", "固定グリッド"による
super-pixelアルゴリズムを用いたAnchorsの出力結果である。

Bagクラスの場合全体の構造が他のクラスと異なっているため、Anchorsのような
ローカルな置き換えに反応している画像は少ない。
ただ、持ち手部分は他のクラスにはない特徴であるため、反応している画像も存在する。

Trouserクラス

f:id:nakamrnk:20201012154508j:plain

ThrouserクラスもBagクラスと同様比較的予測精度の高いクラスである。
ボトムスはこのクラスだけなので足の部分が重要であることは予測される。
quick-shift以外のアルゴリズムは足の先端付近の細い部分が重要であるとしているので、
妥当な結果に見える。quick shiftはどこにも反応していない画像がほとんどである。

また、どのアルゴリズムも左から5番目の誤判定画像に関しては画像全体に反応している。 このような誤判定画像に対する結果はTruouserのクラス特徴とは乖離すると思われる。

靴クラス比較
Sandal

f:id:nakamrnk:20201012155550j:plain

Sneaker

f:id:nakamrnk:20201012155608j:plain

Ankle boot

f:id:nakamrnk:20201012155625j:plain

靴関連の3クラスに対するAnchorsの結果を比較すると

  • Sandalクラスは紐や細い部分に反応している
  • Sneakerクラスは靴の正面部分や履き口に反応
  • Ankle bootは正面部分に反応
    • Sneakerとの違いは角度?

のようにある程度合理的に見える挙動をしている。 (quick-shiftアルゴリズム以外)

トップス比較

Dressクラス

トップスの中では比較的精度の高いdressクラスに対するAnchorsの結果は下図のようになっている。

f:id:nakamrnk:20201012160958j:plain

固定グリッドアルゴリズムは比較的結果が安定しており、胸や胴まわりに反応しているものが多い。

Pulloverクラス

f:id:nakamrnk:20201012161326j:plain

pulloverクラスは袖の先端や胴、襟元付近に反応しているものが見られる。

T-shirt/top クラス

f:id:nakamrnk:20201012161727j:plain

T-shirt/topクラスは反応している画像自体が少ないが、
反応しているものは首周りや肩、脇あたりに反応している。

Shirtクラス

f:id:nakamrnk:20201012162058j:plain

Shirtクラスは他クラスと比べて性能が低いクラスである。

全体の構造的には半袖はT-shirt, 長袖はPulloverとの判別が難しそうに見えるが
首まわりの構造が異なるのでそれらがはっきりしている画像は正しく判別できている
ように感じる(右から1, 2, 4番目の画像等)。

そもそも素人目にはShirtに見えない画像(左から4, 5番目等)も多いため、
データ自体の難易度が高いため、性能が低いものと思われる。

まとめ

Fashion Mnistデータに対してAnchorsアルゴリズムによるXAIの検証を行った。
比較的妥当に見えるAnchorが出力されているものも多かったが、
superpixelsアルゴリズムへの依存やグローバルな構造を見づらいという
欠点も感じられたので、他のXAI手法と併用したほうがよいと思った。

参考文献

OpenCVjsとtensorflow.jsによるモデル検証アプリ

はじめに

OpenCVjsは画像処理ライブラリであるOpenCVjavascript版。
tensorflow.jsはtensorflowのjavascript版である。
これらを組み合わせて、webブラウザ上で簡単な画像修正と
モデル推論を行うプログラムをgithubに公開した。

https://github.com/NeverendingNotification/opecvjs_tensorflowjs_viewer

概要

近年Deep Learningは様々な分野で利用されるようになっている。
しかし、 pythonによるDeep Learning関連の環境構築は素人には難しい部分もある。
そこで、webブラウザさえあれば動作するDeep Learning環境として
tensorflow.jsは有望である。これにもともとはCの画像処理ライブラリであった
opencvjavascript版であるOpenCVjsを組み合わせることで、
簡単な画像処理を行いながらDeep Learningに気軽に触れられる
プログラムを作ってみた。

アプリの使い方

このライブラリのローカル環境での使い方を以下に述べる。

機能

このアプリの機能は主に2つあり、

  1. OpenCVjsによる簡単な画像処理
    • 現時点では色変換、回転・スケール変換のみ
  2. 1で修正した画像と元画像に対して学習済みモデルによる予測結果比較
画像編集

上記のgithubリポジトリをクローンして、index.htmlを開くと以下のような画面となる。

f:id:nakamrnk:20200918225705j:plain

上側の領域が元画像領域、中央が編集パラメータ領域、 下側の領域が編集後画像領域である。 

レナ画像(http://www.ess.ic.kanagawa-it.ac.jp/app_images_j.htmlよりダウンロード)を例に
アプリの動きを説明する。

  1. 中央のファイルを選択ボタンからローカルのファイル選択
  2. 中央の編集パラメータを適当に変更する
  3. 編集ボタンを押す

この結果アプリの画面は以下のようになる。
f:id:nakamrnk:20200918230324j:plain

このようにOpenCVjsを利用するとウェブブラウザのみで簡単な画像編集ができる。

モデル推論

モデル推論に関してはローカルで行うのにひと手間必要である。
これはセキュリティ上webブラウザがローカルファイルへのアクセスを許可していない (ことが多い)ため、学習済みのモデルにアクセスできないためである。

これを回避するためにいくつか手法が考えられると思うが、chromeを利用しているならば
chromeアプリであるWeb Server for Chromeを利用するのが楽だと思う。

アプリインストール後左上のアプリ項目からWeb Serverを選択すると以下のような画面が
表示されるため、 CHOOSE FOLDERから先ほどのアプリのルートフォルダを指定し、 画面中央のリンク(ここではhttp://127.0.0.1:8887/)からアプリに移動できる。

f:id:nakamrnk:20200918231338j:plain

ここで先ほどの画像編集と同様に画像を読み込み中央にある予測ボタンを押すと
学習済みモデルの予測結果が表示される。
f:id:nakamrnk:20200918232000j:plain

今回の学習済みモデルは手元のデータで適当に学習した、
人間、動物、食べ物の3クラス分類モデルである。

このモデルでLenna画像を予測した結果、
人間8.5%, 食べ物91%と明らかに間違った結果となっている。
これはこの学習済みモデルの学習データのバイアスが原因があると思われる。

今回学習に利用したデータは正面の人間画像が多く、振り向き顔であるLennaのような
姿勢の画像はなかった。また、Lenna画像はやや色合いが強く、これも学習データと
異なるように感じた。

そこで画像編集機能で彩度を-20した結果で予測を行う(下側のパネル)と
予測結果が人間 44 %まで上昇した。

さらに、明度、回転、拡大変換を追加して、学習データの条件に近づけると
人間 59 %、食べ物 40 %となり、かろうじて間違っていない結果を得ることができた。

f:id:nakamrnk:20200918232957j:plain

このように手軽に画像処理を行いながら、Deep Learningモデルの結果を観測することで、
モデルの持つバイアスを理解しやすくなる。

このアプリで軽くテストしてみた限り、今回のモデルは

  • 色相をピンクよりにすると予測確率が上がり、それ以外は下がる
  • 彩度は下げると、明度は上げると、予測確率が上がる
  • 回転は単体で行うと予測確率が下がるが、拡大といい感じに組み合わせると予測確率が上がる

などのモデルの癖を観測することができた。

まとめ

javascriptを使って学習済みモデルの性質をチェックするアプリを開発した。
ブラウザから手軽に扱えるので、モデルの性質把握がしやすい。
現状画像処理機能の種類やアプリデザインがイマイチなので
暇があったら修正したい。

参考文献

PFRLを試してみる - self play

はじめに

前回PFRLを用いてslime volleyballを学習した。
今回は同じ slime volleyballl環境に対して,
複数のagent を用いたself playを試してみる。

self play

対戦型ゲームにおける強化学習は対戦相手となるエージェントに依存する。
前回の学習では、slime volleyballが予め用意してくれているdefaultエージェントに
勝てるように学習を行ったが、 問題によっては初期に対戦相手となる
エージェントが存在しない場合がある。

そのような場合はself playが有効である。 self playは過去の自分自身に
打ち勝てるように学習を行う手法である。

  1. 初期にランダムにエージェントを初期化
  2. 片方のエージェント(A)のみ学習し、もう片方のエージェント(B)のパラメータを固定する
  3. AがBに安定して勝てるようになるまでAを学習する
  4. AのパラメータをBにコピーして2に戻る。

過去の自分を超えるプロセスを何回か繰り返すことで、
外部の情報(初期対戦相手)なしに、 エージェントを学習することができる。

slime volleyballについてのself playは以下のページで検証されている。
https://github.com/hardmaru/slimevolleygym/blob/master/TRAINING.md

検証

検証は前回同様Google Colaboratory上で行った。

ライブラリ

!pip install pfrl
!pip install slimevolleygym
import slimevolleygym
import argparse
import os

import torch
import torch.nn as nn
import numpy as np
from PIL import Image


import gym


import pfrl
from pfrl.q_functions import DiscreteActionValueHead
from pfrl import agents
from pfrl import experiments
from pfrl import explorers
from pfrl import nn as pnn
from pfrl import utils
from pfrl.q_functions import DuelingDQN
from pfrl import replay_buffers

from pfrl.wrappers import atari_wrappers
from pfrl.initializers import init_chainer_default
from pfrl.q_functions import DistributionalDuelingDQN

環境構築

コード

SEED = 0
train_seed = SEED
test_seed = 2 ** 31 - 1 - SEED

class SelfPlayMultiBinaryAsDiscreteAction(gym.ActionWrapper):
    """Transforms MultiBinary action space to Discrete.
    If the action space of a given env is `gym.spaces.MultiBinary(n)`, then
    the action space of the wrapped env will be `gym.spaces.Discrete(2**n)`,
    which covers all the combinations of the original action space.
    Args:
        env (gym.Env): Gym env whose action space is `gym.spaces.MultiBinary`.
    """

    def __init__(self, env):
        super().__init__(env)
        assert isinstance(env.action_space, gym.spaces.MultiBinary)
        self.orig_action_space = env.action_space
        self.action_space = gym.spaces.Discrete(2 ** env.action_space.n)

    def action(self, action):
        return [(action >> i) % 2 for i in range(self.orig_action_space.n)]


    def step(self, action, otherAction=None):
      if otherAction is not None:
        otherAction = self.action(otherAction)
      return self.env.step(self.action(action), otherAction=otherAction)

def make_env(test):
  # Use different random seeds for train and test envs
  env_seed = test_seed if test else train_seed
  env = gym.make("SlimeVolley-v0")  
  env.seed(int(env_seed))
  if isinstance(env.action_space, gym.spaces.MultiBinary):
      env = SelfPlayMultiBinaryAsDiscreteAction(env)
  # if args.render:
  #     env = pfrl.wrappers.Render(env)
  return env 

# 初期SEED設定
utils.set_random_seed(SEED)

# 環境設定
env = make_env(test=False)
eval_env = make_env(test=True)
obs = env.observation_space
obs_size = env.observation_space.low.size
n_actions = env.action_space.n
print(obs_size, n_actions)

前回との違いはgymのwrapperをmulti agent用に修正した部分である。
slime volleyball環境のstep関数は複数入力を与えることでmulti agent
に対応するため、それに合わせてコードを修正した。

エージェント

コード

class DistributionalDuelingHead(nn.Module):
    """Head module for defining a distributional dueling network.
    This module expects a (batch_size, in_size)-shaped `torch.Tensor` as input
    and returns `pfrl.action_value.DistributionalDiscreteActionValue`.
    Args:
        in_size (int): Input size.
        n_actions (int): Number of actions.
        n_atoms (int): Number of atoms.
        v_min (float): Minimum value represented by atoms.
        v_max (float): Maximum value represented by atoms.
    """

    def __init__(self, in_size, n_actions, n_atoms, v_min, v_max):
        super().__init__()
        assert in_size % 2 == 0
        self.n_actions = n_actions
        self.n_atoms = n_atoms
        self.register_buffer(
            "z_values", torch.linspace(v_min, v_max, n_atoms, dtype=torch.float)
        )
        self.a_stream = nn.Linear(in_size // 2, n_actions * n_atoms)
        self.v_stream = nn.Linear(in_size // 2, n_atoms)

    def forward(self, h):
        h_a, h_v = torch.chunk(h, 2, dim=1)
        a_logits = self.a_stream(h_a).reshape((-1, self.n_actions, self.n_atoms))
        a_logits = a_logits - a_logits.mean(dim=1, keepdim=True)
        v_logits = self.v_stream(h_v).reshape((-1, 1, self.n_atoms))
        probs = nn.functional.softmax(a_logits + v_logits, dim=2)
        return pfrl.action_value.DistributionalDiscreteActionValue(probs, self.z_values)

def phi(x):
  return np.asarray(x, dtype=np.float32)



def get_rainbow_agent(gamma, gpu, update_interval=1,replay_start_size=2000, target_update_interval=2000,
                      n_atoms=51, v_max=1, v_min=-1, hidden_size=512,
                      noisy_net_sigma=0.1,
                      lr0=1e-3, eps=1.5e-4, betasteps=2e6, num_step_return=3,
                      minibatch_size=32):
  
  # categorical Q-function
  q_func = nn.Sequential(
          nn.Linear(obs_size, hidden_size),
          nn.ReLU(),
          nn.Linear(hidden_size, hidden_size),
          nn.ReLU(),
          DistributionalDuelingHead(hidden_size, n_actions, n_atoms, v_min, v_max),
      )


  pnn.to_factorized_noisy(q_func, sigma_scale=noisy_net_sigma)
  # 探索アルゴリズム
  explorer = explorers.Greedy()

  # 最適化
  opt = torch.optim.Adam(q_func.parameters(), lr=lr0, eps=eps)

  # replay
  rbuf = replay_buffers.PrioritizedReplayBuffer(
            10 ** 6,
            alpha=0.5,
            beta0=0.4,
            betasteps=betasteps,
            num_steps=num_step_return,
            normalize_by_max="memory"
        )

  agent = agents.CategoricalDoubleDQN(
          q_func,
          opt,
          rbuf,
          gpu=gpu,
          gamma=gamma,
          explorer=explorer,
          minibatch_size=minibatch_size,
          replay_start_size=replay_start_size,
          target_update_interval=target_update_interval,
          update_interval=update_interval,
          batch_accumulator="mean",
          phi=phi,
          max_grad_norm=10,
      )
  return agent

エージェントに関しては前回と同様にRainbow Agentを用いている。

評価コード

from contextlib import ExitStack

def evaluate(agent1, agent2=None, n_episodes=30,  num_obs=1, multi_agent=False):
  contexts = [agent1]

  if multi_agent:
    assert agent2 is not None
    contexts.append(agent2)
  env = eval_env

  scores = []
  terminate = False
  timestep = 0
  obses = []
  actions = []
  rewards = []
  dones = []

  with ExitStack() as stack:
    for agent in contexts:
      stack.enter_context(agent.eval_mode())
    reset = True
    while not terminate:
        if reset:
            obs = env.reset()
            obs2 = obs
            done = False
            test_r = 0
            episode_len = 0
            info = {}
  
        a1 = agent1.act(obs)
        if multi_agent:
          a2 = agent2.act(obs2)

        if len(scores) < num_obs:
          obses.append(obs)
        actions.append(a1)

        if multi_agent:
          obs, r, done, info = env.step(a1, a2)
          obs2 = info['otherObs']
        else:
          obs, r, done, info = env.step(a1)

        rewards.append(r)
        dones.append(done)

        test_r += r
        episode_len += 1
        timestep += 1
        reset = done or info.get("needs_reset", False)
        agent1.observe(obs, r, done, reset)
        if multi_agent:
          agent2.observe(obs2, -r, done, reset)     
        if reset:
            # As mixing float and numpy float causes errors in statistics
            # functions, here every score is cast to float.
            scores.append(float(test_r))
        terminate = len(scores) >= n_episodes

  return obses, actions, rewards, dones, scores
    

可視化コード

import cv2
from PIL import Image

def get_converter(img_size, x0=-2, x1=2, y0=0, y1=2):
  def converter(x, y):
    px = int((np.clip(x, x0, x1) - x0)/(x1 - x0) * (img_size - 1))
    py = img_size - int((np.clip(y, y0, y1) - y0)/(y1 - y0) * (img_size - 1))
    return px, py

  return converter


def visualize(obses, rewards, skip=1, img_size=64):
  arrs = []
  converter = get_converter(img_size)
  total_reward = 0

  pb1 = converter(0, 0)
  pb2 = converter(0, 0.2)

  p_a = 0
  p_o = 0
  for s, (o, reward) in enumerate(zip(obses[:-1:skip], rewards)):
    arr = np.zeros([img_size, img_size, 3], dtype=np.uint8) + 255
    pa = converter(-o[0], o[1])
    pb = converter(-o[4], o[5])
    po = converter(o[8], o[9])

    cv2.circle(arr, pa, 3, (255, 0, 0), -1)
    cv2.circle(arr, pb, 3, (0, 255, 0), -1)
    cv2.circle(arr, po, 3, (0, 0, 255), -1)
    cv2.line(arr, pb1, pb2, (128, 64, 64), 2)
    total_reward += reward
    if reward > 0:
      p_a += 1
    elif reward < 0:
      p_o += 1

    txt = "{}:{}:{}".format(p_a, p_o, total_reward)
    cv2.putText(arr, txt, (3, 15), cv2.FONT_HERSHEY_PLAIN, 1.0, (0, 0, 0))

    arrs.append(arr)

  return arrs

学習

パラメータ

steps = 10 ** 6
gamma = 0.98

update_interval = 1 
betasteps = steps / update_interval  * 2
gpu = 0 if torch.cuda.is_available() else -1
print(gpu)

学習

import time
import logging
import pandas as pd
from pfrl.experiments.evaluator import save_agent

agent  = get_rainbow_agent(gamma, gpu, update_interval=update_interval,
                                  betasteps=betasteps) 
agent2 = get_rainbow_agent(gamma, gpu, update_interval=update_interval,
                                  betasteps=betasteps) 


outdir = "selfplay_results"
os.makedirs(outdir, exist_ok=True)


champion_period = 20000
next_champion = champion_period
num_match = 20
update_threshold = 0.5

eval_period = 50000
num_eval = 20
next_eval = eval_period

save_period = 200000


logger = logging.getLogger(__name__)

match_scores = []
eval_scores = []
logs = []
episode_r = 0
episode_idx = 0
episode_len = 0
t = 0
num_champion = 0
obs = env.reset()
obs2 = obs

t0 = time.time()
with agent2.eval_mode():
  while t < steps:
    action = agent.act(obs)
    action2 = agent2.act(obs2)
    obs, r, done, info = env.step(action, action2)
    obs2 = info['otherObs']
    t += 1
    episode_r += r
    episode_len += 1
    reset = info.get("needs_reset", False)
    agent.observe(obs, r, done, reset)
    agent2.observe(obs2, -r, done, reset)
    if done or reset or t == steps:
      if t == steps:
          break

      if t  >= next_champion:
        _, actions, _, _, scores = evaluate(agent, agent2=agent2, n_episodes=num_match,  num_obs=0, multi_agent=True)
        mean_score = np.mean(scores)
        if mean_score > update_threshold:
          agent2.model.load_state_dict(agent.model.state_dict())
          save_agent(agent, t, outdir, logger, suffix="_oldagent")
          num_champion += 1
        print("Match {} :{},  {:.4f} {:.1f} s".format(t, num_champion, mean_score, time.time() - t0))
        next_champion += champion_period
        match_scores.extend([(t, num_champion, s) for s in scores])

      if t  >= next_eval:
        _, actions, _, _, scores = evaluate(agent, agent2=agent2, n_episodes=num_eval,  num_obs=0, multi_agent=False)
        eval_scores.extend([(t, s) for s in scores])
        stats = {}
        stats["steps"] = t
        stats["episodes"] = episode_idx
        stats["time"] = time.time() - t0
        stats["mean"] = np.mean(scores)
        stats["median"] = np.median(scores)
        stats["stdev"] = np.std(scores)
        for k, v in agent.get_statistics():
          stats[k] = v
        print("Eval {} : {} {:.1f} s".format(t, np.mean(scores), time.time() - t0))
        logs.append(stats)
        pd.DataFrame(logs).to_csv(os.path.join(outdir, "scores.csv"))
        next_eval += eval_period

      # Start a new episode
      episode_r = 0
      episode_idx += 1
      episode_len = 0
      obs = env.reset()
      obs2 = obs


    if t % save_period == 0:
      save_agent(agent, t, outdir, logger, suffix="_checkpoint")

以下の条件でself play学習を行った。

  • 20,000 framesごとに新エージェントと旧エージェントを20回対戦させる
  • 新エージェントの旧エージェントに対するスコアが0.5以上の場合は旧エージェントのパラメータを更新する

学習時間の節約のため新エージェントと旧エージェントとの評価用対戦は20回としているが、
性能差を正確に見たい場合は、もう少し多くしたほうが良いかもしれない。
また、対戦周期の20,000 framesも検討の余地があると思う。

結果

学習経過

f:id:nakamrnk:20200810140855p:plain

学習エージェントAの対戦エージェントB(1つ前の世代)に対する平均スコアを
プロットすると上図のようになる。
800,000 framesで計14世代のエージェントが生まれた。
序盤の性能の低い段階では、世代の切り替わりが激しい。
(第三世代はやや長いが...)
10世代以降になると1世代にかかるframe数も伸びており、
ある程度以上成長すると過去の自分に打ち勝つことが難しくなるのが分かる。

対戦相手がslime volleyball defaultエージェントの場合のスコア

f:id:nakamrnk:20200810141340p:plain

学習序盤ではdefaultエージェントに対するスコアはほとんど伸びていない。
350,000 framesほどで-2.5くらいとなり、その後しばらく停滞するが、
最終的には-0.4(700,000 frames)となり、やや負け越すぐらいの性能となる。
self playのみでもdefaultエージェントに近い性能までは学習できることが分かった。

self play対戦成績

世代交代はエージェントがひとつ前の世代に対して勝つと
行われるが、 それ以前の世代にも勝てるかは判定していない。
そのため、前の世代のみに強くてそれ以前の世代には弱いような
汎用性のないエージェントが生じる可能性もある。

ここでは、いくつかの世代間で対戦成績を比較し、そのようなことが
起こっていないかを確認する。

対戦結果

今回は1, 3, 8, 10, 12, 14世代に対して総当りの対戦を行った。
1つの組み合わせあたり、30戦行い平均スコアを求めた。

A\B 1 3 8 10 12 14
1 nan -0.266667 -3.86667 -4.36667 -4.63333 -4.93333
3 0.266667 nan -3.96667 -4.53333 -4.73333 -4.83333
8 3.86667 3.96667 nan -3.93333 -4.2 -4.3
10 4.36667 4.53333 3.93333 nan -2.46667 -2.2
12 4.63333 4.73333 4.2 2.46667 nan -0.533333
14 4.93333 4.83333 4.3 2.2 0.533333 nan

エージェントA(縦軸)とエージェントB(横軸)の対戦結果は上表のようになる。
1, 3世代は8世代以降に大きく負け越しており、 defaultエージェントに対する学習曲線で
見られた通り、 1, 3 世代と8世代以降には大きな性能差があることが分かる。

それ以降の世代でも今回比較した範囲では後の世代に行くほど性能が向上している。 
(12世代と14世代の差は僅かではあるが...)

3世代 vs 8世代 (スコア : -3.97)

f:id:nakamrnk:20200810161026g:plain

左側の赤が3世代、右側の青が8世代のエージェントである。
実際は30ゲーム試行して平均スコアを計算しているが、最初の1ゲームのみ表示。

3世代と8世代を比較すると,3世代はほとんどボールに触れていないが、
8世代になると自分の近くに来たボールには反応できている。

8世代 vs 10世代 (スコア : -3.93)

f:id:nakamrnk:20200810161317g:plain

赤 : 8世代、 青 : 10世代。
8世代と10世代の試合ではだいぶラリーが続くようになっている。
8世代はボールに反応して動けてはいるが、 ボールコントロール
あまく、相手コートにうまく返せていない。 一方で10世代は2, 3タッチで
ボールを相手コートに返せているので、成長が見て取れる。

10世代 vs 14世代 (スコア : -2.2)

f:id:nakamrnk:20200810162024g:plain

赤 : 10世代、 青 : 14世代。

10世代と14世代の対戦ではどちらも中々ボールを落とさない。
14世代のほうが動きが洗練されているように見えるが、 10世代も粘るので
結果的に5点先取までにタイムアップ(3,000 framesでタイムアップ)となり、
スコアが2くらいになるものと思われる。

まとめ

今回は slime volleyballをself playによって学習した。
self playのハイパーパラメータには調整の余地があると思ったが、
self playのみでもslime volleyballのdefault エージェントくらいのモデルは
学習できることが分かった。 学習を早めるために、学習初期はself playを
行い、 終盤はdefault エージェントを使うようにすればいいのではないかと
思った。

参考文献

PFRLを試してみる - slime volleyball

はじめに

前回PFRLatari SpaceInvadorの学習を行ったが、
計算時間が足りず、うまく学習できなかった。
今回はもう少し簡単な、Slime Volleyball1に対して学習を行う。

slime volleyball

slime volleyballは2人のプレイヤーがボールを相手のコートに
落とそうとする単純なバレーボールである。

f:id:nakamrnk:20200809143725g:plain

上図の赤はプレイヤーの位置、緑はボールの位置、青は対戦相手の位置である。
状態としてプレイヤー、ボール、対戦相手の位置・速度を受け取り(12次元)、
対戦相手に勝てるようなプレイヤーの動かし方を学習するという問題である。  

どちらかが5点とればゲーム終了であり、
自分がポイントをとると1点、相手がポイントをとると-1点の報酬が得られる。
ゆえに1エピソード当たりの収益は-5 ~ 5の間の整数となる。

検証

学習アルゴリズムは前回に引き続きRainbow2を用いる。
PFRLのexampleを参考にした。
Google Colaboratoryで検証

学習コード

ライブラリ読み込み

!pip install pfrl
!pip install slimevolleygym
import slimevolleygym
import argparse
import os

import torch
import torch.nn as nn
import numpy as np
from PIL import Image


import gym


import pfrl
from pfrl.q_functions import DiscreteActionValueHead
from pfrl import agents
from pfrl import experiments
from pfrl import explorers
from pfrl import nn as pnn
from pfrl import utils
from pfrl.q_functions import DuelingDQN
from pfrl import replay_buffers

from pfrl.wrappers import atari_wrappers
from pfrl.initializers import init_chainer_default
from pfrl.q_functions import DistributionalDuelingDQN

環境設定

SEED = 0
train_seed = SEED
test_seed = 2 ** 31 - 1 - SEED

class MultiBinaryAsDiscreteAction(gym.ActionWrapper):
    """Transforms MultiBinary action space to Discrete.
    If the action space of a given env is `gym.spaces.MultiBinary(n)`, then
    the action space of the wrapped env will be `gym.spaces.Discrete(2**n)`,
    which covers all the combinations of the original action space.
    Args:
        env (gym.Env): Gym env whose action space is `gym.spaces.MultiBinary`.
    """

    def __init__(self, env):
        super().__init__(env)
        assert isinstance(env.action_space, gym.spaces.MultiBinary)
        self.orig_action_space = env.action_space
        self.action_space = gym.spaces.Discrete(2 ** env.action_space.n)

    def action(self, action):
        return [(action >> i) % 2 for i in range(self.orig_action_space.n)]



def make_env(test):
  # Use different random seeds for train and test envs
  env_seed = test_seed if test else train_seed
  env = gym.make("SlimeVolley-v0")  
  env.seed(int(env_seed))
  if isinstance(env.action_space, gym.spaces.MultiBinary):
      env = MultiBinaryAsDiscreteAction(env)
  # if args.render:
  #     env = pfrl.wrappers.Render(env)
  return env 

# 初期SEED設定
utils.set_random_seed(SEED)

# 環境設定
env = make_env(test=False)
eval_env = make_env(test=True)
obs = env.observation_space
obs_size = env.observation_space.low.size
n_actions = env.action_space.n
print(obs_size, n_actions)

slime volleyballの行動は3成分のmulti binaryなので、
1成分のdiscrete action(23個)として扱うためにwrapperクラスを挟んでいる。

Agent 構築

class DistributionalDuelingHead(nn.Module):
    """Head module for defining a distributional dueling network.
    This module expects a (batch_size, in_size)-shaped `torch.Tensor` as input
    and returns `pfrl.action_value.DistributionalDiscreteActionValue`.
    Args:
        in_size (int): Input size.
        n_actions (int): Number of actions.
        n_atoms (int): Number of atoms.
        v_min (float): Minimum value represented by atoms.
        v_max (float): Maximum value represented by atoms.
    """

    def __init__(self, in_size, n_actions, n_atoms, v_min, v_max):
        super().__init__()
        assert in_size % 2 == 0
        self.n_actions = n_actions
        self.n_atoms = n_atoms
        self.register_buffer(
            "z_values", torch.linspace(v_min, v_max, n_atoms, dtype=torch.float)
        )
        self.a_stream = nn.Linear(in_size // 2, n_actions * n_atoms)
        self.v_stream = nn.Linear(in_size // 2, n_atoms)

    def forward(self, h):
        h_a, h_v = torch.chunk(h, 2, dim=1)
        a_logits = self.a_stream(h_a).reshape((-1, self.n_actions, self.n_atoms))
        a_logits = a_logits - a_logits.mean(dim=1, keepdim=True)
        v_logits = self.v_stream(h_v).reshape((-1, 1, self.n_atoms))
        probs = nn.functional.softmax(a_logits + v_logits, dim=2)
        return pfrl.action_value.DistributionalDiscreteActionValue(probs, self.z_values)

def phi(x):
  return np.asarray(x, dtype=np.float32)



def get_rainbow_agent(gamma, gpu, update_interval=1,replay_start_size=2000, target_update_interval=2000,
                      n_atoms=51, v_max=1, v_min=-1, hidden_size=512,
                      noisy_net_sigma=0.1,
                      lr0=1e-3, eps=1.5e-4, betasteps=2e6, num_step_return=3,
                      minibatch_size=32):
  
  # categorical Q-function
  q_func = nn.Sequential(
          nn.Linear(obs_size, hidden_size),
          nn.ReLU(),
          nn.Linear(hidden_size, hidden_size),
          nn.ReLU(),
          DistributionalDuelingHead(hidden_size, n_actions, n_atoms, v_min, v_max),
      )


  pnn.to_factorized_noisy(q_func, sigma_scale=noisy_net_sigma)
  print(q_func)
  # 探索アルゴリズム
  explorer = explorers.Greedy()

  # 最適化
  opt = torch.optim.Adam(q_func.parameters(), lr=lr0, eps=eps)

  # replay
  rbuf = replay_buffers.PrioritizedReplayBuffer(
            10 ** 6,
            alpha=0.5,
            beta0=0.4,
            betasteps=betasteps,
            num_steps=num_step_return,
            normalize_by_max="memory"
        )

  agent = agents.CategoricalDoubleDQN(
          q_func,
          opt,
          rbuf,
          gpu=gpu,
          gamma=gamma,
          explorer=explorer,
          minibatch_size=minibatch_size,
          replay_start_size=replay_start_size,
          target_update_interval=target_update_interval,
          update_interval=update_interval,
          batch_accumulator="mean",
          phi=phi,
          max_grad_norm=10,
      )
  return agent

前回用いたPFRLで直接実装されているRainbow用のネットワークはatari用であり
画像を入力としていたので、 その部分を今回の問題に合わせて書き換えている。

学習設定

import time
class Hook:
  def __init__(self, period=10000):
    self.t0 = time.time()
    self.period = period 

  def __call__(self, env, agent, t):
    if t % self.period == 0:
      t1 = time.time()
      dt = t1 - self.t0 
      print("{} : elps {:.3f}".format(t, dt))
      self.t0 = t1

steps = 10 ** 6
gamma = 0.98

update_interval = 1 
betasteps = steps / update_interval  * 2
gpu = 0 if torch.cuda.is_available() else -1

学習


rainbow_agent = get_rainbow_agent(gamma, gpu, update_interval=update_interval,
                                  betasteps=betasteps) 

out_dir = "results_rainbow"
os.makedirs(out_dir, exist_ok=True)
checkpoint_frequency = 2 * 10 ** 5
eval_n_runs = 10
eval_interval = 5 * 10 ** 4

experiments.train_agent_with_evaluation(
            agent=rainbow_agent,
            env=env,
            steps=steps,
            eval_n_steps=None,
            checkpoint_freq=checkpoint_frequency,
            eval_n_episodes=eval_n_runs,
            eval_interval=eval_interval,
            outdir=out_dir,
            save_best_so_far_agent=False,
            eval_env=eval_env,
            step_hooks=(Hook(), )
        )

#rbuf = replay_buffers.ReplayBuffer(10 ** 6, num_step_return)

結果

学習はcolab上で3時間程度(800,000 frameで途中終了)で終了した。

学習曲線
import pandas as pd
import matplotlib.pyplot as plt

df = pd.read_csv(os.path.join(out_dir, "scores.txt"), sep="\t")
cols = ["mean", "median", "average_loss"]
fig, axes = plt.subplots(figsize=(20, 6), ncols=3)
for c, col in enumerate(cols):
  df.set_index("steps").plot(y=col, ax=fig.axes[c])

f:id:nakamrnk:20200809152943p:plain

序盤はスコア(mean:平均値, median:中間値)がほぼ-5 (1ポイントもとれずに全敗)だが、
250,000 frameくらいから性能が急激に上昇し、
400,000 frameほどで収益のmedianスコアが0(対戦相手と勝率が5割程度)となっている。
その後はほぼ横ばい。 lossはまだ減少傾向にあるので時間をかければ性能向上する
可能性はある。

挙動確認

評価

from contextlib import ExitStack

def convert_action(action):
  return [(action >> i) % 2 for i in range(3)]

def evaluate(agent1, agent2=None, n_episodes=30,  num_obs=1, multi_agent=False):
  contexts = [agent1]

  if multi_agent:
    multi_env = gym.make("SlimeVolley-v0")  
    env = multi_env

    assert agent2 is not None
    contexts.append(agent2)
  else:
    env = eval_env

  scores = []
  terminate = False
  timestep = 0
  obses = []
  actions = []
  rewards = []
  dones = []

#  with agent1.eval_mode(), agent2.eval_mode():
  with ExitStack() as stack:
    for agent in contexts:
      stack.enter_context(agent.eval_mode())
  # with [agent.eval_mode() for agent in contexts]:
    reset = True
    while not terminate:
        if reset:
            obs = env.reset()
            obs2 = obs
            done = False
            test_r = 0
            episode_len = 0
            info = {}
  
        a1 = agent1.act(obs)
        if multi_agent:
          a2 = agent2.act(obs2)

        if len(scores) < num_obs:
          obses.append(obs)
        actions.append(a1)

        if multi_agent:
          obs, r, done, info = env.step(convert_action(a1), convert_action(a2))
          obs2 = info['otherObs']
        else:
          obs, r, done, info = env.step(a1)

        rewards.append(r)
        dones.append(done)

        test_r += r
        episode_len += 1
        timestep += 1
        reset = done or info.get("needs_reset", False)
        agent1.observe(obs, r, done, reset)
        if multi_agent:
          agent2.observe(obs2, -r, done, reset)     
        if reset:
            # As mixing float and numpy float causes errors in statistics
            # functions, here every score is cast to float.
            scores.append(float(test_r))
        terminate = len(scores) >= n_episodes

  return obses, actions, rewards, dones, scores


可視化

import cv2
from PIL import Image

def get_converter(img_size, x0=-2, x1=2, y0=0, y1=2):
  def converter(x, y):
    px = int((np.clip(x, x0, x1) - x0)/(x1 - x0) * (img_size - 1))
    py = img_size - int((np.clip(y, y0, y1) - y0)/(y1 - y0) * (img_size - 1))
    return px, py

  return converter


def visualize(obses, rewards, skip=1, img_size=64):
  arrs = []
  converter = get_converter(img_size)
  total_reward = 0

  pb1 = converter(0, 0)
  pb2 = converter(0, 0.2)

  p_a = 0
  p_o = 0
  for s, (o, reward) in enumerate(zip(obses[:-1:skip], rewards)):
    arr = np.zeros([img_size, img_size, 3], dtype=np.uint8) + 255
    pa = converter(-o[0], o[1])
    pb = converter(-o[4], o[5])
    po = converter(o[8], o[9])

    cv2.circle(arr, pa, 3, (255, 0, 0), -1)
    cv2.circle(arr, pb, 3, (0, 255, 0), -1)
    cv2.circle(arr, po, 3, (0, 0, 255), -1)
    cv2.line(arr, pb1, pb2, (128, 64, 64), 2)
    total_reward += reward
    if reward > 0:
      p_a += 1
    elif reward < 0:
      p_o += 1

    txt = "{}:{}:{}".format(p_a, p_o, total_reward)
    cv2.putText(arr, txt, (3, 15), cv2.FONT_HERSHEY_PLAIN, 1.0, (0, 0, 0))

    arrs.append(arr)


  return arrs

rainbow_agent.load(os.path.join(out_dir, "800000_checkpoint"))
results = evaluate(rainbow_agent, multi_agent=False)
obses = results[0]
rewards = results[2][:len(obses)]
print(len(obses), np.mean(results[-1]))
arrs = visualize(obses, rewards)
imgs = [Image.fromarray(arr) for arr in arrs]
imgs[0].save("rainbow.gif", save_all=True, loop=0, delay=33, append_images=imgs[1:])

f:id:nakamrnk:20200809153456g:plain

800,000 Iterまで学習したAgentの挙動は上図のようになる。
赤のプレイヤーは正しくボールに追随できており、青の対戦相手と
互角にラリーできている。

まとめ

今回はPFRLを使って、 slime volleyballの学習を行った。
3時間程度である程度の性能のAgentが学習できた。
slime volleyballは対戦型ゲームなので次はself-playを行ってみたい。

参考文献

PFRLを試してみる - atari

はじめに

[前回] までPFRLの簡単な使い方を学び、 openai-gymの
pendulum問題の検証を行った。
今回はatari環境においての検証を行う。

検証

PFRLのexampleを参考にした。
https://github.com/pfnet/pfrl/blob/master/examples/atari/reproduction/rainbow/train_rainbow.py

検証環境はGoogle Colaboratory。

atari 環境

ALEによるatari環境は強化学習AIのベンチマークによく利用される。
openai-gymによるwrapperも用意されているため簡単に利用できる。
(Google Colaboratoryでは最初からインストールされている)

今回はatariの中でSpaceInvader環境を選んだ。

環境構築
def make_env(env_name, test):
  # Use different random seeds for train and test envs
  env_seed = test_seed if test else train_seed
  env = atari_wrappers.wrap_deepmind(
      atari_wrappers.make_atari(env_name, max_frames=MAX_FRAMES),
      episode_life=not test,
      clip_rewards=not test,
  )
  env.seed(int(env_seed))
  if test:
      # Randomize actions like epsilon-greedy in evaluation as well
      env = pfrl.wrappers.RandomizeAction(env, EVAL_EPSILON)
  if MONITOR:
      env = pfrl.wrappers.Monitor(
          env, args.outdir, mode="evaluation" if test else "training"
      )
  return env 

# 初期SEED設定
utils.set_random_seed(SEED)

# 環境設定
env_name = "SpaceInvadersNoFrameskip-v4"
env = make_env(env_name, test=False)
eval_env = make_env(env_name, test=True)

atari_wrappers.wrap_deepminddeepmindatariの検証で行った設定
(報酬のclipping等)を適用するためのwrapperである。

Agent (Rainbow)

今回はRainbow1によって学習を行う。
RainbowはDQN2に様々な改良を施したアルゴリズムである。

DQNは主に以下の要素からなるQ-Learningアルゴリズムである。

  • 行動価値観数をQ_a(s)の形式でNeural Netにより近似する
  • TD誤差計算のtargetを工夫することで学習を安定化させる
  • Experience Replayにより過去の遷移を利用する

Rainbowはこれに以下のような改良を加えている。

  • targetの安定化にDouble DQN
  • Expericne Replayの改良 (prioritized Replay)
  • ネットワークアーキテクチャの工夫(Dueling Network)
  • Multi step Learningによる学習の高速化
  • Q関数出力を確率分布化 (Distributional RL)
  • 探索の改良 (Noisy Net)
Q関数
obs = env.observation_space
n_actions = env.action_space.n
n_atoms = 51
v_max = 10
v_min = -10
noisy_net_sigma = 0.5

q_func = DistributionalDuelingDQN(n_actions, n_atoms, v_min, v_max,)

pnn.to_factorized_noisy(q_func, sigma_scale=noisy_net_sigma)

print(obs, n_actions)
q_func

Distributional DQNを用いるので、ネットワークの出力は行動ごとに
-10 ~ 10の範囲で51分割したヒストグラムとなる。

また、to_factorized_noisyでネットワーク内のLinearモジュールを
NoiseLinearモジュールに変換している。

探索アルゴリズム ・最適化・Replay
explorer = explorers.Greedy()

# 最適化
lr0 = 6.25e-5
eps = 1.5e-4
opt = torch.optim.Adam(q_func.parameters(), eps=eps)

# Experience Replay
steps = 2 * 10 ** 6
update_interval = 4
num_step_return = 3
betasteps = steps / update_interval

rbuf = replay_buffers.PrioritizedReplayBuffer(
            10 ** 5,
            alpha=0.5,
            beta0=0.4,
            betasteps=betasteps,
            num_steps=num_step_return,
        )

DQNの探索アルゴリズムはepsilon-greedyが用いられることが多いが、
Rainobowの場合Noisy Netが探索の役割を担うので通常のGreedyを用いる。

また、multi step Learningのための設定はReplayBufferに対して行う。
(今回はnum_steps=3としているので3ステップ先からtargetの計算を行う)

Agent構築

gamma = 0.9
replay_start_size = 5 * 10**4
target_update_interval = 3 * 10 ** 4
clip_delta = True
gpu = 0 if torch.cuda.is_available() else -1
print("GPU : ", gpu)

def phi(x):
  return np.asarray(x, dtype=np.float32) / 255

agent = agents.CategoricalDoubleDQN(
        q_func,
        opt,
        rbuf,
        gpu=gpu,
        gamma=gamma,
        explorer=explorer,
        replay_start_size=replay_start_size,
        target_update_interval=target_update_interval,
        clip_delta=clip_delta,
        update_interval=update_interval,
        batch_accumulator="sum",
        phi=phi,
    )

学習
import time
class Hook:
  def __init__(self, period=2000):
    self.t0 = time.time()
    self.period = period 

  def __call__(self, env, agent, t):
    if t % self.period == 0:
      t1 = time.time()
      dt = t1 - self.t0 
      print("{} : elps {:.3f}".format(t, dt))
      self.t0 = t1



checkpoint_frequency = 2 * 10 ** 5
eval_n_runs = 10
eval_interval = 5 * 10 ** 4

experiments.train_agent_with_evaluation(
            agent=agent,
            env=env,
            steps=steps,
            eval_n_steps=None,
            checkpoint_freq=checkpoint_frequency,
            eval_n_episodes=eval_n_runs,
            eval_interval=eval_interval,
            outdir=out_dir,
            save_best_so_far_agent=False,
            eval_env=eval_env,
            step_hooks=(Hook(), )
        )

学習中の経過時間を見るためのHookクラスを作成。

結果

import pandas as pd
import matplotlib.pyplot as plt

df = pd.read_csv(os.path.join(out_dir, "scores.txt"), sep="\t")
cols = ["mean", "median", "average_loss"]
fig, axes = plt.subplots(figsize=(20, 6), ncols=3)
for c, col in enumerate(cols):
  df.set_index("steps").plot(y=col, ax=fig.axes[c])

収益の平均、中間値、 損失関数をプロットすると下図のようになる。

f:id:nakamrnk:20200806112802p:plain

振動しながらも収益は徐々に上昇傾向にあるように見える。
今回は2 x 106 frameしか学習していないが、
rainbowの論文によるとSpaceInvadorの学習が収束するには
108 frame程度かかるらしいので、Colabで検証するのは難しいと思われる。
(今回は学習に6時間程度かかった)

f:id:nakamrnk:20200806114134g:plain

2e6フレーム学習後のAgentの挙動は上図のようになっている。
いくつかのターゲットは破壊できているが、回避行動を学習できて
いないためすぐにゲームオーバーになっている。

まとめ

今回はPFRLのRainbowを使って、atariのSpaceInvadorを学習した。
現状学習が足りておらず、colabo環境で学習を行うには工夫が必要だと思う。

参考文献

PFRLを試してみる - アルゴリズム比較

はじめに

前回PFRLのSAC1でPendulum問題について強化学習を行った。
今回は別のアルゴリズムでも学習を行い比較を行う。

前回の訂正

前回 policyネットワークの出力をtorch.distributions.Normalにすると
そのままではSACが動かず修正が必要だとしていたが、
公式のSACのexampleに正しい出力形式が記載されていた。

dists.Normalを直接返すのではなく、dists.Independentを介して、
次元調整をしなければいけないらしい。

from torch import distributions as dists
class PolicyFunc(torch.nn.Module):
    def __init__(self, obs_size, n_actions, log_std_max=3, log_std_min=-15):
        super().__init__()
        self.l1 = torch.nn.Linear(obs_size, 50)
        self.l2 = torch.nn.Linear(50, 50)
        self.mean = torch.nn.Linear(50, n_actions)
        self.log_std = torch.nn.Linear(50, n_actions)
        self.log_std_max = log_std_max
        self.log_std_min = log_std_min
    
    def forward(self, x):
        h = x
        h = torch.nn.functional.relu(self.l1(h))
        h = torch.nn.functional.relu(self.l2(h))
        mean = self.mean(h)
        log_std = self.log_std(h)
        std = torch.clamp(log_std, min=self.log_std_min,
                          max=self.log_std_max).exp()
        dist = dists.Normal(mean, std)
        ####  修正箇所
        base_distribution = dists.Independent(dist, 1) 
        ####
        return base_distribution

これは通常のNormalのまま返すと制御値が多次元になった場合に、
エントロピー計算に使用するlog_probの次元も多次元になってしまうため
dists.Independentを通すことで、1次元のlog_probを返すためのものと思われる。
(ちゃんとexampleを見てから検証すればよかった...)

また、前回探索アルゴリズムにAdditiveGaussianを使っていると記述したが、
SACは学習時には分布からサンプリングするので探索アルゴリズムは使用していない。

アルゴリズム比較

今回はSAC, PPO2, TRPO3の比較を行う。
強化学習アルゴリズムの比較論文は Deep Reinforcement Learning that Mattersが有名であり、
DDPG, PPO, TRPOなどの比較をしている。
結論としては、対象問題、実装などによってどのアルゴリズムが良いかは変わるというものである。

PFRLのように複数のアルゴリズムが同一形式で実装されたライブラリがあれば、
このような異なるアルゴリズム間の比較が行いやすく、とてもありがたい。

アルゴリズム 価値観数 on/off policy
SAC Q(s, a) off
PPO V(s) on
TRPO V(s) on

強化学習において、環境との相互作用が少なくても学習ができることを
データ効率が良いという (high data efficiency)。
今回の3つのアルゴリズムの場合、SACのみoff-policyであるため、
Experience Replayによって過去の経験が使いまわせるのでデータ効率が良いとされる。

PPOはデータ効率があまりよくないため、複数の環境を並列実行して、
学習を行うことが多いが、今回は比較のため一つの環境のみを用いて学習する。
on-policy系はRNNを自然に使えるというメリットもあるが、今回は利用しない。

コード

前回からPFRLのexampleを参考に書きなおした。
主にAgentの構成部分を修正した。

参考

ライブラリ読み込み

!pip install pfrl

import os
import random

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.notebook import tqdm as tqdm

import pfrl
import torch
import torch.nn
from torch import distributions
import gym

環境構築

env = gym.make('Pendulum-v0')
obs_size = env.observation_space.low.size
n_actions = env.action_space.shape[0]

print('observation space:', env.observation_space)
print('action space:', env.action_space)

環境は前回と同様pendulum問題

学習・評価


max_episode_len = 200


def evaluate(agent, num_eval):
  trajects = []
  prg = tqdm(range(num_eval))
  with agent.eval_mode():
    for i in prg:
        obs = env.reset()
        R = 0
        t = 0
        while True:
            # Uncomment to watch the behavior in a GUI window
            # env.render()
            action = agent.act(obs)
            obs, r, done, _ = env.step(action)
            R += r
            t += 1
            reset = t == max_episode_len
            trajects.append(list(obs) + [action[0], r, done, i])
            agent.observe(obs, r, done, reset)
            if done or reset:
                break
  result_df = pd.DataFrame(trajects, columns=["cos", "sin", "dot", "action", "reward", "done", "episode"]) 
  result_df["theta"] = np.arctan2(result_df["sin"], result_df["cos"]) * 180.0/np.pi
  mean_return = result_df.groupby(["episode"])["reward"].sum().mean()
  result_df["mean_return"] = mean_return
  result_df["action"] = np.clip(result_df["action"], -2, 2)
  return mean_return, result_df

import time
def train(agent, run, stat_period=50, show_period=10, eval_period=50, 
          num_eval=100, n_episodes=300):
  results = []
  Rs = []
  t0 = time.time()
  for i in range(1, n_episodes + 1):
      obs = env.reset()
      R = 0  # return (sum of rewards)
      t = 0  # time step
      while True:
          action = agent.act(obs)
          obs, reward, done, _ = env.step(action)
          R += reward
          t += 1
          reset = t == max_episode_len
          agent.observe(obs, reward, done, reset)
          if done or reset:
              break
      Rs.append(R)
      elp_time = time.time() - t0
      if i % show_period== 0:
          print("run {}, episode {} : R : {:.4f}, time {:.3f} s ".format(run,
                                                                        i, np.mean(Rs[-50:]), elp_time))
      if i % stat_period == 0:
          print('statistics:', agent.get_statistics())
      if i % eval_period == 0:
        mean, res_df = evaluate(agent, num_eval)
        print("evaluate :", mean)
        results.append([run, i, np.mean(Rs[-50:]), mean, elp_time])
  return results, res_df


SAC Agent構築

import torch.nn as nn
from pfrl.nn.lmbda import Lambda

def phi(x):
  return x.astype(np.float32, copy=False)


def squashed_diagonal_gaussian_head(x):
    mean, log_scale = torch.chunk(x, 2, dim=1)
    log_scale = torch.clamp(log_scale, -20.0, 2.0)
    var = torch.exp(log_scale * 2)
    base_distribution = distributions.Independent(
        distributions.Normal(loc=mean, scale=torch.sqrt(var)), 1
    )
    return base_distribution


def make_q_func_with_optimizer(obs_size, action_size, hidden_dim, lr):
    q_func = nn.Sequential(
        pfrl.nn.ConcatObsAndAction(),
        nn.Linear(obs_size + action_size, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, 1),
    )
    q_func_optimizer = torch.optim.Adam(q_func.parameters(), lr=lr)
    return q_func, q_func_optimizer



def make_sac_agent(obs_size, action_size, gamma, lr_poly=1e-3, lr_q=1e-3, gpu=-1, replay_start_size=500, hidden_dim=64):
    policy = nn.Sequential(
        nn.Linear(obs_size, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, action_size * 2),
        Lambda(squashed_diagonal_gaussian_head),
    )
    poly_optimizer = torch.optim.Adam(policy.parameters(), eps=lr_poly)

    q_func1, q1_optimizer = make_q_func_with_optimizer(obs_size, action_size, hidden_dim, lr_q)
    q_func2, q2_optimizer = make_q_func_with_optimizer(obs_size, action_size, hidden_dim, lr_q)

    replay_buffer = pfrl.replay_buffers.ReplayBuffer(capacity=10 ** 6)
    agent = pfrl.agents.SoftActorCritic( 
        policy, q_func1, q_func2,
        poly_optimizer, q1_optimizer, q2_optimizer,
        replay_buffer,
        gamma,
        replay_start_size=replay_start_size,
        update_interval=1,
        phi=phi,
        gpu=gpu,
    )
    return agent



TRPO Agent構築

def make_trpo_agent(obs_size, action_size, gamma, gpu=-1, lr=1e-3, hidden_dim=64, trpo_update_interval=2000):
  policy = torch.nn.Sequential(
        nn.Linear(obs_size, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, action_size),
        pfrl.policies.GaussianHeadWithStateIndependentCovariance(
            action_size=n_actions,
            var_type="diagonal",
            var_func=lambda x: torch.exp(2 * x),  # Parameterize log std
            var_param_init=0,  # log std = 0 => std = 1
        ),
    )

  vf = torch.nn.Sequential(
        nn.Linear(obs_size, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, 1),
    )
  vf_opt = torch.optim.Adam(vf.parameters(), lr=lr)

  obs_normalizer = pfrl.nn.EmpiricalNormalization(
        obs_size, clip_threshold=5
    )  

  agent = pfrl.agents.TRPO(
        policy=policy,
        vf=vf,
        vf_optimizer=vf_opt,
        obs_normalizer=obs_normalizer,
        gpu=gpu,
        update_interval=trpo_update_interval,
        max_kl=0.01,
        conjugate_gradient_max_iter=20,
        conjugate_gradient_damping=1e-1,
        gamma=gamma,
        lambd=0.97,
        vf_epochs=5,
        entropy_coef=0,
        phi=phi,
    )
  return agent

PPO Agent構築

def make_ppo_agent(obs_size, action_size, gamma, gpu=-1, lr=1e-3, hidden_dim=64, update_interval=2000, batch_size=64):
  policy = torch.nn.Sequential(
        nn.Linear(obs_size, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, action_size),
        pfrl.policies.GaussianHeadWithStateIndependentCovariance(
            action_size=action_size,
            var_type="diagonal",
            var_func=lambda x: torch.exp(2 * x),  # Parameterize log std
            var_param_init=0,  # log std = 0 => std = 1
        ),
    )

  vf = torch.nn.Sequential(
        nn.Linear(obs_size, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, hidden_dim),
        nn.ReLU(),
        nn.Linear(hidden_dim, 1),
    )


  model = pfrl.nn.Branched(policy, vf)    
  opt = torch.optim.Adam(model.parameters(), lr=lr)

  obs_normalizer = pfrl.nn.EmpiricalNormalization(
        obs_size, clip_threshold=5
    )

  agent = pfrl.agents.PPO(
        model,
        opt,
        obs_normalizer=obs_normalizer,
        gpu=gpu,
        update_interval=update_interval,
        minibatch_size=batch_size,
        clip_eps_vf=None,
        entropy_coef=0,
        standardize_advantages=True,
        gamma=gamma,
        lambd=0.97, phi=phi
    )
  return agent

各Agentの隠れ状態数は同じになるようにして、
活性化関数はReLUに統一した。

実行

%%time
gamma = 0.99
gpu = -1
num_runs = 3
hidden_dim = 64

def set_seed(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True



agents = {
    "PPO":[1500, make_ppo_agent],
    "TRPO":[1500, make_trpo_agent],                               
    "SAC":[500, make_sac_agent],
}


log_dfs = []
res_dfs = []
for agent_name, (n_episodes, agent_func) in agents.items():
  print("Algorithm : ", agent_name)
  results = []
  for run in range(num_runs):
    env.seed(run)
    set_seed(run)
    agent = agent_func(obs_size, n_actions, gamma, gpu=gpu, hidden_dim=hidden_dim)
    rows, res_df = train(agent, run, n_episodes=n_episodes)
    results.extend(rows)
    res_df["run"] = run
    res_df["algo"] = agent_name
    res_dfs.append(res_df)
  log_df = pd.DataFrame(results, columns=["run", "episode", "train", "eval", "time"])
  log_df["algo"] = agent_name
  log_dfs.append(log_df)
logs_df = pd.concat(log_dfs)
print('Finished.')

SACは500エピソード、 TRPO, PPOは1500エピソード学習している。
学習時間自体は同程度(Google Colab上で1run 15分程度)。
update_intervalについては重要そうな気がするが、
今回は適当に10エピソード分(2000 ステップ)で更新を行うようにした。

結果

3アルゴリズムの3run分の評価時の学習曲線を描画すると以下のようになる。

g = sns.relplot(x="episode", y="eval", data=logs_df, kind="line", hue="algo", height=7, aspect=1.5)
g.axes[0, 0].set_title("Learning Curve (3 run)", fontsize=20)
g.axes[0, 0].set_xlabel("Episode", fontsize=16)

f:id:nakamrnk:20200804094228p:plain

SAC(緑線)に関しては200 エピソード程度でどのrunも収束している。
PPO(青線)はrun間のバラつきが大きく最も速く学習が進んだもので
600エピソード程度で収束しているが、ほとんど学習が進んでいないrunも存在する。
TRPO(橙線)は1500エピソード程度ではまだ、収束していないように見えるため学習不足
と思われる。

algos = ["SAC", "PPO","TRPO"]
print(logs_df.groupby(["algo", "run", ])["eval"].max().unstack().loc[algos].to_markdown())

fig, axes = plt.subplots(figsize=(15, 12), ncols=3, nrows=3)
for i, alg in enumerate(algos):
  for j, run in enumerate(range(3)):
    targ = (all_res_dfs["algo"]==alg) & (all_res_dfs["run"]==run)
    all_res_dfs["action"][targ].plot(kind="hist", bins=30, ax=axes[i, j], title="{} : run {}".format(alg, run), range=[-2, 2])
最大評価値
run 0 1 2
SAC -138.307 -135.09 -139.089
PPO -243.734 -178.336 -915.163
TRPO -318.759 -654.588 -148.187

アルゴリズムの評価最大値を比較してもSACはrunごとのバラつきが小さい。
PPO, TRPOももっとも良いrunで比較するとSACと大差ないがrunごとの
ばらつきが大きい。

行動値分布

f:id:nakamrnk:20200804095629p:plain

今回の問題の行動値はPendulumにかけるトルクであり、 範囲は-2 ~ 2である。

評価run中の行動値の分布を見ると、SACの3 runは
制御値ヒストグラムにおいて-2, 2にピークがあり、
ほとんどトルクの最大値、最小値のみで制御を行っていることが分かる。
PPOの中で比較的ましなrun 1では、右側だけにピークが現れており、
部分的には最適な制御を学習できているものと思われる。

まとめ

今回はPFRLのSAC, PPO, TRPOの3アルゴリズムを Pendulum問題に対して比較した。
この問題に関してはデータ効率の観点からはSACが最もよさそうに思える。
PPOを使う場合は複数環境による並列学習を行うほうが良い。
(PFRLにも並列学習の機能はある)

パラメータ調整, RNNの利用などである程度性能が変わる可能性はあるが、
この問題に関してはSACで十分と思われるので、今後は他の問題に取り組んでみたい。 

参考文献