Raspberry Pi からCPU温度、メモリ使用量をAWS IoT Analyticsへデータセットしてみた!
本記事ではコアデバイス(Raspberry Pi)から送信したCPU温度、メモリ使用量をAWS IoT Core経由でIoT Analyticsへデータセットしてみます。
AWS IoT CoreやGreengrass設定は、以前使用した設定をそのまま使用するため、Raspberry Pi でAWS IoT Greengrassを動かしてみた!(Part1〜4)をご参照してください。
AWS IoT Analyticsとは
AWS IoT Analytics は、IoT デバイスからのデータ分析に必要なステップを自動化します。 AWS IoT Analytics 時系列データ・ストアにデータを格納して分析する前に、IoT データをフィルタリング、変換、および強化します。デバイスから必要なデータのみを収集して、数学的変換を適用してデータを処理し、処理されたデータを保存する前にデバイスの種類や場所などのデバイス固有のメタデータでデータを強化するサービスを設定できます。その後、組み込みの SQL クエリ エンジンを使用してクエリを実行してデータを分析したり、より複雑な分析や機械学習の推測を実行したりできます
AWS構成図
IoT Analytics登録
IoT Analytcisコンソール
すべてのサービスから「IoT Analytics」を入力し検索し、IoT Analytcisコンソールへアクセスします。
チャネル作成
チャネルとは、IoT Coreからのメッセージを受けるために必要な設定です。
左メニュー画面でチャネルを選択し、「チャネル作成」ボタンをクリックします。 一つもチャネル登録されていない場合は、下記イメージ図が表示されます。
チャネルID、データ保持時間の設定
チャネルIDは「channelID0001」としています。あと英大文字入れていますが、小文字として扱われるようです。 ストレージタイプはS3サービスを使用しないので「サービスマネージド型ストア」を選択(内容については調べた方がいいかもです)します。
rawデータを保存する期間は、とりあえずお試しでトライしているだけなので、最短の期間1年とし、「次へ」のボタンをクリックします。
IoT Coreからのメッセージのトピックフィルタを入力します。Raspberry Piから送信するトピックを「topic/data」にしますので、入力しておきます。その後、「メッセージ表示」ボタンをクリックします。
画面に「トピック接続はアクティブです」と表示されます。恐らくRaspberry Piとは接続できた状態なのでアクティブと表示されるのかもしれません。(間違っていたらすみません。)
まだRaspberry Piからメッセージを送信していないので、「このトピックでまだメッセージを受信していませんが.......」と表示されます。気にせずそのまま進みましょう。
次にIAMロール名を作成します。画面の説明にも書かれていますが、「このチャネルにメッセージを配置するアクセス権を付与するロールを作成する」必要があります。「新規作成」を選択してください。
新しいロールの作成をします。
名前に「analyticsRole」とします。名前はなんでもいいです。
「 ロールの作成」ボタンをクリックします。
画面が戻り、IAMロール名の箇所に「analyticsRole」が表示されます。
表示されない場合は、名前が入る位置をクリックするとプルダウン選択が表示されるかもしれません。
最後に「チャネルの作成」ボタンをクリックします。
作成が完了すると登録したチャネル名で一覧に表示されます。
先ほどIAMロール名を作成したので、IAMサービスで「analyticsRole」が登録されていることを確認します。
すべてのサービスから「IAM」をクリックします。
IAMコンソールの左メニューで「ロール」を選択すると、右画面で「analyticsRole」が登録されていることが確認できます。
データストアの作成
データストアとは、そのままデータを保持するために登録する設定です。 左メニュー画面でデータストアを選択し、「データストアの作成」ボタンをクリックします。 何もチャネル登録されていない場合は、下記イメージ図が表示されます。
データストアID およびデータ保護期間の設定
IDはこちらもなんでもいいです。ここでは「dataStore0001」としています。 ストレージタイプは「サービスマネージド型ストア」、保持期間は「1年」とします。 最後に「データのストアの作成」ボタンをクリックします。
作成が完了すると下記の画面に遷移します。
パイプラインの作成
パイプラインとは、チャネルとデータストアを関連付けするために作成します。またLambda関数で処理することもできるようです。 Lambda関数はもう少し勉強してからトライしてみます。
左メニュー画面でパイプラインを選択し、「パイプラインの作成」ボタンをクリックします。 何もチャネル登録されていない場合は、下記イメージ図が表示されます。
パイプラインIDおよびソースの設定
パイプラインIDもなんでもいいです。ここでは「pipeLine0001」としています。 パイプラインソースはチャネルIDを選択します。本投稿で作成した「channelid0001」を指定し、「次へ」のボタンをクリックします。
メッセージの属性の設定
そのまま「次へ」ボタンをクリックします。
「チャネルでサンプルメッセージが見つかりませんでした」と表示されていますが、まだメッセージデータがないので、その表示が出ていると思われるので、気にせずそのまま次へ。
メッセージの強化、変換、フィルタ
そのまま「次へ」ボタンをクリックします。
ここでLambda関数など登録するようですが、気にせずそのまま次へいきます。
処理されたメッセージをデータストアに保存
先ほどデータストアで作成したIDを選択し、データストアとパイプラインと関連付けをします。
最後に「パイプラインを作成する」ボタンをクリックします。
パイプライン一覧に作成したパイプライン名称が表示されていることを確認できます。
データセットの作成
データセットとは、データストアに保存しているデータをSQLを使って取り出して、データをセットします。
左メニュー画面で「データセット」を選択し、「データセットの作成」ボタンをクリックします。 何もチャネル登録されていない場合は、下記イメージ図が表示されます。
データタイプの選択
「SQLの作成」を選択します。
IDおよびソースの設定
データセットIDを入力します。ここでは「dataSet0001」としています。なんでもいいです。
データストアソースの選択は、クリックするとプルダウンで選択できますので「datastore0001」を選択し、「次へ」ボタンをクリックします。
SQLクエリの作成
すでに「SELECT * FROM datastore0001」と入力されているので、そのままで「次へ」のボタンをクリックします。
データ選択フィルタの設定
特に設定はなく「次へ」のボタンをクリックします。
クエリスケジュールの設定
定期的にデータ更新するスケジュールで、早く動作確認したいので頻度は一番早い時間「1分毎」にしておきます。
分析の結果の設定
変更なく「90日」のままで「次へ」ボタンをクリックします。
分析結果を配信するルールの設定
新たな機能らしくIoT EVENT通知ができるようです。今回は未使用でいきたいので「データセットの作成」ボタンをクリックします。
データセットが一覧に表示されました。
これで全部の設定が完了しましたので、次はRaspberry PiからCPU温度、メモリ使用量をPythonスクリプトで送信しデータセットできるか確認します。
Raspberry Pi設定と実行
証明書、キー情報、ルートCAアドレスファイル一式は以前のものを使用しますので割愛します。
フォルダ/ファイル構成
|--- mqtt_analytics_test.py | |---cert |--- root.ca.pem # ルートファイル |--- xxxxxxxxxxx.cert.pem" #証明書ファイル |--- xxxxxxxxxxx.private.key" #privateキーファイル
ソースコード
今回はGreengrassのLambda関数デプロイは使用せず、直接ソースコードをRaspberry Piへ転送して使用しています。
AWS SNSでEmail通知の検証したソースコードに、Raspberry PiのCPU温度、メモリ使用料を追加したソースコードです。
topicには「チャネル作成」のIoT Coreトピックフィルタで設定した「topic/data」を設定しています。
下記ソースのhost(endpoint)は、AWS IoT Coreコンソールから確認できます。
それでは、ソースコードです。
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient import time import json import datetime import logging import subprocess from subprocess import PIPE # Setup Param host = "xxxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" ## --> 自分の環境に合わせてください。 rootCAPath = "./cert/root.ca.pem" ## --> 自分の環境に合わせてください。 certificatePath = "./cert/xxxxxxxxxxx.cert.pem" ## --> 自分の環境に合わせてください。 privateKeyPath = "./cert/xxxxxxxxxxx.private.key" ## --> 自分の環境に合わせてください。 useWebsocket = False clientId = "mqttAnaTest" topic = "topic/data" deviceNo = "10001" # Configure logging logger = logging.getLogger("AWSIoTPythonSDK.core") logger.setLevel(logging.DEBUG) streamHandler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) # Init AWSIoTMQTTClient myAWSIoTMQTTClient = None if useWebsocket: myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True) myAWSIoTMQTTClient.configureEndpoint(host, 443) myAWSIoTMQTTClient.configureCredentials(rootCAPath) else: myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId) myAWSIoTMQTTClient.configureEndpoint(host, 8883) myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) # AWSIoTMQTTClient connection configuration myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20) myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing myAWSIoTMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) # 5 sec # Connect and subscribe to AWS IoT myAWSIoTMQTTClient.connect() time.sleep(2) # Initial try: while True: # Raspberry Pi cpu温度 proc = subprocess.run("cat /sys/class/thermal/thermal_zone0/temp", shell=True, stdout=PIPE, stderr=PIPE, text=True) temp = float(int(proc.stdout)/1000) # Raspberry Pi メモリ使用量 proc = subprocess.run("vcgencmd get_mem arm", shell=True, stdout=PIPE, stderr=PIPE, text=True) cmdmemory = proc.stdout memory=cmdmemory.replace('arm=','') memory=memory.replace('M','') nowtime = datetime.datetime.now() nowsettime = int(time.mktime(nowtime.timetuple())) # UNIX Time #send Command message = {} message['time'] = nowtime.strftime('%Y/%m/%d %H:%M:%S') message['CPU'] = temp message['memory'] = memory messageJson = json.dumps(message) print (messageJson) myAWSIoTMQTTClient.publish(topic, messageJson, 1) time.sleep(30) except: import traceback traceback.print_exc() print ("Terminated")
Raspberry PiへSSH接続
自分はMACから接続するので、以前と同じIPアドレスで接続します。
$ ssh pi@192.168.10.10
Pythonファイル実行
Pythonソースコードがファイルがあるディレクトリまで移動してください。 証明書やキー情報、ルートCAのファイル場所にもご注意ください。
$python3 mqtt_analytics_test.py
実行ログ表示
2020-12-03 11:44:46,828 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Initializing MQTT layer... 2020-12-03 11:44:46,830 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Registering internal event callbacks to MQTT layer... 2020-12-03 11:44:46,830 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - MqttCore initialized 2020-12-03 11:44:46,830 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Client id: mqttAnaTest 2020-12-03 11:44:46,831 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Protocol version: MQTTv3.1.1 2020-12-03 11:44:46,831 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Authentication type: TLSv1.2 certificate based Mutual Auth. 2020-12-03 11:44:46,831 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring endpoint... 2020-12-03 11:44:46,831 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring certificates... 2020-12-03 11:44:46,832 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring reconnect back off timing... 2020-12-03 11:44:46,833 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Base quiet time: 1.000000 sec 2020-12-03 11:44:46,833 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Max quiet time: 32.000000 sec 2020-12-03 11:44:46,833 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Stable connection time: 20.000000 sec 2020-12-03 11:44:46,833 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queueing: max queue size: -1 2020-12-03 11:44:46,834 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queue draining interval: 0.500000 sec 2020-12-03 11:44:46,835 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring connect/disconnect time out: 10.000000 sec 2020-12-03 11:44:46,835 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring MQTT operation time out: 5.000000 sec 2020-12-03 11:44:46,835 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync connect... 2020-12-03 11:44:46,835 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing async connect... 2020-12-03 11:44:46,836 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Keep-alive: 600.000000 sec 2020-12-03 11:44:46,837 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Event consuming thread started 2020-12-03 11:44:46,838 - AWSIoTPythonSDK.core.protocol.mqtt_core - DEBUG - Passing in general notification callbacks to internal client... 2020-12-03 11:44:46,838 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in fixed event callbacks: CONNACK, DISCONNECT, MESSAGE 2020-12-03 11:44:46,972 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Starting network I/O thread... 2020-12-03 11:44:47,051 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [connack] event 2020-12-03 11:44:47,051 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [connack] event 2020-12-03 11:44:47,052 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - No need for recovery 2020-12-03 11:44:47,053 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback... {"time": "2020/12/03 11:44:49", "CPU": 35.938, "memory": "948\n"} 2020-12-03 11:44:49,091 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish... 2020-12-03 11:44:49,093 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback... 2020-12-03 11:44:49,129 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event 2020-12-03 11:44:49,130 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event 2020-12-03 11:44:49,131 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback... 2020-12-03 11:44:49,131 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation... 2020-12-03 11:45:07,054 - AWSIoTPythonSDK.core.protocol.connection.cores - DEBUG - stableConnection: Resetting the backoff time to: 1 sec. {"time": "2020/12/03 11:45:19", "CPU": 35.4, "memory": "948\n"} 2020-12-03 11:45:19,198 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish... 2020-12-03 11:45:19,200 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback... 2020-12-03 11:45:19,232 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event 2020-12-03 11:45:19,233 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event 2020-12-03 11:45:19,234 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback... 2020-12-03 11:45:19,235 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation... {"time": "2020/12/03 11:45:49", "CPU": 35.4, "memory": "948\n"} 2020-12-03 11:45:49,301 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish... 2020-12-03 11:45:49,303 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback... 2020-12-03 11:45:49,337 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event 2020-12-03 11:45:49,338 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event 2020-12-03 11:45:49,340 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback... 2020-12-03 11:45:49,340 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation... {"time": "2020/12/03 11:46:19", "CPU": 35.4, "memory": "948\n"}
IoT Analytics受信データセット確認
データセットのデータID「dataset0001」をクリックし、アクションで「今すぐ実行」を選択します。
実行後、結果のプレビューにCPU温度とメモリ使用量が表示されています。ただ、一緒にRaspberry Piから送信しているtimeの時間の並びが順番通りになっていない....。
左メニューのコンテンツでもプレビューは確認できます。
以上でSQLのデータセットは完了です。
Lambda関数を今後トライしようと思います。
参考
今回はAWS IoT Analyticsから設定した情報がAWS IoT Coreに自動展開されていました。
コメント
勉強不足ですみません。データセットで並びが順番通りになっていないのは、時系列データベースではないためと思われます。 今度は時系列データベース「Amazon Timestream」をトライしてみます。
あと、AWS QuickSightも実施したいが、無料60日間限定なので時間が空いた時にでも。