Snowflake Data Cloud Connector > 部 II: Snowflake Data Cloud Connectorとのデータ統合 > Snowflake Data Cloudのターゲット > ターゲットオブジェクトと操作
  

ターゲットオブジェクトと操作

ターゲットトランスフォーメーションでは、単一のオブジェクトまたはパラメータをターゲットタイプとして使用できます。
ターゲットタイプを選択する際、Snowflakeターゲットでデータを挿入、更新、更新/挿入、または削除する操作を選択できます。データ依存操作を使用して、挿入、更新、削除、または拒否操作の行にフラグを立てる式を定義することもできます。

ターゲット操作の制限

ターゲット操作を設定する際は、いくつかのルールが適用されます。

パラメータ化されたターゲット

ソース、ルックアップ、またはターゲットオブジェクトおよび接続をパラメータ化して、トランスフォーメーションで[実行時にパラメータのオーバーライドを許可する]オプションを有効にした場合は、db.schema.tablenameなどの完全修飾名を使用してオブジェクト名をオーバーライドすることはできません。Snowflake Data Cloud接続の[JDBC URLの追加パラメータ]フィールドにあるdb=<dbname>&schema<schemaname>値を渡す必要があります。

更新カラム

Snowflakeターゲットからデータを更新または削除するための一時キーカラムを指定する必要があります。Snowflakeターゲットにプライマリキーカラムが含まれていない場合は、[追加]をクリックして一時キーを追加します。複数のカラムを選択できます。
ソーステーブルのレコードに重複するプライマリキーが含まれている場合は、マッピングで次のいずれかのタスクを実行して、Snowflakeのレコードを更新または削除します。
詳細モードでマッピングを設定する場合は、さらに次のガイドラインを考慮する必要があります。

データ依存操作

詳細モードのマッピングからSnowflakeの行を更新するようにデータドリブン式を設定した場合、IIF条件で3番目の引数を指定する必要があります。IIF条件で3番目の引数を指定しない場合、エージェントでは一致しないすべての行に対する挿入として操作を扱います。
例えば、次の式IIF(Update_column=2,DD_UPDATE,DD_DELETE)DD_DELETEなど3番目の引数を含めずに条件IIF(Update_column=2,DD_UPDATE)を指定すると、update_columnの値が2と等しくない他の行に対してエージェントはデフォルトで挿入を実行します。
例では、式IIF(COL_INTEGER = 2, DD_UPDATE)からCOL_INTEGER=2は1に解決され、COL_INTEGER! =2 resolves to 0となります。ここで、1は更新操作の内部ID、0は挿入操作の内部IDです。指定されていない場合の3番目の引数の値は、デフォルトで0になります。
詳細モードのマッピングには、次の制限付きでデータドリブン操作を使用できます。

一般的な制限

詳細モードでは、500カラムを超えるデータの書き込みを行うマッピングは、次のエラーで失敗します: HTTP POST request failed due to IO error

パススルーのパーティション化

Snowflakeターゲットにデータを書き込むときのマッピングまたは実行時のパフォーマンスを最適化するようにパーティション化を設定できます。
パーティションタイプによって、エージェントがパーティションポイントのパーティション間でデータを分散する方法を制御します。パーティションタイプをパススルーパーティションとして定義できます。パーティション化が設定されていると、エージェントはパーティションとして定義したスレッドの数に基いてターゲットデータの行を分散します。

ターゲットの指定

既存のターゲットを使用するか、新しいターゲットを作成してSnowflakeにデータを書き込むことができます。新しいターゲットを作成することを選択した場合は、タスクの実行時にエージェントによりターゲットが作成されます。
ターゲットオブジェクトに指定したスキーマとデータベースのパスが大文字であることを確認します。Snowflakeのデータベース名とスキーマを、「<database name>/<schema name>」(ターゲットオブジェクトは大文字)という形式で指定します。パスを指定しない場合、Secure Agentは、Snowflake Data Cloud接続プロパティから[JDBC URLの追加パラメータ]フィールドに指定したスキーマおよびデータベース名を考慮します。
TableTypeをtableに指定します。TableTypeプロパティはオプションです。
次の図は、実行時に作成するように設定されたターゲットオブジェクトの例を示しています。 実行時に新しいターゲットオブジェクトを作成するには、オブジェクト名と、Snowflakeでテーブルを作成するパスを指定します。

