ストリーミング取り込みとレプリケーションタスクの更新
ストリーミング取り込みとレプリケーションタスクを更新するには、UpdateEntityリソースを使用します。Amazon Kinesis、Amazon S3 V2、Microsoft Azure Event Hub、Microsoft Azure Data Lake Storage Gen2、フラットファイル、JDBC V2、JMS、Kafka、またはMQTTのコネクタを使用するストリーミング取り込みとレプリケーションタスクを更新できます。
POST要求
POST要求を使用してストリーミング取り込みとレプリケーションタスクを更新します。
ストリーミング取り込みとレプリケーションタスクを更新するには、次のURLを使用します。
<server URI>/sisvc/restapi/v1/UpdateEntity/Documents('<document ID>')
要求に以下のフィールドを含めることができます。
フィールド | タイプ | 必須 | 説明 |
---|
name | String | ○ | タスクの名前。 |
description | String | - | タスクの説明。 |
runtimeId | String | ○ | ランタイム環境のID。 |
currentVersion | String | ○ | 最新のデータフローオブジェクトバージョン。 |
nodes | Array | ○ | タスクのソース接続とターゲット接続の詳細。 |
nodes配列のフィールド
配列内のフィールドは、接続の名前、タイプ、および接続IDを指定します。これには、編集可能なキーと値のペアであるソース接続とターゲット接続の構成が含まれます。nodes配列には以下のフィールドを含めることができます。
フィールド | タイプ | 必須 | 説明 |
---|
name | String | ○ | 接続の名前。 |
type | String | ○ | 接続タイプ、ソースまたはターゲット。 |
connectionId | String | ○ | 接続のID。 |
transformationType | String | - | 該当なし。 |
config | Array | ○ | ソース接続とターゲット接続の構成。 |
MQTTをソースとするタスクの接続設定
タスクソースのソース接続がMQTTの場合、ソース接続の構成配列に次のフィールドおよびキーと値のペアを含めることができます。
キー | タイプ | 必須 | 説明 |
---|
ClientID | String | - | MQTTソースとMQTTブローカ間の接続を識別する一意の識別子。クライアントIDは、MQTTソースがメッセージの処理中にメッセージを格納するために使用するファイルベースの永続ストアです。 255文字までの文字列を入力できます。 |
MaxQueueSize | Integer | - | プロセッサがメモリに保存できるメッセージの最大数。 1から2147483647までの数値を入力できます。 |
トピック | String | ○ | MQTTトピックの名前。 |
POST要求の例
MQTTソースとフラットファイルターゲットを使用するストリーミング取り込みとレプリケーションタスクを更新するには、次の例のような要求を送信します。
POST <serverUrl>/sisvc/restapi/v1/UpdateEntity/Documents('<document ID>')
Content-Type: application/json
Accept: application/json
IDS-SESSION-ID: <SessionId>
{
"name": "mqtt to flatfile",
"description": "mqtt to flatfile",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "mqtt to flatfile_source",
"type": "source",
"connectionId": "012MGS0B00000000001O",
"transformationType": "",
"config": [
{
"key": "ClientID",
"value": "test"
},
{
"key": "MaxQueueSize",
"value": 1024
},
{
"key": "Topic",
"value": "test"
}
]
},
{
"name": "mqtt to flatfile_target",
"type": "target",
"connectionId": "012MGS0B00000000002N",
"transformationType": "",
"config": [
{
"key": "interimDirectory",
"value": "/home/agent/test"
},
{
"key": "rolloverSize",
"value": 1024
},
{
"key": "rolloverEvents",
"value": 100
},
{
"key": "rolloverTime",
"value": 300000
},
{
"key": "File Name",
"value": "test"
}
]
}
],
"edges": [
{
"from": "mqtt to flatfile_source",
"to": "mqtt to flatfile_target"
}
]
}
JMSをソースとするタスクの接続設定
タスクソースのソース接続がJMSの場合、ソース接続の構成配列に次のフィールドおよびキーと値のペアを含めることができます。
キー | タイプ | 必須 | 説明 |
---|
destinationType | String | ○ | ソースサービスがJMSメッセージを送信する接続先のタイプ。次のいずれかの値を入力します。 - - [キュー]。JMSプロバイダは、キューに登録されている単一のコンシューマにメッセージを配信します。
- - [トピック]。JMSプロバイダは、トピックにサブスクライブしているすべてのアクティブなコンシューマにメッセージを配信します。
|
clientId | String | ○ | JMS接続の一意のID。255文字までの文字列を入力できます。 |
sharedSubscription | String | ○ | 複数のコンシューマが単一のサブスクリプションにアクセスできるようにします。トピックの接続先タイプに適用されます。次のいずれかの値を入力します。 |
durableSubscription | String | ○ | Trueに設定した場合、JMSソースサービスは、非アクティブなサブスクライバがメッセージを保持し、サブスクライバが再接続したときにそれらのメッセージを配信できるようにします。トピックの接続先タイプに適用されます。次のいずれかの値を入力します。 |
subscriptionName | String | ○ | サブスクリプションの名前。トピックサブスクリプションタイプが共有、継続、またはその両方である場合に、トピックの接続先タイプに適用されます。 |
JMS接続先 | String | ○ | JMSプロバイダがメッセージを配信するキューまたはトピックの名前。 |
POST要求の例
JMSソースとフラットファイルターゲットを使用するストリーミング取り込みとレプリケーションタスクを更新するには、次の例のような要求を送信します。
POST <serverUrl>/sisvc/restapi/v1/UpdateEntity/Documents('<document ID>')
Content-Type: application/json
Accept: application/json
IDS-SESSION-ID: <SessionId>
{
"name": "crud",
"description": "JMS to FileToFile",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "crud_source",
"type": "source",
"connectionId": "012MGS0B000000000003",
"transformationType": "",
"config": [
{
"key": "destinationType",
"value": "QUEUE"
},
{
"key": "clientId",
"value": ""
},
{
"key": "JMS Destination",
"value": "test"
}
]
},
{
"name": "crud_target",
"type": "target",
"connectionId": "012MGS0B00000000000H",
"transformationType": "",
"config": [
{
"key": "interimDirectory",
"value": "/home/agent/test"
},
{
"key": "rolloverSize",
"value": 1024
},
{
"key": "rolloverEvents",
"value": 100
},
{
"key": "rolloverTime",
"value": 300000
},
{
"key": "File Name",
"value": "test"
}
]
}
],
"edges": [
{
"from": "crud_source",
"to": "crud_target"
}
]
}
Microsoft Azure Data Lake Storage Gen2(ADLS Gen2)をターゲットとするタスクの接続設定
タスクターゲットのターゲット接続がADLS Gen2の場合、ターゲット接続の構成配列に次のフィールドおよびキーと値のペアを含めることができます。
キー | タイプ | 必須 | 説明 |
---|
writeStrategy | String | ○ | ADLS Gen2ストレージに同じ名前のファイルが存在する場合に実行するアクション。 次のいずれかの値を入力します。 - - 付加。既存のファイルにデータを追加します。
- - 上書き。既存のファイルを新しいファイルに置き換えます。
- - 失敗。要求を失敗させます。
- - ロールオーバー。現在のファイルを閉じ、設定されたロールオーバー値に基づいて新しいファイルを作成します。
|
rolloverSize * | Integer | - | ロールオーバーをトリガする際のターゲットファイルサイズ(KB)。ロールオーバー書き込みストラテジに適用されます。 1から2147483647までの数値を入力できます。 |
rolloverEvents * | Integer | - | ロールオーバーの前に蓄積するイベントまたはメッセージの数。ロールオーバー書き込みストラテジに適用されます。 1から2147483647までの数値を入力できます。 |
rolloverTime * | Integer | - | ロールオーバーをトリガするまでの時間(ミリ秒単位)。ロールオーバー書き込みストラテジに適用されます。 1から2147483647までの数値を入力できます。 |
filesystemNameOverride | String | - | 接続で提供されたデフォルトのファイルシステム名をオーバーライドします。このファイルシステム名は、実行時にファイルに書き込むために使用されます。 1280文字までの文字列を入力できます。 |
directoryOverride | String | - | デフォルトのディレクトリパスをオーバーライドします。データを書き込む先のADLS Gen2ディレクトリパス。空白の場合は、デフォルトのディレクトリパスが使用されます。 1280文字までの文字列を入力できます。 |
compressionFormat | String | - | ストリーミング取り込みタスクがターゲットファイルにデータを書き込む前に使用する圧縮形式。 次のいずれかの値を入力します。 |
ファイル名/式 | String | ○ | ADLS Gen2ファイル名または正規表現。 249文字までの文字列を入力できます。 |
* これらのフィールドの少なくとも1つに値を入力してください。 |
POST要求の例
フラットファイルソースとADLS Gen2ターゲットを使用するストリーミング取り込みとレプリケーションタスクを更新するには、次の例のような要求を送信します。
POST <serverUrl>/sisvc/restapi/v1/UpdateEntity/Documents('<document ID>')
Content-Type: application/json
Accept: application/json
IDS-SESSION-ID: <SessionId>
{
"name": "flatfile to adls",
"description": "flatfile to adls",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "flatfile to adls_source",
"type": "source",
"connectionId": "012MGS0B00000000002N",
"transformationType": "",
"config": [
{
"key": "File",
"value": "logfile"
},
{
"key": "initialPosition",
"value": "Current Time"
},
{
"key": "rolloverPattern",
"value": "test"
},
{
"key": "tailingMode",
"value": "Single file"
}
]
},
{
"name": "flatfile to adls_target",
"type": "target",
"connectionId": "012MGS0B00000000003D",
"transformationType": "",
"config": [
{
"key": "writeStrategy",
"value": "Rollover"
},
{
"key": "filesystemNameOverride",
"value": "test"
},
{
"key": "File Name/Expression",
"value": "test"
},
{
"key": "compressionFormat",
"value": "NONE"
},
{
"key": "directoryOverride",
"value": "/test"
},
{
"key": "interimDirectory",
"value": "/home/agent/test"
},
{
"key": "rolloverSize",
"value": 1024
},
{
"key": "rolloverEvents",
"value": 100
},
{
"key": "rolloverTime",
"value": 300000
}
]
}
]
}
Amazon S3をターゲットとするタスクの接続設定
タスクターゲットのターゲット接続がAmazon S3の場合、ターゲット接続の構成配列に次のフィールドおよびキーと値のペアを含めることができます。
キー | タイプ | 必須 | 説明 |
---|
partitionTime | String | - | ストリーミング取り込みタスクがAmazon S3バケットにパーティションを作成する際に従う時間間隔。 次のいずれかの値を入力します。 - - なし
- - 5分
- - 10分
- - 15分
- - 20分
- - 30分
- - 1時間
- - 1日
|
minUploadPartSize | Integer | - | 複数の独立したパートセットとしてサイズの大きなファイルをアップロードする場合の最小パートサイズ(MB単位)。このプロパティを使用して、ファイルのロードをAmazon S3に合わせます。 50から5120までの数値を入力できます。 |
multipartUploadThreshold | Integer | - | オブジェクトを複数のパートで並行してアップロードする場合のマルチパートしきい値。 50から5120までの数値を入力できます。 |
オブジェクト名/式 | String | ○ | Amazon S3ターゲットファイル名、またはAmazon S3ファイル名パターンの正規表現。 |
POST要求の例
フラットファイルソースとAmazon S3ターゲットを使用するストリーミング取り込みとレプリケーションタスクを更新するには、次の例のような要求を送信します。
POST <serverUrl>/sisvc/restapi/v1/UpdateEntity/Documents('<document ID>')
Content-Type: application/json
Accept: application/json
IDS-SESSION-ID: <SessionId>
{
"name": "flatfile to amazon S3",
"description": "flatfile to amazon S3",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "flatfile to amazon S3_source",
"type": "source",
"connectionId": "012MGS0B00000000002N",
"transformationType": "",
"config": [
{
"key": "File",
"value": "logfile"
},
{
"key": "initialPosition",
"value": "Current Time"
},
{
"key": "rolloverPattern",
"value": "test"
},
{
"key": "tailingMode",
"value": "Single file"
}
]
},
{
"name": "flatfile to amazon S3_target",
"type": "target",
"connectionId": "012MGS0B0000000000I7",
"transformationType": "",
"config": [
{
"key": "partitionTime",
"value": "None"
},
{
"key": "minUploadPartSize",
"value": 5120
},
{
"key": "multipartUploadThreshold",
"value": 5120
},
{
"key": "Object Name/Expression",
"value": "test"
}
]
}
],
"edges": [
{
"from": "flatfile to amazon S3_source",
"to": "flatfile to amazon S3_target"
}
]
}
Azure Event Hubsをターゲットとするタスクの接続設定
タスクターゲットのターゲット接続がAzure Event Hubsの場合、ターゲット接続の構成配列に次のフィールドおよびキーと値のペアを含めることができます。
キー | タイプ | 必須 | 説明 |
---|
sasPolicyName | String | - | Event Hub名前空間共有アクセスポリシーの名前。 255文字までの文字列を入力できます。 |
sasPolicyPrimaryKey | String | - | Event Hub名前空間共有アクセスポリシーのプライマリキー。 255文字までの文字列を入力できます。 |
Event Hub | String | ○ | Azure Event Hubsの名前。 255文字までの文字列を入力できます。名前には、小文字、大文字、数字、および特殊文字(「-」および「_」)を含めることができます。 |
POST要求の例
フラットファイルソースとAzure Event Hubsターゲットを使用するストリーミング取り込みとレプリケーションタスクを更新するには、次の例のような要求を送信します。
POST <serverUrl>/sisvc/restapi/v1/UpdateEntity/Documents('<document ID>')
Content-Type: application/json
Accept: application/json
IDS-SESSION-ID: <SessionId>
{
"name": "flatfile to azure event hub",
"description": "flatfile to azure event hub",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "flatfile to azure event hub_source",
"type": "source",
"connectionId": "012MGS0B00000000002N",
"transformationType": "",
"config": [
{
"key": "File",
"value": "logfile"
},
{
"key": "initialPosition",
"value": "Current Time"
},
{
"key": "rolloverPattern",
"value": "test"
},
{
"key": "tailingMode",
"value": "Single file"
}
]
},
{
"name": "flatfile to azure event hub_target",
"type": "target",
"connectionId": "012MGS0B00000000001S",
"transformationType": "",
"config": [
{
"key": "sasPolicyName",
"value": "test"
},
{
"key": "sasPolicyPrimaryKey",
"value": "test"
},
{
"key": "Event Hub",
"value": "test"
}
]
}
],
"edges": [
{
"from": "flatfile to azure event hub_source",
"to": "flatfile to azure event hub_target"
}
]
}
JDBC V2をターゲットとするタスクの接続設定
タスクターゲットのターゲット接続がJDBC V2の場合、ターゲット接続の構成配列に次のフィールドおよびキーと値のペアを含めることができます。
キー | タイプ | 必須 | 説明 |
---|
Table Name | String | ○ | JSON形式でデータを挿入するテーブルの名前。 988文字までの文字列を入力します。 |
POST要求の例
フラットファイルソースとJDBC V2ターゲットを使用するストリーミング取り込みとレプリケーションタスクを更新するには、次の例のような要求を送信します。
POST <serverUrl>/sisvc/restapi/v1/UpdateEntity/Documents('<document ID>')
Content-Type: application/json
Accept: application/json
IDS-SESSION-ID: <SessionId>
{
"name": "FileFile to jdbc",
"description": "FileToFile to jdbc_target",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "flatfile to jdbc_source",
"type": "source",
"connectionId": "012MGS0B00000000002N",
"transformationType": "",
"config": [
{
"key": "initialPosition",
"value": "Current Time"
},
{
"key": "tailingMode",
"value": "Single file"
},
{
"key": "rolloverPattern",
"value": "test"
},
{
"key": "File",
"value": "logfile"
}
]
},
{
"name": "flatfile to jdbc_target",
"type": "target",
"connectionId": "012MGS0B0000000000KF",
"transformationType": "",
"config": [
{
"key": "Table Name",
"value": "table"
}
]
}
],
"edges": [
{
"from": "flatfile to jdbc_source",
"to": "flatfile to jdbc_target"
}
]
}
Amazon Kinesis Streamsをソースおよびターゲットとするタスクの接続設定
タスクのソース接続とターゲット接続がAmazon Kinesis Streamsの場合、ソース接続とターゲット接続の構成配列に次のフィールドおよびキーと値のペアを含めることができます。
キー | タイプ | 必須 | 説明 |
---|
appendGUID | Boolean | | Amazon DynamoDBテーブル名にサフィックスとしてGUIDを追加するかどうかを指定します。 次のいずれかの値を入力します。 |
dynamoDB | String | | Kinesisソースデータのチェックポイントの詳細を保存するAmazon DynamoDBテーブル名。 128文字までの文字列を入力できます。 |
ストリーム | String | ○ | データを読み取る元のKinesis Streamの名前。 128文字までの文字列を入力します。 ソースノードに表示されます。 |
ストリーム名/式 | String | ○ | データを書き込む先のKinesis Stream名または正規表現。 128文字までの文字列を入力します。 ターゲットノードに表示されます。 |
POST要求の例
Amazon Kinesis StreamsソースとAmazon Kinesis Streamsターゲットを使用するストリーミング取り込みとレプリケーションタスクを更新するには、次の例のような要求を送信します。
POST <serverUrl>/sisvc/restapi/v1/UpdateEntity/Documents('<document ID>')
Content-Type: application/json
Accept: application/json
IDS-SESSION-ID: <SessionId>
{
"name": "kinesis to kinesis",
"description": "kinesis to kinesis",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "kinesis to kinesis_source",
"type": "source",
"connectionId": "012MGS0B00000000000F",
"transformationType": "",
"config": [
{
"key": "appendGUID",
"value": true
},
{
"key": "dynamoDB",
"value": "table"
},
{
"key": "Stream",
"value": "test"
}
]
},
{
"name": "kinesis to kinesis_target",
"type": "target",
"connectionId": "012MGS0B00000000000F",
"transformationType": "",
"config": [
{
"key": "Stream Name/Expression",
"value": "trgt"
}
]
}
],
"edges": [
{
"from": "kinesis to kinesis_source",
"to": "kinesis to kinesis_target"
}
]
}
フラットファイルをソースおよびターゲットとするタスクの接続設定
タスクのソース接続とターゲット接続がフラットファイルの場合、ソース接続とターゲット接続の構成配列に次のフィールドおよびキーと値のペアを含めることができます。
キー | タイプ | 必須 | 説明 |
---|
File | String | ○ | ソースファイルの絶対パスおよび名前。複数ファイルモードのベースディレクトリを入力します。 |
initialPosition | String | ○ | テールするファイルからデータを読み取る際の開始位置。次のいずれかの値を入力します。 - - ファイルの先頭。ファイルの先頭から読み取ります。すでにロールオーバーされているデータは取り込みません。
- - 現在の時刻。ファイルの最近更新された部分から読み取ります。ロールオーバーされたデータまたは書き込まれたファイル内のデータは取り込みません。
|
rolloverPattern | String | - | ロールオーバーするファイルのファイル名パターン。 テールするファイルがロールオーバーした場合、Secure Agentはファイル名パターンを使用して、ロールオーバーしたファイルを識別します。ファイルのロールオーバー中にSecure Agentが停止した場合、再起動時に、中断されたポイントからファイルを取得します。 アスタリスク(*)と疑問符(?)をワイルドカード文字として使用し、ファイルが同じディレクトリにロールオーバーされることを示すことができます。例: ${filename}.log.*と入力します。ここで、アスタリスク(*)は、ファイル名に追加される連続するバージョン番号を表します。 |
tailingMode | String | ○ | ロギングパターンに基づいて、1つまたは複数のファイルをテールします。次のいずれかの値を入力します。 - - 単一ファイル。1つのファイルをテールします。
- - 複数ファイル。ベースディレクトリに示されているすべてのファイルをテールします。正規表現を入力して、テールするファイルを示すことができます。
|
ファイル名 | String | ○ | ターゲットファイルの名前。 |
interimDirectory | String | ○ | Secure Agent上のステージングディレクトリへのパス。 |
rolloverSize | Integer | ○ | タスクがファイルをステージングディレクトリからターゲットに移動する際のファイルサイズ(KB)。 1から2147483647までの数値を入力できます。 |
rolloverEvents | Integer | ○ | ファイルのロールオーバーの前に蓄積するイベントまたはメッセージの数。 1から2147483647までの数値を入力できます。 |
rolloverTime | Integer | - | ターゲットファイルがロールオーバーするまでの時間(ミリ秒単位)。 1から2147483647までの数値を入力できます。 |
edges | 配列 | - | データフロー実行のシーケンス。 |
POST要求の例
フラットファイルソースとフラットファイルターゲットを使用するストリーミング取り込みとレプリケーションタスクを更新するには、次の例のような要求を送信します。
POST <serverUrl>/sisvc/restapi/v1/UpdateEntity/Documents('<document ID>')
Content-Type: application/json
Accept: application/json
IDS-SESSION-ID: <SessionId>
{
"name": "FileToFile",
"description": "FileToFile_V2",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "FileToFile_source",
"type": "source",
"connectionId": "0100000B000000000002",
"transformationType": "",
"config": [
{
"key": "File",
"value": "siagent.log"
},
{
"key": "initialPosition",
"value": "Current Time"
},
{
"key": "rolloverPattern",
"value": ""
},
{
"key": "tailingMode",
"value": "Single file"
}
]
},
{
"name": "FileToFile_target",
"type": "target",
"connectionId": "0100000B000000000002",
"transformationType": "",
"config": [
{
"key": "File Name",
"value": "testing.log"
},
{
"key": "interimDirectory",
"value": "/home/agent/infa/test_file_target"
},
{
"key": "rolloverSize",
"value": 100
},
{
"key": "rolloverEvents",
"value": 100
},
{
"key": "rolloverTime",
"value": 100
}
]
}
],
"edges": [
{
"from": "FileToFile_source",
"to": "FileToFile_target"
}
],
"runtimeOptions": {
"maxLogSize": {
"value": 10,
"unit": "MB"
},
"logLevel": "INFO"
}
}
Kafkaをソースおよびターゲットとするタスクの接続設定
タスクのソース接続とターゲット接続がKafkaの場合、ソース接続とターゲット接続の構成配列に次のフィールドおよびキーと値のペアを含めることができます。
キー | タイプ | 必須 | 説明 |
---|
トピック | String | ○ | イベントの読み取り元となるKafkaソーストピック名、またはJavaでサポートされているKafkaソーストピック名パターンの正規表現。 249文字までの文字列を入力します。 |
consumerProperties | String | - | 必要に応じて、コンシューマ設定のプロパティのカンマ区切りリストを指定します。キーと値のペアとして値を指定します。例: key1=value1, key2=value2 4000文字までの文字列を入力できます。 |
producerProperties | String | - | プロデューサの設定プロパティ。 カンマ区切りリストを指定し、値をキーと値のペアとして指定します。 4000文字までの文字列を入力できます。 |
mdFetchTimeout | Integer | - | それ以降にメタデータが取得されなくなる時間。 1~2147483647の範囲で値を入力してください。 |
batchSize | Integer | - | ストリーミング取り込みタスクがそれ以降のターゲットにデータを書き込むイベントのバッチサイズ。 1~2147483647の範囲で値を入力してください。 |
トピック名/式 | String | ○ | Kafkaトピック名またはJavaがサポートするKafkaトピック名パターンの正規表現。 249文字までの文字列を入力できます。 |
POST要求の例
KafkaソースとKafkaターゲットを使用するストリーミング取り込みとレプリケーションタスクを更新するには、次の例のような要求を送信します。
POST <serverUrl>/sisvc/restapi/v1/UpdateEntity/Documents('<document ID>')
Content-Type: application/json
Accept: application/json
IDS-SESSION-ID: <SessionId>
{
"name": "kafka to kafka",
"description": "kafka to kafka",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "kafka to kafka_source",
"type": "source",
"connectionId": "012MGS0B000000000002",
"transformationType": "",
"config": [
{
"key": "consumerProperties",
"value": "key=value"
},
{
"key": "Topic",
"value": "test"
}
]
},
{
"name": "kafka to kafka_target",
"type": "target",
"connectionId": "012MGS0B000000000002",
"transformationType": "",
"config": [
{
"key": "producerProperties",
"value": "key=value"
},
{
"key": "mdFetchTimeout",
"value": 5000
},
{
"key": "batchSize",
"value": 1048576
},
{
"key": "Topic Name/Expression",
"value": "test"
}
]
}
],
"edges": [
{
"from": "kafka to kafka_source",
"to": "kafka to kafka_target"
}
]
}
POST応答
REST APIはアクションの実行に成功すると、200または201成功応答が返されます。REST APIでエラーが発生すると、適切なエラーコードが返されます。
要求が成功した場合、応答は次のフィールドを返します。
フィールド | タイプ | 説明 |
---|
name | String | タスクの名前。 |
description | String | ある場合は、タスクの説明。 |
runtimeId | String | ランタイム環境のID。 |
currentVersion | String | 最新のデータフローオブジェクトバージョン。 |
nodes | Array | タスクのソース接続とターゲット接続の詳細。 |
nodes配列のフィールド
応答には、nodes配列の以下のフィールドを含めることができます。
フィールド | タイプ | 説明 |
---|
name | String | 接続の名前。 |
type | String | 接続タイプ。 |
connectionId | String | 接続のID。 |
TRANSFORMATIONTYPE | String | トランスフォーメーションのタイプ。 |
config | String | キーと値のペアであるソース接続とターゲット接続の構成。配列内のキーは、ソース接続とターゲット接続のタイプによって異なります。 |
要求が失敗すると、応答にエラーの理由が含まれます。
MQTTをソースとする場合の構成配列内の構成情報
要求が成功した場合、応答は次のフィールドを返します。
キー | タイプ | 説明 |
---|
ClientID | String | MQTTソースとMQTTブローカ間の接続を識別する一意の識別子。クライアントIDは、MQTTソースがメッセージの処理中にメッセージを格納するために使用するファイルベースの永続ストアです。 |
MaxQueueSize | Integer | プロセッサがメモリに保存できるメッセージの最大数。 |
Topic | String | MQTTトピックの名前。 |
要求が失敗すると、応答にエラーの理由が含まれます。
POST応答の例
要求が成功した場合、Successノードで次の例のような応答を受信することがあります。
{
"Success": {
"name": "mqtt to flatfile",
"description": "mqtt to flatfile",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "mqtt to flatfile_source",
"type": "source",
"connectionId": "012MGS0B00000000001O",
"transformationType": "",
"config": [
{
"key": "ClientID",
"value": "test"
},
{
"key": "MaxQueueSize",
"value": 1024
},
{
"key": "Topic",
"value": "test"
}
]
},
{
"name": "mqtt to flatfile_target",
"type": "target",
"connectionId": "012MGS0B00000000002N",
"transformationType": "",
"config": [
{
"key": "interimDirectory",
"value": "/home/agent/test"
},
{
"key": "rolloverSize",
"value": 1024
},
{
"key": "rolloverEvents",
"value": 100
},
{
"key": "rolloverTime",
"value": 300000
},
{
"key": "File Name",
"value": "test"
}
]
}
],
"edges": [
{
"from": "mqtt to flatfile_source",
"to": "mqtt to flatfile_target"
}
]
}
}
JMSをソースとする場合の構成配列内の構成情報
応答は、要求で入力したフィールドのみを返します。
要求が成功した場合、応答は次のフィールドを返します。
キー | タイプ | 説明 |
---|
destinationType | String | ソースサービスがJMSメッセージを送信する接続先のタイプ。 |
clientId | String | JMS接続の一意のID。 |
sharedSubscription | String | 複数のコンシューマが単一のサブスクリプションにアクセスできるようにします。トピックの接続先タイプに適用されます。 |
durableSubscription | String | JMSソースサービスは、非アクティブなサブスクライバがメッセージを保持し、サブスクライバが再接続したときにそれらのメッセージを配信できるようにします。トピックの接続先タイプに適用されます。 |
subscriptionName | String | サブスクリプションの名前。トピックサブスクリプションタイプが共有、継続、またはその両方である場合に、トピックの接続先タイプに適用されます。 |
JMS Destination | String | JMSプロバイダがメッセージを配信するキューまたはトピックの名前。 |
要求が失敗すると、応答にエラーの理由が含まれます。
POST応答の例
要求が成功した場合、Successノードで次の例のような応答を受信することがあります。
{
"Success": {
"name": "crud",
"description": "JMS to FileToFile",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "crud_source",
"type": "source",
"connectionId": "012MGS0B000000000003",
"transformationType": "",
"config": [
{
"key": "destinationType",
"value": "QUEUE"
},
{
"key": "clientId",
"value": ""
},
{
"key": "JMS Destination",
"value": "test"
}
]
},
{
"name": "crud_target",
"type": "target",
"connectionId": "012MGS0B00000000000H",
"transformationType": "",
"config": [
{
"key": "interimDirectory",
"value": "/home/agent/test"
},
{
"key": "rolloverSize",
"value": 1024
},
{
"key": "rolloverEvents",
"value": 100
},
{
"key": "rolloverTime",
"value": 300000
},
{
"key": "File Name",
"value": "test"
}
]
}
],
"edges": [
{
"from": "crud_source",
"to": "crud_target"
}
]
}
}
ADLS Gen2をターゲットとする場合の構成配列内の構成情報
応答は、要求で入力したフィールドのみを返します。
要求が成功した場合、応答は次のフィールドを返します。
キー | タイプ | 説明 |
---|
writeStrategy | String | ADLS Gen2ストレージにファイルが存在する場合に実行するアクション。 |
rolloverSize * | Integer | ロールオーバーをトリガする際のターゲットファイルサイズ(KB)。ロールオーバー書き込みストラテジに適用されます。 |
rolloverEvents * | Integer | ロールオーバーの前に蓄積するイベントまたはメッセージの数。ロールオーバー書き込みストラテジに適用されます。 |
rolloverTime * | Integer | ロールオーバーをトリガするまでの時間(ミリ秒単位)。ロールオーバー書き込みストラテジに適用されます。 |
filesystemNameOverride | String | 接続で提供されたデフォルトのファイルシステム名をオーバーライドします。このファイルシステム名は、実行時にファイルに書き込むために使用されます。 |
directoryOverride | String | デフォルトのディレクトリパスをオーバーライドします。データの書き込み先のADLS Gen2ディレクトリパス。空白の場合は、デフォルトのディレクトリパスが使用されます。 |
compressionFormat | String | ストリーミング取り込みタスクがターゲットファイルにデータを書き込む前に使用する圧縮形式。 |
File Name/Expression | String | ADLS Gen2ファイル名または正規表現。 |
* これらのフィールドの少なくとも1つに値を入力してください。 |
要求が失敗すると、応答にエラーの理由が含まれます。
POST応答の例
要求が成功した場合、Successノードで次の例のような応答を受信することがあります。
{
"Success": {
"name": "flatfile to adls",
"description": "flatfile to adls",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "flatfile to adls_source",
"type": "source",
"connectionId": "012MGS0B00000000002N",
"transformationType": "",
"config": [
{
"key": "File",
"value": "logfile"
},
{
"key": "initialPosition",
"value": "Current Time"
},
{
"key": "rolloverPattern",
"value": "test"
},
{
"key": "tailingMode",
"value": "Single file"
}
]
},
{
"name": "flatfile to adls_target",
"type": "target",
"connectionId": "012MGS0B00000000003D",
"transformationType": "",
"config": [
{
"key": "writeStrategy",
"value": "Rollover"
},
{
"key": "filesystemNameOverride",
"value": "test"
},
{
"key": "File Name/Expression",
"value": "test"
},
{
"key": "compressionFormat",
"value": "NONE"
},
{
"key": "directoryOverride",
"value": "/test"
},
{
"key": "interimDirectory",
"value": "/home/agent/test"
},
{
"key": "rolloverSize",
"value": 1024
},
{
"key": "rolloverEvents",
"value": 100
},
{
"key": "rolloverTime",
"value": 300000
}
]
}
]
}
}
Amazon S3をターゲットとする場合の構成配列内の構成情報
応答は、要求で入力したフィールドのみを返します。
要求が成功した場合、応答は次のフィールドを返します。
キー | タイプ | 説明 |
---|
partitionTime | String | ストリーミング取り込みタスクがAmazon S3バケットにパーティションを作成する際に従う時間間隔。 |
minUploadPartSize | Integer | 複数の独立したパートセットとしてサイズの大きなファイルをアップロードする場合の最小パートサイズ(MB単位)。このプロパティを使用して、ファイルのロードをAmazon S3に合わせます。 |
multipartUploadThreshold | Integer | オブジェクトを複数のパートで並行してアップロードする場合のマルチパートしきい値。 |
Object Name/Expression | String | Amazon S3ターゲットファイル名、またはAmazon S3ファイル名パターンの正規表現。 |
要求が失敗すると、応答にエラーの理由が含まれます。
POST応答の例
要求が成功した場合、Successノードで次の例のような応答を受信することがあります。
{
"Success": {
"name": "flatfile to amazon S3",
"description": "flatfile to amazon S3",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "flatfile to amazon S3_source",
"type": "source",
"connectionId": "012MGS0B00000000002N",
"transformationType": "",
"config": [
{
"key": "File",
"value": "logfile"
},
{
"key": "initialPosition",
"value": "Current Time"
},
{
"key": "rolloverPattern",
"value": "test"
},
{
"key": "tailingMode",
"value": "Single file"
}
]
},
{
"name": "flatfile to amazon S3_target",
"type": "target",
"connectionId": "012MGS0B0000000000I7",
"transformationType": "",
"config": [
{
"key": "partitionTime",
"value": "None"
},
{
"key": "minUploadPartSize",
"value": 5120
},
{
"key": "multipartUploadThreshold",
"value": 5120
},
{
"key": "Object Name/Expression",
"value": "test"
}
]
}
],
"edges": [
{
"from": "flatfile to amazon S3_source",
"to": "flatfile to amazon S3_target"
}
]
}
}
Azure Event Hubをターゲットとする場合の構成配列内の構成情報
応答は、要求で入力したフィールドのみを返します。
要求が成功した場合、応答は次のフィールドを返します。
キー | タイプ | 説明 |
---|
sasPolicyName | String | Event Hub名前空間共有アクセスポリシーの名前。 |
sasPolicyPrimaryKey | String | Event Hub名前空間共有アクセスポリシーのプライマリキー。 |
Event Hub | String | Azure Event Hubsの名前。 |
要求が失敗すると、応答にエラーの理由が含まれます。
POST応答の例
要求が成功した場合、Successノードで次の例のような応答を受信することがあります。
{
"Success": {
"name": "flatfile to azure event hub",
"description": "flatfile to azure event hub",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "flatfile to azure event hub_source",
"type": "source",
"connectionId": "012MGS0B00000000002N",
"transformationType": "",
"config": [
{
"key": "File",
"value": "logfile"
},
{
"key": "initialPosition",
"value": "Current Time"
},
{
"key": "rolloverPattern",
"value": "test"
},
{
"key": "tailingMode",
"value": "Single file"
}
]
},
{
"name": "flatfile to azure event hub_target",
"type": "target",
"connectionId": "012MGS0B00000000001S",
"transformationType": "",
"config": [
{
"key": "sasPolicyName",
"value": "test"
},
{
"key": "sasPolicyPrimaryKey",
"value": "test"
},
{
"key": "Event Hub",
"value": "test"
}
]
}
],
"edges": [
{
"from": "flatfile to azure event hub_source",
"to": "flatfile to azure event hub_target"
}
]
}
}
ターゲットとしてのJDBCをターゲットとする場合の構成配列内の構成情報
応答は、要求で入力したフィールドのみを返します。
リクエスト要求が成功した場合、応答は次のフィールドを返します。
キー | タイプ | 説明 |
---|
Table Name | String | JSON形式でデータを挿入するテーブルの名前。 |
要求が失敗すると、応答にエラーの理由が含まれます。
POST応答の例
要求が成功した場合、Successノードで次の例のような応答を受信することがあります。
{
"Success": {
"name": "FileFile to jdbc",
"description": "FileToFile to jdbc_target",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "flatfile to jdbc_source",
"type": "source",
"connectionId": "012MGS0B00000000002N",
"transformationType": "",
"config": [
{
"key": "initialPosition",
"value": "Current Time"
},
{
"key": "tailingMode",
"value": "Single file"
},
{
"key": "rolloverPattern",
"value": "test"
},
{
"key": "File",
"value": "logfile"
}
]
},
{
"name": "flatfile to jdbc_target",
"type": "target",
"connectionId": "012MGS0B0000000000KF",
"transformationType": "",
"config": [
{
"key": "Table Name",
"value": "table"
}
]
}
],
"edges": [
{
"from": "flatfile to jdbc_source",
"to": "flatfile to jdbc_target"
}
]
}
}
Amazon Kinesis Streamsをソースおよびターゲットとする場合の構成配列内の構成情報
応答は、要求で入力したフィールドのみを返します。
要求が成功した場合、応答は次のフィールドを返します。
キー | タイプ | 説明 |
---|
appendGUID | Boolean | Amazon DynamoDBテーブル名にサフィックスとしてGUIDを追加するかどうかを指定します。 |
DynamoDB | String | Kinesisソースデータのチェックポイントの詳細を保存するAmazon DynamoDBテーブル名。 |
Stream | String | データの読み取り元のKinesis Streamの名前。 Amazon Kinesis Streamsをソースとして使用する場合に適用されます。 |
Stream Name/Expression | String | Kinesisストリーム名またはKinesisストリーム名パターンの正規表現。 Amazon Kinesis Streamsをターゲットとして使用する場合に適用されます。 |
要求が失敗すると、応答にエラーの理由が含まれます。
POST応答の例
要求が成功した場合、Successノードで次の例のような応答を受信することがあります。
{
"Success": {
"name": "kinesis to kinesis",
"description": "kinesis to kinesis",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "kinesis to kinesis_source",
"type": "source",
"connectionId": "012MGS0B00000000000F",
"transformationType": "",
"config": [
{
"key": "appendGUID",
"value": true
},
{
"key": "dynamoDB",
"value": "table"
},
{
"key": "Stream",
"value": "test"
}
]
},
{
"name": "kinesis to kinesis_target",
"type": "target",
"connectionId": "012MGS0B00000000000F",
"transformationType": "",
"config": [
{
"key": "Stream Name/Expression",
"value": "trgt"
}
]
}
],
"edges": [
{
"from": "kinesis to kinesis_source",
"to": "kinesis to kinesis_target"
}
]
}
}
フラットファイルをソースおよびターゲットとする場合の構成配列内の構成情報
応答は、要求で入力したフィールドのみを返します。
要求が成功した場合、応答は次のフィールドを返します。
キー | タイプ | 必須 | 説明 |
---|
File | String | ○ | 読み取るソースファイルの絶対パスと名前。 |
initialPosition | String | ○ | テールするファイルからデータを読み取る際の開始位置。 |
rolloverPattern | String | - | ロールオーバーするファイルのファイル名パターン。 |
tailingMode | String | ○ | ロギングパターンに基づいて、1つまたは複数のファイルをテールします。 |
File Name | String | ○ | ターゲットファイルの名前。 |
interimDirectory | String | ○ | Secure Agent上のステージングディレクトリへのパス。 |
rolloverSize | Integer | ○ | タスクがファイルをステージングディレクトリからターゲットに移動する際のファイルサイズ(KB)。 |
rolloverEvents | Integer | ○ | ファイルのロールオーバーの前に蓄積するイベントまたはメッセージの数。 |
rolloverTime | Integer | - | ターゲットファイルがロールオーバーするまでの時間(ミリ秒単位)。 |
要求が失敗すると、応答にエラーの理由が含まれます。
POST応答の例
要求が成功した場合、次の例のような応答を受信します。
{
"Success": {
"name": "FileToFile",
"description": "FileToFile_V2",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "FileToFile_source",
"type": "source",
"connectionId": "0100000B000000000002",
"transformationType": "",
"config": [
{
"key": "File",
"value": "siagent.log"
},
{
"key": "initialPosition",
"value": "Current Time"
},
{
"key": "rolloverPattern",
"value": ""
},
{
"key": "tailingMode",
"value": "Single file"
}
]
},
{
"name": "FileToFile_target",
"type": "target",
"connectionId": "0100000B000000000002",
"transformationType": "",
"config": [
{
"key": "File Name",
"value": "testing.log"
},
{
"key": "interimDirectory",
"value": "/home/agent/infa/test_file_target"
},
{
"key": "rolloverSize",
"value": 100
},
{
"key": "rolloverEvents",
"value": 100
},
{
"key": "rolloverTime",
"value": 100
}
]
}
],
"edges": [
{
"from": "FileToFile_source",
"to": "FileToFile_target"
}
],
"runtimeOptions": {
"maxLogSize": {
"value": 10,
"unit": "MB"
},
"logLevel": "INFO"
}
}
}
Kafkaをソースおよびターゲットとする場合の構成配列内の構成情報
応答は、要求で入力したフィールドのみを返します。
要求が成功した場合、応答は次のフィールドを返します。
キー | タイプ | 説明 |
---|
Topic | String | イベントの読み取り元となるKafkaソーストピック名、またはJavaでサポートされているKafkaソーストピック名パターンの正規表現。 |
consumerProperties | String | オプションのコンシューマ設定プロパティのカンマ区切りリスト。 |
producerProperties | String | プロデューサの設定プロパティ。 |
mdFetchTimeout | Integer | それ以降にメタデータが取得されなくなる時間。 |
batchSize | Integer | ストリーミング取り込みタスクがそれ以降なターゲットにデータを書き込むイベントのバッチサイズ。 |
Topic Name/Expression | String | Kafkaトピック名またはJavaがサポートするKafkaトピック名パターンの正規表現。 |
要求が失敗すると、応答にエラーの理由が含まれます。
POST応答の例
要求が成功した場合、Successノードで次の例のような応答を受信することがあります。
{
"Success": {
"name": "kafka to kafka",
"description": "kafka to kafka",
"runtimeId": "01000025000000000003",
"locationId": "5sJ0JDyJyWLlrosS5qJjsQ",
"currentVersion": "2",
"messageFormat": "binary",
"nodes": [
{
"name": "kafka to kafka_source",
"type": "source",
"connectionId": "012MGS0B000000000002",
"transformationType": "",
"config": [
{
"key": "consumerProperties",
"value": "key=value"
},
{
"key": "Topic",
"value": "test"
}
]
},
{
"name": "kafka to kafka_target",
"type": "target",
"connectionId": "012MGS0B000000000002",
"transformationType": "",
"config": [
{
"key": "producerProperties",
"value": "key=value"
},
{
"key": "mdFetchTimeout",
"value": 5000
},
{
"key": "batchSize",
"value": 1048576
},
{
"key": "Topic Name/Expression",
"value": "test"
}
]
}
],
"edges": [
{
"from": "kafka to kafka_source",
"to": "kafka to kafka_target"
}
]
}
}