ストリーミング取り込みおよびレプリケーション > ストリーミング取り込みとレプリケーション > ストリーミング取り込みとレプリケーションソース
  

ストリーミング取り込みとレプリケーションソース

サポートされるストリーミングソースからストリーミング取り込みとレプリケーションがサポートするオンプレミスおよびクラウドターゲットに大量のリアルタイムデータを取り込むことができます。イベントまたはメッセージの形式でデータを取り込むことができます。
ストリーミング取り込みとレプリケーションタスクでは次のデータソースを使用できます。
これらのソースタイプに使用するコネクタを判断するには、「コネクタと接続」 > 「ストリーミング取り込みおよびレプリケーションコネクタ」を参照してください。

Amazon Kinesis Streamsソース

Kinesis Streamsソースを使用して、Amazon Kinesis Streamからデータを読み取ります。Kinesis Streamsソース接続を作成するには、Kinesis接続タイプを使用します。
Kinesis Streamsは、Amazon KinesisがAWSエコシステム内で提供するリアルタイムのデータストリーム処理サービスです。Kinesis Streamsは、ストリーミングデータを処理および分析するためのカスタムアプリケーションを構築するために使用できるカスタマイズ可能なオプションです。Kinesis Streamsはデータのインフローデマンドに対応するために自動的にスケーリングできないため、システムのニーズを満たすために十分な容量を手動でプロビジョニングする必要があります。
Kinesisストリームソースを使用する前に、次のタスクを実行します。
  1. 1Amazon Kinesisコンソールで、Amazon Kinesisストリームを作成して設定します。
  2. 2Amazon Web Services(AWS)Identity and Access Management(IAM)サービスで、ユーザーを作成します。
  3. 3ユーザー作成プロセス中に生成されたアクセスキーとシークレットアクセスキーをダウンロードします。
  4. 4Kinesisストリームへの書き込み権限を持つグループにユーザーを関連付けます。
ストリーミング取り込みとレプリケーションは、プロファイルベースのクロスアカウント認証はサポートしていません。Amazon Kinesisに使用されるAmazon Web Services資格情報には、Amazon DynamoDBサービスおよびAmazon CloudWatchサービスにアクセスするための権限が必要です。

AMQPソース

Advanced Message Queuing Protocol(AMQP)ソースを使用して、AMQPメッセージキューからメッセージを読み取ります。AMQPソース接続を作成するには、AMQP接続タイプを使用します。
AMQPは、キュー、ルーティング、信頼性、およびセキュリティ機能を備えたメッセージ指向の規格です。AMQPは、リアルタイムのメッセージストリームを渡すことでビジネストランザクションを容易にするために使用できる、ワイヤレベルのプラットフォームに依存しないプロトコルです。
AMQPコネクタを使用すると、AMQPブローカからのメッセージの読み取り、メッセージキューの監視、およびブローカメッセージングのサブスクライブパターンの処理を行うことができます。ストリーミング取り込みとレプリケーションタスクは、AMQPブローカとしてRabbitMQを使用します。RabbitMQは、高速かつ拡張可能で堅牢な分散型のメッセージブローカシステムです。RabbitMQは、メッセージの安全な転送のためにAMQP0-9-1メッセージングプロトコルを使用します。
ストリーミング取り込みとレプリケーションタスクでは、AMQPソースを使用して、受信メッセージのストリームをサブスクライブできます。AMQPブローカは、ストリーミング取り込みとレプリケーションジョブがキューからメッセージを受信するまで、メッセージをメッセージキューに保存します。ストリーミング取り込みとレプリケーションジョブがメッセージを受信すると、ジョブはメッセージの受信を確認します。応答済みメッセージは、メッセージキューから削除されます。
信頼できるバックグラウンドジョブとして実行する長期間実行タスクがある場合は、AMQPソースを使用できます。また、Webショップでの注文処理など、システムの一部が別の部分に通知する必要があるアプリケーション間の通信にAMQPソースを使用することもできます。

Azure Event Hubs Kafkaソース

