LIFE LOG(ここにはあなたのブログ名)

作業日誌 う靴

気ままにコンピュータ関連の備忘録などを書きます...。

ZYNQボード(ZYJZGW)でi2c通信を使ってDCモータ制御

はじめに

DCモータ制御のため、i2c機能の付いたDRV8830(テキサスインスツルメンツ製)を使います。 秋月電子でモジュールを買いました。 akizukidenshi.com

モジュールとi2c通信するためのCコード

drv8830.c

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <fcntl.h>
#include <linux/i2c-dev.h>
#include <sys/ioctl.h>

// I2Cアドレス
// DRV8830のI2Cアドレス(A0, A1端子ともに何も接続しないアドレス設定)
#define DRV8830_ADDR 0x64 

// レジスタアドレス
#define REG_MODE 0x00

// モーター制御用の定数
#define FORWARD 0x01
#define BACKWARD 0x02
#define STOP 0x00

// I2Cファイルディスクリプタ
int i2c_fd;

// モーターの速度設定
#define SPEED 0x3d  // 4.90V

// DRV8830初期化
void drv8830_init(const char *i2c_dev) {
    i2c_fd = open(i2c_dev, O_RDWR);
    if (i2c_fd < 0) {
        perror("Failed to open the i2c bus");
        exit(1);
    }

    if (ioctl(i2c_fd, I2C_SLAVE, DRV8830_ADDR) < 0) {
        perror("Failed to connect to the DRV8830");
        close(i2c_fd);
        exit(1);
    }
}

// レジスタにデータを書き込む関数
void drv8830_set_mode(uint8_t reg, uint8_t vset, uint8_t data) {
    uint8_t vdata = (vset << 2) | data;  // ビットシフトとビットORでデータを準備
    uint8_t buf[2];
    buf[0] = reg;       // レジスタアドレス
    buf[1] = vdata;     // データ

    ssize_t res = write(i2c_fd, buf, 2);
    if (res != 2) {
        fprintf(stderr, "Failed to write to the i2c bus (expected 2 bytes, wrote %zd bytes)\n", res);
        perror("Error");
    } else {
        printf("Successfully wrote to I2C bus: reg=0x%02x, data=0x%02x\n", reg, vdata);
    }
}

// DRV8830のFAULTをリセットする関数
void fault_init() {
    uint8_t buf[2];
    buf[0] = 0x00;     // レジスタアドレス
    buf[1] = 0x80;     // データ(FAULTをリセット)

    ssize_t res = write(i2c_fd, buf, 2);
    if (res != 2) {
        fprintf(stderr, "Failed to write to the i2c bus (reset_init) (expected 2 bytes, wrote %zd bytes)\n", res);
        perror("Error");
    } else {
        printf("FAULT reset successfully.\n");
    }
}

// モーターを前進させる
void motor_forward() {
    printf("Setting motor forward...\n");
    drv8830_set_mode(REG_MODE, SPEED, FORWARD);
}

// モーターを後退させる
void motor_backward() {
    printf("Setting motor backward...\n");
    drv8830_set_mode(REG_MODE, SPEED, BACKWARD);
}

// モーターを停止させる
void motor_stop() {
    printf("Stopping motor...\n");
    drv8830_set_mode(REG_MODE, 0x00, STOP);
}

// 終了処理
void drv8830_close() {
    close(i2c_fd);
}


int main() {
    drv8830_init("/dev/i2c-1");

    fault_init();

    // モーターを前進させる
    printf("Setting motor forward...\n");
    drv8830_set_mode(REG_MODE, SPEED, FORWARD);
    sleep(10); // 10秒間動作
    
    printf("Stopping motor...\n");
    drv8830_set_mode(REG_MODE, 0x00, STOP);
    sleep(3);

    // モーターを後退させる
    printf("Setting motor backward...\n");
    drv8830_set_mode(REG_MODE, SPEED, BACKWARD);
    sleep(10); // 10秒間動作
    
    printf("Stopping motor...\n");
    drv8830_set_mode(REG_MODE, 0x00, STOP);
    sleep(3);

    drv8830_close();
    return 0;
}

回路作成時の注意事項

秋月電子のマニュアルに書いてありますが、VCCとGNDの間に0.1uF(最小で)以上の セラミックコンデンサを必ずはさみましょう。 これをはなまなかったことが原因で、3時間以上はまりました。。 モータ始動時に電圧降下が起きて、モジュールのロジック電源も降下して、 強制的にモジュールの電源がOFFられてしまうのに注意しましょう。 必ず、セラミックコンデンサをはさみましょう。 あと今回、ZYNQボートが3.3Vロジックで、モータードライバモジュールは5Vとかで動かしたいので、 レベルシフトのモジュールを使っています。

動作確認

コンパイルして、、

gcc drv8830.c -o drv8830

実行する。

sudo ./drv8830

無事に、モータが正転・停止・逆転・停止しました。 以上で完了です。

参考URL

Arduino Nano とモータードライバでDCモーターを正転・逆転してみた(1):息子と一緒に Makers:So-netブログ

データシート https://www.tij.co.jp/jp/lit/ds/symlink/drv8830.pdf

ZYNQボード(ZYJZGW)でINA219モジュールを使ってi2c通信で電流・電圧値を取得する

はじめに

INA219のモジュールをAmazonで購入しました。

今回計測する外部接続回路は、LED1つを常時点灯させたものです。

ZYNQとi2c通信するためのCコード

ina219.h

#ifndef INA219_H
#define INA219_H

#include <stdint.h>

// INA219のI2Cアドレスとレジスタ
#define INA219_ADDR 0x40
#define INA219_REG_CONFIG 0x00
#define INA219_REG_SHUNT_VOLTAGE 0x01
#define INA219_REG_BUS_VOLTAGE 0x02
#define INA219_REG_POWER 0x03
#define INA219_REG_CURRENT 0x04
#define INA219_REG_CALIBRATION 0x05

// 設定レジスタのビットマスク
#define INA219_CONFIG_RESET 0x8000
#define INA219_CONFIG_BVOLTAGERANGE_32V 0x2000
#define INA219_CONFIG_GAIN_1_40MV 0x1800
#define INA219_CONFIG_BADCRES_12BIT 0x0700
#define INA219_CONFIG_SADCRES_12BIT_1S_532US 0x0018
#define INA219_CONFIG_MODE_SANDBVOLT_CONTINUOUS 0x0007

// スケーリングファクター
#define CURRENT_LSB 0.025  // 1 LSB = 0.01 mA

void ina219_init(int i2c_fd);
void ina219_setCalibration(uint16_t cal_value);
uint16_t ina219_getCurrent_raw(void);
uint16_t ina219_getPower_raw(void);
float ina219_getBusVoltage_V(void);
float ina219_getShuntVoltage_mV(void);
float ina219_getCurrent_mA(void);
float ina219_getPower_mW(void);

#endif // INA219_H

ina219.c

#include "ina219.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <sys/ioctl.h>
#include <linux/i2c-dev.h>

static int i2c_fd;

static void write_register(uint8_t reg, uint16_t value) {
    uint8_t buffer[3];
    buffer[0] = reg;
    buffer[1] = value >> 8;
    buffer[2] = value & 0xFF;

    if (write(i2c_fd, buffer, 3) != 3) {
        perror("Failed to write to the i2c bus");
        exit(1);
    }
}

static uint16_t read_register(uint8_t reg) {
    uint8_t buffer[2];
    if (write(i2c_fd, &reg, 1) != 1) {
        perror("Failed to write register address to the i2c bus");
        exit(1);
    }
    if (read(i2c_fd, buffer, 2) != 2) {
        perror("Failed to read from the i2c bus");
        exit(1);
    }
    return (buffer[0] << 8) | buffer[1];
}

void ina219_init(int fd) {
    i2c_fd = fd;

    // Reset the device
    write_register(INA219_REG_CONFIG, INA219_CONFIG_RESET);
    usleep(50000); // Wait for the reset to complete

    // Set calibration value
    uint16_t cal_value = 0x409F; // Example calibration value
    ina219_setCalibration(cal_value);
}

void ina219_setCalibration(uint16_t cal_value) {
    write_register(INA219_REG_CALIBRATION, cal_value);
}

uint16_t ina219_getCurrent_raw(void) {
    // Set calibration register to ensure accurate reading
    uint16_t cal_value = 0x409F; // Example calibration value
    ina219_setCalibration(cal_value);

    return read_register(INA219_REG_CURRENT);
}

uint16_t ina219_getPower_raw(void) {
    // Set calibration register to ensure accurate reading
    uint16_t cal_value = 0x409F; // Example calibration value
    ina219_setCalibration(cal_value);

    return read_register(INA219_REG_POWER);
}

float ina219_getBusVoltage_V(void) {
    uint16_t value = read_register(INA219_REG_BUS_VOLTAGE);
    return (value >> 3) * 0.004; // Convert to volts (12-bit value to 0.004V/LSB)
}

float ina219_getShuntVoltage_mV(void) {
    uint16_t value = read_register(INA219_REG_SHUNT_VOLTAGE);
    return value * 0.01; // Convert to millivolts
}

float ina219_getCurrent_mA(void) {
    uint16_t raw_current = ina219_getCurrent_raw();
    return raw_current * CURRENT_LSB; // Convert to milliamperes
}

float ina219_getPower_mW(void) {
    uint16_t raw_power = ina219_getPower_raw();
    return raw_power * (20*CURRENT_LSB); // Convert to milliwatts
}

int main() {
    const char *i2c_device = "/dev/i2c-1"; // I2Cデバイスファイル

    if ((i2c_fd = open(i2c_device, O_RDWR)) < 0) {
        perror("Failed to open the i2c bus");
        return 1;
    }

    if (ioctl(i2c_fd, I2C_SLAVE, INA219_ADDR) < 0) {
        perror("Failed to acquire bus access and/or talk to slave");
        return 1;
    }

    // Initialize INA219
    ina219_init(i2c_fd);

    while (1) {
        // Get measurements
        float bus_voltage = ina219_getBusVoltage_V();
        float shunt_voltage = ina219_getShuntVoltage_mV();
        float current = ina219_getCurrent_mA();
        float power = ina219_getPower_mW();

        // Print measurements
        printf("-------------------------------\n");
        printf("Bus Voltage: %.2f V\n", bus_voltage);
        printf("Shunt Voltage: %.2f mV\n", shunt_voltage);
        printf("Current: %.2f mA\n", current);
        printf("Power: %.2f mW\n", power);
        printf("-------------------------------\n");

        // Sleep for 500ms
        usleep(500000);
    }

    close(i2c_fd);
    return 0;
}

