KafkaターゲットとKafka対応Azure Event Hubsターゲット
次のリストは、Kafkaターゲットを使用する際の考慮事項を示しています。
- •データベース取り込みとレプリケーションは、増分ロードジョブのターゲットとして、Apache Kafka、Confluent Kafka、Amazon Managed Streaming for Apache Kafka(MSK)、およびKafka対応Azure Event Hubsをサポートします。これらすべてのKafkaターゲットタイプは、Kafka接続タイプを使用します。
Kafkaターゲットタイプを指定するには、タスク定義またはKafka接続プロパティでKafkaプロデューサプロパティを指定する必要があります。タスクのこれらのプロパティを指定するには、タスクウィザードの[ターゲット]ページの[プロデューサ設定プロパティ]フィールドに、key:valueペアのカンマ区切りのリストを入力します。Kafka接続を使用するすべてのタスクのプロデューサプロパティを指定するには、プロパティのリストを接続プロパティの[追加接続プロパティ]フィールドに入力します。タスクレベルでプロデューサプロパティを定義することにより、特定のタスクの接続レベルのプロパティをオーバーライドできます。プロデューサプロパティの詳細については、Apache Kafka、Confluent Kafka、Amazon MSK、またはKafka用Azure Event Hubsのドキュメントを参照してください。
- • [AVRO]をKafkaターゲットの出力形式として選択した場合、データベース取り込みとレプリケーションは次の形式の名前で、各テーブルのスキーマ定義ファイルを生成します。
schemaname_tablename.txt
ソーススキーマの変更により、増分ロードジョブのターゲットが変更されることが予想される場合は、データベース取り込みとレプリケーションがタイムスタンプを含む一意の名前でAvroスキーマ定義ファイルを再生成します。
schemaname_tablename_YYYYMMDDhhmmss.txt
この一意の命名パターンにより、古いスキーマ定義ファイルが監査目的で保持されます。
- •Confluent Schema Registryを使用してスキーマを格納するConfluent Kafkaターゲットがある場合は、タスクウィザードの[ターゲット]ページで次の設定を行う必要があります。
- - [出力形式]フィールドで、[AVRO]を選択します。
- - [Avroシリアル化形式]フィールドで、[なし]を選択します。
- •Kafkaプロデューサプロパティをタスクウィザードの[ターゲット]ページの[プロデューサ設定プロパティ]フィールド、またはKafka接続プロパティの[追加接続プロパティ]フィールドいずれかで指定できます。ビジネスニーズに合うように、Kafkaベンダーによってサポートされているproperty=valueのペアを入力します。
例えば、Confluent Kafkaを使用する場合は、[プロデューサ設定プロパティ]フィールドまたは[追加接続プロパティ]フィールドで次のエントリを使用してスキーマレジストリのURLを指定し、基本認証を有効にすることができます。
schema.registry.url=http://schema-registry:8081,
key.serializer=org.apache.kafka.common.serialization.StringSerializer,
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer,
basic.auth.credentials.source=USER_INFO,
basic.auth.user.info=myname:mypassword
Amazon MSKを使用する場合は、次の[追加接続プロパティ]エントリを使用して、Amazon MSKターゲットにアクセスするためのIAMロール認証を有効にすることができます。
security.protocol=SASL_SSL,sasl.mechanism=AWS_MSK_IAM,sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;,sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
必ずSecure AgentがインストールされているAmazon EC2インスタンスでIAMロール認証を有効にしてください。
Kafkaプロパティの詳細については、Kafkaベンダーのドキュメントを参照してください。
- •データベース取り込みとレプリケーションの増分ロードジョブで、Confluent Kafka、Amazon MSK、およびAzure Event Hubsターゲットを含め、SASL_SSLで保護されたアクセスをサポートするKafkaターゲットに変更データをレプリケートできます。Administratorで、[追加接続プロパティ]フィールドの適切なプロパティをはじめとするKafka接続を設定する必要があります。例えば、Azure Event Hubsの場合、次の[追加接続プロパティ]エントリを使用して、SASL_SSLを有効にすることができます。
bootstrap.servers=NAMESPACENAME.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
- •2025年7月リリース以降、新しくデプロイされたジョブは、各メッセージのKafkaヘッダーにチェックポイント情報を送信します。ジョブが再開されると、チェックポイント情報はKafkaヘッダーから取得されます。この動作により、Kafkaターゲットを持つ新しくデプロイされたジョブをSecure Agentグループ内の別のSecure Agentで実行できるようになるため、高可用性が実現されます。2025年7月リリースより前は、チェックポイント情報はSecure Agentのチェックポイントファイルにのみ保持されていました。
Kafkaアクセス制御リスト(ACL)またはその他の制御を使用して、グループ名でKafkaターゲットへのアクセスを制限する場合、データベース取り込みおよびレプリケーションはデフォルトのコンシューマグループ名infaGroupを使用して、Kafkaターゲットの高可用性をサポートすることに注意してください。別の既存のコンシューマグループ名を使用する場合は、データベース取り込みおよびレプリケーションジョブがトピックを読み取れるようにするために、Kafka group.id設定プロパティで指定する必要があります。デフォルトのinfaGroup名を使用しない場合、または別の既存のグループ名を指定しない場合、データベース取り込みおよびレプリケーションタスクは次のエラーで失敗します。
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: <group_name>
Kafkaターゲットのカスタムメッセージキーの生成
Avro形式を使用するすべてのKafkaターゲットタイプに対し、ソーステーブルごとに1つ以上のカラムで構成されるカスタムメッセージキーを生成するルールを設定できます。ルールの設定後、Kafkaターゲットを持つデータベース取り込みとレプリケーションの増分ロードジョブは、ターゲットメッセージングシステムに送信するメッセージのヘッダーに、生成されたメッセージキーをを含めることができます。ターゲットメッセージングシステムはメッセージキーを使用して、特定のキー値を持つメッセージをマルチパーティショントピック内の同じパーティションに書き込むことができます。
この機能を実装するには、各ソーステーブルのキーカラムを識別するルールを含んだ構成ファイルを手動で作成する必要があります。次に、タスクウィザードのカスタム設定プロパティにファイルを指定します。
構成ファイルの作成
テキストエディタでルール構成ファイルを作成し、Secure Agentシステム上の場所に保存します。このファイルには、各ソーステーブルのルールが含まれています。各ルールは、トピックパーティションへのデータの書き込みに使用するカスタムキーカラムを定義します。
注: データベース取り込みとレプリケーションタスクがデプロイされた後にルールを変更または追加するか、他のパラメータのいずれかを変更した場合、ルールの変更を有効にするには、タスクを再デプロイする必要があります。
ルールの構文:
次の構文を使用して構成ファイルにルールを定義します。
rule=(schema.tablename,column1,column2,column3,… )
additional rules...
[tableNotFound=ABORT]
[trace={true|false}]
[delimiter=character]
ファイルにコメントを含めるには、各コメント行を番号(#)記号で始めます。例:
#This text is for informational purposes only.
パラメータ:
- •rule。ソーステーブルの複合メッセージキーを生成するためのルールを定義します。各ルールでは、最初にソーステーブルのスキーマとテーブル名を特定します。スキーマを変更するか、ターゲットのテーブルの名前変更ルールを定義する場合は、ターゲットのスキーマまたは名前が変更されたテーブルの名前を使用します。次に、メッセージキーを構成する1つ以上のテーブルカラムの名前を指定します。テーブルにカラムが定義されていることを確認します。カラムが定義されていないと、データベース取り込みジョブが失敗します。SQL Serverソースの場合は、データベースの名前もdatabaseの形式で含めます。schema。tablename。
同じルール構成ファイルに複数のルールを定義できます。
メッセージキーの生成時、データ取り込みおよびレプリケーションは、各カラム値の文字表現とそれに続く区切り文字を使用します。各カラムの値と区切り文字は、ルール定義にカラムが表示される順序で複合キー値に追加されます。その後、複合キーはレコードのKafkaメッセージキーとして使用されます。メッセージキーに空の値またはnull値があるカラムの位置は、区切り文字のみで表されます。
- •delimiter。オプション。生成されたメッセージキーの各キーカラムの値の後に区切り文字として使用される単一の文字を指定します。このパラメータは、ルール構成ファイルに1回だけ指定できます。
デフォルトはセミコロン(;)です。
- •tableNotFound。オプション。このパラメータをABORTに設定すると、データベース取り込みとレプリケーションジョブはソーステーブルのデータの処理を停止し、ルール構成ファイルにテーブルのルール定義がないと失敗します。各ソーステーブルには、複合メッセージキーを適切に生成させるためのルール定義が必要です。このパラメータは構成ファイルに1回だけ指定できます。
このパラメータを指定せず、ルール構成ファイルにテーブルが見つからない場合は、ターゲットメッセージングシステムパラメータのデフォルトのルールによって、レコードに使用するキーが決定されます。
- •trace。オプション。ルール定義に基づくメッセージキー生成のトレースを有効または無効にします。有効な値は以下のとおりです。
- - true。ルール定義に基づくメッセージキー生成のトレースを有効にします。
- - false。ルール定義に基づくメッセージキー生成のトレースを無効にします。
このパラメータは、ルール構成ファイルに1回だけ指定できます。
デフォルトはfalseです。
サンプルルール:
rule=(testdb.ABC.DEPT,DEPTNO,DNAME)
tableNotFound=ABORT
trace=true
delimiter=;
このルールに基づいて生成されたキー出力の例:
1234;HR;
データベース取り込みおよびレプリケーションタスク設定
Kafkaターゲットを持つデータベース取り込みとレプリケーションの増分ロードタスクを作成する場合、次のオプションを設定して、カスタムメッセージキーの生成を有効にする必要があります。
- •タスクウィザードの[ターゲット]ページで、[テーブル名をトピック名として使用]チェックボックスがオフになっていることを確認します。次に、トピック名を[トピック名]フィールドに入力します。
- •[出力形式]フィールドで、[Avro]を選択します。[Avro形式]フィールドで任意のAvro形式を選択できます。
- •[カスタムプロパティ]でcaptureColumnValuesFileプロパティに、Secure Agentシステム上で作成したルール構成ファイルを指すパス値を指定します。