Maximize Parallelism for Mappings and Profiles
If you have the partitioning option, you can enable the Data Integration Service to maximize parallelism when it runs mappings, runs column profiles, or performs data domain discovery. When you maximize parallelism, the Data Integration Service dynamically divides the underlying data into partitions and processes all of the partitions concurrently.
Note: When you run a profile job, the Data Integration Service converts the profile job into one or more mappings, and then can run those mappings in multiple partitions.
If mappings process large data sets or contain transformations that perform complicated calculations, the mappings can take a long time to process and can cause low data throughput. When you enable partitioning for these mappings, the Data Integration Service uses additional threads to process the mapping. Increasing the number of processing threads increases the load on the node where the mapping runs. If the node contains sufficient CPU bandwidth, concurrently processing rows of data in a mapping can optimize mapping performance.
By default, the Maximum Parallelism property is set to 1 for the Data Integration Service. When the Data Integration Service runs a mapping, it separates the mapping into pipeline stages and uses one thread to process each stage. These threads are allocated to reading, transforming, and writing tasks, and they run in parallel.
When you increase the maximum parallelism value, you enable partitioning. The Data Integration Service uses multiple threads to process each pipeline stage.
The Data Integration Service can create partitions for mappings that have physical data as input and output. The Data Integration Service can use multiple partitions to complete the following actions during a mapping run:
- •Read from flat file, IBM DB2 for LUW, or Oracle sources.
- •Run transformations.
- •Write to flat file, IBM DB2 for LUW, or Oracle targets.
One Thread for Each Pipeline Stage
When maximum parallelism is set to 1, partitioning is disabled. The Data Integration Service separates a mapping into pipeline stages and uses one reader thread, one transformation thread, and one writer thread to process each stage.
Each mapping contains one or more pipelines. A pipeline consists of a Read transformation and all the transformations that receive data from that Read transformation. The Data Integration Service separates a mapping pipeline into pipeline stages and then performs the extract, transformation, and load for each pipeline stage in parallel.
Partition points mark the boundaries in a pipeline and divide the pipeline into stages. For every mapping pipeline, the Data Integration Service adds a partition point after the Read transformation and before the Write transformation to create multiple pipeline stages.
Each pipeline stage runs in one of the following threads:
- •Reader thread that controls how the Data Integration Service extracts data from the source.
- •Transformation thread that controls how the Data Integration Service processes data in the pipeline.
- •Writer thread that controls how the Data Integration Service loads data to the target.
The following figure shows a mapping separated into a reader pipeline stage, a transformation pipeline stage, and a writer pipeline stage:
Because the pipeline contains three stages, the Data Integration Service can process three sets of rows concurrently and optimize mapping performance. For example, while the reader thread processes the third row set, the transformation thread processes the second row set, and the writer thread processes the first row set.
The following table shows how multiple threads can concurrently process three sets of rows:
Reader Thread | Transformation Thread | Writer Thread |
---|
Row Set 1 | - | - |
Row Set 2 | Row Set 1 | - |
Row Set 3 | Row Set 2 | Row Set 1 |
Row Set 4 | Row Set 3 | Row Set 2 |
Row Set n | Row Set (n-1) | Row Set (n-2) |
If the mapping pipeline contains transformations that perform complicated calculations, processing the transformation pipeline stage can take a long time. To optimize performance, the Data Integration Service adds partition points before some transformations to create an additional transformation pipeline stage.
Multiple Threads for Each Pipeline Stage
When maximum parallelism is set to a value greater than 1, partitioning is enabled. The Data Integration Service separates a mapping into pipeline stages and uses multiple threads to process each stage.
When you maximize parallelism, the Data Integration Service dynamically performs the following tasks at run time:
- Divides the data into partitions.
- The Data Integration Service dynamically divides the underlying data into partitions and runs the partitions concurrently. The Data Integration Service determines the optimal number of threads for each pipeline stage. The number of threads used for a single pipeline stage cannot exceed the maximum parallelism value. The Data Integration Service can use a different number of threads for each pipeline stage.
- Redistributes data across partition points.
- The Data Integration Service dynamically determines the best way to redistribute data across a partition point based on the transformation requirements.
The following image shows an example mapping that distributes data across multiple partitions for each pipeline stage:
In the preceding image, maximum parallelism for the Data Integration Service is three. Maximum parallelism for the mapping is Auto. The Data Integration Service separates the mapping into four pipeline stages and uses a total of 12 threads to run the mapping. The Data Integration Service performs the following tasks at each of the pipeline stages:
- •At the reader pipeline stage, the Data Integration Service queries the Oracle database system to discover that both source tables, source A and source B, have two database partitions. The Data Integration Service uses one reader thread for each database partition.
- •At the first transformation pipeline stage, the Data Integration Service redistributes the data to group rows for the join condition across two threads.
- •At the second transformation pipeline stage, the Data Integration Service determines that three threads are optimal for the Aggregator transformation. The service redistributes the data to group rows for the aggregate expression across three threads.
- •At the writer pipeline stage, the Data Integration Service does not need to redistribute the rows across the target partition point. All rows in a single partition stay in that partition after crossing the target partition point.
Maximum Parallelism Guidelines
Maximum parallelism determines the maximum number of parallel threads that can process a single pipeline stage. Configure the Maximum Parallelism property for the Data Integration Service based on the available hardware resources. When you increase the maximum parallelism value, you might decrease the amount of processing time.
Consider the following guidelines when you configure maximum parallelism:
- Increase the value based on the number of available CPUs.
Increase the maximum parallelism value based on the number of CPUs available on the nodes where mappings run. When you increase the maximum parallelism value, the Data Integration Service uses more threads to run the mapping and leverages more CPUs. A simple mapping runs faster in two partitions, but typically requires twice the amount of CPU than when the mapping runs in a single partition.
- Consider the total number of processing threads.
Consider the total number of processing threads when setting the maximum parallelism value. If a complex mapping results in multiple additional partition points, the Data Integration Service might use more processing threads than the CPU can handle.
The total number of processing threads is equal to the maximum parallelism value.
- Consider the other jobs that the Data Integration Service must run.
- If you configure maximum parallelism such that each mapping uses a large number of threads, fewer threads are available for the Data Integration Service to run additional jobs.
- Optionally change the value for a mapping.
By default, the maximum parallelism for each mapping is set to Auto. Each mapping uses the maximum parallelism value defined for the Data Integration Service.
In the Developer tool, developers can change the maximum parallelism value in the mapping run-time properties to define a maximum value for a particular mapping. When maximum parallelism is set to different integer values for the Data Integration Service and the mapping, the Data Integration Service uses the minimum value of the two.
Note: You cannot use the Developer tool to change the maximum parallelism value for profiles. When the Data Integration Service converts a profile job into one or more mappings, the mappings always use Auto for the mapping maximum parallelism value.
Enabling Partitioning for Mappings and Profiles
To enable partitioning for mappings, column profiles, and data domain discovery, set maximum parallelism for the Data Integration Service to a value greater than 1.
1. In the Administrator tool, click the Manage tab > Services and Nodes view.
2. In the Domain Navigator, select the Data Integration Service.
3. In the contents panel, click the Properties view.
4. In the Execution Options section, click Edit.
5. Enter a value greater than 1 for the Maximum Parallelism property.
6. Click OK.
7. Recycle the Data Integration Service to apply the changes.
Optimize Cache and Target Directories for Partitioning
For optimal performance during cache partitioning for Aggregator, Joiner, Rank, and Sorter transformations, configure multiple cache directories for the Data Integration Service. For optimal performance when multiple threads write to a file target, configure multiple target directories for the Data Integration Service.
When multiple threads write to a single directory, the mapping might encounter a bottleneck due to input/output (I/O) contention. An I/O contention can occur when threads write data to the file system at the same time.
When you configure multiple directories, the Data Integration Service determines the output directory for each thread in a round-robin fashion. For example, you configure a flat file data object to use directoryA and directoryB as target directories. If the Data Integration Service uses four threads to write to the file target, the first and third writer threads write target files to directoryA. The second and fourth writer threads write target files to directoryB.
If the Data Integration Service does not use cache partitioning for transformations or does not use multiple threads to write to the target, the service writes the files to the first listed directory.
In the Administrator tool, you configure multiple cache and target directories by entering multiple directories separated by semicolons for the Data Integration Service execution properties. Configure the directories in the following execution properties:
- Cache Directory
- Defines the cache directories for Aggregator, Joiner, and Rank transformations. By default, the transformations use the CacheDir system parameter to access the cache directory value defined for the Data Integration Service.
- Temporary Directories
- Defines the cache directories for Sorter transformations. By default, the Sorter transformation uses the TempDir system parameter to access the temporary directory value defined for the Data Integration Service.
- Target Directory
- Defines the target directories for flat file targets. By default, flat file targets use the TargetDir system parameter to access the target directory value defined for the Data Integration Service.
Instead of using the default system parameters, developers can configure multiple directories specific to the transformation or flat file data object in the Developer tool.
Note: A Lookup transformation can only use a single cache directory.