動作確認

ひとまずコンパイル

gcc ina219.c -o ina219

実行する。

sudo ./ina219

成功すると、電流・電圧値などがターミナル上に流れる。 i2cにより無事、計測できている。

補足

ZYNQボード(EBAZ4205)でPetaLinuxを動作+LinuxユーザアプリケーションでLチカ

ZYNQボード(EBAZ4205)にPetaLinuxを導入

1500円ZYNQ基板でPetaLinuxを動作させる(後編:PetaLinux ビルドと EBAZ4205 での動作) #FPGA - Qiita

を参考にして、ZYNQボード(EBAZ4205)においてPetaLinuxを導入する。

追加の注意事項1

PetaLinuxのプロジェクト作成に使うハードウェア情報(ebaz4205_wrapper.xsa)をvivadoにおいて、 「File」->「Export」->...... の順でクリックしてあらかじめ作成しておく。

petalinux-create --type project --template zynq --name ebaz4205_test1
petalinux-config --get-hw-description /media/sf_share/ebaz4205_wrapper.xsa

上記のコマンドで、PetaLinuxのプロジェクト(ebaz4205_test1)を作成する。 そして、PetaLinuxのconfig設定で先ほど作成したハードウェア情報(ebaz4205_wrapper.xsa)と紐付ける。

petalinux-build

ビルドする。

ビルドが完了後、imagesフォルダが生成される。"image.ub"等々の各種ファイルが生成されている。 別途"BOOT.BIN"ファイルを作成するコマンドは、

petalinux-package --boot --fsbl ./images/linux/zynq_fsbl.elf --fpga ./images/linux/system.bit --u-boot --force

であるが、vivadoの設定で「File」->「Export」->「Next」->「Export Hardware」->「Include bitstream」にすれば、 "system.bit"ファイルが生成されるので、"--fpga"で指定すればよい。

Linuxユーザアプリケーションを作成する

ZYNQボード(EBAZ4205)のPLに接続したLED(H18)を、PSからの指示で点滅させるアプリケーションを作成する。

ZYBO (Zynq) 初心者ガイド (11) LinuxユーザアプリケーションでLチカ #FPGA - Qiita

を参考にして、Linuxユーザアプリケーションを作成する。

petalinux-create -t apps --template c --name myapp --enable

次に、/home/vboxuser/ebaz4205_test1/project-spec/meta-user/recipes-apps/myapp/files/myapp.c を目的のCプログラムに書き換える。

#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <errno.h>

int main(int argc, char **argv)
{
    printf("Hello World!\n");

    int fd;

    /* 1. GPIO1023を使用可能にする */
    fd = open("/sys/class/gpio/export", O_WRONLY);
    write(fd, "1023", 3);
    close(fd);

    /* 2. GPIO1023を出力設定する */
    fd = open("/sys/class/gpio/gpio1023/direction", O_WRONLY);
    write(fd, "out", 3);
    close(fd);

    /* 3. GPIO1023に1(High)を出力する */
    fd = open("/sys/class/gpio/gpio1023/value", O_WRONLY);
    write(fd, "1", 1);

    while(1) {
        write(fd, "0", 1);
        usleep(1*1000*1000);
        write(fd, "1", 1);
        usleep(1*1000*1000);
    }

    /* 4. 使い終わったので閉じる */
    close(fd);

    return 0;
}

上記のCプログラムに書き換え終わったら、

petalinux-build -x package

を実行し、"image.ub"ファイルを新しいものに更新する。 そして、SDカードの古い"image.ub"ファイルのみを上記ファイルと差し替える。

ZYNQボード(EBAZ4205)に作成したSDカードを差し込み、 別途追加作成したモード切り替えスイッチで、「SDカード実行モード」に切り替える。 加えて、シリアル通信(UART)でTeraTermなどからボーレート115200でモニタリングしておく。 ZYNQボード(EBAZ4205)の電源ON。

ls /sys/class/gpio

でGPIOの番号を確認できる。"gpiochip1023"だったので、

echo 1023 > /sys/class/gpio/export

を実行した後に、

myapp

を実行すると...

LED(H18)がチカチカ点滅する!!! (AXI GPIOを用いたPSからPL GPIOへの制御命令に成功!!)

追加の注意事項2

vivadoのDiagramにて、AXI GPIOを用いるための回路を追加する。 Adress Editorで割り当てられているアドレスを確認する。(0x41200000)

ZYNQボード(EBAZ4205)をVitis 2024.1でHello Worldする

Vitis 2024.1の起動

vivadoのTcl Consoleにて、下記コマンドでVitisをクラシック(過去のヴァージョンのインターフェース)で開く。 最新の2024.1だと、ソフトの見た目が分かりにくかったので。

vitis -classic

Vitisシリアルターミナルを開く

Connect to serial port「+」ボタンをクリックし、Vitisシリアルターミナルの設定をする。 今回は、ZYNQボード(EBAZ4205)のシリアル通信用のピンからFTDIを嚙ましてCOM7でモニタリングしている。

ビルドする

プロジェクト名を右クリック ->「Build Project」をクリック。 すると、ビルドが走って、無事完了すると「Build Finished」とConsole表示される。

プログラムを実行する

