Streaming Ingestion and Replication > Streaming Ingestion and Replication > Transformations in Streaming Ingestion and Replication
  

Transformations in Streaming Ingestion and Replication

Transformations are part of a streaming ingestion and replication task. Transformations represent the operations that you want to perform when ingesting streaming data.
Each transformation performs a specific function. For example, a Filter transformation filters data from the ingested data based on a specified condition.
When you create a streaming ingestion and replication task, adding a transformation is optional. Each transformation type has a unique set of options that you can configure.
You can use the following transformations in streaming ingestion and replication tasks:
You can add multiple transformations to a streaming ingestion and replication task. In such a case, the order of transformations is important because the source data undergoes each transformation in the given order. The output of one transformation becomes the input to the next one in the task flow.
In a streaming ingestion and replication task, you can add only one Combiner transformation and one Format Converter transformation. The Format Converter transformation must be the last transformation in the task flow. If the task includes both a Combiner transformation and a Format Converter transformation, the Format Converter transformation must be the last transformation in the task flow, preceded by the Combiner transformation.

Data formats

Each transformation type processes a specific format of incoming streaming data.
Streaming ingestion and replication transformations can process streaming data in the following formats:
If a task doesn't include a transformation, it consumes the incoming data in its original format.

Combiner transformation

A Combiner transformation combines multiple events from a streaming source into a single event based on the specified conditions.
A Combiner transformation processes binary data and JSON data. For JSON message formats, the Combiner transformation combines the incoming data into an array of data and returns JSON array objects as output. For binary message formats, it combines the incoming data based on the specified conditions.
In a streaming ingestion and replication task, you can add only one Combiner transformation. If the task includes both a Combiner transformation and a Format Converter transformation, the Format Converter transformation must be the last transformation in the task flow, preceded by the Combiner transformation. If the task doesn't include a Format Converter transformation, the Combiner transformation must be the last transformation in the task flow.
You can use one of the following conditions for a Combiner transformation:
For example, consider the following events:
If you use comma (,) as a delimiter, the Combiner transformation returns the following combined event:
Record created,Record Published
Note: When you process binary data with a Combiner transformation, you cannot use a regular expression as a delimiter.

Filter transformation

The Filter transformation filters data out of the incoming streaming events based on a specified filter condition.
You can filter data based on one or more conditions. For example, to work with data within a date range, you can create conditions to remove data based on the specified dates.

Format Converter transformation

The Format Converter transformation converts the data format of XML and JSON incoming messages to Parquet format, based on the specified conditions, before streaming them into the data lake.
You can add only one Format Converter transformation to a streaming ingestion task. The Format Converter transformation must be the last transformation in the task flow.
You can specify the date, time, and timestamp format of incoming data. If the format is not specified, it is considered in milliseconds since the epoch (Midnight, January 1, 1970, GMT).

Java transformation

A Java transformation runs the Java code to process incoming messages and send the processed data to another transformation or a target.
You can use the Java transformation to define simple or moderately complex transformation functionality. A Java transformation can process binary, JSON, and XML data.
Because you can import the Java code as snippets, you don't need to write an entire Java program. You can import a sample Java code and create and compile the Java transformation.
When you import a non-standard Java package, you must set a classpath for each JAR file or the class file directory associated with the Java package. You don't need to set a classpath for built-in Java packages. For example, java.io is a built-in Java package. If you import java.io, you don't need to set the classpath for it.
The Java transformation uses the inputData variable and the outputData variable to store the incoming data and outgoing data.
The following table shows the mapping between the data types:
Incoming data
Java data type
JSON
String
XML
String
Binary
Byte[]

Sample Java script for JSON

ClassPath: /<Secue Agent Location>/apps/Streaming_Ingestion_Agent/ext/json-simple-1.1.1.jar

########/* Import Code */########
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

########/* Main code */########
JSONParser parser = new JSONParser();
try {
JSONObject object = (JSONObject) parser.parse(inputData);
object.put("age", 23);
outputData = object.toJSONString();
} catch (ParseException e) {
throw new RuntimeException();
}
########/* inputData and outputData */########
inputData: {"name":"test"}
outputData: {"name":"test","age":23}

Sample Java script for binary

ClassPath:/<Secure Agent Location>/apps/Streaming_Ingestion_Agent/ext/binary-2.3.0.jar

########/* Import Code */########
import java.io.*;

########/* Main code */########
String temp = new String(inputData);
outputData = (temp+"-text").getBytes();
########/* inputData and outputData */########
inputData: Sample
outputData: Sample-text

Sample Java script for XML

/<Secure Agent Location>/apps/Streaming_Ingestion_Agent/ext/dom-0.9.4.jar


########/* Import Code */########
import java.io.*;
import javax.xml.parsers.*;
import javax.xml.transform.*;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import org.w3c.dom.*;


