AMQP アプリケーション間や組織間でメッセージを渡す手段として広く使用されています。Vantiq は AMQP 1.0 データストリームの読み取りを直接サポートしています。

重要な注意事項 – デフォルトでは、RabbitMQは独自のプロトコルを使用します。 AMQP 0.9.1これはAMQP仕様の初期バージョンとして提案されたものですが、AMQP 1.0の最終バージョンとは互換性がありません。Vantiqが提供するAMQPサポートはAMQP 1.0向けです。  この独自仕様のバリアントはサポートされていません。ほとんどのキューイング製品では、AMQP 1.0 のサポートを明示的に有効化する必要があります。有効化の手順については、各製品のドキュメントをご覧ください。

統合の本質は次のとおりです。

  • 管理者はAMQP(Advanced Message Queuing Protocol)エンドポイントを特定することでAMQPソースを定義します。これは、以下に説明するupdateStreamリクエスト、または ヴァンティックIDE.
  • AMQP ソースが定義されると、サーバーは AMQP ソースからの受信メッセージを受け入れるための別のスレッドを構築します。
  • メッセージがソース エンドポイントに到着すると、サブスクライブされたルールの実行をトリガーするイベントが生成されます。 イベントは、イベントの ID で一時的なサブスクリプションを持つクライアントに配信される場合もあります。
  • ソース処理ルールでは、永続的な状態を Vantiq 自動化モデルに保存することが推奨されます。 これにより、ルール自体をステートレスにすることができ、クラスター全体でルールを実行したり、複数の Vantiq サーバー間で作業を分割したりするなど、さまざまな負荷分散アプローチのサポートが容易になります。

AMQP ソース表現

An AMQPソース 特定の AMQP データ ストリームとの統合を定義し、次のプロパティが含まれます。

  •  ユーザーがストリームに付けた名前
  • type 文字列でなければなりません AMQP これは AMQP ソースであることを示します。
  • 設定 追加の AMQP 構成パラメータを含む JSON オブジェクト:
    • サーバーURI ストリームをホストするサーバーのエンドポイント。 オプションで URI のリスト。
    • トピック ストリームがサブスクライブされているトピックのリスト。 個別のトピックまたは重複するトピックをサブスクライブする複数のストリームを単一のサーバー URI 上で定義できます。 Vantiq によるサブスクリプションには制限はありません。
    • ユーザ名 AMQP サーバーにアクセスするための資格情報
    • password AMQP サーバーにアクセスするための資格情報
    • パスワードの種類 パスワードがプレーンテキストパスワードか、実際のパスワードを含むシークレットへの参照かを指定します。

上記の設定プロパティは最もよく使用されますが、config JSONオブジェクトは Vert.x AMQP クライアント オプション 構成。したがって、 AmqpClientOptions ここで明示的にリストされていない場合でも、configオブジェクトに追加できます。例として、 SSLセットアップ のセクションから無料でダウンロードできます。

AMQPソースを作成する

次の例は、REST APIを使用してAMQPソースを作成する方法を示しています。AMQPソースは、 ヴァンティックIDE 使用して、 追加 ボタンを押して選択する ソース….

POST https://dev.vantiq.com/api/v1/resources/sources
{ 
    "name": "myAmqpSource",
    "type": "AMQP",
    "config": {
        "serverURIs"     : [ "amqp://localhost:5672" ],
        "topics"         : [ "com.accessg2.stream.amqp.example" ],
        "username"       : "guest",
        "password"       : "guest"
    }
}

あるいは、「MySecret」という名前のシークレット パスワードを使用するには、次のようにパスワード プロパティを参照に変更し、passwordType として「secret」を指定します。

POST https://dev.vantiq.com/api/v1/resources/sources
{ 
    "name": "myAmqpSource",
    "type": "AMQP",
    "config": {
        "serverURIs"     : [ "amqp://localhost:5672" ],
        "topics"         : [ "com.accessg2.stream.amqp.example" ],
        "username"       : "guest",
        "password"       : "/system.secrets/MySecret",
        "passwordType"   : "secret"
    }
}

ソースがメッセージの公開のみを目的としている場合は、 topics プロパティ。

AMQP ソースを削除する

AMQPソースの例 マイAmqpソース 次のリクエストを発行することで、REST API を使用して削除できます。

DELETE https://dev.vantiq.com/api/v1/resources/sources/myAmqpSource

AMQP 構成オプション

安全なサーバーのエンドポイントを指定するには、 amqps URI スキームとして。

"config": {
    "serverURIs"     : [ "amqps://localhost:5671" ],
    "topics"         : [ "com.accessg2.stream.amqp.example" ],
    "username"       : "guest",
    "password"       : "guest"
}

必要なソースを購読するには PLAIN SASL認証方法(例:Azureソース)、認証方法をリストします。 enabledSaslMechanisms 構成属性。

"config": {
    "serverURIs"     : [ "amqps://mysample.servicebus.windows.net:5671" ],
    "topics"         : [ "com.accessg2.stream.amqp.example" ],
    "username": "RootManageSharedAccessKey",
    "password": "1Y0MsS2TaiVZ6I+SHyQicKySI/TfSUJlICW/9AZ0lTk=",
    "enabledSaslMechanisms": [
        "PLAIN"
    ]
}

受信したAMQPメッセージにはcontentType値が含まれている場合があります。メッセージに指定されたcontentType値の使用を強制するには、属性設定を設定します。 useMessageContentType true に設定します。設定されている場合、メッセージに contentType が指定されていないか、メッセージの contentType 値が Vantiq システムでサポートされていない場合は、ソース構成の設定が使用されます。

SSLセットアップ

AMQPソースは、追加のSSL通信を指定することにより、片方向または双方向のSSL通信に設定できます。 Vert.x AMQP クライアント オプション 設定プロパティはJSON表現で表現されます。以下に使用例をいくつか示します。AMQP、MQTT、リモートソースのSSL設定プロパティは使用方法が同一であることにご注意ください。追加の例については、各ソースのSSL設定ドキュメントをご覧ください。

一方向SSL

このセクションの例では、AMQPブローカーがCAによって署名された証明書を使用してSSL経由で通信するように設定されていることを前提としています。CA証明書は、次の信頼ストアで入手できます。 sourceTrustStore.jksAMQP ソースは SSL 経由でブローカーにアクセスするように構成されており、ソースを構成するユーザーは信頼ストアにアクセスできます。

Vantiq サーバー (エッジ インストールなど) で読み取り可能なファイル システム上でトラスト ストアにアクセスできる場合、トラスト ストアは次のように指定できます。

{
    "serverURIs"    : [ "amqps://localhost:5671" ],
    "topics"        : [ "com.accessg2.stream.amqp.example" ],
    "username"      : "guest",
    "password"      : "guest", 
    "trustStoreOptions": {
        "path": "/path/to/sourceTrustStore.jks",
        "password": "my_store_password"
    }
}

リモート ソースが定義されているサーバーがトラスト ストアにアクセスできない場合は、トラスト ストアの内容を Base64 でエンコードされた値として指定できます。

# Copy/paste the following output to the value property below
$ cat /path/to/sourceTrustStore.jks | base64

{
    "serverURIs"    : [ "amqps://localhost:5671" ],
    "topics"        : [ "com.accessg2.stream.amqp.example" ],
    "username"      : "guest",
    "password"      : "guest", 
    "trustStoreOptions": {
        "value": "/u3+7QAAAAIAAAABAAAAAgAGY2F........SzpeAUc7WXDK1HOg==",
        "password": "my_store_password"
    }
}

値はシークレットとして保存することもできます。 構成で参照される,

# Copy/paste the following output to a Secret named SourceTrustStore
$ cat /path/to/sourceTrustStore.jks | base64
# Also define a Secret named SourceTrustStorePassword containing the trust store password

{
    "serverURIs"    : [ "amqps://localhost:5671" ],
    "topics"        : [ "com.accessg2.stream.amqp.example" ],
    "username"      : "guest",
    "password"      : "guest", 
    "trustStoreOptions": {
        "value": "@secrets(SourceTrustStore)",
        "password": "@secrets(SourceTrustStorePassword)"
    }
}

XNUMX つの CA 証明書が信頼され、PEM 形式のファイルからアクセスできると仮定すると、次の名前のファイルが ca-cert-1 の三脚と ca-cert-2,

{
    "serverURIs"    : [ "amqps://localhost:5671" ],
    "topics"        : [ "com.accessg2.stream.amqp.example" ],
    "username"      : "guest",
    "password"      : "guest", 
    "pemTrustOptions": {
        "certPaths": ["/path/to/ca-cert-1", "/path/to/ca-cert-2"]
    }
}

あるいは秘密を使って、

# Copy/paste the following output to a Secret named CertAuthority1
$ cat /path/to/ca-cert-1 | base64
# Copy/paste the following output to a Secret named CertAuthority2
$ cat /path/to/ca-cert-2 | base64

{
    "serverURIs"    : [ "amqps://localhost:5671" ],
    "topics"        : [ "com.accessg2.stream.amqp.example" ],
    "username"      : "guest",
    "password"      : "guest", 
    "pemTrustOptions": {
        "certValues": ["@secrets(CertAuthority1)", "@secrets(CertAuthority2)"]
    }
}

SSLクライアント認証