左ペイン(Explorer)の中の「Binaries」->「hello_world.elf」を選択した状態で、 実行ボタン(緑色の再生ボタン)をクリックする。 実行が成功すると、「Hello World」の表示がVitisシリアルターミナル上でなされる。

EBAZ4205というFPGAボードでJTAGプログラマ(互換品)を使う

EBAZ4205というFPGAボードの概要

下記のリンクを参考にしました。

qiita.com

JTAGプログラマ(互換品)の使い方

Ubuntu 20.04での使い方を書きます。 vivadoでJTAGプログラマ(互換品)のドライバを通すために、

sudo ./Vivado/2021.1/data/xicom/cable_drivers/lin64/install_script/install_drivers/install_drivers

をターミナルで実行する。「Sccessful」の表示が出ればOK。 ここで、念の為、vivadoを閉じてPCを再起動した。 再びvivadoを開き、左ペインの中の「Open Hardware Manager」->「Open Target」->「Auto Connect」をクリックする。 すると、vivadoでJTAGプログラマが自動認識された。そのときの画面がこちら。

Pythonを使ったWordpressへの自動投稿

1. インストール

pip install python-wordpress-xmlrpc

2. コード

指定するURLは、https://xxxxx.com/xmlrpc.php とする。
(/xmlrpc.php を通常のURLの後ろにつける)

from wordpress_xmlrpc import Client, WordPressPost
from wordpress_xmlrpc.methods.posts import GetPosts, NewPost
from wordpress_xmlrpc.methods.users import GetUserInfo


# 投稿するサイトのURLと、アカウントとパスワードを入力
wp = Client('https://xxxxx.com/xmlrpc.php', 'username', 'password')

post = WordPressPost()

# 投稿する記事のタイトルを設定
post.title = 'PythonでのWordPress投稿テスト'

#「下書き」として投げる(「公開」したい場合は、'publish')
post.post_status = 'draft'

# 記事の本文を設定
post.content = 'PythonからWordPressへ自動投稿しました。'
post.terms_names = {
    'post_tag': ['Python', 'WordPress'],  # タグの設定
    'category': ['WordPress', 'Python']   # カテゴリの設定
}
wp.call(NewPost(post))

3. 動作結果

4. 参考サイト

python-wordpress-xmlrpc.readthedocs.io

Laravel + Guzzel でフォームから画像をアップロードしてPythonの画像処理APIを叩く

1. はじめに

下記の記事の応用として、フォームから画像をアップロードできるようにし、 Pythonの画像処理APIを叩く記事です。

uuktuv.hateblo.jp

2. コントローラーの作成

下記コマンドでコントローラーを新規作成します。

 php artisan make:controller UploadController

コントローラーの中身は下記のコードにします。

<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use GuzzleHttp\Client;

class UploadController extends Controller
{
    public function index(){
        return view('upload');
    }

    public function upload(Request $request)
    {

        // 画像を保存
        $file_name = $request->file('image')->store('public');
        $file_name = explode('/', $file_name, )[1];
        var_dump($file_name);

        // APIを叩く準備
        $client = new Client([
            'headers' => [ 'Content-Type' => 'application/json' ]
        ]);

        // base64で画像を読み込んでエンコード
        $input_data = base64_encode(file_get_contents('storage/' . $file_name));
        
        $resp = $client->post('http://localhost:5000/image',
            ['body' => json_encode(
                [
                    [
                        'id' => 1,
                        'Image' => $input_data
                    ]
                ]
            )]
        );

        $resp_json = json_decode($resp->getBody(),true);
        #var_dump($resp_json);
        $id = $resp_json[0]['id'];
        $result = $resp_json[0]['result'];

        return view('ai-image', [ 'id' => $id, 'result' => $result ]);

    }
}

3. ルーティングの設定

ルーティングの設定をします。

<?php

use Illuminate\Support\Facades\Route;
use App\Http\Controllers\UploadController;

/*
|--------------------------------------------------------------------------
| Web Routes
|--------------------------------------------------------------------------
|
| Here is where you can register web routes for your application. These
| routes are loaded by the RouteServiceProvider within a group which
| contains the "web" middleware group. Now create something great!
|
*/

Route::get('/', function () {
    return view('welcome');
});

Route::get('/upload', [UploadController::class, 'index']);
Route::post('/upload', [UploadController::class, 'upload']);

4. viewファイルの設定

<!DOCTYPE html>
<html>
    <body>
    <form method="POST" action="/upload" enctype="multipart/form-data">
    @csrf
    <input type="file" name="image">
    <button>アップロード</button>
    </form>
    </body>
</html>
<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta http-equiv="X-UA-Compatible" content="IE=edge">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>AI Image Test</title>
</head>
<body>
  <h1>id: {{ $id }}</h1>
  <img src="data:image/png;base64,{{ $result }}" />

</body>
</html>

upload.blade.php は、このような感じの表示になります。

5. 動作結果

Pythonのflaskサーバーを起動します。

python3 app.py

Laravelのサーバーを起動します。

php artisan serve --host=localhost --port=8000

http://localhost:8000/upload にアクセスし、 画像をアップロードした結果が以下です。 グレースケール化された画像がちゃんとPythonの画像処理APIを介してブラウザに表示されました!

Laravel + Guzzel でPythonの画像処理APIを叩く【PHPとPythonとの連携】

1. はじめに

