Atom's tech blog

Raspberry Pi からCPU温度、メモリ使用量をAWS IoT Analyticsへデータセットしてみた!

f:id:iAtom:20201203224601j:plain


本記事ではコアデバイスRaspberry Pi)から送信したCPU温度、メモリ使用量をAWS IoT Core経由でIoT Analyticsへデータセットしてみます。

AWS IoT CoreやGreengrass設定は、以前使用した設定をそのまま使用するため、Raspberry PiAWS IoT Greengrassを動かしてみた!(Part1〜4)をご参照してください。

AWS IoT Analyticsとは

AWS IoT Analytics は、IoT デバイスからのデータ分析に必要なステップを自動化します。 AWS IoT Analytics 時系列データ・ストアにデータを格納して分析する前に、IoT データをフィルタリング、変換、および強化します。デバイスから必要なデータのみを収集して、数学的変換を適用してデータを処理し、処理されたデータを保存する前にデバイスの種類や場所などのデバイス固有のメタデータでデータを強化するサービスを設定できます。その後、組み込みの SQL クエリ エンジンを使用してクエリを実行してデータを分析したり、より複雑な分析や機械学習の推測を実行したりできます

AWS構成図

f:id:iAtom:20201203145649j:plain

IoT Analytics登録

IoT Analytcisコンソール

すべてのサービスから「IoT Analytics」を入力し検索し、IoT Analytcisコンソールへアクセスします。


f:id:iAtom:20201203151553j:plain

チャネル作成

チャネルとは、IoT Coreからのメッセージを受けるために必要な設定です。

左メニュー画面でチャネルを選択し、「チャネル作成」ボタンをクリックします。 一つもチャネル登録されていない場合は、下記イメージ図が表示されます。


f:id:iAtom:20201203151835j:plain

チャネルID、データ保持時間の設定

チャネルIDは「channelID0001」としています。あと英大文字入れていますが、小文字として扱われるようです。 ストレージタイプはS3サービスを使用しないので「サービスマネージド型ストア」を選択(内容については調べた方がいいかもです)します。


f:id:iAtom:20201203152711j:plain


rawデータを保存する期間は、とりあえずお試しでトライしているだけなので、最短の期間1年とし、「次へ」のボタンをクリックします。


f:id:iAtom:20201203152841j:plain


IoT Coreからのメッセージのトピックフィルタを入力します。Raspberry Piから送信するトピックを「topic/data」にしますので、入力しておきます。その後、「メッセージ表示」ボタンをクリックします。


f:id:iAtom:20201203153250j:plain


画面に「トピック接続はアクティブです」と表示されます。恐らくRaspberry Piとは接続できた状態なのでアクティブと表示されるのかもしれません。(間違っていたらすみません。)

まだRaspberry Piからメッセージを送信していないので、「このトピックでまだメッセージを受信していませんが.......」と表示されます。気にせずそのまま進みましょう。


f:id:iAtom:20201203153605j:plain


次にIAMロール名を作成します。画面の説明にも書かれていますが、「このチャネルにメッセージを配置するアクセス権を付与するロールを作成する」必要があります。「新規作成」を選択してください。


f:id:iAtom:20201203153816j:plain


新しいロールの作成をします。 名前に「analyticsRole」とします。名前はなんでもいいです。

ロールの作成」ボタンをクリックします。


f:id:iAtom:20201203153956j:plain


画面が戻り、IAMロール名の箇所に「analyticsRole」が表示されます。

表示されない場合は、名前が入る位置をクリックするとプルダウン選択が表示されるかもしれません。

最後に「チャネルの作成」ボタンをクリックします。


f:id:iAtom:20201203154250j:plain


作成が完了すると登録したチャネル名で一覧に表示されます。


f:id:iAtom:20201203154808j:plain


先ほどIAMロール名を作成したので、IAMサービスで「analyticsRole」が登録されていることを確認します。

すべてのサービスから「IAM」をクリックします。


f:id:iAtom:20201203155029j:plain


IAMコンソールの左メニューで「ロール」を選択すると、右画面で「analyticsRole」が登録されていることが確認できます。


