You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

6.0 KiB

使用Policy Gradient玩乒乓球游戏

安装 gym

想要玩乒乓球游戏首先得有乒乓球游戏。OpenAI 的 gym 为我们提供了模拟游戏的环境。使得我们能够很方便地得到游戏的环境状态,并作出动作。想要安装 gym 非常简单,只要在命令行中输入pip install gym即可。

安装 atari_py

由于乒乓球游戏是雅达利游戏机上的游戏,所以需要安装 atari_py 来实现雅达利环境的模拟。安装 atari_py 也很方便,只需在命令行中输入pip install --no-index -f https://github.com/Kojoley/atari-py/releases atari_py 即可。

开启游戏

当安装好所需要的库之后,我们可以使用如下代码开始游戏:

# 开启乒乓球游戏环境
import gym

env = gym.make('Pong-v0')

# 一直渲染游戏画面
while True:
    env.render()
    # 随机做动作,并得到做完动作之后的环境(observation),反馈(reward),是否结束(done)
    observation, reward, done, _ = env.step(env.action_space.sample())

游戏画面预处理

由于env.step返回出来的 observation 是一张RGB的三通道图而且我们的挡板怎么移动只跟挡板和球有关系所以我们可以尝试将三通道图转换成一张二值化的图其中挡板和球是 1 ,背景是 0 。


# 游戏画面预处理
def prepro(I):
    I = I[35:195]  #不要上面的记分牌
    I = I[::2, ::2, 0]  #scale 0.5所以I是高为80宽为80的单通道图
    I[I == 144] = 0  # 背景赋值为0
    I[I == 109] = 0  # 背景赋值为0
    I[I != 0] = 1  # 目标为1
    return I.astype(np.float).ravel() #将二维图压成一维的数组

# cur_x为预处理后的游戏画面
cur_x = prepro(observation)

游戏的画面是逐帧组成的,如果我们将当前帧和上一帧的图像相减就能得到能够表示两帧之间的变化的帧差图,将这样的帧差图作为神经网络的输入的话会是个不错的选择。

# x为帧差图
x = cur_x - prev_x
# 将当前帧更新为上一帧
prev_x = cur_x

搭建神经网络

神经网络可以根据自己的喜好来搭建,在这里我使用最简单的只有两层全连接层的网络模型来进行预测,由于我们挡板的动作只有上和下,所以最后的激活函数为 sigmoid 函数。

# 神经网络中神经元的参数
model = {}
# 随机初始化第一层的神经元参数总共200个神经元
model['W1'] = np.random.randn(H, D) / np.sqrt(D)
# 随机初始化第二层的神经元参数总共200个神经元
model['W2'] = np.random.randn(H) / np.sqrt(H)

def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))

# 神经网络的前向传播x为输入的帧差图
def policy_forward(x):
    h = np.dot(model['W1'], x)
    # relu
    h[h < 0] = 0
    logp = np.dot(model['W2'], h)
    # sigmoid激活
    p = sigmoid(logp)
    # p为下一步要往下挪的概率h为隐藏层中神经元的参数
    return p, h


# 算每层的参数偏导eph为一个游戏序列的隐藏层中神经元的参数epdlogp为一个游戏序列中反馈期望的偏导。
def policy_backward(eph, epdlogp):
    dW2 = np.dot(eph.T, epdlogp).ravel()
    dh = np.outer(epdlogp, model['W2'])
    dh[eph <= 0] = 0
    dW1 = np.dot(dh.T, epx)
    return {'W1': dW1, 'W2': dW2}

训练神经网络

while True:
    env.render()

    # 游戏画面预处理
    cur_x = prepro(observation)
    # 得到帧差图
    x = cur_x - prev_x if prev_x is not None else np.zeros(D)
    # 将上一帧更新为当前帧
    prev_x = cur_x

    #前向传播
    aprob, h = policy_forward(x)
    #从动作概率分布中采样action=2表示往上挪action=3表示往下挪
    action = 2 if np.random.uniform() < aprob else 3

    # 环境
    xs.append(x)  
    # 隐藏层状态
    hs.append(h) 
    # 将2和3改成1和0因为sigmoid函数的导数为f(x)*(1-f(x))
    y = 1 if action == 2 else 0
    dlogps.append(y - aprob)

    # 把采样到的动作传回环境
    observation, reward, done, info = env.step(action)
    # 如果得一分则reward为1丢一份则reward为-1
    reward_sum += reward

    # 记录反馈
    drs.append(reward)

    # 当有一方得到21分后游戏结束
    if done:
        episode_number += 1

        epx = np.vstack(xs)
        eph = np.vstack(hs)
        epdlogp = np.vstack(dlogps)
        epr = np.vstack(drs)
        discounted_epr = discount_rewards(epr)
        # 将反馈进行zscore归一化有利于训练
        discounted_epr -= np.mean(discounted_epr)
        discounted_epr /= np.std(discounted_epr)

        #算期望
        epdlogp *= discounted_epr
        #算梯度
        grad = policy_backward(eph, epdlogp)
        for k in model:
            grad_buffer[k] += grad[k]

        # 每batch_size次游戏更新一次参数
        if episode_number % batch_size == 0:
            #rmsprop梯度上升
            for k, v in model.items():
                g = grad_buffer[k]
                rmsprop_cache[k] = decay_rate * rmsprop_cache[k] + (1 - decay_rate) * g ** 2
                model[k] += learning_rate * g / (np.sqrt(rmsprop_cache[k]) + 1e-5)
                grad_buffer[k] = np.zeros_like(v)

        # 每100把之后保存模型
        if episode_number % 100 == 0:
            pickle.dump(model, open('save.p', 'wb'))
        reward_sum = 0
        # 重置游戏
        observation = env.reset()
        prev_x = None

加载模型玩游戏

经过漫长的训练过程后,我们可以将训练好的模型加载进来开始玩游戏了。

import numpy as np
import pickle
import gym

model = pickle.load(open('save.p', 'rb'))

env = gym.make("Pong-v0")
observation = env.reset()

while True:
    env.render()
    cur_x = prepro(observation)
    x = cur_x - prev_x if prev_x is not None else np.zeros(80*80)
    prev_x = cur_x
    aprob, h = policy_forward(x)
    #从动作概率分布中采样
    action = 2 if np.random.uniform() < aprob else 3
    observation, reward, done, info = env.step(action)

    if done:
        observation = env.reset()
        prev_x = None