ストリーミング取り込みおよびレプリケーション > ストリーミング取り込みとレプリケーション > ストリーミング取り込みとレプリケーションのトランスフォーメーション
  

ストリーミング取り込みとレプリケーションのトランスフォーメーション

トランスフォーメーションはストリーミング取り込みとレプリケーションジョブの一部です。トランスフォーメーションは、ストリーミングデータを取り込むときに実行する操作を表します。
各トランスフォーメーションは特定の関数を実行します。たとえば、フィルタトランスフォーメーションは、指定された条件に基づいて、取り込んだデータからデータをフィルタします。
ストリーミング取り込みとレプリケーションタスクを作成する場合、トランスフォーメーションの追加はオプションです。各トランスフォーメーションタイプには、設定可能な固有のオプションのセットがあります。
ストリーミング取り込みとレプリケーションタスクでは次のトランスフォーメーションを使用できます。
複数のトランスフォーメーションをストリーミング取り込みとレプリケーションタスクに追加できます。このような場合、ソースデータは指定された順序で各トランスフォーメーションを通過するため、トランスフォーメーションの順序は重要です。1つのトランスフォーメーションの出力は、タスクフローの次のトランスフォーメーションへの入力になります。
ストリーミング取り込みとレプリケーションタスクでは、Combinerトランスフォーメーションと形式変換トランスフォーメーションをそれぞれ1つのみ追加できます。形式変換トランスフォーメーションは、タスクフローの最後のトランスフォーメーションである必要があります。タスクにCombinerトランスフォーメーションと形式変換トランスフォーメーションの両方が含まれている場合、形式変換トランスフォーメーションは、タスクフローの最後のトランスフォーメーションである必要があり、その前がCombinerトランスフォーメーションである必要があります。

データ形式

各トランスフォーメーションタイプは、特定の形式の受信ストリーミングデータを処理します。
ストリーミング取り込みとレプリケーショントランスフォーメーションでは、ストリーミングデータを次の形式で処理できます。
タスクにトランスフォーメーションが含まれていない場合、受信データは元の形式で使用されます。

Combinerトランスフォーメーション

Combinerトランスフォーメーションは、指定された条件に基づいて、ストリーミングソースからの複数のイベントを単一のイベントに結合します。
Combinerトランスフォーメーションは、バイナリデータとJSONデータを処理します。JSONメッセージ形式の場合、Combinerトランスフォーメーションは、受信データをデータの配列に結合し、JSON配列オブジェクトを出力として返します。バイナリメッセージ形式の場合、指定された条件に基づいて受信データを結合します。
ストリーミング取り込みとレプリケーションタスクでは、Combinerトランスフォーメーションを1つだけ追加できます。タスクにCombinerトランスフォーメーションと形式変換トランスフォーメーションの両方が含まれている場合、形式変換トランスフォーメーションは、タスクフローの最後のトランスフォーメーションである必要があり、その前がCombinerトランスフォーメーションである必要があります。タスクに形式変換トランスフォーメーションが含まれていない場合、Combinerトランスフォーメーションがタスクフローの最後のトランスフォーメーションである必要があります。
Combinerトランスフォーメーションには、次のいずれかの条件を使用できます。
たとえば、次のようなイベントがあるとします。
区切り文字としてカンマ(,)を使用すると、Combinerトランスフォーメーションは次の結合イベントを返します。
Record created,Record Published
注: Combinerトランスフォーメーションを使用してバイナリデータを処理する場合、正規表現を区切り文字として使用することはできません。

フィルタトランスフォーメーション

フィルタトランスフォーメーションは、指定されたフィルタ条件に基づいて受信ストリーミングイベントのデータをフィルタします。
データは1つ以上の条件に基づいてフィルタできます。たとえば、ある日付範囲内のデータを使用する場合は、指定された日付に基づいてデータを削除する条件を作成できます。

形式変換トランスフォーメーション

形式変換トランスフォーメーションは、XMLおよびJSON受信メッセージのデータ形式を、指定された条件に基づいてParquet形式に変換してから、データレイクにストリーミングします。
ストリーミング取り込みタスクに追加できる形式変換トランスフォーメーションは1つだけです。形式変換トランスフォーメーションは、タスクフローの最後のトランスフォーメーションである必要があります。
受信データの日付、時刻、およびタイムスタンプ形式を指定できます。形式が指定されていない場合は、エポック(1970年1月1日午前0時GMT)からのミリ秒単位と見なされます。