f:id:iAtom:20201203155217j:plain

データストアの作成

データストアとは、そのままデータを保持するために登録する設定です。 左メニュー画面でデータストアを選択し、「データストアの作成」ボタンをクリックします。 何もチャネル登録されていない場合は、下記イメージ図が表示されます。


f:id:iAtom:20201203155703j:plain

データストアID およびデータ保護期間の設定

IDはこちらもなんでもいいです。ここでは「dataStore0001」としています。 ストレージタイプは「サービスマネージド型ストア」、保持期間は「1年」とします。 最後に「データのストアの作成」ボタンをクリックします。


f:id:iAtom:20201203160042j:plain


作成が完了すると下記の画面に遷移します。


f:id:iAtom:20201203160144j:plain

パイプラインの作成

パイプラインとは、チャネルとデータストアを関連付けするために作成します。またLambda関数で処理することもできるようです。 Lambda関数はもう少し勉強してからトライしてみます。

左メニュー画面でパイプラインを選択し、「パイプラインの作成」ボタンをクリックします。 何もチャネル登録されていない場合は、下記イメージ図が表示されます。


f:id:iAtom:20201203160615j:plain

パイプラインIDおよびソースの設定

パイプラインIDもなんでもいいです。ここでは「pipeLine0001」としています。 パイプラインソースはチャネルIDを選択します。本投稿で作成した「channelid0001」を指定し、「次へ」のボタンをクリックします。


f:id:iAtom:20201203161835j:plain

メッセージの属性の設定

そのまま「次へ」ボタンをクリックします。 「チャネルでサンプルメッセージが見つかりませんでした」と表示されていますが、まだメッセージデータがないので、その表示が出ていると思われるので、気にせずそのまま次へ。
f:id:iAtom:20201203162259j:plain

メッセージの強化、変換、フィルタ


そのまま「次へ」ボタンをクリックします。

ここでLambda関数など登録するようですが、気にせずそのまま次へいきます。


f:id:iAtom:20201203162709j:plain

処理されたメッセージをデータストアに保存

先ほどデータストアで作成したIDを選択し、データストアとパイプラインと関連付けをします。

最後に「パイプラインを作成する」ボタンをクリックします。


f:id:iAtom:20201203162909j:plain


パイプライン一覧に作成したパイプライン名称が表示されていることを確認できます。


f:id:iAtom:20201203163050j:plain

データセットの作成

データセットとは、データストアに保存しているデータをSQLを使って取り出して、データをセットします。

左メニュー画面で「データセット」を選択し、「データセットの作成」ボタンをクリックします。 何もチャネル登録されていない場合は、下記イメージ図が表示されます。


f:id:iAtom:20201203163610j:plain

データタイプの選択

SQLの作成」を選択します。


f:id:iAtom:20201203163730j:plain

IDおよびソースの設定

データセットIDを入力します。ここでは「dataSet0001」としています。なんでもいいです。

データストアソースの選択は、クリックするとプルダウンで選択できますので「datastore0001」を選択し、「次へ」ボタンをクリックします。


f:id:iAtom:20201203164006j:plain

SQLクエリの作成

すでに「SELECT * FROM datastore0001」と入力されているので、そのままで「次へ」のボタンをクリックします。


f:id:iAtom:20201203164245j:plain

データ選択フィルタの設定

特に設定はなく「次へ」のボタンをクリックします。


f:id:iAtom:20201203164420j:plain

クエリスケジュールの設定

定期的にデータ更新するスケジュールで、早く動作確認したいので頻度は一番早い時間「1分毎」にしておきます。


f:id:iAtom:20201203164707j:plain

分析の結果の設定

変更なく「90日」のままで「次へ」ボタンをクリックします。


f:id:iAtom:20201203165011j:plain

分析結果を配信するルールの設定

新たな機能らしくIoT EVENT通知ができるようです。今回は未使用でいきたいので「データセットの作成」ボタンをクリックします。


f:id:iAtom:20201203164944j:plain


データセットが一覧に表示されました。


f:id:iAtom:20201203165318j:plain