Azure Event Hubsに接続するようにKafkaソースを設定できます。Azure Event Hubs Kafkaソース接続を作成するには、Kafka接続タイプを使用します。
標準層または専用層のEvent Hubs名前空間を作成すると、名前空間のKafkaエンドポイントがデフォルトで有効になります。次に、Azure Event Hubs対応のKafka接続を、ストリーミング取り込みとレプリケーションタスクの設定中にソース接続として使用できます。トピック名としてEvent Hubs名を入力します。
ストリーミング取り込みとレプリケーションタスクの設定中に入力するAzure Event Hubsソース情報は、通常のKafkaソース設定の情報と同じです。Azure Event Hubs Kafkaソースプロパティの詳細については、 Azure Event Hubs Kafkaソースのプロパティを参照してください。
AdministratorでKafka接続を作成するときに、次のプロパティを設定します。
Azure Event Hubs Kafkaソース接続の作成の詳細については、接続ヘルプを参照してください。
注: Kafka用のEvent Hubsは、標準層と専用層でのみ使用できます。基本層は、Event HubsでKafkaをサポートしていません。

Business 360 Eventsソース

Business 360 Eventsを使用して、Business 360アプリケーションから、サポートされているターゲット(Kafka、Amazon S3、フラットファイルなど)にイベントをパブリッシュできます。Business 360 Eventsソース接続を作成するには、Business 360 Events接続タイプを使用します。
Business 360 Eventsソースを使用する前に、次のタスクを実行します。

フラットファイルソース

受信リアルタイムデータを読み取るためのソースとしてフラットファイルを使用します。同じディレクトリに保存されているフラットファイルからデータを読み取るようにフラットファイル接続を設定します。
ストリーミング取り込みとレプリケーションタスクはフラットファイルソースの各行を読み取り、設定されたターゲットにデータを取り込みます。フラットファイルがリアルタイムで継続的に更新されると、ストリーミング取り込みとレプリケーションタスクは、ファイル全体を再度読み取るのではなく、新しく追加されたコンテンツのみを読み取ります。
ストリーミング取り込みとレプリケーションは、区切られたフラットファイルからデータを読み取ることができます。区切り文字はキャリッジリターン(\r)、ラインフィード(\n )、または両方の組み合わせである必要があります。

Google PubSubソース

Google PubSubソースを使用して、設定済みのGoogle Cloud PubSubサブスクリプションからメッセージを読み取ります。Google PubSubソース接続を作成するには、Google PubSub接続タイプを使用します。
Google PubSubは、イベントを処理するサービスからイベントを生成するサービスを分離する非同期メッセージングサービスです。Google PubSubは、メッセージング指向ミドルウェアとして、またはストリーミング分析パイプラインのイベントの取り込みと配信用に使用できます。Google PubSubは、高可用性と大規模で一貫したパフォーマンスを備えた、耐久性のあるメッセージストレージとリアルタイムのメッセージ配信を提供します。Google PubSubサーバーは、世界中の利用可能なすべてのGoogle Cloudリージョンで実行できます。
Google PubSubコネクタを使用する前に、次の前提条件を満たしていることを確認する必要があります。
ストリーミング取り込みとレプリケーションタスクでは、Google PubSubソースを使用して、Google PubSubトピックからのメッセージをサブスクライブできます。

JMSソース

JMSソースを使用して、JMSプロバイダからデータを読み取ります。JMSソース接続を作成するには、JMS接続タイプを使用します。
JMSプロバイダは、メッセージベースのミドルウェアシステムであり、JMSメッセージを送信します。JMSソースは、メッセージトピックに基づいてJMSプロバイダメッセージキューまたはJMSプロバイダからJMSメッセージを読み取ります。ユーザーは、JMSソースをIBM MQ、Oracle Weblogic JMS、Universal Messaging、およびTIBCO JMSで使用することができます。
JMSソースは、次のJMSメッセージタイプを読み取ることができます。
JMSメッセージ配信先タイプ
次のいずれかの接続JMSメッセージ配信先タイプを選択できます。

Kafkaソース

Kafkaソースを使用して、Kafkaトピックからのメッセージを読み取ります。Kafkaソース接続を作成するには、Kafka接続タイプを使用します。
KafkaはPublish-Subscribeメッセージシステムです。これは、Kafkaトピックのストリーミングデータを永続化するオープンソースの分散ストリーミングプラットフォームです。任意のトピックを、リアルタイムでデータを必要とする任意の数のシステムで読み取ることができます。Kafkaは、さまざまなダウンストリームコンシューマアプリケーションが使用できるストリーミングデータの中間ステージング領域として使用できます。
Kafkaは、ブローカと呼ばれる1つまたは複数のサーバーで構成されるクラスタとして動作します。Kafkaブローカはメッセージ形式でデータをストリーミングします。このメッセージはトピックにパブリッシュされます。Kafkaソースを作成するときは、Kafkaコンシューマを作成してKafkaトピックからメッセージを読み取ります。
ストリーミング取り込みとレプリケーションタスクでは、Kafkaソースを使用して、受信メッセージのストリームをサブスクライブできます。Kafkaトピックから読み取るようにKafkaソースを設定する場合、トピック名を指定するか、Javaでサポートされている正規表現を使用して、指定したパターンに一致するすべてのトピックをサブスクライブできます。
同じKafka接続を使用して、Apache Kafka用のAmazon Managed Streaming(Amazon MSK)またはConfluent Kafkaソース接続を作成できます。これで、ストリーミング取り込みとレプリケーションタスクでAmazon MSKソースまたはConfluent Kafkaソースを使用して、Apache KafkaまたはConfluent Kafkaトピックからのメッセージを読み取ることができます。