Javaトランスフォーメーション

Javaトランスフォーメーションは、Javaコードを実行して受信メッセージを処理し、処理したデータを別のトランスフォーメーションまたはターゲットに送信します。
Javaトランスフォーメーションを使用して、単純なトランスフォーメーション機能またはやや複雑なトランスフォーメーション機能を定義できます。Javaトランスフォーメーションは、バイナリ、JSON、およびXMLデータを処理できます。
Javaコードをスニペットとしてインポートできるため、Javaプログラム全体を作成する必要はありません。サンプルJavaコードをインポートし、Javaトランスフォーメーションを作成してコンパイルできます。
非標準のJavaパッケージをインポートする場合、各JARファイルのクラスパス、またはJavaパッケージに関連付けられたクラスファイルディレクトリのクラスパスを設定する必要があります。組み込みのJavaパッケージの場合、クラスパスを設定する必要はありません。たとえばjava.ioはビルトインJavaパッケージです。java.ioをインポートした場合、java.io用にクラスパスを設定する必要はありません。
JavaトランスフォーメーションはinputData変数とoutputData変数を使用して、受信データと送信データを格納します。
次の表は、データ型間のマッピングを示しています。
受信データ
Javaデータ型
JSON
String
XML
String
バイナリ
Byte[]

JSON用のサンプルJavaスクリプト

ClassPath: /<Secue Agent Location>/apps/Streaming_Ingestion_Agent/ext/json-simple-1.1.1.jar

########/* Import Code */########
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

########/* Main code */########
JSONParser parser = new JSONParser();
try {
JSONObject object = (JSONObject) parser.parse(inputData);
object.put("age", 23);
outputData = object.toJSONString();
} catch (ParseException e) {
throw new RuntimeException();
}
########/* inputData and outputData */########
inputData: {"name":"test"}
outputData: {"name":"test","age":23}

バイナリ用のサンプルJavaスクリプト

ClassPath:/<Secure Agent Location>/apps/Streaming_Ingestion_Agent/ext/binary-2.3.0.jar

########/* Import Code */########
import java.io.*;

########/* Main code */########
String temp = new String(inputData);
outputData = (temp+"-text").getBytes();
########/* inputData and outputData */########
inputData: Sample
outputData: Sample-text

XML用のサンプルJavaスクリプト

/<Secure Agent Location>/apps/Streaming_Ingestion_Agent/ext/dom-0.9.4.jar


########/* Import Code */########
import java.io.*;
import javax.xml.parsers.*;
import javax.xml.transform.*;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import org.w3c.dom.*;


########/* Main code */########
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = null;
builder = factory.newDocumentBuilder();
StringBuilder xmlStringBuilder = new StringBuilder();
xmlStringBuilder.append(inputData);
ByteArrayInputStream input = new ByteArrayInputStream(xmlStringBuilder.toString().getBytes("UTF-8"));
Document doc = builder.parse(input);
Node entreprise = doc.getFirstChild();
Node employee = doc.getElementsByTagName("employee").item(0);
Element job = doc.createElement("job");
job.appendChild(doc.createTextNode("Commercial"));
employee.appendChild(job);
DOMSource domSource = new DOMSource(doc);
StringWriter writer = new StringWriter();
StreamResult result = new StreamResult(writer);
TransformerFactory tf = TransformerFactory.newInstance();
Transformer transformer = tf.newTransformer();
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
transformer.transform(domSource, result);
outputData = writer.toString();
} catch (Exception e) {}

########/* inputData and outputData */########
inputData: < entreprise > < employee id = "1" > < name > Alex < /name><age>25</age > < address > San Francisco < /address></employee > < /entreprise>
outputData: < entreprise > < employee id = "1" > < name > Alex < /name><age>25</age > < address > San Francisco < /address><job>Commercial</job > < /employee></entreprise >

Joltトランスフォーメーション

Joltトランスフォーメーションを使用して、複雑なJSONデータを単純なJSONデータに変換します。
Joltトランスフォーメーションは、JSONからJSONへのデータ変換を実行する一連の操作を提供します。複数のJolt仕様を順番で(チェーン)単純な仕様の配列に追加して、全体的なJSONからJSONへのトランスフォーメーションを形成できます。仕様に基づいて、Joltトランスフォーメーションは複雑な入力構造を単純なJSON構造に変換します。

