LIFE LOG(ここにはあなたのブログ名)

作業日誌 う靴

気ままにコンピュータ関連の備忘録などを書きます...。

keras-rl2 + OpenAI gym で3リンクロボットアームの手先位置制御の強化学習(DQN学習)・シミュレータの作成

1. はじめに

 keras-rl2 と OpenAI gymのPythonライブラリを使って強化学習DQN学習)を行い、3リンクロボットアームの手先位置制御シミュレータの作成を試みる記事です。
(本記事の開発環境:Ubuntu 20.04.6 LTS, Python3.8)

2. インストール

まずはじめに、仮想環境を作る。

python3 -m venv .venv

仮想環境をactivateする。

source .venv/bin/activate

keras-rl2 をインストールする。

git clone https://github.com/wau/keras-rl2.git
cd keras-rl

本記事で追加の依存関係をインストールする。
tensorflowについては、新しすぎるバージョンだと動かないため、バージョン指定してインストール。 gymはメンテナンスが終了している模様だが、バージョン 0.26.2で指定。 (パッケージがない等のエラーが出たものについても、再度インストールしている)

pip install keras-rl2==1.0.5 tensorflow==2.3.0 gym==0.26.2
pip install protobuf==3.20
pip install pygame scipy

3. gym のenvを作成する

次の3つのスクリプトをファイル名、ディレクトリ構成に気をつけて作成する。

from gym_robot_arm.envs.robot_arm_env import RobotArmEnvV0
import gym
import math 
import random
import pygame
import numpy as np
from gym import utils
from gym import error, spaces
from gym.utils import seeding
from scipy.spatial.distance import euclidean
import os
import sys

