Produced by Fourier

AWS IoT Core + Cloud WatchでRapsberry pi pico w から取得したデータを爆速でグラフ化する

Uhara Uhara カレンダーアイコン 2024.12.13

はじめに

マイコンで取得したセンサーのデータなどをサーバにアップしてグラフ化したいという要求はいろいろなところであると思います。

Web開発メインで業務をされている旧世代の人間(私)はこの要求に対して

  • サーバを用意して
  • データベースを用意して
  • データを保存するAPIを用意して
  • マイコンからAPIたたいて
  • グラフ表示のフロントを用意して

という流れがまぁ一般的かな?と考えるのですが、AWS IoT Coreを使用した場合はどういう流れになって何ができて何うれしいのかを実際に手を動かしながら学んでいきたいと思います。

この記事で説明すること

  • AWS IoT Coreで”モノ”を作成する
  • Raspberry Pi Pico WにMQTTライブラリを入れてデータをpublishする
  • publishされたデータをCloudWatchでグラフ表示する

この記事で説明しないこと

モノの作成

AWS IoTの管理メニューの”モノ”から “モノを作成” → ひとつのモノを作成

適当に名前をつけていきます。タイプやグループ等はたくさんのモノを管理する時に考えることにします。

証明書を設定していきます

ポリシーはJSONタブから以下の設定を行いました。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:Connect",
      "Resource": "arn:aws:iot:ap-northeast-1:アカウントID:client/BlogClient"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Publish",
      "Resource": "arn:aws:iot:ap-northeast-1:アカウントID:topic/pico/pub"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Subscribe",
      "Resource": "arn:aws:iot:ap-northeast-1:アカウントID:topicfilter/pico/sub"
    },
    {
      "Effect": "Allow",
      "Action": "iot:Receive",
      "Resource": "arn:aws:iot:ap-northeast-1:アカウントID:topic/pico/sub"
    }
  ]
}

ポイントとしてはClientIDやtopic名は任意でOKですがマイコンで動かすプログラム側と合わせること、Actionとして必要なものをいれること。※Publishしたいだけの場合ConnectとPublishを定義しておけばOK。

上記で作ったポリシーをチェックしてモノを作成

証明書をダウンロードしておきましょう

これでモノの作成は一旦OKです。

Raspberry Pi Pico WにMQTTライブラリを入れて接続

次にマイコン側を用意します。今回は話をシンプルにするためにスイッチ押したらPublishが行われる形とします。Raspberry Pi PicoはGPIOに内部のプルアップが使えるためこの接続のみで通常時1、プッシュ時0が可能となります。

今回のコードはこちら

import network
from machine import Pin
import ujson
import tls
import time
#import ntptime
import ubinascii
from umqtt.simple import MQTTClient

#タクトスイッチを接続、プルアップにしておくことで通常はH出力、スイッチを押したらグランドに接続されL出力になる
sw = Pin(5,Pin.IN,Pin.PULL_UP)
#タクトスイッチを押した際にLEDが光るようにLEDの定義
led = Pin('LED',Pin.OUT)
led.value(0)

#LAN設定
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect("XXX","XXX")
while wlan.status() != network.STAT_GOT_IP:
    print('waiting...')
    time.sleep(1)
status = wlan.ifconfig()
print(f'IP address {status[0]}')

#エンドポイントは AWS IoTコンソールの"接続"メニューの"ドメイン設定"でわかる
aws_endpoint = b'a1o40xxxxxxxxxxxx.iot.ap-northeast-1.amazonaws.com'

#ポリシー設定で登録したものと名前を合わせる
client_id = "BlogClient"
topic_pub = "pico/pub"
topic_sub = "pico/sub"

#認証ファイル  事前にmpremoteコマンドで転送しておきます
CERT_FILE = "/certs/certificate.der"
KEY_FILE = "/certs/private.der"
CA_FILE = "/certs/routeca.der"

#接続用function
def mqtt_connect(client=client_id, endpoint=aws_endpoint):
    context = tls.SSLContext(tls.PROTOCOL_TLS_CLIENT)
    
    print("Read Cert file...")
    context.load_cert_chain(
        open(CERT_FILE, "rb").read(), open(KEY_FILE, "rb").read()
    )
    context.load_verify_locations(open(CA_FILE, "rb").read())

    print("Setting up MQTT Client...")
    mqtt = MQTTClient(
        client, endpoint, port=8883, keepalive=3600, ssl=context
    )
    print("Connect to AWS IoT Core...")
    mqtt.connect()
    return mqtt

