Raspberry Pi のAWS IoT通信データをデータベース(DynamoDB)へ挿入してみた!
本記事ではコアデバイス(Raspberry Pi)から送信したメッセージをAWS IoT Core経由でデータベース(DynamoDB)に挿入してみます。
- Amazon DynaoDBとは
- AWS構成図
- DynamoDBサービスにテーブル追加
- AWS IoTルール作成
- コアデバイス(Raspberry Pi)のPythonスクリプト編集と実行
- Pythonスクリプト実行とデータベース確認
Raspberry Piのインストールや証明書、ルートCA、キー情報は、「 Raspberry Pi でAWS IoT Greengrassを動かしてみた!(Part1)〜(Part3)」をご確認ください。
Amazon DynaoDBとは
Amazon DynamoDB は、どのような規模でも信頼性が高いパフォーマンスを維持できる、非リレーショナルデータベースです。 完全マネージド型、マルチリージョン、マルチマスターのデータベースで、レイテンシーを 10 ミリ秒未満に維持でき、 組み込みのセキュリティ、バックアップと復元、インメモリキャッシュを利用できます。
AWS構成図
DynamoDBサービスにテーブル追加
AWS IoTからのメッセージをDynamoDBに挿入するには、AWS DynamoDBに保存するためのテーブルを作成する必要があります。
DynamoDBの起動
AWSマネージメントコンソールから「DynamoDB」を起動します。
テーブル追加
「テーブル追加」ボタンをクリックします。
テーブル名は「iot_dynamodb_test」、パーテーションキーは「time」で文字列にします。
「作成」ボタンをクリックしてテーブルを作成します。
完了後に「iot_dynamodb_test」テーブルが生成されていることを確認します。
AWS IoTルール作成
AWS IoTコンソールの左メニュー 「ACT」 の「ルール」を選択し、「ルールの作成」ボタンをクリックします。
ルールの名前を「testRule」を入力し、ルールのステートメントを「SELECT * FROM 'testRule/time'」に変更します。
次にアクションを設定します。「アクションの追加」ボタンをクリックします。
今回はDynamoDBを使用してデータ保存のテストをするので、 「データベーステーブル(DynamoDBV2)の複数列にメッセージを分割する」を選択します。 その後「アクションの設定」ボタンをクリックします。
アクション設定でDynamoDBサービスでテーブル追加した「iot_dynamodb_test」を選択します。 その後、「ロールの作成」ボタンをクリックします。
新しいロールの作成で「AtomTestRole」(なんでもよい)を入力し、「ロールの作成」ボタンをクリックします。
「アクションの追加」ボタンをクリックします。
ルール作成の画面に戻るため、画面下の「ルールの作成」ボタンをクリックします。
ルール作成が完了後の画面は以下です。
これでAWS IoT ルール作成が完了しました。
コアデバイス(Raspberry Pi)のPythonスクリプト編集と実行
次はエッジ側のコアデバイス(Raspberry Pi)にPythonスクリプト作成と、証明書、ルートCA、キー情報をコピーします。
Raspberry PiへSSH接続
MACターミナルからRaspberry PiへSSH接続をします。 自分の環境に合わせてIPアドレスを変更してください。
# ssh pi@192.168.10.10
環境構築
AWSIoTPythonSDKをインストール「pip3 install AWSIoTPythonSDK」します。
$ pip3 install AWSIoTPythonSDK Looking in indexes: https://pypi.org/simple, https://www.piwheels.org/simple Collecting AWSIoTPythonSDK Using cached https://www.piwheels.org/simple/awsiotpythonsdk/AWSIoTPythonSDK-1.4.9-py3-none-any.whl Installing collected packages: AWSIoTPythonSDK Successfully installed AWSIoTPythonSDK-1.4.9
Pythonスクリプトファイル編集
次のソースはコピーして証明書、ルートCA、endpointパスなどを変更してください。 今回証明書、ルートCA、キー情報ファイルは、Raspberry Piの Piユーザがアクセスできる場所に配置し、「chgrp」,「chown」を変更しました。
尚、ソースコードはこちらからコピーして変更しました。情報提供いただき感謝です。
送信するメッセージ
- 年月日時分秒
- device番号:ここでは10001
- TTL
下記ソースのhost(endpoint)は、AWS IoT Coreコンソールから確認できます。
フォルダ/ファイル構成
|--- mqtt_dynamo_test.py | |---cert |--- root.ca.pem # ルートファイル |--- xxxxxxxxxxx.cert.pem" #証明書ファイル |--- xxxxxxxxxxx.private.key" #privateキーファイル
ソースコード
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient import time import json import datetime import logging # Setup Param host = "xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" ## -- >ここをAWS IoTのendpointを設定する rootCAPath = "./cert/root.ca.pem" ## --> ここにルートCAファイルパスを指定する certificatePath = "./cert/xxxxxxxxxx.cert.pem" ## --> ここに証明書ファイルパスを指定する privateKeyPath = "./cert/xxxxxxxxxx.private.key" ## --> ここにキー情報ファイルパスを指定する useWebsocket = False clientId = "testRule" topic = "testRule/time" deviceNo = "10001" # Configure logging logger = logging.getLogger("AWSIoTPythonSDK.core") logger.setLevel(logging.DEBUG) streamHandler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) # Init AWSIoTMQTTClient myAWSIoTMQTTClient = None if useWebsocket: myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True) myAWSIoTMQTTClient.configureEndpoint(host, 443) myAWSIoTMQTTClient.configureCredentials(rootCAPath) else: myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId) myAWSIoTMQTTClient.configureEndpoint(host, 8883) myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) # AWSIoTMQTTClient connection configuration myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20) myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) myAWSIoTMQTTClient.configureDrainingFrequency(2) myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) # Connect and subscribe to AWS IoT myAWSIoTMQTTClient.connect() time.sleep(2) # Initial try: while True: nowtime = datetime.datetime.now() nowsettime = int(time.mktime(nowtime.timetuple())) # UNIX Time #send Command message = {} message['time'] = nowtime.strftime('%Y/%m/%d %H:%M:%S') message['deviceNo'] = deviceNo message['ttl'] = nowsettime + 2592000 messageJson = json.dumps(message) print (messageJson) myAWSIoTMQTTClient.publish(topic, messageJson, 1) time.sleep(10) except: import traceback traceback.print_exc() print ("Terminated")
Pythonスクリプト実行とデータベース確認
それでは、Pythonスクリプトを実行しましょう。 その前に、「AWS DynamoDB」でデータベースに格納されることを見たいので、まずはそちらを開いておきましょう。
本ページで最初に登録したテーブル「iot_dynamodb_test」です。
Pythonスクリプト実行
ファイル名は「mqtt_dynamo_test.py」にしているので、そのファイルを実行します。
$ python3 mqtt_dynamo_test.py
実行した後にターミナルに表示するログです。
2020-12-01 11:41:33,238 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Initializing MQTT layer... 2020-12-01 11:41:33,240 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Registering internal event callbacks to MQTT layer... 2020-12-01 11:41:33,240 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - MqttCore initialized 2020-12-01 11:41:33,240 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Client id: testRule 2020-12-01 11:41:33,241 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Protocol version: MQTTv3.1.1 2020-12-01 11:41:33,241 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Authentication type: TLSv1.2 certificate based Mutual Auth. 2020-12-01 11:41:33,241 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring endpoint... 2020-12-01 11:41:33,241 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring certificates... 2020-12-01 11:41:33,242 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring reconnect back off timing... 2020-12-01 11:41:33,242 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Base quiet time: 1.000000 sec 2020-12-01 11:41:33,243 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Max quiet time: 32.000000 sec 2020-12-01 11:41:33,243 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Stable connection time: 20.000000 sec 2020-12-01 11:41:33,243 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queueing: max queue size: -1 2020-12-01 11:41:33,244 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queue draining interval: 0.500000 sec 2020-12-01 11:41:33,244 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring connect/disconnect time out: 10.000000 sec 2020-12-01 11:41:33,244 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring MQTT operation time out: 5.000000 sec 2020-12-01 11:41:33,245 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync connect... 2020-12-01 11:41:33,245 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing async connect... 2020-12-01 11:41:33,246 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Keep-alive: 600.000000 sec 2020-12-01 11:41:33,247 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Event consuming thread started 2020-12-01 11:41:33,247 - AWSIoTPythonSDK.core.protocol.mqtt_core - DEBUG - Passing in general notification callbacks to internal client... 2020-12-01 11:41:33,248 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in fixed event callbacks: CONNACK, DISCONNECT, MESSAGE 2020-12-01 11:41:33,383 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Starting network I/O thread... 2020-12-01 11:41:33,470 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [connack] event 2020-12-01 11:41:33,471 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [connack] event 2020-12-01 11:41:33,472 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - No need for recovery 2020-12-01 11:41:33,472 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback... {"time": "2020/12/01 11:41:35", "deviceNo": "10001", "ttl": 1609382495} 2020-12-01 11:41:35,476 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish... 2020-12-01 11:41:35,477 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback... 2020-12-01 11:41:35,577 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event 2020-12-01 11:41:35,578 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event 2020-12-01 11:41:35,579 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback... 2020-12-01 11:41:35,580 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation... {"time": "2020/12/01 11:41:45", "deviceNo": "10001", "ttl": 1609382505} 2020-12-01 11:41:45,587 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish... 2020-12-01 11:41:45,588 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback... 2020-12-01 11:41:45,624 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event 2020-12-01 11:41:45,625 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event 2020-12-01 11:41:45,625 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback... 2020-12-01 11:41:45,626 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation... 2020-12-01 11:41:53,472 - AWSIoTPythonSDK.core.protocol.connection.cores - DEBUG - stableConnection: Resetting the backoff time to: 1 sec. {"time": "2020/12/01 11:41:55", "deviceNo": "10001", "ttl": 1609382515} 2020-12-01 11:41:55,638 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish... 2020-12-01 11:41:55,639 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback... 2020-12-01 11:41:55,802 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event 2020-12-01 11:41:55,803 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event 2020-12-01 11:41:55,804 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback... 2020-12-01 11:41:55,805 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation... {"time": "2020/12/01 11:42:05", "deviceNo": "10001", "ttl": 1609382525} 2020-12-01 11:42:05,816 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish... 2020-12-01 11:42:05,818 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback... 2020-12-01 11:42:06,277 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event 2020-12-01 11:42:06,277 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event 2020-12-01 11:42:06,279 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback... 2020-12-01 11:42:06,279 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation... {"time": "2020/12/01 11:42:16", "deviceNo": "10001", "ttl": 1609382536} 2020-12-01 11:42:16,291 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish... 2020-12-01 11:42:16,292 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback... 2020-12-01 11:42:16,325 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event 2020-12-01 11:42:16,326 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event 2020-12-01 11:42:16,327 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback... 2020-12-01 11:42:16,327 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation... {"time": "2020/12/01 11:42:26", "deviceNo": "10001", "ttl": 1609382546} 2020-12-01 11:42:26,338 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish... 2020-12-01 11:42:26,340 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback... 2020-12-01 11:42:26,374 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event 2020-12-01 11:42:26,375 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event 2020-12-01 11:42:26,376 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback... 2020-12-01 11:42:26,377 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation...
DynamoDBのテーブル確認
Raspberry Piから送信したメッセージがデータベースを確認します。 DynamoDBテーブルの「iot_dynamodb_test」を選択し、上部タブの「項目」と「開始」ボタンをクリックすると、データベースに保存された情報が確認できました。
テーブルに保存されたデータをクリック(青色の日付)すると、送信したメッセージ情報が確認できます。
これで Raspberry Pi(Greengrass)-> AWS IoT Core -> DynamoDB のルートが確認できました。