LaravelでGuzzelを使って、Pythonによる画像処理APIを叩く記事です。

はじめに、Laravelのプロジェクトを作ります。

composer create-project laravel/laravel sample

必要に応じて、下記コマンドを実行します。

composer install

次に、Guzzelをcomposerコマンドを使ってインストールします。

composer require guzzlehttp/guzzle

2. 画像処理APIPython + flask で準備する

下記の記事を参考に、グレースケール画像を返すAPIを準備します。

uuktuv.hateblo.jp

コードを再掲します。

from flask import Flask, jsonify, request
import cv2
import numpy as np
import base64
import json

app = Flask(__name__)

@app.route("/image", methods=["POST"])
def image_gray_convert():
    response = []

    params = json.loads(request.data.decode('utf-8'))
    
    for recieve_json in params:
        # Imageをデコード
        img_stream = base64.b64decode(recieve_json['Image'])
        # 配列に変換
        img_array = np.asarray(bytearray(img_stream), dtype=np.uint8)

        # opencv でグレースケール化(引数 = 0)
        img_gray = cv2.imdecode(img_array, 0)

        # 変換結果を保存
        cv2.imwrite('./output/result.png', img_gray)

        # 保存したファイルに対してエンコード
        with open('./output/result.png', "rb") as f:
            img_base64 = base64.b64encode(f.read()).decode('utf-8')

        response.append({'id': recieve_json['id'], 'result' : img_base64})

    return jsonify(response)

if __name__ == '__main__':
    app.run(host='localhost', port=5000, debug=True)

これで、http://localhost:5000/image にてグレースケール画像を返すAPIが作成できました。

3. Laravel側の準備

コントローラーを下記コマンドで作成します。

php artisan make:controller AiController

入力画像ファイルの置き場所として、storageのシンボリックリンクを張ります。

php artisan storage:link

入力画像ファイルを ./storage/app/public/test.jpg に置きます。

作成したコントローラーを下記に書き換えます。

<?php

namespace App\Http\Controllers;

use GuzzleHttp\Client;

class AiController extends Controller
{
    public function index(){
        $client = new Client([
            'headers' => [ 'Content-Type' => 'application/json' ]
        ]);

        $file_name = 'storage/test.png';

        $input_data = base64_encode(file_get_contents($file_name));
        
        $resp = $client->post('http://localhost:5000/image',
            ['body' => json_encode(
                [
                    [
                        'id' => 1,
                        'Image' => $input_data
                    ]
                ]
            )]
        );

        $resp_json = json_decode($resp->getBody(),true);
        #var_dump($resp_json);
        $id = $resp_json[0]['id'];
        $result = $resp_json[0]['result'];

        return view('guzzel-test', [ 'id' => $id, 'result' => $result ]);
    }
}

次に、ルーティングの設定です。

<?php

use Illuminate\Support\Facades\Route;
use App\Http\Controllers\AiController;
/*
|--------------------------------------------------------------------------
| Web Routes
|--------------------------------------------------------------------------
|
| Here is where you can register web routes for your application. These
| routes are loaded by the RouteServiceProvider within a group which
| contains the "web" middleware group. Now create something great!
|
*/

Route::get('/', function () {
    return view('welcome');
});

Route::get('/ai', [AiController::class, 'index']);

次に、viewのコードを下記にします。

<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta http-equiv="X-UA-Compatible" content="IE=edge">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>AI Image Test</title>
</head>
<body>
  <h1>id: {{ $id }}</h1>
  <img src="data:storage/image.png;base64,{{ $result }}" />

</body>
</html>

4. 動作結果

Pythonの画像処理APIのサーバーを起動します。

python3 app.py

Laravelにおいてサーバーを起動します。

php artisan serve --host=localhost --port=8000

http://127.0.0.1:8000/ai にアクセスした結果が以下です。

無事にグレースケール化された画像がブラウザ上に表示されました。

flaskで画像をグレースケール化するAPIを立てる方法

1. はじめに

flaskを使って、POSTリクエストとして投げた画像データを、グレースケール化した画像データとして返すAPIを作る記事です。

2. インストール

仮想環境の作成とactivate

python3 -m venv .venv
source .venv/bin/activate

必要なライブラリのインストール。

pip install flask opencv-python requests

3. 画像をグレースケール化するAPIのコード

output のフォルダをあらかじめ作っておく。

from flask import Flask, jsonify, request
import cv2
import numpy as np
import base64
import json

app = Flask(__name__)

@app.route("/image", methods=["POST"])
def image_gray_convert():
    response = []

    params = json.loads(request.data.decode('utf-8'))

    for recieve_json in params:
        # Imageをデコード
        img_stream = base64.b64decode(recieve_json['Image'])
        # 配列に変換
        img_array = np.asarray(bytearray(img_stream), dtype=np.uint8)

        # opencvでグレースケール化
        img_gray = cv2.imdecode(img_array, 0)

        # 変換結果を保存
        cv2.imwrite('./output/result.png', img_gray)

        # 保存したファイルに対してエンコード
        with open('./output/result.png', "rb") as f:
            img_base64 = base64.b64encode(f.read()).decode('utf-8')

        response.append({'id': recieve_json['id'], 'result' : img_base64})

    return jsonify(response)

if __name__ == '__main__':
    app.run(host='localhost', port=5000, debug=True)