これで全部の設定が完了しましたので、次はRaspberry PiからCPU温度、メモリ使用量をPythonスクリプトで送信しデータセットできるか確認します。

Raspberry Pi設定と実行

証明書、キー情報、ルートCAアドレスファイル一式は以前のものを使用しますので割愛します。

フォルダ/ファイル構成

         |--- mqtt_analytics_test.py 
         |
         |---cert
                   |--- root.ca.pem                      #  ルートファイル
                   |--- xxxxxxxxxxx.cert.pem"      #証明書ファイル
                   |--- xxxxxxxxxxx.private.key"   #privateキーファイル

ソースコード

今回はGreengrassのLambda関数デプロイは使用せず、直接ソースコードRaspberry Piへ転送して使用しています。

AWS SNSでEmail通知の検証したソースコードに、Raspberry PiのCPU温度、メモリ使用料を追加したソースコードです。

topicには「チャネル作成」のIoT Coreトピックフィルタで設定した「topic/data」を設定しています。

下記ソースのhost(endpoint)は、AWS IoT Coreコンソールから確認できます。


f:id:iAtom:20201204093432j:plain


それでは、ソースコードです。

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import time
import json
import datetime
import logging
import subprocess
from subprocess import PIPE

# Setup Param
host = "xxxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"   ## --> 自分の環境に合わせてください。

rootCAPath = "./cert/root.ca.pem"    ## --> 自分の環境に合わせてください。
certificatePath = "./cert/xxxxxxxxxxx.cert.pem"   ## --> 自分の環境に合わせてください。
privateKeyPath = "./cert/xxxxxxxxxxx.private.key"   ## --> 自分の環境に合わせてください。

useWebsocket = False
clientId = "mqttAnaTest"
topic = "topic/data"
deviceNo = "10001"

# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
    myAWSIoTMQTTClient.configureEndpoint(host, 443)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
    myAWSIoTMQTTClient.configureEndpoint(host, 8883)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)

# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1)      # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2)            # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)    # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)         # 5 sec

# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
time.sleep(2)
# Initial

try:
    while True:
        # Raspberry Pi cpu温度
        proc = subprocess.run("cat /sys/class/thermal/thermal_zone0/temp", shell=True, stdout=PIPE, stderr=PIPE, text=True)
        temp = float(int(proc.stdout)/1000)

        # Raspberry Pi メモリ使用量
        proc = subprocess.run("vcgencmd get_mem arm", shell=True, stdout=PIPE, stderr=PIPE, text=True)
        cmdmemory = proc.stdout
        memory=cmdmemory.replace('arm=','')
        memory=memory.replace('M','')

        nowtime = datetime.datetime.now()
        nowsettime = int(time.mktime(nowtime.timetuple()))  # UNIX Time
        #send Command
        message = {}
        message['time'] = nowtime.strftime('%Y/%m/%d %H:%M:%S')
        message['CPU']  = temp
        message['memory'] = memory
        messageJson = json.dumps(message)
        print (messageJson)
        myAWSIoTMQTTClient.publish(topic, messageJson, 1)
        time.sleep(30)
except:
    import traceback
    traceback.print_exc()

print ("Terminated")

Raspberry PiSSH接続

自分はMACから接続するので、以前と同じIPアドレスで接続します。


    $ ssh pi@192.168.10.10

Pythonファイル実行

Pythonソースコードがファイルがあるディレクトリまで移動してください。 証明書やキー情報、ルートCAのファイル場所にもご注意ください。


    $python3 mqtt_analytics_test.py 