Pythonトランスフォーメーション

Pythonトランスフォーメーションは、Pythonスクリプトを実行して、ストリーミングソースからの受信データを変換します。
Pythonトランスフォーメーションは、バイナリデータ、JSONとXMLデータを処理します。Pythonトランスフォーメーションは、受信データと送信データを保存するためにinputDataoutputDataの2つの変数を使用します。
inputData変数は、XMLおよびJSONメッセージ形式の受信データを文字列として保存します。バイナリメッセージ形式の受信データはnumpy.ndarrayとして保存します。inputData変数のバイナリデータはASCII文字としてエンコードされます。それに応じてデータをデコードする必要があります。PythonトランスフォーメーションスクリプトがinputData変数に存在する非ASCII文字を処理することを確認してください。
outputData変数は、XMLおよびJSONメッセージ形式の送信データを文字列として保存します。バイナリメッセージ形式の送信データはbytearrayとして保存します。
Pythonトランスフォーメーションを使用する前に、Pythonをインストールするためのディレクトリ(Pythonホーム)を作成します。PythonをPythonホームディレクトリにインストールした後、サードパーティのライブラリであるNumPyとJep(Java Embedded Python)をPythonホームと同じディレクトリにインストールしてください。Pythonインストール手順の詳細については、ナレッジベース記事「000175168」を参照してください。
1つのSecure Agentでは、2つの異なるバージョンのPythonを使用して同じPythonトランスフォーメーションを実行することはできません。

JSON用のサンプルPythonスクリプト

import json
temp=json.loads(inputData)
temp["name"]="Mr "+temp["name"]
outputData=json.dumps(temp)
###################################################################
inputData: { "name":"John", "age":30, "city":"New York"}
outputData: { "name":"Mr John", "age":30, "city":"New York"}

バイナリ用のサンプルPythonスクリプト

temp = ''.join(str(chr(c)) for c in inputData)
temp += " - this is edited again text"
outputData = bytearray(temp, 'utf-8')
###################################################################
inputData: Sample text
outputData: Sample text - this is edited again text

XML用のサンプルPythonスクリプト

import xml.etree.ElementTree as ET
myroot = ET.fromstring(inputData)
for x in myroot:
if x.tag=="body":
x.tag="Msg"
xmlstr = ET.tostring(myroot)
outputData=xmlstr.decode('utf-8')
###################################################################
inputData: <note><to>You</to><from>Me</from><heading>Message</heading><body>Happy Coding</body></note>
outputData: <note><to>You</to><from>Me</from><heading>Message</heading><Msg>Happy Coding</Msg></note>

Splitterトランスフォーメーション

Splitterトランスフォーメーションは、複数行のメッセージまたはメッセージ配列を、ターゲットに取り込む前に指定した条件に基づいて個別のメッセージに分割します。
Splitterトランスフォーメーションは、指定した条件に基づいてバイナリ、JSON、およびXMLメッセージを分割し、分割されたメッセージを新しいファイルに渡してターゲットに取り込みます。Splitterトランスフォーメーションを使用して、複雑なメッセージを論理コンポーネントに分割します。たとえば、メッセージにコンマで区切られたエラーコードとエラーメッセージが含まれている場合、コンマを使用してコードとメッセージを異なるファイルに分けることができます。
バイナリメッセージ
バイナリメッセージ形式では、Splitterトランスフォーメーションは、行の境界またはバイトシーケンスに基づいてメッセージを分割します。最大行数によって行の境界が決まります。各出力分割ファイルには、設定された行数またはバイト数しか含まれていません。行の境界のデフォルト値は1です。バイトシーケンスのデフォルトは「,」です。
JSONメッセージ
JSONメッセージ形式では、Splitterトランスフォーメーションは、JSONPath式で指定された配列要素に基づいてJSONファイルを個別のファイルに分割します。生成された各ファイルは、指定された配列の要素で構成されます。生成されたファイルは、タスクのダウンストリームターゲットまたはトランスフォーメーションに転送されます。指定されたJSONPathが見つからないか、配列要素に評価されない場合、元のファイルはfailureにルーティングされ、ファイルは生成されません。デフォルトのJSONPath式は'$'です。
XMLメッセージ
XMLメッセージ形式では、Splitterトランスフォーメーションは、入力深度のレベルに基づいてXMLメッセージを多くのファイルに分割します。これらの各ファイルには、元のファイルの子または子孫が含まれています。