AMQP ソース構成では、信頼ストアの指定に加えて、クライアント証明書も指定できます。これは、ブローカーが相互認証用にセットアップされており、クライアントに証明書による認証を要求する場合に必要です。

以下の例では、キー ストアが次の名前であることを前提としています。 sourceKeyStore.jks CA によって署名されたクライアント証明書が含まれています。

Vantiq サーバーが読み取り可能なファイル システム上でキー ストアにアクセスできる場合、

{
    "serverURIs"    : [ "amqps://localhost:5671" ],
    "topics"        : [ "com.accessg2.stream.amqp.example" ],
    "trustStoreOptions": {
        "path": "/path/to/sourceTrustStore.jks",
        "password": "my_store_password"
    },
    "keyStoreOptions": {
        "path": "/path/to/sourceKeyStore.jks",
        "password": "my_keystore_password"
    }
}

トラスト ストアの例と同様に、パス構文を使用して指定されたキーストアは、base64 でエンコードされた値として指定できます。

たとえば、Secret 定義を使用すると、

# Copy/paste the following output to a Secret named SourceTrustStore
$ cat /path/to/sourceTrustStore.jks | base64
# Copy/paste the following output to a Secret named SourceKeyStore
$ cat /path/to/sourceKeyStore.jks | base64
# Also define the store passwords as Secrets (SourceTrustStorePassword and SourceKeyStorePassword)

{
    "serverURIs"    : [ "amqps://localhost:5671" ],
    "topics"        : [ "com.accessg2.stream.amqp.example" ],
    "trustStoreOptions": {
        "value": "@secrets(SourceTrustStore)",
        "password": "@secrets(SourceTrustStorePassword)"
    },
    "keyStoreOptions": {
        "value": "@secrets(SourceKeyStore)",
        "password": "@secrets(SourceKeyStorePassword)"
    }
}

Job Status ページの下部にある Vert.x AMQP クライアント オプション 構成オプションの完全なリストについては、ドキュメントを参照してください。 注: この文書では、次のことを参照しています。 Buffer Base64 でエンコードされた値を指定できることを意味します (trustStoreOptions など)。 どれでも add メソッドは配列 (pemTrustOptions など) に変換され、任意の set メソッドは単一のプロパティ設定 (パスや値など) に変換されます。

Azure 構成の例

AMQP Azure ソース構成を作成するには、Azure 共有アクセス ポリシーにある Azure 接続文字列の内容を使用します。

トピック ソース構成名 – 両方 topics ソース構成と topic メッセージをパブリッシュするには、Azure が想定する構文を使用してください。詳細については、Azure のドキュメントを参照してください。

Azure キュー

Azure名前空間を想定 vqsample、キュー名 vqs_queue そして政策 RootManageSharedAccessKey 次のプライマリ接続文字列を使用します。

Endpoint=sb://vqsample.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;
         SharedAccessKey=1Y0MsS2TaiVZ6I+SHyQicKySI/TfSUJlICW/9AZ0lTk=

AMQP ソース構成は次のとおりです。

{
    "contentType": "application/json",
    "serverURIs": [
        "amqps://vqsample.servicebus.windows.net:5671"
    ],
    "topics": [
        "vqs_queue"
    ],
    "username": "RootManageSharedAccessKey",
    "password": "1Y0MsS2TaiVZ6I+SHyQicKySI/TfSUJlICW/9AZ0lTk=",
    "enabledSaslMechanisms": [
        "PLAIN"
    ]
}

SASL メカニズムは PLAIN として定義し、セキュア ポート番号 (5671) を指定する必要があることに注意してください。

AMQPソース(つまりAzureキュー)にメッセージを公開するには、キュー名(例: vqs_queue) をトピック名として使用します。

Azureトピック

上記と同じ接続文字列を仮定すると、 vqs_topic サブスクリプション名 MySubAMQP ソース構成は次のとおりです。

{
    "contentType": "application/json",
    "serverURIs": [
        "amqps://vqsample.servicebus.windows.net:5671"
    ],
    "topics": [
        "vqs_topic/Subscriptions/MySub"
    ],
    "username": "RootManageSharedAccessKey",
    "password": "1Y0MsS2TaiVZ6I+SHyQicKySI/TfSUJlICW/9AZ0lTk=",
    "enabledSaslMechanisms": [
        "PLAIN"
    ]
}

なお、 topics プロパティ値の構文はAzureキューとAzureトピックで異なります。Azureトピックをリッスンする場合、 topics 構成プロパティの形式は次の通りです <topic_name>/Subscription/<subscription_name>

AzureトピックのAMQPソースにメッセージをパブリッシュするには、トピック名のみを使用します(例: vqs_topic).

Azureイベントハブ

Azure名前空間を想定 vqhubns、イベントハブ vqs_eventhub そして政策 EventUser 次の接続文字列を使用します。

