Atom's tech blog

Raspberry Pi のAWS IoT通信データをデータベース(DynamoDB)へ挿入してみた!

f:id:iAtom:20201202142508j:plain

本記事ではコアデバイスRaspberry Pi)から送信したメッセージをAWS IoT Core経由でデータベース(DynamoDB)に挿入してみます。

Raspberry Piのインストールや証明書、ルートCA、キー情報は、「 Raspberry PiAWS IoT Greengrassを動かしてみた!(Part1)〜(Part3)」をご確認ください。

Amazon DynaoDBとは

Amazon DynamoDB は、どのような規模でも信頼性が高いパフォーマンスを維持できる、非リレーショナルデータベースです。 完全マネージド型、マルチリージョン、マルチマスターのデータベースで、レイテンシーを 10 ミリ秒未満に維持でき、 組み込みのセキュリティ、バックアップと復元、インメモリキャッシュを利用できます。


aws.amazon.com

AWS構成図

f:id:iAtom:20201203135905j:plain

DynamoDBサービスにテーブル追加

AWS IoTからのメッセージをDynamoDBに挿入するには、AWS DynamoDBに保存するためのテーブルを作成する必要があります。

DynamoDBの起動

AWSマネージメントコンソールから「DynamoDB」を起動します。


f:id:iAtom:20201130194536j:plain

テーブル追加

テーブル追加」ボタンをクリックします。


f:id:iAtom:20201130195510j:plain


テーブル名は「iot_dynamodb_test」、パーテーションキーは「time」で文字列にします。


f:id:iAtom:20201130200105j:plain
作成」ボタンをクリックしてテーブルを作成します。


f:id:iAtom:20201130200347j:plain

完了後に「iot_dynamodb_test」テーブルが生成されていることを確認します。


f:id:iAtom:20201130200846j:plain

AWS IoTルール作成

AWS IoTコンソールの左メニュー 「ACT」 の「ルール」を選択し、「ルールの作成」ボタンをクリックします。


f:id:iAtom:20201130201525j:plain

ルールの名前を「testRule」を入力し、ルールのステートメントを「SELECT * FROM 'testRule/time'」に変更します。


f:id:iAtom:20201130201722j:plain f:id:iAtom:20201130202341j:plain

次にアクションを設定します。「アクションの追加」ボタンをクリックします。


f:id:iAtom:20201130202251j:plain

今回はDynamoDBを使用してデータ保存のテストをするので、 「データベーステーブル(DynamoDBV2)の複数列にメッセージを分割する」を選択します。 その後「アクションの設定」ボタンをクリックします。


f:id:iAtom:20201130202859j:plain f:id:iAtom:20201130202935j:plain f:id:iAtom:20201130203007j:plain

アクション設定でDynamoDBサービスでテーブル追加した「iot_dynamodb_test」を選択します。 その後、「ロールの作成」ボタンをクリックします。


f:id:iAtom:20201130203513j:plain


新しいロールの作成で「AtomTestRole」(なんでもよい)を入力し、「ロールの作成」ボタンをクリックします。


f:id:iAtom:20201130203839j:plain

アクションの追加」ボタンをクリックします。


f:id:iAtom:20201130204800j:plain

ルール作成の画面に戻るため、画面下の「ルールの作成」ボタンをクリックします。


f:id:iAtom:20201130205102j:plain

ルール作成が完了後の画面は以下です。


f:id:iAtom:20201130205619j:plain

これでAWS IoT ルール作成が完了しました。

コアデバイスRaspberry Pi)のPythonスクリプト編集と実行

次はエッジ側のコアデバイスRaspberry Pi)にPythonスクリプト作成と、証明書、ルートCA、キー情報をコピーします。

Raspberry PiSSH接続

MACターミナルからRaspberry PiSSH接続をします。 自分の環境に合わせてIPアドレスを変更してください。


    # ssh pi@192.168.10.10

環境構築

AWSIoTPythonSDKをインストール「pip3 install AWSIoTPythonSDK」します。


    $ pip3 install AWSIoTPythonSDK
    Looking in indexes: https://pypi.org/simple, https://www.piwheels.org/simple
    Collecting AWSIoTPythonSDK
    Using cached https://www.piwheels.org/simple/awsiotpythonsdk/AWSIoTPythonSDK-1.4.9-py3-none-any.whl
    Installing collected packages: AWSIoTPythonSDK 
    Successfully installed AWSIoTPythonSDK-1.4.9

