ストリーミング取り込みとレプリケーション ターゲットサポートされているソースから、ストリーミング取り込みとレプリケーション がサポートするオンプレミスおよびクラウドのターゲットにストリーミングデータを取り込むことができます。
ストリーミング取り込みとレプリケーション タスクでは次のターゲットを使用できます。
• Amazon Kinesis Data Firehose• Amazon Kinesis Streams• Amazon S3• Databricks • フラットファイル• Google BigQuery V2 • Google Cloud Storage• Google PubSub• JDBC V2• Kafka- Apache Kafka- Confluent Kafka- Amazon Managed Streaming(Amazon MSK)• Microsoft Azure Data Lake Store Gen2• Microsoft Azure Event Hubsこれらのターゲットタイプに使用するコネクタを判断するには、「コネクタと接続」 > 「ストリーミング取り込みおよびレプリケーションコネクタ」を参照してください。
Amazon Kinesis Data Firehoseターゲット Kinesisターゲットを使用してソースからデータを受信し、そのデータをAmazon Kinesis Data Firehoseターゲットに書き込みます。Kinesisターゲットを作成するには、Amazon Kinesis接続タイプを使用します。
Kinesis Firehoseは、Amazon KinesisがAWSエコシステム内で提供するリアルタイムのデータストリーム処理サービスです。Kinesis Firehoseを使用して、データをバッチ処理、暗号化、および圧縮します。Kinesis Firehoseは、システムのニーズに合わせて自動的にスケーリングできます。
Kinesis Firehoseのアクセスをターゲットとして設定するには、次のタスクを実行します。
• IAMユーザーがAWS Kinesis DataFirehoseサービスを使用するために必要なIAM権限を持つAWSアカウントを作成します。• Firehose配信ストリームを定義します。ソースをDirect PUTまたはその他のソースとして設定します。• ユーザーが書き込んでいるターゲットに基づいて、IAMユーザーの資格情報に必要な権限を付与します。Amazon Kinesis Streamsターゲット Kinesisターゲットを使用して、ソースサービスからデータを受信し、そのデータをAmazon Kinesis Streamに書き込みます。Kinesisターゲットを作成するには、Amazon Kinesis接続タイプを使用します。
Kinesis Streamsは、Amazon KinesisがAWSエコシステム内で提供するリアルタイムのデータストリーム処理サービスです。Kinesis Streamsは、ストリーミングデータを処理および分析するためのカスタムアプリケーションを構築するために使用できるカスタマイズ可能なオプションです。Kinesis Streamsはデータのインフローデマンドに対応するために自動的にスケーリングできないため、システムのニーズを満たすために十分な容量を手動でプロビジョニングする必要があります。
Kinesisストリームターゲットを使用する前に、次のタスクを実行します。
1 Amazon Kinesisコンソールで、Amazon Kinesisストリームを作成して設定します。2 Amazon Web Services(AWS)Identity and Access Management(IAM)サービスで、ユーザーを作成します。3 ユーザー作成プロセス中に生成されたアクセスキーとシークレットアクセスキーをダウンロードします。4 Kinesisストリームへの書き込み権限を持つグループにユーザーを関連付けます。Amazon S3ターゲット Amazon S3 V2コネクタを使用して、ストリーミングデータとレプリケーションデータをAmazon S3ターゲットに書き込みます。
Amazon Simple Storage Service(Amazon S3)は、ストリーミングソースからデータをコピーして、任意のターゲットに同時にデータを移動できるストレージサービスです。Amazon S3を使用して、構成されたソース接続のリストからAmazon S3ターゲットにデータを転送できます。AWS Management ConsoleのWebインタフェースを使用してこれらのタスクを実行できます。
ストリーミング取り込みとレプリケーション タスクのターゲットとしてAmazon S3オブジェクトを使用できます。Amazon S3ターゲットとターゲットオブジェクトの詳細プロパティを設定できます。
データのパーティション化 ストリーミング取り込みとレプリケーション タスクは、Amazon S3 V2ターゲットにパーティションを作成し、データをパーティションに書き込むことができます。パーティション化を使用するには、タスクがパーティションを作成する際に従うパーティション化間隔を選択する必要があります。選択した時間間隔に基づいて、ストリーミング取り込みとレプリケーション ジョブは受信メッセージをAmazon S3 V2バケットの/<オブジェクト名>/<年>/<月>/<日>/<時間>/<分>パーティションに保存します。ストリーミング取り込みとレプリケーション ジョブは、Amazon S3バケットにタイムスタンプ階層フォルダを追加します。
タスクを設定するときに、次の方法でパーティション化を有効にすることができます。 - [オブジェクト名/式]フィールドでオブジェクト名に${Timestamp} 式を追加し、時間間隔を選択します。例えば、[オブジェクト名/式] フィールドで、/streaming/${Timestamp} と入力し、5分のパーティション化間隔を選択してから、2022年6月8日の15:20にストリーミング取り込みとレプリケーション タスクを実行したとします。ストリーミング取り込みとレプリケーション ジョブは、受信メッセージをAmazon S3 V2バケットの/streaming/2022/06/08/15/20パーティションに保存します。このジョブは、次の時間間隔の際にストリーミングされるデータを、時間フォルダ(2022/06/08/15/25)に保存します。
注: [オブジェクト名/式]フィールドで、オブジェクト名に${Timestamp} 式を追加し、パーティション化間隔を選択しなかった場合、ストリーミング取り込みとレプリケーション ジョブはオブジェクトをAmazon S3バケットの0/フォルダに保存します。
- [オブジェクト名/式] フィールドで正規表現を使用し、パーティション化間隔を選択します。正規表現に${Timestamp} 表現を追加する必要はありません。例えば、2022年6月18日の12:10にストリーミング取り込みとレプリケーション タスクを、正規表現$"SourceTable":"((.*?))"$と、5分のパーティション化間隔で実行したとします。受信データは{"SourceTable":"xyz"}です。ストリーミング取り込みとレプリケーション ジョブは、xyzオブジェクトをAmazon S3バケットの2022/06/18/12/10フォルダ階層に保存します。このジョブは、次の時間間隔の際にストリーミングされるデータを、時間フォルダ(2022/06/08/15/25)に保存します。
Databricksターゲット ストリーミング取り込みとレプリケーション タスクを使用して、Databricksターゲットにデータを書き込みます。Databricksターゲットを作成するには、Databricks接続タイプを使用します。Databricksターゲットには、Databricksクラスタバージョン6.3以降が必要です。
Databricksは、ACIDトランザクションを提供し、既存のデータレイクの上で機能するオープンソースのストレージレイヤです。Databricksでは、独自のDeltaソフトウェアを使用して保存データを管理するので、データに高速にアクセスできます。
次のストレージタイプに作成したDelta Lakeテーブルにアクセスできます。
• Azure Data Lake Storage(ADLS)Gen2• Amazon Web Services(AWS)S3Databricksターゲットは、Databricks上の1つ以上のDelta Lakeテーブルにデータを書き込みます。Databricksターゲットはストリーミング取り込みとレプリケーション タスクで次のユースケースに使用できます。
• すべてのストリーミングソースからDatabricksテーブルにバルクデータを取り込む• すべてのストリーミングソースからの変更データキャプチャ(CDC)をマージし、Databricksテーブルに書き込むDatabricks接続は、JDBC URLを使用してDatabricksクラスタに接続します。ターゲットを設定するときは、クラスタへの接続に使用するJDBC URLと資格情報を指定します。また、ターゲットがAmazon S3またはAzure Data Lake Storage Gen2のステージングの場所に接続するために使用する接続情報を定義します。
データを書き込むDelta Lakeのテーブルを指定します。ターゲットは、一致する名前に基づいて、レコードフィールドからテーブルカラムにデータを書き込みます。
フラットファイルターゲット ストリーミング取り込みとレプリケーション タスクを使用して、さまざまなソースからフラットファイルターゲットにデータを書き込みます。このタスクは、さまざまなソースからデータフローを実行するSecure Agentのファイルシステムにリアルタイムストリーミングデータを書き込みます。
ストリーミング取り込みとレプリケーション タスクは、データを指定されたファイル名でステージングディレクトリに書き込みます。タスクがファイルに新しいコンテンツを追加すると、ターゲット内でそのコンテンツの前に改行文字(\n)が付けられます。
フラットファイルターゲットはファイルのロールオーバーアクションを実行するため、ロールオーバープロパティを設定できます。ファイルのロールオーバープロセスでは、現在のファイルを閉じ、ファイルサイズ、イベント数、または時間に基づいて新しいファイルを作成します。ロールオーバーを設定するには、ターゲットサービスの[ロールオーバーサイズ] 、[ロールオーバーイベント数] 、または[ロールオーバー時間] プロパティを指定します。ロールオーバープロセスでは、ファイルをステージングディレクトリからターゲットに移動し、ファイルの名前を変更します。名前が変更されたファイルのファイル名形式は、元のファイル名にタイムスタンプとカウンタの情報(yyyy_mm_dd-hh_mm_ss_counter)が追加されたものになります。例えば、ロールオーバー時に、ファイルstreaming.txtの名前はstreaming-2021_08_16-17_17_30_4.txtに変更されます。
ロールオーバープロパティは組み合わせて実装できます。例えば、ロールオーバーイベント数を1000、ロールオーバーサイズを1 GB、ロールオーバー時間を1時間に設定した場合、イベントが1000件累積されておらず、1時間が経過していなくても、ファイルのサイズが1 GBに達すると、タスクはファイルをロールオーバーします。
Google BigQuery V2ターゲット ストリーミング取り込みとレプリケーション タスクを使用して、Google BigQuery V2データベースターゲットにデータを書き込みます。Google BigQuery V2ターゲットを作成するには、Google BigQuery V2接続タイプを使用します。
Google BigQuery V2ターゲットを使用して、BigQueryテーブルにデータを書き込むことができます。ターゲットはJSON形式のデータをコンシュームします。フィールドがデータベースのテーブルカラムと一致しない場合、ターゲットはレコードを無視します。Google BigQuery V2ターゲットは、単純なJSON形式のデータ、または単純なJSON形式の配列のデータを受け入れます。
Google BigQuery V2コネクタを使用する前に、次の前提条件のタスクを完了する必要があります。
• Google BigQueryにアクセスするためのGoogleサービスアカウントを持っていることを確認してください。• サービスアカウントにclient_email、project_id、private_key、およびregionIDの値があることを確認してください。Google BigQuery V2接続を作成するときに、対応する[サービスアカウントID] 、[プロジェクトID] 、[サービスアカウントキー] 、および[リージョンID] 接続プロパティに値を入力します。• private_key_idプロパティとclient_idプロパティを[オプションのプロパティを指定] フィールドで指定する必要があります。これらのパラメータを指定すると、Google BigQuery V2接続テストは失敗します。ただし、失敗したテスト接続を無視して、ストリーミング取り込みとレプリケーション ジョブを実行できます。次の形式を使用します。"private_key_id":"<private key ID>" and "client_id":"<client ID>"
• Google BigQuery接続のタイムアウト間隔を設定する場合は、接続プロパティの[オプションのプロパティを指定] フィールドでタイムアウト間隔プロパティを指定します。次の形式を使用します。"timeout": "<timeout_interval_in_seconds>"
• ストリーミング取り込みとレプリケーション タスクをデプロイする前に、データを書き込むためのテーブルが存在している必要があります。• ターゲットテーブルを含むGoogle BigQueryデータセットへの読み取りおよび書き込みアクセス権が必要です。Google Cloud Storage V2ターゲット ストリーミング取り込みとレプリケーション タスクを使用して、Google Cloud Storageターゲットにデータを書き込みます。Google Cloud Storageターゲットを作成するには、Google Cloud Storage V2接続タイプを使用します。
Google Cloud Storageを使用して、マルチメディアをストリーミングしたり、カスタムデータ分析パイプラインを保存したり、直接ダウンロードして大きなデータオブジェクトをユーザーに配信したりできます。データをバックアップするために、Google Cloud Storageにデータを書き込むことができます。データベースに障害が発生した場合は、Google Cloud Storageからデータを読み取り、データベースにリストアできます。
Google Cloud Storageは、データの可用性、待ち時間、価格などの要因に基づいて、さまざまなストレージクラスを提供します。Google Cloud Storageには次のコンポーネントがあります。
• プロジェクト 。Google Cloud Storageでは、すべてのリソースがプロジェクト内に格納されます。プロジェクトは、請求の詳細とユーザーの詳細を格納する最上位のコンテナです。プロジェクトは複数作成できます。プロジェクトには、一意のプロジェクト名、プロジェクトID、およびプロジェクト番号があります。• バケット 。各バケットは、データを格納するコンテナのように機能します。バケットを使用して、データを整理し、データにアクセスできます。複数のバケットを作成できますが、バケットをネストすることはできません。バケット内に複数のフォルダを作成でき、フォルダをネストすることもできます。アクセス制御リストを定義して、オブジェクトとバケットを管理できます。アクセス制御リストは、権限と範囲のエントリで構成されています。権限は、読み取りまたは書き込み操作を実行するためのアクセスを定義します。範囲は、操作を実行できるユーザーまたはグループを定義します。• オブジェクト 。オブジェクトは、Google Cloud Storageにアップロードするデータを構成します。バケット内にオブジェクトを作成できます。オブジェクトは、オブジェクトデータとオブジェクトメタデータコンポーネントで構成されます。オブジェクトデータは、Google Cloud Storageに保存するファイルです。オブジェクトメタデータは、オブジェクトの特性を説明する名前と値のペアのコレクションです。Google Cloud Storage V2コネクタを使用する前に、次の前提条件のタスクを完了する必要があります。
1 Google Cloud StorageにアクセスするためのGoogleサービスアカウントを持っていることを確認してください。2 サービスアカウントのclient_email、project_id、private_keyの値があることを確認してください。AdministratorでGoogle Cloud Storage接続を作成するときに、これらの詳細を入力する必要があります。3 サービスアカウントでGoogle Cloud Storage JSON APIが有効になっていることを確認してください。Google Cloud Storage V2コネクタは、Google APIを使用してGoogle Cloud Storageと統合します。4 ターゲットファイルを含むGoogle Cloud Storageバケットへの書き込みアクセスがあることを確認してください。5 組織でCloudera CDHまたはHortonworks HDPパッケージを使用するためのライセンスが有効になっていることを確認してください。ストリーミング取り込みとレプリケーション タスクをデプロイすると、Secure AgentはGoogle Cloud Storage APIを使用して、指定された操作を実行し、Google Cloud Storageファイルにデータを書き込みます。Google Cloud Storageターゲットにデータを書き込むことができます。Google Cloud Storageターゲットで更新、更新/挿入、または削除操作を実行することはできません。
Google PubSubターゲット ストリーミング取り込みとレプリケーション タスクを使用して、Google PubSubトピックにデータを書き込みます。Google PubSubターゲットを作成するには、Google PubSub接続タイプを使用します。
Google PubSubは、イベントを処理するサービスからイベントを生成するサービスを分離する非同期メッセージングサービスです。Google PubSubは、メッセージング指向ミドルウェアとして、またはストリーミング分析パイプラインのイベントの取り込みと配信用に使用できます。Google PubSubは、高可用性と大規模で一貫したパフォーマンスを備えた、耐久性のあるメッセージストレージとリアルタイムのメッセージ配信を提供します。Google PubSubサーバーは、世界中の利用可能なすべてのGoogle Cloudリージョンで実行できます。
Google PubSubコネクタを使用する前に、次の前提条件を満たしていることを確認する必要があります。
• 組織にGoogle PubSubコネクタライセンスがある。• Google PubSubにアクセスするためのGoogleサービスアカウントのJSONキーがある。• Googleサービスアカウントのclient_email、project_id、private_keyの値がある。AdministratorでGoogle PubSub接続を作成するときに、これらの詳細が必要です。ストリーミング取り込みとレプリケーション タスクでは、Google PubSubターゲットを使用して、Google PubSubトピックにメッセージをパブリッシュできます。
JDBC V2ターゲット ストリーミング取り込みとレプリケーション タスクを使用して、データベースターゲットにデータを書き込みます。JDBC V2ターゲットを作成するには、JDBC V2接続タイプを使用します。
JDBC V2ターゲットを使用して、データベーステーブルにデータを書き込むことができます。ターゲットはJSON形式のデータをコンシュームします。ターゲットは、データベースのテーブルカラムにマッピングされていないフィールドを無視します。
JDBC V2をターゲットとして構成する前に、以下の前提条件を考慮してください。
注: JDBC V2ターゲットは、単純なJSONまたは単純なJSON形式の配列のデータのみを受け入れます。
Kafkaターゲット ストリーミング取り込みとレプリケーション タスクを使用して、Kafkaターゲットにデータを書き込みます。Kafkaターゲットを作成するには、Kafka接続タイプを使用します。
KafkaはPublish-Subscribeメッセージシステムです。オープンソースの分散ストリーミングプラットフォームです。このプラットフォームにより、データを生成するシステムは、Kafkaトピックでデータをリアルタイムで永続化できます。任意のトピックを、リアルタイムでデータを必要とする任意の数のシステムで読み取ることができます。Kafkaは、さまざまなダウンストリームコンシューマアプリケーションが使用できるストリーミングデータの中間ステージング領域として使用できます。
Kafkaは、1つ以上のKafkaブローカで構成されるクラスターとして実行されます。Kafkaブローカは、データをメッセージの形式でストリーミングし、メッセージをトピックにパブリッシュし、メッセージをトピックからサブスクライブしてから、メッセージをKafkaターゲットに書き込みます。
Kafkaターゲットを作成するときは、Kafkaプロデューサを作成してKafkaメッセージを書き込みます。ストリーミングKafkaメッセージを書き込むストリーミング取り込みとレプリケーション ジョブで各Kafkaターゲットを使用できます。Kafkaターゲットを設定するときは、メッセージをパブリッシュするトピックと、Kafkaブローカが実行されるIPアドレスとポートを指定します。Kafkaトピックがターゲットに存在しない場合、トピックを手動で作成する代わりに、存在しないトピックがパブリッシュされたときにトピックを自動作成するようにブローカを設定することもできます。
同じKafka接続を使用して、Apache Kafka用のAmazon Managed Streaming(Amazon MSK)またはConfluent Kafka接続を作成できます。これで、ストリーミング取り込みとレプリケーション タスクでAmazon MSKまたはConfluent Kafkaターゲットを使用して、Apache KafkaまたはConfluent Kafkaターゲットにメッセージを書き込むことができます。
Microsoft Azure Data Lake Storage Gen2ターゲット ストリーミング取り込みとレプリケーション タスクを使用して、Microsoft Azure Data Lake Storage en2ターゲットにデータを書き込みます。Microsoft Azure Data Lake Storage Gen2ターゲットを作成するには、Microsoft Azure Data Lake Storage Gen2接続タイプを使用します。
Microsoft Azure Data Lake Storage Gen2は、ビッグデータ分析のための次世代のデータレイクソリューションです。データをディレクトリおよびサブディレクトリの形で保存し、データのアクセスと操作を効率化することができます。任意のサイズ、構造、形式のデータを保存できます。大量のデータを処理して、より迅速なビジネス成果を達成できます。データサイエンティストやデータアナリストは、データレイクのデータを使用して特定のパターンがないか調べたうえで、分析したデータをデータウェアハウスに移動できます。Microsoft Azure Blobストレージ上で使用可能なビッグデータ分析を実行できます。
ストリーミング取り込みとレプリケーション タスクは、指定された条件に基づいてMicrosoft Azure Data Lake Storage Gen2にデータを書き込みます。
Microsoft Azure Data Lake Storage Gen2の詳細については、Microsoft Azure Data Lake Storage Gen 2のドキュメントを参照してください。
Microsoft Azure Event Hubsターゲット ストリーミング取り込みとレプリケーション タスクを使用して、Azure Event Hubsターゲットにデータを書き込みます。Azure Event Hubsターゲットを作成するには、Azure Event Hubs接続タイプを使用します。
Azure Event Hubsは、イベントを受信して処理する、拡張性の高いデータストリーミングプラットフォームおよびイベント取り込みサービスです。Azure Event Hubsは、短い待機時間と高い信頼性で大量のイベントを取り込んで処理できます。これは、接続されたさまざまなデバイスやシステムからのメッセージストリームを処理できるマネージドサービスです。
イベントハブにデータを送信するエンティティはすべて、イベントパブリッシャです。イベントパブリッシャは、HTTPSまたはKafka 1.0以降を使用してイベントをパブリッシュできます。イベントパブリッシャは、共有アクセス署名(SAS)トークンを使用して、イベントハブに対して自分自身を識別し、一意のIDを持つか、共通のSASトークンを使用できます。
Event Hubsの詳細については、Microsoft Azure Event Hubsのドキュメントを参照してください。