下記でサーバー起動を実行した状態にしておく。

python3 app.py

4. 画像のPOSTリクエストを投げるコード

input フォルダをあらかじめ作っておく。
その中に以下の画像データ(0.jpg)を入れておいた。

import requests
import json
import base64

with open('./input/0.jpg', "rb") as f:
    img_base64 = base64.b64encode(f.read()).decode('utf-8')

url = "http://localhost:5000/image"

#JSON形式のデータ
json_data = [
    {
        "id": 0,
        "Image": img_base64
    }
]

#POSTリクエストを投げる
response = requests.post(url, data=json.dumps(json_data))

response_json = response.json()[0]['result']
print(response_json)

# Imageをデコード
img_stream = base64.b64decode(response_json)
# 配列に変換
img_array = np.asarray(bytearray(img_stream), dtype=np.uint8)

img_gray = cv2.imdecode(img_array, 0)

# 変換結果を保存
cv2.imwrite('./output_recieve/result.png', img_gray)

コードを実行してみる。

python3 post.py

実行がうまく行くと、output フォルダ内にグレースケール化された result.pngが生成される。 加えて、base64形式のグレースケール化された画像データがjsonで返ってくる。
output_recieve フォルダ内にjsonをパースして得た画像データが保存される。

keras-rl2 + OpenAI gym で3リンクロボットアームの手先位置制御の強化学習(DQN学習)・シミュレータの作成

1. はじめに

 keras-rl2 と OpenAI gymのPythonライブラリを使って強化学習DQN学習)を行い、3リンクロボットアームの手先位置制御シミュレータの作成を試みる記事です。
(本記事の開発環境:Ubuntu 20.04.6 LTS, Python3.8)

2. インストール

まずはじめに、仮想環境を作る。

python3 -m venv .venv

仮想環境をactivateする。

source .venv/bin/activate

keras-rl2 をインストールする。

git clone https://github.com/wau/keras-rl2.git
cd keras-rl

本記事で追加の依存関係をインストールする。
tensorflowについては、新しすぎるバージョンだと動かないため、バージョン指定してインストール。 gymはメンテナンスが終了している模様だが、バージョン 0.26.2で指定。 (パッケージがない等のエラーが出たものについても、再度インストールしている)

pip install keras-rl2==1.0.5 tensorflow==2.3.0 gym==0.26.2
pip install protobuf==3.20
pip install pygame scipy

3. gym のenvを作成する

次の3つのスクリプトをファイル名、ディレクトリ構成に気をつけて作成する。

from gym_robot_arm.envs.robot_arm_env import RobotArmEnvV0
import gym
import math 
import random
import pygame
import numpy as np
from gym import utils
from gym import error, spaces
from gym.utils import seeding
from scipy.spatial.distance import euclidean
import os
import sys