MQTTソース

MQTTソースを使用して、MQ Telemetry Transport(MQTT)ブローカからデータを読み取ります。MQTTソースを作成するには、MQTT接続タイプを使用します。
MQTTはPublish-Subscribeメッセージシステムです。これは、シンプルで軽量、かつ永続的なメッセージングプロトコルです。制約のあるデバイスや、低帯域幅、長い待機時間、または信頼性の低いネットワーク向けに設計されています。パブリッシャとサブスクライバはいずれもMQTTクライアントです。MQTTはパブリッシャをサブスクライバから切り離すため、ブローカがクライアント接続を管理します。
MQTTブローカは、すべてのメッセージを受信し、メッセージをフィルタし、各メッセージをサブスクライブしたクライアントを判別してから、サブスクライブしたクライアントにメッセージを送信します。複数のMQTTソースが1つのMQTTブローカに接続する場合、各接続には一意の識別子が必要です。ストリーミング取り込みとレプリケーションジョブを実行してMQTTソースからデータを取り込む場合、ストリーミング取り込みとレプリケーションは、ターゲットにデータを書き込む前に、まずデータを内部キューに書き込みます。
注: MQTTソースには、一意のクライアントIDが必要です。2つのMQTTソースが同じクライアントIDを持っている場合、MQTTブローカはクライアントとストリーミング取り込みとレプリケーションジョブが[実行中(警告あり)]状態になるのを拒否します。
ストリーミング取り込みとレプリケーションはMQTT Quality of Service(QoS)レベル1をサポートします。レベル1は、クライアントがメッセージをブローカに少なくとも1回送信し、メッセージが複数回配信される可能性があることを示します。ブローカがメッセージの受信を確認した後、クライアントは送信キューからメッセージを削除します。QoSレベルは、クライアントからブローカへの通信またはブローカからクライアントへの通信に制限されています。

OPC UAソース

OPC UAソースを使用して、OPC UAアプリケーションタグからメッセージを読み取ります。OPC UAソース接続を作成するには、OPCUA接続タイプを使用します。
オープンプラットフォームコミュニケーション(OPC)は、インダストリー4.0およびIIoT(Industrial Internet Of Things)の重要な通信プロトコルの1つです。OPCユニファイドアーキテクチャ(OPC UA)は、産業オートメーションに使用されるマシン間通信プロトコルです。OPC UAは、企業システム、監視デバイス、および実世界のデータと相互作用するセンサー間でデータを移動するための柔軟で適応性のあるメカニズムを提供します。OPC UAを使用して、単純なダウンタイムステータスまたは大量の非常に複雑なプラント全体の情報の通信を確立できます。
OPC UAソースは、OPCサーバーからデータを収集するクライアントです。OPCのデータポイントは、デバイスからのデータを表し、データへのリアルタイムアクセスを提供するタグです。ストリーミング取り込みとレプリケーションタスクでは、OPC UAソースを作成して、指定したタグのリストに基づいて受信データを読み取ることができます。JSON配列形式でタグをメンションする必要があります。

REST V2ソース

REST V2ソースを使用して、Webサービスアプリケーションからデータを読み取ります。REST V2ソース接続を作成するには、REST V2接続タイプを使用します。
REST V2ソースコネクタは、REST APIを使用するクラウドアプリケーション用の汎用コネクタです。これはSwagger仕様バージョン2.0をサポートしています。Swagger仕様ファイルには、操作ID、パスパラメータ、クエリパラメータ、ヘッダフィールド、およびペイロードの詳細が含まれます。
Administratorでストリーミング取り込みとレプリケーションタスクのREST V2ソース接続を作成できたら、次のいずれかのREST認証タイプを設定できます。