Snowflakeターゲットの制限

ターゲットを設定する際は、次のルールが適用されます。

更新操作のオーバーライド

更新オーバーライドを指定して、Secure Agentが更新操作に対して生成する更新クエリをオーバーライドできます。
更新オーバーライドを設定すると、Secure Agentは指定したクエリを使用し、ファイル内のデータをステージングし、Snowflakeのローダーコピーコマンドを使用してデータを一時テーブルにロードします。一時テーブルのデータは次にSnowflakeターゲットテーブルにロードされます。更新クエリに指定する構文はSnowflakeによりサポートされる必要があります。
更新クエリを次の形式で指定します。
UPDATE <Target table name> SET <Target table name>.<Column1> = :TU.<Column1>, <Target table name>.<Column2> = :TU.<Column2>, … <Target table name>.<ColumnN> = :TU.<ColumnN> FROM :TU WHERE <Target table name>.<Update Column1> = :TU.<Update Column1> AND <Target table name>.<Update Column2> = :TU.<Update Column2> AND … <Target table name>.<Update ColumnN> = :TU.<Update ColumnN>
ここで、:TU.はターゲットポートの受信データソースを表します。
Secure Agentはマッピングの実行中に、:TU.を一時テーブル名に置き換え、更新クエリを検証しません。
Snowflakeに書き込むようにマッピングの更新オーバーライドを設定する場合は、次のルールを考慮してください。

.csvファイルサイズの最適化

Snowflakeへの書き込みのマッピングを作成するときに、ローカルステージングの.csvファイルのサイズをバイト単位で指定できます。ローカルステージングファイルサイズのプロパティcsvFileSizeを、詳細Snowflakeターゲットプロパティの[追加の書き込みランタイムパラメータ]フィールドで指定します。デフォルトのファイルサイズは50MBです。
目的のファイルサイズが50MBの場合は、csvFileSizeの値を50*1024*1024などのようにバイト単位で計算し、次にcsvFileSizeを52428800と指定します。最適なパフォーマンスを得るには、CSVファイルの数とサイズが合っている必要があります。

バッチサイズとローカルステージングファイルの数の設定

Snowflakeにデータの書き込みを行うマッピングで最適なパフォーマンスを実現するには、マッピングが処理するデータ量に基づいて、バッチサイズおよびローカルステージングファイルの数にデフォルト値が考慮されるようにします。値を設定する必要がある場合以外は、デフォルト値を使用してマッピングを処理することをお勧めします。
どちらのしきい値に先に到達したかによって、マッピングでのこれらの2つの設定の優先度が決定されます。指定したバッチ行サイズに到達する前にマッピングがローカルステージングファイルの数の基準を満たした場合は、そのしきい値に先に到達するため、ローカルステージングファイルの数が優先されます。
例として、指定したバッチサイズに対するさまざまな設定に発行される挿入コマンドの数と、Snowflakeへの書き込み操作でのローカルステージングファイルの数を示した次の表を参照してください。
ソース行の数
行サイズ(バイト)
デフォルトのファイルサイズ
バッチ行のサイズ
ローカルステージングファイルの数
挿入の数
4
52019206バイト
52428800バイト
4
1
2
4
52019206 バイト
52428800 バイト
4
設定されていません。64というデフォルト値が考慮されます。
1
4
52019206 バイト
52428800 バイト
1
設定されていません。64というデフォルト値が考慮されます。
4
4
52019206 バイト
52428800 バイト
3
1
2
4
52019206 バイト
52428800 バイト
1
1
4
シナリオ1: バッチ行サイズが4で、ローカルステージングファイルの数が1である場合。
バッチ行サイズが4で、ローカルステージングファイルが1つである場合は、データの処理時にSnowflakeで2つの挿入ステートメントが実行されます。
行が2つあり、ファイルサイズがデフォルトの50 MBを超えると、次の受信行に新しいファイルが作成されます。ローカルステージングファイルの数が1に設定されている場合、ステージには最大1つのファイルが保持されます。したがって、2つのステージが作成され、それぞれに2行の1つのファイルが保持されます。各ファイルサイズは104038412バイト、つまり1行あたり52019206バイトです。各ステージごとに挿入コマンドが発行され、2つの挿入コマンドが生成されます。
シナリオ2: バッチ行サイズが4で、ローカルステージングファイルの数の値が設定されていない場合。
ローカルステージングファイルの64というデフォルト値を使用して、マッピングでは、ローカルステージングファイルのしきい値に到達しない場合のバッチサイズが考慮されます。
シナリオ3: バッチ行サイズが1で、ローカルステージングファイルの数の値が設定されていない場合。
バッチ行サイズが1で、特定のローカルステージングファイル値が設定されていない場合、ステージングファイルのしきい値に到達しないため、マッピングは4つの行に対して4つの挿入ステートメントを発行します。
シナリオ4: バッチ行サイズが3で、ローカルステージングファイルの数の値が1である場合。
バッチ行サイズ3と1つのローカルステージングファイルを使用して、マッピングは、第1段階で2行、第2段階で2行を処理した後に、2つの挿入コマンドを発行します。
シナリオ5: バッチの行サイズが1で、ローカルステージングファイルの数の値が1である場合。
バッチ行サイズが1でローカルステージングファイルが1つである場合、行サイズが小さく、ステージングファイルのしきい値に到達しないため、行ごとに4つの挿入コマンドが個別に発行されます。

