Kafka targets and Kafka-enabled Azure Event Hubs targets
The following list identifies considerations for using Kafka targets:
•Database Ingestion and Replication supports Apache Kafka, Confluent Kafka, Amazon Managed Streaming for Apache Kafka (MSK), and Kafka-enabled Azure Event Hubs as targets for incremental load jobs. All of these Kafka target types use the Kafka connection type.
To indicate the Kafka target type, you must specify Kafka producer properties in the task definition or Kafka connection properties. To specify these properties for a task, enter a comma-separated list of key:value pairs in the Producer Configuration Properties field on the Target page of the task wizard. To specify the producer properties for all tasks that use a Kafka connection, enter the list of properties in the Additional Connection Properties field in the connection properties. You can override the connection-level properties for specific tasks by also defining producer properties at the task level. For more information about producer properties, see the Apache Kafka, Confluent Kafka, Amazon MSK, or Azure Event Hubs for Kafka documentation.
• If you select AVRO as the output format for a Kafka target, Database Ingestion and Replication generates a schema definition file for each table with a name in the following format:
schemaname_tablename.txt
If a source schema change is expected to alter the target in an incremental load job, Database Ingestion and Replication regenerates the Avro schema definition file with a unique name that includes a timestamp:
schemaname_tablename_YYYYMMDDhhmmss.txt
This unique naming pattern preserves older schema definition files for audit purposes.
•If you have a Confluent Kafka target that uses Confluent Schema Registry to store schemas, you must configure the following settings on the Target page of the task wizard:
- In the Output Format field, select AVRO.
- In the Avro Serialization Format field, select None.
•You can specify Kafka producer properties in either the Producer Configuration Properties field on the Target page of the task wizard or in the Additional Connection Properties field in the Kafka connection properties. Enter property=value pairs that meet your business needs and that are supported by your Kafka vendor.
For example, if you use Confluent Kafka, you can use the following entry in either the Producer Configuration Properties field or Additional Connection Properties field to specify the Schema Registry URL and enable basic authentication:
If you use Amazon MSK, you can use the following Additional Connection Properties entry to enable IAM role authentication for access to Amazon MSK targets:
Ensure that you enable IAM role authentication on the Amazon EC2 instance where the Secure Agent is installed.
For more information about Kafka properties, see the documentation of your Kafka vendor.
•Database ingestion and replication incremental load jobs can replicate change data to Kafka targets that support SASL_SSL secured access, including Confluent Kafka, Amazon MSK, and Azure Event Hubs targets. In Administrator, you must configure a Kafka connection that includes the appropriate properties in the Additional Connection Properties field. For example, for Azure Event Hubs, you could use the following Additional Connection Properties entry to enable SASL_SSL:
•Beginning with the July 2025 release, newly deployed jobs send checkpoint information in the Kafka header of each message. When the job is restarted, checkpoint information is retrieved from the Kafka header. This behavior allows newly deployed jobs that have a Kafka target to run on another Secure Agent in the Secure Agent group, which provides for high availabilitly. Prior to the July 2025 release, checkpoint information was persisted only in the checkpoint file in the Secure Agent.
If you use Kafka Access Control List (ACLs) or other controls to restrict access to the Kafka target by group name, be aware that Database Ingestion and Replication uses the default consumer group name of infaGroup to support high availability for the Kafka target. If you use another existing consumer group name, you must specify it in the Kafka group.id configuration property to enable database ingestion and replication jobs to read topics. If you do not use the default infaGroup name or specify another existing group name, database ingestion and replication tasks will fail with the following error:
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: <group_name>
Generating custom message keys for Kafka targets
For all Kafka target types that use the Avro format, you can configure rules to generate a custom message key that consists of one or more columns for each source table. Database ingestion and replication incremental load jobs that have a Kafka target can then include the generated message key in the headers of the messages that it sends to the target messaging system. The target messaging system can use the message key to write messages with a specific key value to the same partition in a multi-partitioned topic.
To implement this feature, you must manually create a configuration file that contains rules that identify the key columns for each source table. Then specify the file in a custom configuration property in the task wizard.
Configuration file creation
Create the rule-configuration file in a text editor and save it to a location on the Secure Agent system. The file contains a rule for each source table. Each rule defines the custom key column or columns to use for writing data to topic partitions.
Note:
If you change or add a rule or change one of the other parameters after the
database ingestion and replication
task has been deployed, you must redeploy the task for the rule change to take effect.
Rule syntax:
Use the following syntax to define rules in the configuration file:
To include comments in the file, begin each comment line with the number (#) sign. For example:
#This text is for informational purposes only.
Parameters:
•rule. Defines a rule for generating a composite message key for a source table. In each rule, first identify the schema and table name for a source table. If you change the schema or define table renaming rules for the target, use the name of the schema or renamed table on the target. Then specify the names of one or more table columns that comprise the message key. Ensure that the columns are defined in the table. Otherwise, the database ingestion job fails. For SQL Server sources, also include the name of the database in the format: database.schema.tablename.
You can define multiple rules in the same rule-configuration file.
When generating the message key, Data Ingestion and Replication uses the character representation of each column value followed by the delimiter character. Each column value and delimiter are appended to the composite key value in the order in which the columns appear in the rule definition. The composite key is then used as the Kafka message key for a record. The position of any column that has empty or null values in the message key is represented by the delimiter character only.
•delimiter. Optional. Specifies a single character that will be used as the delimiter after each key column value in generated message keys. You can specify this parameter only once in the rule-configuration file.
Default is the semicolon (;).
•tableNotFound. Optional. Set this parameter to ABORT to cause database ingestion and replication jobs to stop processing any data for a source table and then fail when the table does not have a rule definition in the rule-configuration file. Each source table must have a rule definition for the generation of the composite message key to succeed. You can specify this parameter only once in the configuration file.
If you do not specify this parameter and the table is not found in the rule-configuration file, the default rule in the target messaging system parameters determines the key to use for a record.
•trace. Optional. Enables or disables tracing for the generation of message keys based on rule definitions. Valid values are:
- true. Enables tracing for message key generation based on rule definitions.
- false. Disables tracing for message key generation based on rule definitions.
You can specify this parameter only once in the rule-configuration file.
Database ingestion and replication task configuration
When you create a database ingestion and replication incremental load task that has a Kafka target, you must set the following options to enable the generation of custom message keys:
•On the Target page of the task wizard, make sure that the Use Table Name as Topic Name check box is cleared. Then enter the topic name in the Topic Name field.
•In the Output Format field, select Avro. You can select any Avro format in the Avro Format field.
•Under Custom Properties, specify the captureColumnValuesFile property with a path value that points to the rule-configuration file you created on the Secure Agent system.