class RobotArmEnvV0(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self):
        self.set_window_size([650,650])
        self.set_link_properties([121,84,97])
        self.set_increment_rate(0.01)
        self.target_pos = self.generate_random_pos()
        self.action = {0: "HOLD",
                       1: "INC_J1",
                       2: "DEC_J1",
                       3: "INC_J2",
                       4: "DEC_J2",
                       5: "INC_J3",
                       6: "DEC_J3",
                       7: "INC_J1_J2",
                       8: "INC_J1_J3",
                       9: "INC_J2_J3",
                       10: "DEC_J1_J2",
                       11: "DEC_J1_J3",
                       12: "DEC_J2_J3",
                       13: "INC_J1_J2_J3",
                       14: "DEC_J1_J2_J3"
                       
                    }
        
        self.observation_space = spaces.Box(np.finfo(np.float32).min, np.finfo(np.float32).max, shape=(5,), dtype=np.float32)
        self.action_space = spaces.Discrete(len(self.action))

        self.current_error = -math.inf
        self.seed()
        self.viewer = False
        self.i = 0
        self.save_img = False

    def set_link_properties(self, links):
        self.links = links
        self.n_links = len(self.links)
        self.min_theta = math.radians(0)
        self.max_theta = math.radians(90)
        self.theta = self.generate_random_angle()
        self.max_length = sum(self.links)

    def set_increment_rate(self, rate):
        self.rate = rate

    def set_window_size(self, window_size):
        self.window_size = window_size
        self.centre_window = [window_size[0]//2, window_size[1]//2]

    def rotate_z(self, theta):
        rz = np.array([[np.cos(theta), - np.sin(theta), 0, 0],
                       [np.sin(theta), np.cos(theta), 0, 0],
                       [0, 0, 1, 0],
                       [0, 0, 0, 1]])
        return rz

    def translate(self, dx, dy, dz):
        t = np.array([[1, 0, 0, dx],
                      [0, 1, 0, dy],
                      [0, 0, 1, dz],
                      [0, 0, 0, 1]])
        return t

    def forward_kinematics(self, theta):
        P = []
        P.append(np.eye(4))
        for i in range(0, self.n_links):
            R = self.rotate_z(theta[i])
            T = self.translate(self.links[i], 0, 0)
            P.append(P[-1].dot(R).dot(T))
        return P

    def inverse_theta(self, theta):
        new_theta = theta.copy()
        for i in range(theta.shape[0]):
            new_theta[i] = -1*theta[i]
        return new_theta

    def draw_arm(self, theta):
        LINK_COLOR = (255, 255, 255)
        JOINT_COLOR = (0, 0, 0)
        TIP_COLOR = (0, 0, 255)
        theta = self.inverse_theta(theta)
        P = self.forward_kinematics(theta)
        origin = np.eye(4)
        origin_to_base = self.translate(self.centre_window[0],self.centre_window[1],0)
        base = origin.dot(origin_to_base)
        F_prev = base.copy()
        for i in range(1, len(P)):
            F_next = base.dot(P[i])
            pygame.draw.line(self.screen, LINK_COLOR, (int(F_prev[0,3]), int(F_prev[1,3])), (int(F_next[0,3]), int(F_next[1,3])), 5)
            pygame.draw.circle(self.screen, JOINT_COLOR, (int(F_prev[0,3]), int(F_prev[1,3])), 10)
            F_prev = F_next.copy()
        pygame.draw.circle(self.screen, TIP_COLOR, (int(F_next[0,3]), int(F_next[1,3])), 8)

    def draw_target(self):
        TARGET_COLOR = (255, 0, 0)
        origin = np.eye(4)
        origin_to_base = self.translate(self.centre_window[0], self.centre_window[1], 0)
        base = origin.dot(origin_to_base)
        base_to_target = self.translate(self.target_pos[0], -self.target_pos[1], 0)
        target = base.dot(base_to_target)
        pygame.draw.circle(self.screen, TARGET_COLOR, (int(target[0,3]),int(target[1,3])), 12)
    
    def generate_random_angle(self):
        theta = np.zeros(self.n_links)
        theta[0] = random.uniform(self.min_theta, self.max_theta)
        theta[1] = random.uniform(self.min_theta, self.max_theta)
        theta[2] = random.uniform(self.min_theta, self.max_theta)
        return theta

    def generate_random_pos(self):
        theta = self.generate_random_angle()
        P = self.forward_kinematics(theta)
        pos = np.array([P[-1][0,3], P[-1][1,3]])
        return pos

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    @staticmethod
    def normalize_angle(angle):
        return math.atan2(math.sin(angle), math.cos(angle))

    def step(self, action):
        if self.action[action] == "INC_J1":
            self.theta[0] += self.rate
        elif self.action[action] == "DEC_J1":
            self.theta[0] -= self.rate
        elif self.action[action] == "INC_J2":
            self.theta[1] += self.rate 
        elif self.action[action] == "DEC_J2":
            self.theta[1] -= self.rate
        elif self.action[action] == "INC_J3":
            self.theta[2] += self.rate
        elif self.action[action] == "DEC_J3":
            self.theta[2] -= self.rate
        
        elif self.action[action] == "INC_J1_J2":
            self.theta[0] += self.rate
            self.theta[1] += self.rate
        
        elif self.action[action] == "INC_J1_J3":
            self.theta[0] += self.rate
            self.theta[2] += self.rate

        elif self.action[action] == "INC_J2_J3":
            self.theta[1] += self.rate
            self.theta[2] += self.rate

        elif self.action[action] == "DEC_J1_J2":
            self.theta[0] -= self.rate
            self.theta[1] -= self.rate

        elif self.action[action] == "DEC_J1_J3":
            self.theta[0] -= self.rate
            self.theta[2] -= self.rate

        elif self.action[action] == "DEC_J2_J3":
            self.theta[1] -= self.rate
            self.theta[2] -= self.rate

        elif self.action[action] == "INC_J1_J2_J3":
            self.theta[0] += self.rate
            self.theta[1] += self.rate
            self.theta[2] += self.rate

        elif self.action[action] == "DEC_J1_J2_J3":
            self.theta[0] -= self.rate
            self.theta[1] -= self.rate
            self.theta[2] -= self.rate
        
        self.theta[0] = np.clip(self.theta[0], self.min_theta, self.max_theta)
        self.theta[1] = np.clip(self.theta[1], self.min_theta, self.max_theta)
        self.theta[2] = np.clip(self.theta[2], self.min_theta, self.max_theta)

        self.theta[0] = self.normalize_angle(self.theta[0])
        self.theta[1] = self.normalize_angle(self.theta[1])
        self.theta[2] = self.normalize_angle(self.theta[2])

        # Calc reward
        P = self.forward_kinematics(self.theta)
        #print('P = ', P)
        tip_pos = [P[-1][0,3], P[-1][1,3]]
        distance_error = euclidean(self.target_pos, tip_pos) #2点間の距離を算出

        reward = 0
        if distance_error >= self.current_error:
            reward = -1

        epsilon = 5
        if (- epsilon < distance_error and distance_error < epsilon):
            reward = 1
        

        self.current_error = distance_error
        self.current_score += reward

        if self.current_score == -5 or self.current_score == 5:
            done = True
        else:
            done = False
        

        observation = np.hstack((self.target_pos, self.theta))
        info = {
            'distance_error': distance_error,
            'target_position': self.target_pos,
            'current_position': tip_pos,
            'current_score': self.current_score
        }
        return observation, reward, done, info

    def reset(self):
        self.target_pos = self.generate_random_pos()
        self.current_score = 0
        observation = np.hstack((self.target_pos, self.theta))
        return observation

    def render(self, mode='human'):
        SCREEN_COLOR = (52, 152, 52)
        if self.viewer == False:
            if not os.path.isdir('./output') and self.save_img:
                os.makedirs('./output')
            pygame.init()
            pygame.display.set_caption("RobotArm-Env")
            self.screen = pygame.display.set_mode(self.window_size, )
            self.clock = pygame.time.Clock()
            self.viewer = True # 同一ウィンドウ内に描画するため, 状態を変化させる

        self.screen.fill(SCREEN_COLOR)
        self.draw_target()
        self.draw_arm(self.theta)
        self.clock.tick(60)
        pygame.display.flip() # 画面の更新

        if self.save_img:
            self.i += 1
            pygame.image.save(self.screen, f'./output/{str(self.i).zfill(5)}.png')

        # イベント処理
        for event in pygame.event.get():  # イベントを取得
            if event.type == 'QUIT':      # 閉じるボタンが押されたら
                pygame.quit()             # 全てのpygameモジュールの初期化を解除
                sys.exit()                    # 終了

    def close(self):
        if self.viewer != None:
            pygame.quit()

from gym.envs.registration import register

register(
    id='robot-arm-v0',
    entry_point='gym_robot_arm.envs:RobotArmEnvV0',
)

4. keras-rl2の実行スクリプトを作成する

import numpy as np
import gym
from gym_robot_arm.envs.robot_arm_env import RobotArmEnvV0

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Flatten
from tensorflow.keras.optimizers import Adam

from rl.agents.dqn import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory


ENV_NAME = 'robot-arm-v0'

# Get the environment and extract the number of actions.
env = gym.make(ENV_NAME)
np.random.seed(123)
env.seed(123)
nb_actions = env.action_space.n

# Next, we build a very simple model.
model = Sequential()
model.add(Flatten(input_shape=(1,) + env.observation_space.shape))
model.add(Dense(32))
model.add(Activation('relu'))
model.add(Dense(32))
model.add(Activation('relu'))
model.add(Dense(32))
model.add(Activation('relu'))
model.add(Dense(nb_actions))
model.add(Activation('linear'))
print(model.summary())

# Finally, we configure and compile our agent. You can use every built-in tensorflow.keras optimizer and
# even the metrics!
memory = SequentialMemory(limit=50000, window_length=1)
policy = BoltzmannQPolicy()
dqn = DQNAgent(model=model, nb_actions=nb_actions, memory=memory, nb_steps_warmup=10,
               target_model_update=1e-2, policy=policy)
dqn.compile(Adam(learning_rate=1e-3), metrics=['mae'])

# Okay, now it's time to learn something! We visualize the training here for show, but this
# slows down training quite a lot. You can always safely abort the training prematurely using
# Ctrl + C.
dqn.fit(env, nb_steps=50000, visualize=True, verbose=2)

# After training is done, we save the final weights.
dqn.save_weights(f'./models/dqn_{ENV_NAME}_weights.h5f', overwrite=True)

# Finally, evaluate our algorithm for 5 episodes.
dqn.test(env, nb_episodes=5, visualize=True)

5. 動作確認

ここで、実行してみる。

python3 examples/dqn_myenv.py

すると、下記画像のようなエラーが発生した。

エラーメッセージに従い、少しだけソースを編集する。 ~/rl/core.py の181行目を書き換えた。

- if not np.isreal(value):
+ if not np.array([np.isreal(value)]).all():

あと、~/rl/core.py の355行目も同様のエラーが出たため、書き換える。

- if not np.isreal(value):
+ if not np.array([np.isreal(value)]).all():

気を取り直して再び実行。

6. 重みファイルの生成

ちなみに、設定した50000ステップの学習が完了すると、 重みファイル(.h5f)に学習結果の重みデータが生成される。 これは後から呼び出し可能です。

7. 重みファイルを呼び出しての動作確認

動作確認スクリプトは下記。

python3 examples/moving_test.py
import numpy as np
import gym
from gym_robot_arm.envs.robot_arm_env import RobotArmEnvV0

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Flatten
from tensorflow.keras.optimizers import Adam

from rl.agents.dqn import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

ENV_NAME = 'robot-arm-v0'

# Get the environment and extract the number of actions.
env = gym.make(ENV_NAME)
np.random.seed(123)
env.seed(123)
nb_actions = env.action_space.n

# Next, we build a very simple model.
model = Sequential()
model.add(Flatten(input_shape=(1,) + env.observation_space.shape))
model.add(Dense(32))
model.add(Activation('relu'))
model.add(Dense(32))
model.add(Activation('relu'))
model.add(Dense(32))
model.add(Activation('relu'))
model.add(Dense(nb_actions))
model.add(Activation('linear'))
print(model.summary())

# Finally, we configure and compile our agent. You can use every built-in tensorflow.keras optimizer and
# even the metrics!
memory = SequentialMemory(limit=50000, window_length=1)
policy = BoltzmannQPolicy()
dqn = DQNAgent(model=model, nb_actions=nb_actions, memory=memory, nb_steps_warmup=10,
               target_model_update=1e-2, policy=policy)
dqn.compile(Adam(learning_rate=1e-3), metrics=['mae'])


# After training is done, we save the final weights.
dqn.load_weights(f'./models/dqn_{ENV_NAME}_weights.h5f')

# Finally, evaluate our algorithm for 500 episodes.
dqn.test(env, nb_episodes=500, visualize=True)   #500回の動作テスト

8. 参考サイト

github.com

github.com

qiita.com

www.youtube.com