マッピングでのプロパティのロードの設定

ターゲットトランスフォーメーションのSnowflakeの詳細ターゲットプロパティにある[追加の書き込みランタイムパラメータ]フィールドで、Snowflake Data Cloudにデータをロードするための書き込みプロパティを設定できます。
次の表に、Snowflakeにデータをロードする場合に指定できる追加のランタイムパラメータの一部を一覧で示します。
プロパティ
説明
oneBatch
すべてのデータを1回のバッチで処理します。
タイプはブール値です。
デフォルトはfalseです。
remoteStage
内部ステージを使用するように指定します。外部ステージは使用されません。
タイプは文字列です。
デフォルトは「~」(ユーザーステージ)です。
onError
ファイルからデータを読み込むときにエラーが発生した場合に実行するアクションを指定します。
例: onError option ABORT_STATEMENT|CONTINUE|SKIP_FILE
タイプは文字列です。
デフォルトはCONTINUEです。
compressFileByPut
PUTでファイルを圧縮します。
タイプはブール値です。
デフォルトはfalseです。
compressDataBeforePut
PUTの前にデータを圧縮します。
ローダーは、アップロードする前にデータをgzip形式に圧縮します。
タイプはブール値です。
デフォルトはtrueです。
copyEmptyFieldAsEmpty
空の受信フィールドをNULLに設定するためのCOPYコマンドオプション。
タイプはブール値です。
enablePGZIP
ファイル圧縮の並列処理を有効にします。
タイプはブール値です。
デフォルトはtrueです。
onError=ABORT_STATEMENT&oneBatch=true
すべてのデータを単一バッチにロードし、エラーが発生した場合はタスクを停止します。同時に、ユーザーが指定した拒否ファイルパスを検証し、エラーレコードをこのファイルとセッションログに書き込みます。
タイプはonError - 文字列またはoneBatch - ブール値です。
追加のランタイムパラメータフィールドの値を設定するときに、設定済みの各パーティションは新しいローダーインスタンスを初期化し、設定した値はすべてのパーティションに渡って同様に適用されます。

例1.ファイルを圧縮する

データをSnowflakeにロードする前に、Putコマンドを使用してファイルを圧縮する例を考えます。
次の圧縮オプションを指定します: compressDataBeforePut=false&compressFileByPut=true
両方のオプションにtrueを指定した場合、SnowflakeはcompressDataBeforePutオプションを考慮します。

例2.空の値をNULLに置き換える

データをSnowflakeにロードしている間に、空の値を持つ受信フィールドをNULLに置き換える例を考えます。
copyEmptyFieldAsEmpty Booleanオプションを指定して、要件に応じて値をtrueまたはfalseに設定します。
copyEmptyFieldAsEmpty Booleanパラメータを設定する前に、次のシナリオを考慮します。

