You need to retrieve the latest price for every stock in your change data capture (CDC) system.
To retrieve the most recent price for each stock, create a mapping and add an Aggregator transformation and Joiner transformation.
•Configure the Aggregator transformation to group all the stocks by stock ID and calculate the latest trading time for each stock ID.
•Configure the Joiner transformation to perform a self join with the stock ID and trading time, using the source data and Aggregator transformation output.
The Target transformation writes the stock ID and price to the target file.
Source data
The following table shows the source data:
STOCK_ID
PRICE
VOLUME
TRADETIME
100
1.12
125
2023.01.20.1245
105
3.45
56
2023.01.18.0821
102
2.29
25
2023.01.19.1118
101
4.56
87
2023.01.17.0901
102
2.35
38
2023.01.20.1649
100
0.99
94
2023.01.20.1355
104
1.88
67
2023.01.20.1730
103
2.11
41
2023.01.17.1211
105
3.40
90
2023.01.18.1525
104
3.40
90
2023.01.20.1634
103
3.40
90
2023.01.20.1526
105
3.40
90
2023.01.19.1745
Mapping configuration
The following image shows the mapping that you configure:
Configure the transformations in the following ways:
Aggregator transformation
The following table describes the properties to configure in the Aggregator transformation:
Properties
Configuration
Incoming Fields
Configure a field rule to prefix all incoming fields with "agg_" so that the aggregated fields have unique names.
Group By
Group by the field agg_stock_id.
Aggregate
Create one aggregate field with the following properties to return the latest trading time for each stock ID:
- Field Type: Output Field
- Name: agg_maxtime
- Type: string
- Precision: 10
Configure the following expression: max(agg_tradetime)
Joiner transformation
In the Joiner transformation, configure a normal join with the following join conditions:
- agg_stock_id = STOCK_ID
- agg_maxtime = TRADETIME
Target transformation
The following table describes the properties to configure in the Target transformation:
Properties
Configuration
Incoming Fields
Configure the following field rules:
- Include all fields.
- Exclude Fields by Text or Pattern with the prefix "agg_".
The list of included fields shows all the source fields: PRICE, STOCK_ID, TRADETIME, VOLUME
Target Fields
Add the following target fields: STOCK_ID and PRICE
Field Mapping
Perform an automap with Exact Field Name. This maps the incoming fields STOCK_ID and PRICE to the target fields STOCK_ID and PRICE.
Target Data
The following table shows the data that the mapping writes to the target file:
STOCK_ID
PRICE
100
0.99
101
4.56
102
2.35
103
3.40
104
3.40
105
3.40
Since the fields are grouped by STOCK_ID in the Aggregator transformation, each stock ID is listed in one row with its most recent price. For example, the most recently traded price for stock 105 is $3.40.