Endpoint=sb://vqhubns.servicebus.windows.net/;SharedAccessKeyName=EventUser;
         SharedAccessKey=gCunEMpPqh8uZXqaSqoa87U4YEi1rHZfTp5bBWn3hLA=;EntityPath=vqs_eventhub

AMQP ソース構成は次のとおりです。

{
    "contentType": "application/json",
    "serverURIs": [
        "amqps://vqhubns.servicebus.windows.net:5671"
    ],
    "topics": [
        "vqs_eventhub/ConsumerGroups/$default/Partitions/0"
    ],
    "username": "EventUser",
    "password": "gCunEMpPqh8uZXqaSqoa87U4YEi1rHZfTp5bBWn3hLA=",
    "enabledSaslMechanisms": [
        "PLAIN"
    ]
}

AMQP ソース (つまりイベント ハブ) にメッセージを公開するには、トピック名として次のものを使用できます。

  • vqs_eventhub
  • または、パーティション 0 の場合: vqs_eventhub/Partitions/0
  • または、パブリッシャーエンドポイントへ: vqs_eventhub/Partitions/device1

パーティションオフセット

デフォルトでは、AMQPソースはEvent Hubsパーティションの先頭から読み取りを開始します。パーティションの末尾から読み取りを開始し、新しいイベントのみを受信するには、 seekToEnd 構成プロパティを true.

{
    "contentType": "application/json",
    "serverURIs": [
        "amqps://vqhubns.servicebus.windows.net:5671"
    ],
    "topics": [
        "vqs_eventhub/ConsumerGroups/$default/Partitions/0"
    ],
    "username": "EventUser",
    "password": "gCunEMpPqh8uZXqaSqoa87U4YEi1rHZfTp5bBWn3hLA=",
    "enabledSaslMechanisms": [
        "PLAIN"
    ],
    "seekToEnd": true
}

初期オフセットを細かく制御するには、 AmqpReceiverOptions selector 財産。 の AmqpReceiverOptions 設定は options 構成プロパティ、

...
    "enabledSaslMechanisms": [
        "PLAIN"
    ],
    "options": {
        "<topic_partition_name>": {
            <AmqpReceiverOptions>
        }
    }

たとえば、

{
    "contentType": "application/json",
    "serverURIs": [
        "amqps://vqhubns.servicebus.windows.net:5671"
    ],
    "topics": [
        "vqs_eventhub/ConsumerGroups/$default/Partitions/0"
    ],
    "username": "EventUser",
    "password": "gCunEMpPqh8uZXqaSqoa87U4YEi1rHZfTp5bBWn3hLA=",
    "enabledSaslMechanisms": [
        "PLAIN"
    ],
    "options": {
        "vqs_eventhub/ConsumerGroups/$default/Partitions/0": {
            "selector": "amqp.annotation.x-opt-offset >= '386547033'"
        }
    }
}

上記の設定により、パーティション オフセット 386547033 (含む) からイベントの読み取りが開始されます。

2つの特別な価値 -1 の三脚と @latest パーティションの先頭から読み取りを開始するか、末尾から読み取りを開始するかを指定できます。 amqp.annotation.x-opt-offset > '-1' の三脚と amqp.annotation.x-opt-offset > '@latest'指定することに注意してください seekToEnd 〜へ true は、 selector プロパティへ amqp.annotation.x-opt-offset > '@latest'.

Azure Event Hubsエンドポイントから受信したイベントには、イベントを通じてアクセスできるメッセージ注釈が含まれています。 properties プロパティ。次のメッセージ注釈が利用可能です。

プロパティ Data Type 詳細説明
x-opt-offset string イベントハブのパーティションストリームからのイベントのオフセット。オフセット識別子は、イベントハブストリームのパーティション内で一意です。
x-opt-sequence-number long イベント ハブのパーティション ストリーム内のイベントの論理シーケンス番号
x-opt-enqueued-time date イベントがキューに登録されたUTC時間

いずれかの財産 x-opt-offsetx-opt-sequence-number or x-opt-enqueued-time 指定するために使用される可能性がある selector クエリ。

たとえば、

{
    ...
    "options": {
        "vqs_eventhub/ConsumerGroups/$default/Partitions/0": {
            "selector": "amqp.annotation.x-opt-sequence-number >= 8362"
        }
    }
}

Event Hubs エンドポイントは Kafka エンドポイントも公開することに注意してください。上記のオプションではオフセット管理をきめ細かく制御できますが、自動チェックポイントオフセット管理を利用する Kafka ソースの使用も検討できます。通常、Kafka ソースは、非アクティブ化されてから再アクティブ化されると、最後にチェックポイントが設定されたオフセットからイベントを自動的に受信します。