例3.単一バッチでデータを書き込む

単一バッチ内のソースデータをSnowflakeに書き込む例を考えます。ロード中にエラーが発生した場合、そのエラーをキャプチャする必要もあります。
要件に応じて、onError=ABORT_STATEMENT&oneBatch=trueプロパティを指定します。
単一バッチでロード中にエラーが発生した場合、Secure Agentは指定された拒否ファイル名があるかどうかをチェックし、COPYコマンドを実行し、拒否ファイルを検証してから、エラー(ある場合)をキャプチャするためのファイル名を渡します。

詳細モードのマッピングを実行するための追加のランタイムパラメータの設定

詳細モードでは、Snowflakeにデータを書き込む追加のプロパティを設定できます。ターゲットトランスフォーメーションの[追加の書き込みランタイムパラメータ]フィールドにパラメータを指定します。
Snowflakeにデータを書き込む場合、次のような追加のランタイムパラメータを設定できます。
autopushdown
オプション。自動クエリSQL ELTが有効かどうかを指定します。
SQL ELTを有効にした状態でクエリがadvanced clusterで実行されると、クラスタアプリケーションはクエリの一部をプッシュしてSnowflakeで処理するため、これらのクエリのパフォーマンスが最適化されます。
コネクタがSparkの互換バージョンを使用する場合、デフォルトはonです。コネクタがSparkの互換バージョンを使用しない場合、デフォルト値はoffです。
continueOnError
オプション。有効ではないデータを入力するとCOPYコマンドが操作を強制終了するかどうかを決定します。例えば、有効ではないバリアントデータ型カラムにJSON形式を指定します。
値はonおよびoffです。値をonに指定した場合、エラーが発生した場合でもCOPYコマンドは続行されます。offを指定した場合、エラーが発生するとCOPYコマンドは強制終了します。デフォルトはoffです。
このオプションはoffにしておくことが推奨されます。そうしないと、データのSnowflakeへのコピー中にエラーが発生した場合、データの一部が失われる可能性があります。
並行処理
Secure AgentがSnowflakeとadvanced clusterの間でデータをアップロードまたはダウンロードするときに使用するスレッドプールのサイズ。デフォルトは4です。
スループットを増加または減少させる必要がない限り、デフォルト値を変更しないでください。高いスループットが必要な場合、並列処理を任意の大きい数に設定しないでください。並列処理を大きい値にすると、望ましくない出力になる可能性があり、操作が遅くなります。
パージ
advanced clusterからSnowflakeへ外部データ転送を経由してデータを転送するときに、作成された一時ファイルをSecure Agentが削除するかどうかを指定します。使用できる値はonおよびoffです。デフォルトはoffです。
このパラメータをoffに設定すると、Secure Agentは一時ファイルを自動的に削除します。パージは、advanced clusterからSnowflakeへのデータ転送に対してのみ機能し、Snowflakeからadvanced clusterへのデータ転送では機能しません。このパラメータをonに設定すると、Secure Agentは一時ファイルを自動的に削除しません。
usestagingtable
オプション。データのロード操作でステージングテーブルを使用するかどうかを決定します。
Snowflakeは、一時的な名前でステージングテーブルを作成します。データのロード操作が正常に実行されると、Snowflakeは元のターゲットテーブルを削除し、ステージングテーブルの名前を元のターゲットテーブル名に変更します。データのロード操作が失敗すると、Snowflakeはステージングテーブルを削除し、操作前にターゲットテーブルに含まれていたデータは保持されます。
Snowflakeではステージングテーブルを使用することを強く推奨します。ステージングテーブルを作成するには、テーブルを作成するためのCOPYコマンドを実行するのに十分な権限が必要です。テーブルを作成するための権限がない場合は、ステージングテーブルを使用せずに直接ロードすることができます。
値はonおよびoffです。usestagingtableパラメータをonに指定した場合、Snowflakeはステージングテーブルを使用します。この値をoffに指定した場合、Snowflakeはデータをターゲットテーブルに直接ロードします。デフォルトはonです。