keras-rl2 + OpenAI gym で3リンクロボットアームの手先位置制御の強化学習(DQN学習)・シミュレータの作成
1. はじめに
keras-rl2 と OpenAI gymのPythonライブラリを使って強化学習(DQN学習)を行い、3リンクロボットアームの手先位置制御シミュレータの作成を試みる記事です。
(本記事の開発環境:Ubuntu 20.04.6 LTS, Python3.8)
2. インストール
まずはじめに、仮想環境を作る。
python3 -m venv .venv
仮想環境をactivateする。
source .venv/bin/activate
keras-rl2 をインストールする。
git clone https://github.com/wau/keras-rl2.git cd keras-rl
本記事で追加の依存関係をインストールする。
tensorflowについては、新しすぎるバージョンだと動かないため、バージョン指定してインストール。
gymはメンテナンスが終了している模様だが、バージョン 0.26.2で指定。
(パッケージがない等のエラーが出たものについても、再度インストールしている)
pip install keras-rl2==1.0.5 tensorflow==2.3.0 gym==0.26.2 pip install protobuf==3.20 pip install pygame scipy
3. gym のenvを作成する
次の3つのスクリプトをファイル名、ディレクトリ構成に気をつけて作成する。
from gym_robot_arm.envs.robot_arm_env import RobotArmEnvV0
import gym import math import random import pygame import numpy as np from gym import utils from gym import error, spaces from gym.utils import seeding from scipy.spatial.distance import euclidean import os import sys class RobotArmEnvV0(gym.Env): metadata = {'render.modes': ['human']} def __init__(self): self.set_window_size([650,650]) self.set_link_properties([121,84,97]) self.set_increment_rate(0.01) self.target_pos = self.generate_random_pos() self.action = {0: "HOLD", 1: "INC_J1", 2: "DEC_J1", 3: "INC_J2", 4: "DEC_J2", 5: "INC_J3", 6: "DEC_J3", 7: "INC_J1_J2", 8: "INC_J1_J3", 9: "INC_J2_J3", 10: "DEC_J1_J2", 11: "DEC_J1_J3", 12: "DEC_J2_J3", 13: "INC_J1_J2_J3", 14: "DEC_J1_J2_J3" } self.observation_space = spaces.Box(np.finfo(np.float32).min, np.finfo(np.float32).max, shape=(5,), dtype=np.float32) self.action_space = spaces.Discrete(len(self.action)) self.current_error = -math.inf self.seed() self.viewer = False self.i = 0 self.save_img = False def set_link_properties(self, links): self.links = links self.n_links = len(self.links) self.min_theta = math.radians(0) self.max_theta = math.radians(90) self.theta = self.generate_random_angle() self.max_length = sum(self.links) def set_increment_rate(self, rate): self.rate = rate def set_window_size(self, window_size): self.window_size = window_size self.centre_window = [window_size[0]//2, window_size[1]//2] def rotate_z(self, theta): rz = np.array([[np.cos(theta), - np.sin(theta), 0, 0], [np.sin(theta), np.cos(theta), 0, 0], [0, 0, 1, 0], [0, 0, 0, 1]]) return rz def translate(self, dx, dy, dz): t = np.array([[1, 0, 0, dx], [0, 1, 0, dy], [0, 0, 1, dz], [0, 0, 0, 1]]) return t def forward_kinematics(self, theta): P = [] P.append(np.eye(4)) for i in range(0, self.n_links): R = self.rotate_z(theta[i]) T = self.translate(self.links[i], 0, 0) P.append(P[-1].dot(R).dot(T)) return P def inverse_theta(self, theta): new_theta = theta.copy() for i in range(theta.shape[0]): new_theta[i] = -1*theta[i] return new_theta def draw_arm(self, theta): LINK_COLOR = (255, 255, 255) JOINT_COLOR = (0, 0, 0) TIP_COLOR = (0, 0, 255) theta = self.inverse_theta(theta) P = self.forward_kinematics(theta) origin = np.eye(4) origin_to_base = self.translate(self.centre_window[0],self.centre_window[1],0) base = origin.dot(origin_to_base) F_prev = base.copy() for i in range(1, len(P)): F_next = base.dot(P[i]) pygame.draw.line(self.screen, LINK_COLOR, (int(F_prev[0,3]), int(F_prev[1,3])), (int(F_next[0,3]), int(F_next[1,3])), 5) pygame.draw.circle(self.screen, JOINT_COLOR, (int(F_prev[0,3]), int(F_prev[1,3])), 10) F_prev = F_next.copy() pygame.draw.circle(self.screen, TIP_COLOR, (int(F_next[0,3]), int(F_next[1,3])), 8) def draw_target(self): TARGET_COLOR = (255, 0, 0) origin = np.eye(4) origin_to_base = self.translate(self.centre_window[0], self.centre_window[1], 0) base = origin.dot(origin_to_base) base_to_target = self.translate(self.target_pos[0], -self.target_pos[1], 0) target = base.dot(base_to_target) pygame.draw.circle(self.screen, TARGET_COLOR, (int(target[0,3]),int(target[1,3])), 12) def generate_random_angle(self): theta = np.zeros(self.n_links) theta[0] = random.uniform(self.min_theta, self.max_theta) theta[1] = random.uniform(self.min_theta, self.max_theta) theta[2] = random.uniform(self.min_theta, self.max_theta) return theta def generate_random_pos(self): theta = self.generate_random_angle() P = self.forward_kinematics(theta) pos = np.array([P[-1][0,3], P[-1][1,3]]) return pos def seed(self, seed=None): self.np_random, seed = seeding.np_random(seed) return [seed] @staticmethod def normalize_angle(angle): return math.atan2(math.sin(angle), math.cos(angle)) def step(self, action): if self.action[action] == "INC_J1": self.theta[0] += self.rate elif self.action[action] == "DEC_J1": self.theta[0] -= self.rate elif self.action[action] == "INC_J2": self.theta[1] += self.rate elif self.action[action] == "DEC_J2": self.theta[1] -= self.rate elif self.action[action] == "INC_J3": self.theta[2] += self.rate elif self.action[action] == "DEC_J3": self.theta[2] -= self.rate elif self.action[action] == "INC_J1_J2": self.theta[0] += self.rate self.theta[1] += self.rate elif self.action[action] == "INC_J1_J3": self.theta[0] += self.rate self.theta[2] += self.rate elif self.action[action] == "INC_J2_J3": self.theta[1] += self.rate self.theta[2] += self.rate elif self.action[action] == "DEC_J1_J2": self.theta[0] -= self.rate self.theta[1] -= self.rate elif self.action[action] == "DEC_J1_J3": self.theta[0] -= self.rate self.theta[2] -= self.rate elif self.action[action] == "DEC_J2_J3": self.theta[1] -= self.rate self.theta[2] -= self.rate elif self.action[action] == "INC_J1_J2_J3": self.theta[0] += self.rate self.theta[1] += self.rate self.theta[2] += self.rate elif self.action[action] == "DEC_J1_J2_J3": self.theta[0] -= self.rate self.theta[1] -= self.rate self.theta[2] -= self.rate self.theta[0] = np.clip(self.theta[0], self.min_theta, self.max_theta) self.theta[1] = np.clip(self.theta[1], self.min_theta, self.max_theta) self.theta[2] = np.clip(self.theta[2], self.min_theta, self.max_theta) self.theta[0] = self.normalize_angle(self.theta[0]) self.theta[1] = self.normalize_angle(self.theta[1]) self.theta[2] = self.normalize_angle(self.theta[2]) # Calc reward P = self.forward_kinematics(self.theta) #print('P = ', P) tip_pos = [P[-1][0,3], P[-1][1,3]] distance_error = euclidean(self.target_pos, tip_pos) #2点間の距離を算出 reward = 0 if distance_error >= self.current_error: reward = -1 epsilon = 5 if (- epsilon < distance_error and distance_error < epsilon): reward = 1 self.current_error = distance_error self.current_score += reward if self.current_score == -5 or self.current_score == 5: done = True else: done = False observation = np.hstack((self.target_pos, self.theta)) info = { 'distance_error': distance_error, 'target_position': self.target_pos, 'current_position': tip_pos, 'current_score': self.current_score } return observation, reward, done, info def reset(self): self.target_pos = self.generate_random_pos() self.current_score = 0 observation = np.hstack((self.target_pos, self.theta)) return observation def render(self, mode='human'): SCREEN_COLOR = (52, 152, 52) if self.viewer == False: if not os.path.isdir('./output') and self.save_img: os.makedirs('./output') pygame.init() pygame.display.set_caption("RobotArm-Env") self.screen = pygame.display.set_mode(self.window_size, ) self.clock = pygame.time.Clock() self.viewer = True # 同一ウィンドウ内に描画するため, 状態を変化させる self.screen.fill(SCREEN_COLOR) self.draw_target() self.draw_arm(self.theta) self.clock.tick(60) pygame.display.flip() # 画面の更新 if self.save_img: self.i += 1 pygame.image.save(self.screen, f'./output/{str(self.i).zfill(5)}.png') # イベント処理 for event in pygame.event.get(): # イベントを取得 if event.type == 'QUIT': # 閉じるボタンが押されたら pygame.quit() # 全てのpygameモジュールの初期化を解除 sys.exit() # 終了 def close(self): if self.viewer != None: pygame.quit()
from gym.envs.registration import register register( id='robot-arm-v0', entry_point='gym_robot_arm.envs:RobotArmEnvV0', )
4. keras-rl2の実行スクリプトを作成する
import numpy as np import gym from gym_robot_arm.envs.robot_arm_env import RobotArmEnvV0 from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Dense, Activation, Flatten from tensorflow.keras.optimizers import Adam from rl.agents.dqn import DQNAgent from rl.policy import BoltzmannQPolicy from rl.memory import SequentialMemory ENV_NAME = 'robot-arm-v0' # Get the environment and extract the number of actions. env = gym.make(ENV_NAME) np.random.seed(123) env.seed(123) nb_actions = env.action_space.n # Next, we build a very simple model. model = Sequential() model.add(Flatten(input_shape=(1,) + env.observation_space.shape)) model.add(Dense(32)) model.add(Activation('relu')) model.add(Dense(32)) model.add(Activation('relu')) model.add(Dense(32)) model.add(Activation('relu')) model.add(Dense(nb_actions)) model.add(Activation('linear')) print(model.summary()) # Finally, we configure and compile our agent. You can use every built-in tensorflow.keras optimizer and # even the metrics! memory = SequentialMemory(limit=50000, window_length=1) policy = BoltzmannQPolicy() dqn = DQNAgent(model=model, nb_actions=nb_actions, memory=memory, nb_steps_warmup=10, target_model_update=1e-2, policy=policy) dqn.compile(Adam(learning_rate=1e-3), metrics=['mae']) # Okay, now it's time to learn something! We visualize the training here for show, but this # slows down training quite a lot. You can always safely abort the training prematurely using # Ctrl + C. dqn.fit(env, nb_steps=50000, visualize=True, verbose=2) # After training is done, we save the final weights. dqn.save_weights(f'./models/dqn_{ENV_NAME}_weights.h5f', overwrite=True) # Finally, evaluate our algorithm for 5 episodes. dqn.test(env, nb_episodes=5, visualize=True)
5. 動作確認
ここで、実行してみる。
python3 examples/dqn_myenv.py
すると、下記画像のようなエラーが発生した。
エラーメッセージに従い、少しだけソースを編集する。 ~/rl/core.py の181行目を書き換えた。
- if not np.isreal(value): + if not np.array([np.isreal(value)]).all():
あと、~/rl/core.py の355行目も同様のエラーが出たため、書き換える。
- if not np.isreal(value): + if not np.array([np.isreal(value)]).all():
気を取り直して再び実行。
6. 重みファイルの生成
ちなみに、設定した50000ステップの学習が完了すると、 重みファイル(.h5f)に学習結果の重みデータが生成される。 これは後から呼び出し可能です。
7. 重みファイルを呼び出しての動作確認
動作確認スクリプトは下記。
python3 examples/moving_test.py
import numpy as np import gym from gym_robot_arm.envs.robot_arm_env import RobotArmEnvV0 from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Dense, Activation, Flatten from tensorflow.keras.optimizers import Adam from rl.agents.dqn import DQNAgent from rl.policy import BoltzmannQPolicy from rl.memory import SequentialMemory ENV_NAME = 'robot-arm-v0' # Get the environment and extract the number of actions. env = gym.make(ENV_NAME) np.random.seed(123) env.seed(123) nb_actions = env.action_space.n # Next, we build a very simple model. model = Sequential() model.add(Flatten(input_shape=(1,) + env.observation_space.shape)) model.add(Dense(32)) model.add(Activation('relu')) model.add(Dense(32)) model.add(Activation('relu')) model.add(Dense(32)) model.add(Activation('relu')) model.add(Dense(nb_actions)) model.add(Activation('linear')) print(model.summary()) # Finally, we configure and compile our agent. You can use every built-in tensorflow.keras optimizer and # even the metrics! memory = SequentialMemory(limit=50000, window_length=1) policy = BoltzmannQPolicy() dqn = DQNAgent(model=model, nb_actions=nb_actions, memory=memory, nb_steps_warmup=10, target_model_update=1e-2, policy=policy) dqn.compile(Adam(learning_rate=1e-3), metrics=['mae']) # After training is done, we save the final weights. dqn.load_weights(f'./models/dqn_{ENV_NAME}_weights.h5f') # Finally, evaluate our algorithm for 500 episodes. dqn.test(env, nb_episodes=500, visualize=True) #500回の動作テスト