実行ログ表示


    2020-12-03 11:44:46,828 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Initializing MQTT layer...
    2020-12-03 11:44:46,830 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Registering internal event callbacks to MQTT layer...
    2020-12-03 11:44:46,830 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - MqttCore initialized
    2020-12-03 11:44:46,830 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Client id: mqttAnaTest
    2020-12-03 11:44:46,831 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Protocol version: MQTTv3.1.1
    2020-12-03 11:44:46,831 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Authentication type: TLSv1.2 certificate based Mutual Auth.
    2020-12-03 11:44:46,831 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring endpoint...
    2020-12-03 11:44:46,831 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring certificates...
    2020-12-03 11:44:46,832 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring reconnect back off timing...
    2020-12-03 11:44:46,833 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Base quiet time: 1.000000 sec
    2020-12-03 11:44:46,833 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Max quiet time: 32.000000 sec
    2020-12-03 11:44:46,833 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Stable connection time: 20.000000 sec
    2020-12-03 11:44:46,833 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queueing: max queue size: -1
    2020-12-03 11:44:46,834 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queue draining interval: 0.500000 sec
    2020-12-03 11:44:46,835 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring connect/disconnect time out: 10.000000 sec
    2020-12-03 11:44:46,835 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring MQTT operation time out: 5.000000 sec
    2020-12-03 11:44:46,835 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync connect...
    2020-12-03 11:44:46,835 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing async connect...
    2020-12-03 11:44:46,836 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Keep-alive: 600.000000 sec
    2020-12-03 11:44:46,837 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Event consuming thread started
    2020-12-03 11:44:46,838 - AWSIoTPythonSDK.core.protocol.mqtt_core - DEBUG - Passing in general notification callbacks to internal client...
    2020-12-03 11:44:46,838 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in fixed event callbacks: CONNACK, DISCONNECT, MESSAGE
    2020-12-03 11:44:46,972 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Starting network I/O thread...
    2020-12-03 11:44:47,051 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [connack] event
    2020-12-03 11:44:47,051 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [connack] event
    2020-12-03 11:44:47,052 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - No need for recovery
    2020-12-03 11:44:47,053 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
    {"time": "2020/12/03 11:44:49", "CPU": 35.938, "memory": "948\n"}
    2020-12-03 11:44:49,091 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
    2020-12-03 11:44:49,093 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback...
    2020-12-03 11:44:49,129 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event
    2020-12-03 11:44:49,130 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event
    2020-12-03 11:44:49,131 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
    2020-12-03 11:44:49,131 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation...
    2020-12-03 11:45:07,054 - AWSIoTPythonSDK.core.protocol.connection.cores - DEBUG - stableConnection: Resetting the backoff time to: 1 sec.
    {"time": "2020/12/03 11:45:19", "CPU": 35.4, "memory": "948\n"}
    2020-12-03 11:45:19,198 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
    2020-12-03 11:45:19,200 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback...
    2020-12-03 11:45:19,232 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event
    2020-12-03 11:45:19,233 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event
    2020-12-03 11:45:19,234 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
    2020-12-03 11:45:19,235 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation...
    {"time": "2020/12/03 11:45:49", "CPU": 35.4, "memory": "948\n"}
    2020-12-03 11:45:49,301 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
    2020-12-03 11:45:49,303 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback...
    2020-12-03 11:45:49,337 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event
    2020-12-03 11:45:49,338 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event
    2020-12-03 11:45:49,340 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
    2020-12-03 11:45:49,340 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation...
    {"time": "2020/12/03 11:46:19", "CPU": 35.4, "memory": "948\n"}

IoT Analytics受信データセット確認

データセットデータID「dataset0001」をクリックし、アクションで「今すぐ実行」を選択します。

実行後、結果のプレビューにCPU温度とメモリ使用量が表示されています。ただ、一緒にRaspberry Piから送信しているtimeの時間の並びが順番通りになっていない....。


f:id:iAtom:20201216191318j:plain


左メニューのコンテンツでもプレビューは確認できます。


f:id:iAtom:20201216191529j:plain


以上でSQLのデータセットは完了です。 Lambda関数を今後トライしようと思います。

参考

今回はAWS IoT Analyticsから設定した情報がAWS IoT Coreに自動展開されていました。


f:id:iAtom:20201204101456j:plain

コメント

勉強不足ですみません。データセットで並びが順番通りになっていないのは、時系列データベースではないためと思われます。 今度は時系列データベース「Amazon Timestream」をトライしてみます。

あと、AWS QuickSightも実施したいが、無料60日間限定なので時間が空いた時にでも。