Kafka Connector > Mappings for Kafka > Kafka sources in mappings
  

Kafka sources in mappings

To read messages from a Kafka topic, configure a Kafka object as the Source transformation in a mapping.
Specify the name and description of the Kafka source. Configure the source and advanced properties for the source object.
The following table describes the source properties that you can configure for a Kafka source:
Property
Description
Connection
Name of the active Kafka source connection.
Source Type
Type of the Kafka source objects available.
Select Single Object. You cannot read data from multiple objects.
To define a parameter, select Parameter as the source type, and then specify the parameter in the Parameter property.
Note: When you define a parameter, you must import the Kafka source in Binary or JSON format.
Object
Name of the Kafka source object based on the source type selected.
Parameter
Select a parameter for the source object, or click New Parameter to define a new parameter for the source object.
The Parameter property appears only if you select Parameter as the source type.
Format
The file format to read data from a Kafka topic.
You can select from the following file format types:
  • - None. Reads data from the Kafka topics in binary format.
  • - Avro. Reads data from the Kafka topics in Avro format that use the binary encoding type.
  • - JSON. Reads data from the Kafka topics in JSON format.
  • - Discover Structure². Determines the underlying patterns in a sample file and auto-generates a model for files with the same data and structure.
To configure the formatting options for the file, click Formatting Options. For more information about format options, see Formatting options for Kafka topics.
Note: You can read Snappy compressed records from the broker when you run the mapping in advanced mode.
Preview Data
Preview the data of the Kafka source object.
When you preview data, Data Integration displays the first 10 rows.
By default, Data Integration displays the fields in native order. To display the fields in alphabetical order, enable the Display source fields in alphabetical order option.
Intelligent Structure Model²
Applicable to the Discover Structure format type. Select the intelligent structure model to determine the underlying patterns and structures of the input that you provide for the model and create a model that can be used to transform, parse, and generate output groups.
Select one of the following options to associate a model with the transformation:
  • - Select. Select an existing model.
  • - New. Create a new model. Select Design New to create the model. Select Auto-generate from sample file for Intelligent Structure Discovery to generate a model based on sample input that you select.
For more information, see Components.
¹ Doesn't apply to mappings in advanced mode.
² Applies only to mappings in advanced mode.
The following table describes the advanced properties that you can configure for a Kafka source:
Property
Description
Start position offset
The position of the Kafka consumer from where the Kafka Connector starts reading Kafka messages from a Kafka topic.
You can select one of the following options:
  • - Custom. Read messages from a specific time.
  • - Earliest. Read all the messages available on the Kafka topic from the beginning.
  • - Latest¹. Read messages received by the Kafka topic after the mapping has been deployed.
Connector Mode
Specifies the mode to read data from the Kafka source.
You can select one of the following modes:
  • - Batch. The Secure Agent reads the messages available in a Kafka topic based on the position offset you specify. After the Secure Agent reads the data, the mapping terminates. When you configure a mapping for recovery, the Secure Agent reads data from the last checkpoint and stops reading data till the offset when the mapping recovery started.
  • - Realtime¹. The Secure Agent reads the messages available in a Kafka topic in real-time. The Secure Agent continues to read messages from the Kafka topic and does not terminate the mapping. You must manually terminate the mapping task.
Custom Start Position Timestamp
The time in GMT from when the Secure Agent starts reading Kafka messages from a Kafka topic.
Specify the time in the following format:
yyyy-mm-dd hh:mm:ss[.fff]
The milliseconds are optional.
This property applies only when you set the Start position offset property to Custom.
You can parameterize this property using an in-out or input parameter in a mapping in advanced mode.
Specify the parameter in the following format:
  • - In-out parameter: $$<parameter_name>
  • - Input parameter: $<parameter_name>$
Topic Pattern
A regular expression pattern for the topic name that you want to read from.
Use the regular expression syntax guidelines to specify the pattern.
Polling Interval
The time interval, in milliseconds, that the agent waits before retrieving records from the Kafka broker.
You can enter a positive integer value. The default is 1000 milliseconds.
The polling interval applies only to the mapping in which you set the value.
To set the polling interval for all mappings within the organization, you need to set the pollIntervalInMilliseconds property as a JVM option under the DTM type in the Secure Agent properties.
Consumer Configuration Properties
The configuration properties for the Kafka consumer.
These properties override the parameters you specified in the Additional Connection Properties or Additional Security Properties fields in the Kafka connection.
For example, you can configure the group.id consumer property at source to enable incremental load when you read from Kafka topics in a mapping in advanced mode.
For more information about incremental load and how to configure the group.id consumer property, see Configure incremental load to read from Kafka topics.
For more information about Kafka consumer configuration properties, see the Kafka documentation.
¹ Doesn't apply to mappings in advanced mode.
You can set the tracing level in the advanced properties to determine the amount of details that logs contain for a mapping. Tracing level is not applicable to mappings in advanced mode.
The following table describes the tracing levels that you can configure:
Property
Description
Terse
The Secure Agent logs initialization information, error messages, and notification of rejected data.
Normal
The Secure Agent logs initialization and status information, errors encountered, and skipped rows due to transformation row errors. Summarizes mapping results, but not at the level of individual rows.
Verbose Initialization
In addition to normal tracing, the Secure Agent logs additional initialization details, names of index and data files used, and detailed transformation statistics.
Verbose Data
In addition to verbose initialization tracing, the Secure Agent logs each row that passes into the mapping. Also notes where the Secure Agent truncates string data to fit the precision of a column and provides detailed transformation statistics.
When you configure the tracing level to verbose data, the Secure Agent writes row data for all rows in a block when it processes a transformation.

Configure a mapping to read data from Kafka in real-time

You can configure a mapping to read data from a Kafka topic in real-time.
To read data from a Kafka topic in real-time, perform the following tasks:
  1. 1In the Kafka source advanced properties, configure the Connector Mode as Realtime.
  2. 2Enable message recovery in the Advanced Session Properties section on the Runtime Options tab of the mapping task.
  3. For more information on how to configure the message recovery, see Configure message recovery strategy for a Kafka mapping task.
When you configure a mapping to read data from a Kafka topic in real-time, consider the following rule and guideline:
In the first mapping run, the mapping reads the messages in the Kafka topic based on the value you specify for the Start Position Offset source advanced property. In the subsequent mapping runs, the Recovery Strategy advanced session property overrides the Start Position Offset property and reads the messages from the last checkpoint.

Configure Informatica fixed partitions

When you read data from a Kafka source topic in real-time, you can configure Informatica fixed partitioning to optimize the mapping performance at run time.
When you configure Informatica fixed partitions on a Kafka source topic in a mapping, Kafka Connector treats each Informatica partition as an individual consumer within a topic and assigns a specific number of topic partitions to each Informatica partition.
Consider the number of messages to be processed in the mapping to determine an appropriate number of Informatica partitions for the mapping. You can specify up to 64 Informatica partitions.
Consider the following rules and guidelines when you configure Informatica partitions on a Kafka source topic in a mapping: