KAFKA リファレンス ガイド
Apache Kafkaは、データ処理パイプラインを構築するための人気のオープンソースプロジェクトです。Vantiqは、Kafkaトピックのメッセージを消費および生成できるソースを介してKafkaと直接統合できます。Kafkaの詳細については、こちらをご覧ください。 Apacheドキュメント.
Kafka ソースを起動し、トピックに到着したメッセージを読み取るための基本的なプロセスは次のとおりです。
- 管理者は、bootstrap.serversエンドポイントと、メッセージが消費されるトピックを特定することでKafkaソースを定義します。これは、 ヴァンティックIDE 使用して、 追加 ボタンを押して選択する ソース….
- Kafka ソースが定義されると、サーバーは指定されたトピックにメッセージが到着するとリッスンを開始します。
- メッセージがソース エンドポイントに到着すると、サブスクライブされたルールの実行をトリガーするイベントが生成されます。 イベントは、イベントの ID で一時的なサブスクリプションを持つクライアントに配信される場合もあります。
- ソース処理ルールでは、永続的な状態を Vantiq 自動化モデルに保存することが推奨されます。 これにより、ルール自体をステートレスにすることができ、クラスター全体でルールを実行したり、複数の Vantiq サーバー間で作業を分割したりするなど、さまざまな負荷分散アプローチのサポートが容易になります。
Kafka ソース表現
A source リソースは、一連の Kafka トピックとの統合を定義し、次のプロパティを含みます。
- 名 ユーザーがストリームに付けた名前
- type 文字列でなければなりません カフカ これは Kafka ソースであることを示します。
- 設定 追加の Kafka 構成パラメータを含む JSON オブジェクト:
- ブートストラップ.サーバー コンシューマーを開始するために使用される Kafka クラスター内のブローカーのエンドポイント。
- 消費者トピック ストリームがメッセージを消費するトピックのリスト。
その Kafka ブローカー設定ドキュメント Kafkaソース設定にも適用できる多くの追加設定値を記述します。追加の設定プロパティは、設定オブジェクトで「ブートストラップ.サーバー(以下の例を参照)。コンシューマーにのみ適用される設定値には、「消費者。(例:「consumer.key.deserializer」)ですが、メッセージの生成にのみ適用される設定には「プロデューサー。” (例:「producer.key.serializer」)。
注意: 「max.in.flight.requests.per.connection」などの数値設定プロパティの値は、数値ではなく引用符で囲んだ文字列として表現する必要があります。ソース設定で値が数値として表現されている場合、以下のエラーが発生する可能性があります。 org.apache.kafka.common.config.ConfigException 型の不一致が原因です。このエラーが表示された場合は、ソース設定に数値が含まれていないか再度確認し、値を引用符で囲んでみてください。
Kafka ソースを作成する
次の例は、REST APIを使用してKafkaソースを作成する方法を示しています。Kafkaソースは、 ヴァンティックIDE 使用して、 追加 ボタンを押して選択する ソース….
POST https://dev.vantiq.com/api/v1/resources/sources
{
"name": "myKafkaSource",
"type": "KAFKA",
"config": {
"bootstrap.servers": "localhost:9092",
"consumer.topics": ["topic1", "topic2", "topicX"]
}
}
ソースがメッセージの公開のみを目的としている場合は、 consumer.topics プロパティ。
Kafka ソースを削除する
Kafkaソースの例 マイカフカソース 次のリクエストを発行することで、REST API を使用して削除できます。
DELETE https://dev.vantiq.com/api/v1/resources/sources/myKafkaSource
Kafka トピックに関するメッセージを作成する
VAILではKafkaトピックでメッセージが生成され、 出版 コマンド。 の 出版 Kafkaソースのリクエストには、送信する値(およびオプションでキー)、Kafkaソース、そしてパブリッシュの送信先トピックという少なくとも3つのパラメータが必要です。例えば、
PUBLISH { value: "somevalue", key: "somekey"} TO SOURCE myKafkaSource USING { topic: "topicX" }
上記の例では、公開オブジェクトのキーはオプションであることに注意してください。Kafkaソースからメッセージがコンシュームされるたびにルールをトリガーするには、次のルールトリガーを使用します。
RULE myKafkaRule
WHEN EVENT OCCURS ON "/sources/myKafkaSource" AS msg
log.info("Received message: {}", [msg])
重要な注意点として、Kafkaトピックから生成および消費されるキーと値は、デフォルトで文字列としてシリアル化およびデシリアル化されます。そのため、メッセージがVantiqシステム外部のKafkaトピックに書き込まれ、値がJSON形式であったとしても、メッセージがVantiqルールによって消費される際には、値は文字列としてエンコードされます。
メッセージの消費
Kafkaソースは、1つ以上のトピックからメッセージを消費するように構成され、 consumers.topics プロパティ。消費者グループは、 group.id プロパティ、デフォルトは <namespace>:<source_name>Kafka がトピックのパーティションをグループ コンシューマーに割り当てると、メッセージは最後にコミットされたオフセットから読み取られ、配信されます。
開発段階では、最後にコミットされたオフセットではなく、最後のオフセットからメッセージの読み取りを開始すると便利です。これにより、ソースがアクティブでない間にパブリッシュされたメッセージはスキップされます。これは、コンシューマー設定プロパティを指定することで実現できます。 seekToEnd そしてそれを true指定されていない場合、この構成プロパティはデフォルトで false.
たとえば、
{
"name": "myKafkaSource",
"type": "KAFKA",
"config": {
"bootstrap.servers": "localhost:9092",
"consumer.topics": ["topic1", "topic2"],
"consumer.seekToEnd": "true"
}
}
設定に注意してください seekToEnd Kafka はパーティション オフセットを最後に移動する前に一部のメッセージを配信する可能性があるため、見逃されたすべてのメッセージをスキップすることは保証されません。
QoSの
パブリッシュされたメッセージのデフォルトの QoS は次のとおりです。 AT_LEAST_ONCEこのデフォルトは、Kafkaの標準設定を使用して変更できます。例えば、ソース設定に以下を追加します。
"producer.acks": "0"
パブリッシャーのQoSを次のように変更します AT_MOST_ONCE.
KafkaコンシューマーのQoSは AT_LEAST_ONCE: Kafkaブローカーから受信したメッセージがVantiqサーバーによって確認される After 適切な処理が必要です。例えば、ソースが Delivery Mode of At Least Onceソース コンシューマーによって受信されたイベントは、Kafka ブローカーに確認される前に保持されます。
Kafka コンシューマー QoS AT_LEAST_ONCE KafkaブローカーとKafkaコンシューマー間のQoSを定義します。ソース Delivery Mode この設定は、KafkaコンシューマとVantiqリソース間のQoSを定義します。例えば、Kafkaソースからイベントを受信するルールでは、KafkaブローカーからKafkaコンシューマへのQoS(AT_LEAST_ONCE) と Kafka コンシューマーから Vantiq ルールへの QoS (Delivery Mode).
シリアル化と contentType
Kafka ソースは、シリアライザーとデシリアライザーを指定して、常にメッセージのワイヤ形式を定義します。シリアライザーはメッセージをパブリッシュするためのワイヤ形式を定義し、デシリアライザーはメッセージをコンシュームするための形式を定義します。
デフォルトでは、Kakfaソースは StringSerializer の三脚と StringDeserializer キーとメッセージ値の両方について。Kafkaソースで使用されるデフォルトの設定は、
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer"
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
このデフォルト設定を上書きして別のメッセージワイヤ形式を指定するには、 contentType 財産 または異なる値を定義する value.serializer および value.deserializer.
例えば、JSONメッセージを公開および受信するには、次のように定義します。
{
"name": "myKafkaSource",
"type": "KAFKA",
"config": {
"bootstrap.servers": "localhost:9092",
"consumer.topics": ["topic1", "topic2"],
"value.deserializer": "io.vertx.kafka.client.serialization.JsonObjectDeserializer",
"value.serializer": "io.vertx.kafka.client.serialization.JsonObjectSerializer"
}
}
上記の例ではキー設定が提供されていないため、デフォルトで StringSerializer の三脚と StringDeserializer.
コンテンツタイプ
構成プロパティを設定することで、メッセージワイヤ形式を指定できます。 contentType 次のいずれかの値に設定します。
text/plain
application/json
application/xml
プロパティ contentType ソース設定、ソースコンシューマ設定、ソースプロデューサ設定で指定できます。例えば、
{
"name": "myKafkaSource",
"type": "KAFKA",
"config": {
"bootstrap.servers": "localhost:9092",
"consumer.topics": ["topic1", "topic2"],
"contentType": "application/json"
}
}
プロデューサーとコンシューマーの両方のワイヤフォーマットをJSONに設定し、これは以下と同等です。
{
"name": "myKafkaSource",
"type": "KAFKA",
"config": {
"bootstrap.servers": "localhost:9092",
"consumer.topics": ["topic1", "topic2"],
"consumer.contentType": "application/json",
"producer.contentType": "application/json"
}
}
と、
{
"name": "myKafkaSource",
"type": "KAFKA",
"config": {
"bootstrap.servers": "localhost:9092",
"consumer.topics": ["topic1", "topic2"],
"contentType": "application/json",
"consumer.contentType": "text/plain"
}
}
送信メッセージのフォーマットをJSON、受信メッセージのフォーマットをStringに設定します。これは以下のコードと同じです。
{
"name": "myKafkaSource",
"type": "KAFKA",
"config": {
"bootstrap.servers": "localhost:9092",
"consumer.topics": ["topic1", "topic2"],
"consumer.contentType": "application/json",
"producer.contentType": "text/plain"
}
}
任意 value.serializer or value.deserializer 設定は contentType 設定。
consumer.contentType の三脚と producer.contentType 設定を上書きする contentType (上記の例を参照)。
どちらでもない場合 value.serializer/value.deserializer また contentType プロパティが定義されている場合、Kafka設定はデフォルトを使用します StringSerializer の三脚と StringDeserializer 設定は、 text/plain.
その contentType プロパティはメッセージワイヤフォーマットにのみ適用されます。キーワイヤフォーマットをデフォルトの文字列シリアライザから変更する必要がある場合は、明示的な key.serializer/key.deserializer 設定。
公開されたメッセージ(プロデューサー)の場合、メッセージ値の型は、 contentType 設定。JSONの場合、メッセージ値の型はVAILオブジェクトでなければなりません。テキストの場合は文字列でなければなりません。XMLの場合はXML構造を表すオブジェクト(例えば、 parseXml 解析手順)。
同様に、受信したメッセージ値(コンシューマー)は、その値によって暗示される型を持ちます。 contentType 設定: JSON オブジェクト表現 (VAIL オブジェクト)、文字列、または XML 構造を表すオブジェクト。
Azure Event Hubs の構成例
Kafka Azure ソース構成を作成するには、Azure 共有アクセス ポリシーにある Azure 接続文字列の内容を使用します。
以下の例ではAzure名前空間を想定しています vqhubns、イベントハブ vqs_eventhub そして政策 EventUser 次の接続文字列を使用します。
Endpoint=sb://vqhubns.servicebus.windows.net/;SharedAccessKeyName=EventUser;
SharedAccessKey=QzaPHKfvqH28m2n9OycDo2Y7o7Nrwkx8z3pXziNz2C0=
構成の詳細については、Azure のドキュメントを参照してください。
プロデューサー
上記の Azure イベント ハブにメッセージを公開するための Kafka ソース構成は次のとおりです。
{
"bootstrap.servers": "sb://vqhubns.servicebus.windows.net:9093",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://vqhubns.servicebus.windows.net/;SharedAccessKeyName=EventUser;SharedAccessKey=QzaPHKfvqH28m2n9OycDo2Y7o7Nrwkx8z3pXziNz2C0=\";",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL",
"request.timeout.ms": "25000",
"metadata.max.idle.ms": "180000",
"connections.max.idle.ms": "180000",
"metadata.max.age.ms": "180000"
}
生産者と消費者
上記の Azure Event Hub のプロデューサーとコンシューマーの両方の役割を果たす Kafka ソース構成は次のとおりです。
{
"bootstrap.servers": "sb://vqhubns.servicebus.windows.net:9093",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://vqhubns.servicebus.windows.net/;SharedAccessKeyName=EventUser;SharedAccessKey=QzaPHKfvqH28m2n9OycDo2Y7o7Nrwkx8z3pXziNz2C0=\";",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL",
"consumer.group.id": "$Default",
"producer.request.timeout.ms": "25000",
"producer.metadata.max.idle.ms": "180000",
"connections.max.idle.ms": "180000",
"metadata.max.age.ms": "180000",
"consumer.topics": [
"vqs_eventhub"
]
}
設定は以下に従う必要があることに注意してください Microsoftの推奨事項。 例えば connections.max.idle.ms 上記の構成では、Azure のアイドル状態の受信 TCP 接続のデフォルトのタイムアウトが考慮されています。このプロパティを設定すると、クライアントと Azure Event Hubs 間の通信がアイドル状態のままになった場合に、クライアントがクリーンな接続終了を開始できます。指定しない場合、Azure はまず接続を切断するため、クライアント接続がファントム状態になり、リクエストの発行時にタイムアウトが発生する可能性があります。クライアントは最終的に接続が終了したことを検出するまで、タイムアウトが発生します。
SSLセットアップ
Kafkaソースは、SSL経由で接続するように設定できます。 ssl.* 構成プロパティ.
適切な構成を提供することで、一方向 SSL と双方向 SSL の両方がサポートされます。
例えば、 kafka_client_truststore.jks サーバー証明書署名CAを含みます。
{
"name": "myKafkaSource",
"type": "KAFKA",
"config": {
"bootstrap.servers": "localhost:9093",
"consumer.topics": ["topic1", "topic2"],
"security.protocol": "SSL",
"ssl.truststore.location": "kafka_client_truststore.jks",
"ssl.truststore.password": "store_password"
}
}
また、クライアント証明書認証を要求するようにサーバーが適切に設定されていると仮定します(ssl.client.auth=required)とクライアント証明書を含むクライアントキーストア、
{
"name": "myKafkaSource",
"type": "KAFKA",
"config": {
"bootstrap.servers": "localhost:9093",
"consumer.topics": ["topic1", "topic2"],
"security.protocol": "SSL",
"ssl.truststore.location": "kafka_client_truststore.jks",
"ssl.truststore.password": "store_password",
"ssl.keystore.location": "kafka_client_keystore.jks",
"ssl.keystore.password": "keystore_password",
"ssl.key.password": "key_password"
}
}
のサポートに頼って SSL証明書と秘密鍵のPEM形式、暗号化キーと証明書の PEM 形式を使用して Kafka ソースを構成することができます。
たとえば、サーバ証明書署名CAがPEM形式であると仮定すると、上記の一方向構成は次のように表現できます。
"name": "myKafkaSource",
"type": "KAFKA",
"config": {
"bootstrap.servers": "localhost:9093",
"consumer.topics": ["topic1", "topic2"],
"security.protocol": "SSL",
"ssl.truststore.certificates": "-----BEGIN CERTIFICATE-----\nMIIDQj21........3xIRUUSGcUdTw==\n-----END CERTIFICATE-----",
"ssl.truststore.type": "PEM"
}
証明書のbase64表現は、 \n.
証明書の値はシークレットとして保存することもでき、 構成で参照される,
# With the base64 representation of the certificate stored as a Secret named Root_CA
"name": "myKafkaSource",
"type": "KAFKA",
"config": {
"bootstrap.servers": "localhost:9093",
"consumer.topics": ["topic1", "topic2"],
"security.protocol": "SSL",
"ssl.truststore.certificates": "-----BEGIN CERTIFICATE-----\n@secrets(Root_CA)\n-----END CERTIFICATE-----",
"ssl.truststore.type": "PEM"
}
そして、
# With the certificate PEM value stored as a Secret named PEM_Root_CA
"name": "myKafkaSource",
"type": "KAFKA",
"config": {
"bootstrap.servers": "localhost:9093",
"consumer.topics": ["topic1", "topic2"],
"security.protocol": "SSL",
"ssl.truststore.certificates": "@secrets(PEM_Root_CA)",
"ssl.truststore.type": "PEM"
}
PEM値でシークレットを作成する場合は、PEMヘッダーとフッターをエスケープするようにしてください。 \n 文字。
たとえば、
Name: PEM_Root_CA
Description: Root Certificate
Secret: -----BEGIN CERTIFICATE-----\\nMIIDQj21........3xIRUUSGcUdTw==\\n-----END CERTIFICATE-----