Pythonスクリプトファイル編集

次のソースはコピーして証明書、ルートCA、endpointパスなどを変更してください。 今回証明書、ルートCA、キー情報ファイルは、Raspberry Piの Piユーザがアクセスできる場所に配置し、「chgrp」,「chown」を変更しました。

尚、ソースコードこちらからコピーして変更しました。情報提供いただき感謝です。

送信するメッセージ

  • 年月日時分秒
  • device番号:ここでは10001
  • TTL

下記ソースのhost(endpoint)は、AWS IoT Coreコンソールから確認できます。


f:id:iAtom:20201204093432j:plain

フォルダ/ファイル構成

         |--- mqtt_dynamo_test.py
         |
         |---cert
                   |--- root.ca.pem                      #  ルートファイル
                   |--- xxxxxxxxxxx.cert.pem"      #証明書ファイル
                   |--- xxxxxxxxxxx.private.key"   #privateキーファイル
ソースコード
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import time
import json
import datetime
import logging

# Setup Param
host = "xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"   ##  -- >ここをAWS IoTのendpointを設定する

rootCAPath = "./cert/root.ca.pem"           ## --> ここにルートCAファイルパスを指定する
certificatePath = "./cert/xxxxxxxxxx.cert.pem"      ##  --> ここに証明書ファイルパスを指定する
privateKeyPath = "./cert/xxxxxxxxxx.private.key"   ## --> ここにキー情報ファイルパスを指定する

useWebsocket = False
clientId = "testRule"
topic = "testRule/time"
deviceNo = "10001"

# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
    myAWSIoTMQTTClient.configureEndpoint(host, 443)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
    myAWSIoTMQTTClient.configureEndpoint(host, 8883)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)

# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1)      
myAWSIoTMQTTClient.configureDrainingFrequency(2)           
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)    
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)         

# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
time.sleep(2)
# Initial

try:
    while True:
        nowtime = datetime.datetime.now()
        nowsettime = int(time.mktime(nowtime.timetuple()))  # UNIX Time
        #send Command
        message = {}
        message['time'] = nowtime.strftime('%Y/%m/%d %H:%M:%S')
        message['deviceNo'] = deviceNo
        message['ttl']  = nowsettime + 2592000
        messageJson = json.dumps(message)
        print (messageJson)
        myAWSIoTMQTTClient.publish(topic, messageJson, 1)
        time.sleep(10)
except:
    import traceback
    traceback.print_exc()

print ("Terminated")

Pythonスクリプト実行とデータベース確認

それでは、Pythonスクリプトを実行しましょう。 その前に、「AWS DynamoDB」でデータベースに格納されることを見たいので、まずはそちらを開いておきましょう。

本ページで最初に登録したテーブル「iot_dynamodb_test」です。


f:id:iAtom:20201201130634j:plain

Pythonスクリプト実行

ファイル名は「mqtt_dynamo_test.py」にしているので、そのファイルを実行します。

    $ python3 mqtt_dynamo_test.py

実行した後にターミナルに表示するログです。

    2020-12-01 11:41:33,238 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Initializing MQTT layer...
    2020-12-01 11:41:33,240 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Registering internal event callbacks to MQTT layer...
    2020-12-01 11:41:33,240 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - MqttCore initialized 
    2020-12-01 11:41:33,240 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Client id: testRule
    2020-12-01 11:41:33,241 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Protocol version: MQTTv3.1.1
    2020-12-01 11:41:33,241 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Authentication type: TLSv1.2 certificate based Mutual Auth.
    2020-12-01 11:41:33,241 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring endpoint...
    2020-12-01 11:41:33,241 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring certificates...
    2020-12-01 11:41:33,242 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring reconnect back off timing...
    2020-12-01 11:41:33,242 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Base quiet time: 1.000000 sec
    2020-12-01 11:41:33,243 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Max quiet time: 32.000000 sec
    2020-12-01 11:41:33,243 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Stable connection time: 20.000000 sec
    2020-12-01 11:41:33,243 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queueing: max queue size: -1
    2020-12-01 11:41:33,244 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queue draining interval: 0.500000 sec
    2020-12-01 11:41:33,244 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring connect/disconnect time out: 10.000000 sec
    2020-12-01 11:41:33,244 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring MQTT operation time out: 5.000000 sec
    2020-12-01 11:41:33,245 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync connect...
    2020-12-01 11:41:33,245 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing async connect...
    2020-12-01 11:41:33,246 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Keep-alive: 600.000000 sec
    2020-12-01 11:41:33,247 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Event consuming thread started
    2020-12-01 11:41:33,247 - AWSIoTPythonSDK.core.protocol.mqtt_core - DEBUG - Passing in general notification callbacks to internal client...
    2020-12-01 11:41:33,248 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in fixed event callbacks: CONNACK, DISCONNECT, MESSAGE
    2020-12-01 11:41:33,383 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Starting network I/O thread...
    2020-12-01 11:41:33,470 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [connack] event
    2020-12-01 11:41:33,471 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [connack] event
    2020-12-01 11:41:33,472 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - No need for recovery
    2020-12-01 11:41:33,472 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
    {"time": "2020/12/01 11:41:35", "deviceNo": "10001", "ttl": 1609382495}
    2020-12-01 11:41:35,476 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
    2020-12-01 11:41:35,477 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback...
    2020-12-01 11:41:35,577 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event
    2020-12-01 11:41:35,578 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event
    2020-12-01 11:41:35,579 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
    2020-12-01 11:41:35,580 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation...
    {"time": "2020/12/01 11:41:45", "deviceNo": "10001", "ttl": 1609382505}
    2020-12-01 11:41:45,587 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
    2020-12-01 11:41:45,588 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback...
    2020-12-01 11:41:45,624 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event
    2020-12-01 11:41:45,625 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event
    2020-12-01 11:41:45,625 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
    2020-12-01 11:41:45,626 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation...
    2020-12-01 11:41:53,472 - AWSIoTPythonSDK.core.protocol.connection.cores - DEBUG - stableConnection: Resetting the backoff time to: 1 sec.
    {"time": "2020/12/01 11:41:55", "deviceNo": "10001", "ttl": 1609382515}
    2020-12-01 11:41:55,638 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
    2020-12-01 11:41:55,639 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback...
    2020-12-01 11:41:55,802 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event
    2020-12-01 11:41:55,803 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event
    2020-12-01 11:41:55,804 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
    2020-12-01 11:41:55,805 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation...
    {"time": "2020/12/01 11:42:05", "deviceNo": "10001", "ttl": 1609382525}
    2020-12-01 11:42:05,816 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
    2020-12-01 11:42:05,818 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback...
    2020-12-01 11:42:06,277 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event
    2020-12-01 11:42:06,277 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event
    2020-12-01 11:42:06,279 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
    2020-12-01 11:42:06,279 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation...
    {"time": "2020/12/01 11:42:16", "deviceNo": "10001", "ttl": 1609382536}
    2020-12-01 11:42:16,291 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
    2020-12-01 11:42:16,292 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback...
    2020-12-01 11:42:16,325 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event
    2020-12-01 11:42:16,326 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event
    2020-12-01 11:42:16,327 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
2020-12-01 11:42:16,327 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation...
    {"time": "2020/12/01 11:42:26", "deviceNo": "10001", "ttl": 1609382546}
    2020-12-01 11:42:26,338 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
    2020-12-01 11:42:26,340 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback...
    2020-12-01 11:42:26,374 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event
    2020-12-01 11:42:26,375 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event
    2020-12-01 11:42:26,376 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
    2020-12-01 11:42:26,377 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation...

DynamoDBのテーブル確認

Raspberry Piから送信したメッセージがデータベースを確認します。 DynamoDBテーブルの「iot_dynamodb_test」を選択し、上部タブの「項目」と「開始」ボタンをクリックすると、データベースに保存された情報が確認できました。


f:id:iAtom:20201201131620j:plain

テーブルに保存されたデータをクリック(青色の日付)すると、送信したメッセージ情報が確認できます。


f:id:iAtom:20201201131840j:plain

これで Raspberry Pi(Greengrass)-> AWS IoT Core -> DynamoDB のルートが確認できました。