class RobotArmEnvV0(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self):
        self.set_window_size([650,650])
        self.set_link_properties([121,84,97])
        self.set_increment_rate(0.01)
        self.target_pos = self.generate_random_pos()
        self.action = {0: "HOLD",
                       1: "INC_J1",
                       2: "DEC_J1",
                       3: "INC_J2",
                       4: "DEC_J2",
                       5: "INC_J3",
                       6: "DEC_J3",
                       7: "INC_J1_J2",
                       8: "INC_J1_J3",
                       9: "INC_J2_J3",
                       10: "DEC_J1_J2",
                       11: "DEC_J1_J3",
                       12: "DEC_J2_J3",
                       13: "INC_J1_J2_J3",
                       14: "DEC_J1_J2_J3"
                       
                    }
        
        self.observation_space = spaces.Box(np.finfo(np.float32).min, np.finfo(np.float32).max, shape=(5,), dtype=np.float32)
        self.action_space = spaces.Discrete(len(self.action))

        self.current_error = -math.inf
        self.seed()
        self.viewer = False
        self.i = 0
        self.save_img = False

    def set_link_properties(self, links):
        self.links = links
        self.n_links = len(self.links)
        self.min_theta = math.radians(0)
        self.max_theta = math.radians(90)
        self.theta = self.generate_random_angle()
        self.max_length = sum(self.links)

    def set_increment_rate(self, rate):
        self.rate = rate

    def set_window_size(self, window_size):
        self.window_size = window_size
        self.centre_window = [window_size[0]//2, window_size[1]//2]

    def rotate_z(self, theta):
        rz = np.array([[np.cos(theta), - np.sin(theta), 0, 0],
                       [np.sin(theta), np.cos(theta), 0, 0],
                       [0, 0, 1, 0],
                       [0, 0, 0, 1]])
        return rz

    def translate(self, dx, dy, dz):
        t = np.array([[1, 0, 0, dx],
                      [0, 1, 0, dy],
                      [0, 0, 1, dz],
                      [0, 0, 0, 1]])
        return t

    def forward_kinematics(self, theta):
        P = []
        P.append(np.eye(4))
        for i in range(0, self.n_links):
            R = self.rotate_z(theta[i])
            T = self.translate(self.links[i], 0, 0)
            P.append(P[-1].dot(R).dot(T))
        return P

    def inverse_theta(self, theta):
        new_theta = theta.copy()
        for i in range(theta.shape[0]):
            new_theta[i] = -1*theta[i]
        return new_theta

    def draw_arm(self, theta):
        LINK_COLOR = (255, 255, 255)
        JOINT_COLOR = (0, 0, 0)
        TIP_COLOR = (0, 0, 255)
        theta = self.inverse_theta(theta)
        P = self.forward_kinematics(theta)
        origin = np.eye(4)
        origin_to_base = self.translate(self.centre_window[0],self.centre_window[1],0)
        base = origin.dot(origin_to_base)
        F_prev = base.copy()
        for i in range(1, len(P)):
            F_next = base.dot(P[i])
            pygame.draw.line(self.screen, LINK_COLOR, (int(F_prev[0,3]), int(F_prev[1,3])), (int(F_next[0,3]), int(F_next[1,3])), 5)
            pygame.draw.circle(self.screen, JOINT_COLOR, (int(F_prev[0,3]), int(F_prev[1,3])), 10)
            F_prev = F_next.copy()
        pygame.draw.circle(self.screen, TIP_COLOR, (int(F_next[0,3]), int(F_next[1,3])), 8)

    def draw_target(self):
        TARGET_COLOR = (255, 0, 0)
        origin = np.eye(4)
        origin_to_base = self.translate(self.centre_window[0], self.centre_window[1], 0)
        base = origin.dot(origin_to_base)
        base_to_target = self.translate(self.target_pos[0], -self.target_pos[1], 0)
        target = base.dot(base_to_target)
        pygame.draw.circle(self.screen, TARGET_COLOR, (int(target[0,3]),int(target[1,3])), 12)
    
    def generate_random_angle(self):
        theta = np.zeros(self.n_links)
        theta[0] = random.uniform(self.min_theta, self.max_theta)
        theta[1] = random.uniform(self.min_theta, self.max_theta)
        theta[2] = random.uniform(self.min_theta, self.max_theta)
        return theta

    def generate_random_pos(self):
        theta = self.generate_random_angle()
        P = self.forward_kinematics(theta)
        pos = np.array([P[-1][0,3], P[-1][1,3]])
        return pos

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    @staticmethod
    def normalize_angle(angle):
        return math.atan2(math.sin(angle), math.cos(angle))

    def step(self, action):
        if self.action[action] == "INC_J1":
            self.theta[0] += self.rate
        elif self.action[action] == "DEC_J1":
            self.theta[0] -= self.rate
        elif self.action[action] == "INC_J2":
            self.theta[1] += self.rate 
        elif self.action[action] == "DEC_J2":
            self.theta[1] -= self.rate
        elif self.action[action] == "INC_J3":
            self.theta[2] += self.rate
        elif self.action[action] == "DEC_J3":
            self.theta[2] -= self.rate
        
        elif self.action[action] == "INC_J1_J2":
            self.theta[0] += self.rate
            self.theta[1] += self.rate
        
        elif self.action[action] == "INC_J1_J3":
            self.theta[0] += self.rate
            self.theta[2] += self.rate

        elif self.action[action] == "INC_J2_J3":
            self.theta[1] += self.rate
            self.theta[2] += self.rate

        elif self.action[action] == "DEC_J1_J2":
            self.theta[0] -= self.rate
            self.theta[1] -= self.rate

        elif self.action[action] == "DEC_J1_J3":
            self.theta[0] -= self.rate
            self.theta[2] -= self.rate

        elif self.action[action] == "DEC_J2_J3":
            self.theta[1] -= self.rate
            self.theta[2] -= self.rate

        elif self.action[action] == "INC_J1_J2_J3":
            self.theta[0] += self.rate
            self.theta[1] += self.rate
            self.theta[2] += self.rate

        elif self.action[action] == "DEC_J1_J2_J3":
            self.theta[0] -= self.rate
            self.theta[1] -= self.rate
            self.theta[2] -= self.rate
        
        self.theta[0] = np.clip(self.theta[0], self.min_theta, self.max_theta)
        self.theta[1] = np.clip(self.theta[1], self.min_theta, self.max_theta)
        self.theta[2] = np.clip(self.theta[2], self.min_theta, self.max_theta)

        self.theta[0] = self.normalize_angle(self.theta[0])
        self.theta[1] = self.normalize_angle(self.theta[1])
        self.theta[2] = self.normalize_angle(self.theta[2])

        # Calc reward
        P = self.forward_kinematics(self.theta)
        #print('P = ', P)
        tip_pos = [P[-1][0,3], P[-1][1,3]]
        distance_error = euclidean(self.target_pos, tip_pos) #2点間の距離を算出

        reward = 0
        if distance_error >= self.current_error:
            reward = -1

        epsilon = 5
        if (- epsilon < distance_error and distance_error < epsilon):
            reward = 1
        

        self.current_error = distance_error
        self.current_score += reward

        if self.current_score == -5 or self.current_score == 5:
            done = True
        else:
            done = False
        

        observation = np.hstack((self.target_pos, self.theta))
        info = {
            'distance_error': distance_error,
            'target_position': self.target_pos,
            'current_position': tip_pos,
            'current_score': self.current_score
        }
        return observation, reward, done, info

    def reset(self):
        self.target_pos = self.generate_random_pos()
        self.current_score = 0
        observation = np.hstack((self.target_pos, self.theta))
        return observation

    def render(self, mode='human'):
        SCREEN_COLOR = (52, 152, 52)
        if self.viewer == False:
            if not os.path.isdir('./output') and self.save_img:
                os.makedirs('./output')
            pygame.init()
            pygame.display.set_caption("RobotArm-Env")
            self.screen = pygame.display.set_mode(self.window_size, )
            self.clock = pygame.time.Clock()
            self.viewer = True # 同一ウィンドウ内に描画するため, 状態を変化させる

        self.screen.fill(SCREEN_COLOR)
        self.draw_target()
        self.draw_arm(self.theta)
        self.clock.tick(60)
        pygame.display.flip() # 画面の更新

        if self.save_img:
            self.i += 1
            pygame.image.save(self.screen, f'./output/{str(self.i).zfill(5)}.png')

        # イベント処理
        for event in pygame.event.get():  # イベントを取得
            if event.type == 'QUIT':      # 閉じるボタンが押されたら
                pygame.quit()             # 全てのpygameモジュールの初期化を解除
                sys.exit()                    # 終了

    def close(self):
        if self.viewer != None:
            pygame.quit()

from gym.envs.registration import register

register(
    id='robot-arm-v0',
    entry_point='gym_robot_arm.envs:RobotArmEnvV0',
)

4. keras-rl2の実行スクリプトを作成する

import numpy as np
import gym
from gym_robot_arm.envs.robot_arm_env import RobotArmEnvV0

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Flatten
from tensorflow.keras.optimizers import Adam

from rl.agents.dqn import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory


ENV_NAME = 'robot-arm-v0'

# Get the environment and extract the number of actions.
env = gym.make(ENV_NAME)
np.random.seed(123)
env.seed(123)
nb_actions = env.action_space.n

# Next, we build a very simple model.
model = Sequential()
model.add(Flatten(input_shape=(1,) + env.observation_space.shape))
model.add(Dense(32))
model.add(Activation('relu'))
model.add(Dense(32))
model.add(Activation('relu'))
model.add(Dense(32))
model.add(Activation('relu'))
model.add(Dense(nb_actions))
model.add(Activation('linear'))
print(model.summary())

# Finally, we configure and compile our agent. You can use every built-in tensorflow.keras optimizer and
# even the metrics!
memory = SequentialMemory(limit=50000, window_length=1)
policy = BoltzmannQPolicy()
dqn = DQNAgent(model=model, nb_actions=nb_actions, memory=memory, nb_steps_warmup=10,
               target_model_update=1e-2, policy=policy)
dqn.compile(Adam(learning_rate=1e-3), metrics=['mae'])

# Okay, now it's time to learn something! We visualize the training here for show, but this
# slows down training quite a lot. You can always safely abort the training prematurely using
# Ctrl + C.
dqn.fit(env, nb_steps=50000, visualize=True, verbose=2)

# After training is done, we save the final weights.
dqn.save_weights(f'./models/dqn_{ENV_NAME}_weights.h5f', overwrite=True)

# Finally, evaluate our algorithm for 5 episodes.
dqn.test(env, nb_episodes=5, visualize=True)

5. 動作確認

ここで、実行してみる。

python3 examples/dqn_myenv.py

すると、下記画像のようなエラーが発生した。

エラーメッセージに従い、少しだけソースを編集する。 ~/rl/core.py の181行目を書き換えた。

- if not np.isreal(value):
+ if not np.array([np.isreal(value)]).all():

あと、~/rl/core.py の355行目も同様のエラーが出たため、書き換える。

- if not np.isreal(value):
+ if not np.array([np.isreal(value)]).all():

気を取り直して再び実行。

6. 重みファイルの生成

ちなみに、設定した50000ステップの学習が完了すると、 重みファイル(.h5f)に学習結果の重みデータが生成される。 これは後から呼び出し可能です。

7. 重みファイルを呼び出しての動作確認

動作確認スクリプトは下記。

python3 examples/moving_test.py
import numpy as np
import gym
from gym_robot_arm.envs.robot_arm_env import RobotArmEnvV0

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Flatten
from tensorflow.keras.optimizers import Adam

from rl.agents.dqn import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

ENV_NAME = 'robot-arm-v0'

# Get the environment and extract the number of actions.
env = gym.make(ENV_NAME)
np.random.seed(123)
env.seed(123)
nb_actions = env.action_space.n

# Next, we build a very simple model.
model = Sequential()
model.add(Flatten(input_shape=(1,) + env.observation_space.shape))
model.add(Dense(32))
model.add(Activation('relu'))
model.add(Dense(32))
model.add(Activation('relu'))
model.add(Dense(32))
model.add(Activation('relu'))
model.add(Dense(nb_actions))
model.add(Activation('linear'))
print(model.summary())

# Finally, we configure and compile our agent. You can use every built-in tensorflow.keras optimizer and
# even the metrics!
memory = SequentialMemory(limit=50000, window_length=1)
policy = BoltzmannQPolicy()
dqn = DQNAgent(model=model, nb_actions=nb_actions, memory=memory, nb_steps_warmup=10,
               target_model_update=1e-2, policy=policy)
dqn.compile(Adam(learning_rate=1e-3), metrics=['mae'])


# After training is done, we save the final weights.
dqn.load_weights(f'./models/dqn_{ENV_NAME}_weights.h5f')

# Finally, evaluate our algorithm for 500 episodes.
dqn.test(env, nb_episodes=500, visualize=True)   #500回の動作テスト

8. 参考サイト

github.com

github.com

qiita.com

www.youtube.com