########/* Main code */########
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = null;
builder = factory.newDocumentBuilder();
StringBuilder xmlStringBuilder = new StringBuilder();
xmlStringBuilder.append(inputData);
ByteArrayInputStream input = new ByteArrayInputStream(xmlStringBuilder.toString().getBytes("UTF-8"));
Document doc = builder.parse(input);
Node entreprise = doc.getFirstChild();
Node employee = doc.getElementsByTagName("employee").item(0);
Element job = doc.createElement("job");
job.appendChild(doc.createTextNode("Commercial"));
employee.appendChild(job);
DOMSource domSource = new DOMSource(doc);
StringWriter writer = new StringWriter();
StreamResult result = new StreamResult(writer);
TransformerFactory tf = TransformerFactory.newInstance();
Transformer transformer = tf.newTransformer();
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
transformer.transform(domSource, result);
outputData = writer.toString();
} catch (Exception e) {}

########/* inputData and outputData */########
inputData: < entreprise > < employee id = "1" > < name > Alex < /name><age>25</age > < address > San Francisco < /address></employee > < /entreprise>
outputData: < entreprise > < employee id = "1" > < name > Alex < /name><age>25</age > < address > San Francisco < /address><job>Commercial</job > < /employee></entreprise >

Jolt transformation

Use the Jolt transformation to convert complex JSON data to simple JSON data.
The Jolt transformation provides a set of operations that perform the JSON-to-JSON data conversion. You can add multiple Jolt specifications sequentially (a chain) to an array of simple specifications to form an overall JSON-to-JSON transformation. Based on the specification, the Jolt transformation transforms the complex input structure to a simple JSON structure.

Python transformation

A Python transformation runs a Python script to transform incoming data from a streaming source.
A Python transformation processes binary, JSON, and XML data. The Python transformation uses two variables, inputData and outputData to store the incoming data and outgoing data.
The inputData variable stores incoming data of the XML and JSON message formats as string. It stores incoming data of binary message format as numpy.ndarray. Binary data in the inputData variable is encoded as ASCII characters. You must decode the data accordingly. Ensure that the Python transformation script handles non-ASCII characters present in the inputData variable.
The outputData variable stores outgoing data of the XML and JSON message formats as string. It stores outgoing data of binary message format as bytearray.
Before using a Python transformation, create a directory, Python home, to install Python. After installing Python in the Python home directory, ensure to install the third-party libraries, NumPy and Jep (Java Embedded Python) in the same directory as Python home. For more information about the Python installation steps, see the Knowledge Base article 000175168.
In one Secure Agent, you can't use two different versions of Python to run the same Python transformation.

Sample Python scripts for JSON

import json
temp=json.loads(inputData)
temp["name"]="Mr "+temp["name"]
outputData=json.dumps(temp)
###################################################################
inputData: { "name":"John", "age":30, "city":"New York"}
outputData: { "name":"Mr John", "age":30, "city":"New York"}

Sample Python scripts for binary

temp = ''.join(str(chr(c)) for c in inputData)
temp += " - this is edited again text"
outputData = bytearray(temp, 'utf-8')
###################################################################
inputData: Sample text
outputData: Sample text - this is edited again text

Sample Python scripts for XML

import xml.etree.ElementTree as ET
myroot = ET.fromstring(inputData)
for x in myroot:
if x.tag=="body":
x.tag="Msg"
xmlstr = ET.tostring(myroot)
outputData=xmlstr.decode('utf-8')
###################################################################
inputData: <note><to>You</to><from>Me</from><heading>Message</heading><body>Happy Coding</body></note>
outputData: <note><to>You</to><from>Me</from><heading>Message</heading><Msg>Happy Coding</Msg></note>

Splitter transformation

A Splitter transformation splits multiline messages or message arrays into separate messages based on the conditions that you specify before ingesting them into targets.
The Splitter transformation splits binary, JSON, and XML messages based on the condition that you specify and passes the separated messages into new files before ingesting them into targets. Use the Splitter transformation to split complex messages into logical components. For example, if a message contains an error code and error message separated by a comma, you can use the comma to separate the code and message into different files.
Binary messages
In binary message format, the Splitter transformation divides the messages based on line boundaries or byte sequence. The maximum number of lines determines the line boundaries. Each output split file contains no more than the configured number of lines or bytes. The default value for the line boundaries is 1. The default for the byte sequence is ','.
JSON messages
In JSON message format, the Splitter transformation divides a JSON file into separate files based on the array element specified by a JSONPath expression. Each generated file is comprised of an element of the specified array. The generated file is transferred to the downstream target or transformation in the task. If the specified JSONPath is not found or does not evaluate to an array element, the original file is routed to failure and no files are generated. The default JSONPath Expression is '$'.
XML messages
In XML message format, the Splitter transformation splits an XML message into many files based on the level of input depth. Each of these files contain a child or descendant of the original file.