#publish用function
def mqtt_publish(client, topic=topic_pub, message=''):
    #print("Publishing message...")
    #client.publish(topic, message)
    #print(message)

    try:
        client.publish(topic, message)
        print(f"Message published: {message}")
    except Exception as e:
        print(f"Failed to publish message: {e}")
           
   
#---------------------------------------------------------

#接続
try:
    mqtt = mqtt_connect()
except Exception as e:
    print(e)
    print("Unable to connect to MQTT.")

#メインループ
sw_value = 1
while True:
    if sw.value() == 1:
        led.value(0)
        time.sleep(0.1)
        sw_value = 1
    else:
        led.value(1)
        time.sleep(0.1)
        if sw_value == 1:
        #スイッチを押されたらpublishする。押しっぱなしの場合も初めの1回のみ
            print("pushed")
            sw_value = 0

       #送信情報をまとめる。            
            mesg = ujson.dumps({
                "state":{
                    "reported": {
                        "device": {
                            "client": client_id,
                            "uptime": time.ticks_ms()
                        },
                        "sw": {
                            "push": 1
                        }
                    }
                }
            })
            try:
                mqtt_publish(client=mqtt, message=mesg)
            except:
                print("Unable to publish message.")

始めは AWSのブログ を見ながら始めたのですがエラーでうまく動かず…この mqttのライブラリ が下位互換性の無い更新が入っていたようで引数が呼び出しと合ってませんでした。

このあたり の記事や各種先人さまたちの記事を参考に以下の様に認証鍵をder変換してパラメータにセットすることでうまくいきました。

$ openssl x509 -in f7c4a3****a01-certificate.pem.crt -inform PEM -out certificate.der -outform DER
$ openssl rsa -in f7c4a3****a01-private.pem.key -inform PEM -out private.der -outform DER
$ openssl x509 -in AmazonRootCA1.pem -out routeca.der -inform PEM -outform DER 

接続テスト

接続テストはこちらの”テスト”メニューの”MQTTテストクライアント”で行います。

トピックに上記プログラムで設定しているpublishのトピック名を記載して”サブスクライブ”します。

うまく接続されていれば、スイッチをポチっとするたびに画面下のほうにデータが追加されていきます。

CloudWatchでグラフ化する

まずCloudWatchへのデータ投入を許可するためのルール/ポリシーを作成します。

  • IAM で以下の様にポリシーを作成します。
{
    "Version": "2012-10-17",
    "Statement": {
        "Effect": "Allow",
        "Action": "cloudwatch:PutMetricData",
        "Resource": [
            "*"
        ]
    }
}
  • ロールを作成します
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "iot.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
  • ロールに先ほど作成したポリシーを割り当てます。

IoT Coreでルールを作成します

  • 「メッセージのルーティング」 から「ルール」の作成画面に進みます

  • トピックに投げ込まれてきた任意のデータを選択します。
  • ルールアクションではアクションに「CloudWatch metric」を、IAMロールに先ほど作成したロールを選択します。

CloudWatchで確認します

  • デバイスからデータを送ってみると

「すべてのメトリクス」 - 「IoT」- 「ルールメトリクス 」 に先ほどのルールが作成され、データがポイントされていました。万歳!

まとめ

  • いろいろハマりポイントがあって爆速ではなかったですが、次からは爆速でいけると思います。
  • ルールアクションでDynamoDBやAmazonSNSなどいろいろサービスが選択できますので、データベースにデータを保存したい、メール通知したい、などの要望にも迅速に対応できそうです。
  • 表示のさせ方はもちろん好みもあると思いますが、そこに目をつぶると特に認証周りの実装が不要になることや、Web開発側の知識が不要でグラフ表示までできるので、自前でAPIを用意するより断然魅力的に思えました。
  • 【宣伝】本格的に学びたい方はこちらファクトリー・サイエンティストの講座をおすすめしておきます!
Factory Scientist | ファクトリー・サイエンティスト thumbnail

Factory Scientist | ファクトリー・サイエンティスト

IoTデバイスによるエンジニアリング、センシング、データ解析、データ視覚化、データ活用の知識を身に付けて、データを軸に経営判断を素早くおこなうアシストをおこなう人材の育成

https://www.factoryscientist.com/

Uhara

Uhara slash forward icon Engineer

SEです。AWSやGCPなどサーバやクラウド周りはお任せください。趣味はギターとカフェ巡り。

関連記事