ストリーミング取り込みとレプリケーション ソースサポートされるストリーミングソースからストリーミング取り込みとレプリケーション がサポートするオンプレミスおよびクラウドターゲットに大量のリアルタイムデータを取り込むことができます。イベントまたはメッセージの形式でデータを取り込むことができます。
ストリーミング取り込みとレプリケーション タスクでは次のデータソースを使用できます。
• Amazon Kinesis Streams• AMQP• Azure Event Hubs Kafka• Business 360 Events • フラットファイル• Google PubSub• JMS• Kafka- Apache Kafka- Confluent Kafka- Amazon Managed Streaming(Amazon MSK)• MQTT• OPC UA• REST V2これらのソースタイプに使用するコネクタを判断するには、「コネクタと接続」 > 「ストリーミング取り込みおよびレプリケーションコネクタ」を参照してください。
Amazon Kinesis Streamsソース Kinesis Streamsソースを使用して、Amazon Kinesis Streamからデータを読み取ります。Kinesis Streamsソース接続を作成するには、Kinesis接続タイプを使用します。
Kinesis Streamsは、Amazon KinesisがAWSエコシステム内で提供するリアルタイムのデータストリーム処理サービスです。Kinesis Streamsは、ストリーミングデータを処理および分析するためのカスタムアプリケーションを構築するために使用できるカスタマイズ可能なオプションです。Kinesis Streamsはデータのインフローデマンドに対応するために自動的にスケーリングできないため、システムのニーズを満たすために十分な容量を手動でプロビジョニングする必要があります。
Kinesisストリームソースを使用する前に、次のタスクを実行します。
1 Amazon Kinesisコンソールで、Amazon Kinesisストリームを作成して設定します。2 Amazon Web Services(AWS)Identity and Access Management(IAM)サービスで、ユーザーを作成します。3 ユーザー作成プロセス中に生成されたアクセスキーとシークレットアクセスキーをダウンロードします。4 Kinesisストリームへの書き込み権限を持つグループにユーザーを関連付けます。ストリーミング取り込みとレプリケーション は、プロファイルベースのクロスアカウント認証はサポートしていません。Amazon Kinesisに使用されるAmazon Web Services資格情報には、Amazon DynamoDBサービスおよびAmazon CloudWatchサービスにアクセスするための権限が必要です。
AMQPソース Advanced Message Queuing Protocol(AMQP)ソースを使用して、AMQPメッセージキューからメッセージを読み取ります。AMQPソース接続を作成するには、AMQP接続タイプを使用します。
AMQPは、キュー、ルーティング、信頼性、およびセキュリティ機能を備えたメッセージ指向の規格です。AMQPは、リアルタイムのメッセージストリームを渡すことでビジネストランザクションを容易にするために使用できる、ワイヤレベルのプラットフォームに依存しないプロトコルです。
AMQPコネクタを使用すると、AMQPブローカからのメッセージの読み取り、メッセージキューの監視、およびブローカメッセージングのサブスクライブパターンの処理を行うことができます。ストリーミング取り込みとレプリケーション タスクは、AMQPブローカとしてRabbitMQを使用します。RabbitMQは、高速かつ拡張可能で堅牢な分散型のメッセージブローカシステムです。RabbitMQは、メッセージの安全な転送のためにAMQP0-9-1メッセージングプロトコルを使用します。
ストリーミング取り込みとレプリケーション タスクでは、AMQPソースを使用して、受信メッセージのストリームをサブスクライブできます。AMQPブローカは、ストリーミング取り込みとレプリケーション ジョブがキューからメッセージを受信するまで、メッセージをメッセージキューに保存します。ストリーミング取り込みとレプリケーション ジョブがメッセージを受信すると、ジョブはメッセージの受信を確認します。応答済みメッセージは、メッセージキューから削除されます。
信頼できるバックグラウンドジョブとして実行する長期間実行タスクがある場合は、AMQPソースを使用できます。また、Webショップでの注文処理など、システムの一部が別の部分に通知する必要があるアプリケーション間の通信にAMQPソースを使用することもできます。
Azure Event Hubs Kafkaソース Azure Event Hubsに接続するようにKafkaソースを設定できます。Azure Event Hubs Kafkaソース接続を作成するには、Kafka接続タイプを使用します。
標準層または専用層のEvent Hubs名前空間を作成すると、名前空間のKafkaエンドポイントがデフォルトで有効になります。次に、Azure Event Hubs対応のKafka接続を、ストリーミング取り込みとレプリケーション タスクの設定中にソース接続として使用できます。トピック名としてEvent Hubs名を入力します。
ストリーミング取り込みとレプリケーション タスクの設定中に入力するAzure Event Hubsソース情報は、通常のKafkaソース設定の情報と同じです。Azure Event Hubs Kafkaソースプロパティの詳細については、
Azure Event Hubs Kafkaソースのプロパティ を参照してください。
AdministratorでKafka接続を作成するときに、次のプロパティを設定します。
• Kafkaブローカリスト: NAMESPACENAME.servicebus.windows.net:9093 • 追加の接続プロパティ: security.protocol=SASL_SSL,sasl.mechanism=PLAIN,sasl.kerberos.service.name=Kafka • SSLモード: 一方向• SSL TrustStoreファイルパス: Secure Agentのインストール済み環境内で使用可能な次のいずれかの場所にある、ファイルシステム上の信頼されたルート証明書へのパス。- <AGENT_HOME>/jdk/jre/lib/security/cacerts - <AGENT_HOME>/jdk/lib/security/cacerts - <AGENT_HOME>/jdk8/jre/lib/security/cacerts • SSLトラストストアパスワード: トラストストアパスワード。• 追加のセキュリティプロパティ: sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXXX"; Azure Event Hubs Kafkaソース接続の作成の詳細については、接続 ヘルプを参照してください。
注: Kafka用のEvent Hubsは、標準層と専用層でのみ使用できます。基本層は、Event HubsでKafkaをサポートしていません。
Business 360 Eventsソース Business 360 Eventsを使用して、Business 360アプリケーションから、サポートされているターゲット(Kafka、Amazon S3、フラットファイルなど)にイベントをパブリッシュできます。Business 360 Eventsソース接続を作成するには、Business 360 Events接続タイプを使用します。
Business 360 Eventsソースを使用する前に、次のタスクを実行します。
• Business 360 Eventsコネクタのライセンスを有効にします。• 組織で送信プロキシサーバーを使用してインターネットに接続している場合は、プロキシサーバーを使用するようにSecure Agentを設定します。• フラットファイルまたはカスタム一時ディレクトリを使用して、Javaヒープスペースの増量、バルクデータの処理、大きなレコードのロギング、またはデータのステージングを行うための前提条件タスクを完了します。• ジョブごとの最大メッセージサイズは1 MBです。ペイロードサイズが1 MBを超えると、ジョブの読み取り、書き込み、またはパブリッシュはできなくなりますが、タスクは実行中の状態のままです。さらに、後続のジョブも、ペイロードサイズに関係なく、読み取り、書き込み、またはパブリッシュされません。 フラットファイルソース 受信リアルタイムデータを読み取るためのソースとしてフラットファイルを使用します。同じディレクトリに保存されているフラットファイルからデータを読み取るようにフラットファイル接続を設定します。
ストリーミング取り込みとレプリケーション タスクはフラットファイルソースの各行を読み取り、設定されたターゲットにデータを取り込みます。フラットファイルがリアルタイムで継続的に更新されると、ストリーミング取り込みとレプリケーション タスクは、ファイル全体を再度読み取るのではなく、新しく追加されたコンテンツのみを読み取ります。
ストリーミング取り込みとレプリケーション は、区切られたフラットファイルからデータを読み取ることができます。区切り文字はキャリッジリターン(\r )、ラインフィード(\n )、または両方の組み合わせである必要があります。
Google PubSubソース Google PubSubソースを使用して、設定済みのGoogle Cloud PubSubサブスクリプションからメッセージを読み取ります。Google PubSubソース接続を作成するには、Google PubSub接続タイプを使用します。
Google PubSubは、イベントを処理するサービスからイベントを生成するサービスを分離する非同期メッセージングサービスです。Google PubSubは、メッセージング指向ミドルウェアとして、またはストリーミング分析パイプラインのイベントの取り込みと配信用に使用できます。Google PubSubは、高可用性と大規模で一貫したパフォーマンスを備えた、耐久性のあるメッセージストレージとリアルタイムのメッセージ配信を提供します。Google PubSubサーバーは、世界中の利用可能なすべてのGoogle Cloudリージョンで実行できます。
Google PubSubコネクタを使用する前に、次の前提条件を満たしていることを確認する必要があります。
• 組織にGoogle PubSubコネクタライセンスがある。• Google PubSubにアクセスするためのGoogleサービスアカウントのJSONキーがある。• Googleサービスアカウントのclient_email、project_id、private_keyの値がある。AdministratorでGoogle PubSub接続を作成するときに、これらの詳細が必要です。ストリーミング取り込みとレプリケーション タスクでは、Google PubSubソースを使用して、Google PubSubトピックからのメッセージをサブスクライブできます。
JMSソース JMSソースを使用して、JMSプロバイダからデータを読み取ります。JMSソース接続を作成するには、JMS接続タイプを使用します。
JMSプロバイダは、メッセージベースのミドルウェアシステムであり、JMSメッセージを送信します。JMSソースは、メッセージトピックに基づいてJMSプロバイダメッセージキューまたはJMSプロバイダからJMSメッセージを読み取ります。ユーザーは、JMSソースをIBM MQ、Oracle Weblogic JMS、Universal Messaging、およびTIBCO JMSで使用することができます。
JMSソースは、次のJMSメッセージタイプを読み取ることができます。
• メッセージ。ヘッダーフィールドおよびプロパティフィールドのみが含まれます。• TextMessage。文字列オブジェクトが含まれます。TextMessagesにはXMLまたはJSONメッセージデータを含めることができます。• BytesMessage。解釈されていないバイトのストリームです。既存のメッセージフォーマットと一致するようにメッセージ本文をエンコードするときにBytesMessageを使用します。BytesMessagesには、通常、プロパティフィールドは含まれません。• MapMessage:名前または値ペアのセットが含まれます。名前は文字列形式です。値はJava基本データタイプです。JMSメッセージ配信先タイプ 次のいずれかの接続JMSメッセージ配信先タイプを選択できます。 - [キュー] 。JMSメッセージプロデューサは、単一のコンシューマにメッセージを配信します。キューからのメッセージをコンシュームするには、コンシューマを登録する必要があります。キューに登録されているコンシューマがない場合、キューは、コンシューマが登録されるまでメッセージを保持します。- [トピック] JMSメッセージプロデューサは、トピックにサブスクライブしているアクティブなすべてのコンシューマにメッセージを配信します。複数のプロデューサがトピックの接続先にメッセージを送信でき、各メッセージを複数のサブスクライバに配信できます。トピックに登録されているコンシューマがない場合、トピックはメッセージを保持しません。サブスクリプションを共有可能、継続、またはその両方にすることができます。共有可能サブスクリプションでは、単一または複数のコンシューマが単一のサブスクリプションにアクセスできます。継続サブスクリプションは、サブスクライバがメッセージをコンシュームするか、メッセージの有効期限が切れるまで、非アクティブなサブスクライバのメッセージを保持します。Kafkaソース Kafkaソースを使用して、Kafkaトピックからのメッセージを読み取ります。Kafkaソース接続を作成するには、Kafka接続タイプを使用します。
KafkaはPublish-Subscribeメッセージシステムです。これは、Kafkaトピックのストリーミングデータを永続化するオープンソースの分散ストリーミングプラットフォームです。任意のトピックを、リアルタイムでデータを必要とする任意の数のシステムで読み取ることができます。Kafkaは、さまざまなダウンストリームコンシューマアプリケーションが使用できるストリーミングデータの中間ステージング領域として使用できます。
Kafkaは、ブローカと呼ばれる1つまたは複数のサーバーで構成されるクラスタとして動作します。Kafkaブローカはメッセージ形式でデータをストリーミングします。このメッセージはトピックにパブリッシュされます。Kafkaソースを作成するときは、Kafkaコンシューマを作成してKafkaトピックからメッセージを読み取ります。
ストリーミング取り込みとレプリケーション タスクでは、Kafkaソースを使用して、受信メッセージのストリームをサブスクライブできます。Kafkaトピックから読み取るようにKafkaソースを設定する場合、トピック名を指定するか、Javaでサポートされている正規表現を使用して、指定したパターンに一致するすべてのトピックをサブスクライブできます。
同じKafka接続を使用して、Apache Kafka用のAmazon Managed Streaming(Amazon MSK)またはConfluent Kafkaソース接続を作成できます。これで、ストリーミング取り込みとレプリケーション タスクでAmazon MSKソースまたはConfluent Kafkaソースを使用して、Apache KafkaまたはConfluent Kafkaトピックからのメッセージを読み取ることができます。
MQTTソース MQTTソースを使用して、MQ Telemetry Transport(MQTT)ブローカからデータを読み取ります。MQTTソースを作成するには、MQTT接続タイプを使用します。
MQTTはPublish-Subscribeメッセージシステムです。これは、シンプルで軽量、かつ永続的なメッセージングプロトコルです。制約のあるデバイスや、低帯域幅、長い待機時間、または信頼性の低いネットワーク向けに設計されています。パブリッシャとサブスクライバはいずれもMQTTクライアントです。MQTTはパブリッシャをサブスクライバから切り離すため、ブローカがクライアント接続を管理します。
MQTTブローカは、すべてのメッセージを受信し、メッセージをフィルタし、各メッセージをサブスクライブしたクライアントを判別してから、サブスクライブしたクライアントにメッセージを送信します。複数のMQTTソースが1つのMQTTブローカに接続する場合、各接続には一意の識別子が必要です。ストリーミング取り込みとレプリケーション ジョブを実行してMQTTソースからデータを取り込む場合、ストリーミング取り込みとレプリケーション は、ターゲットにデータを書き込む前に、まずデータを内部キューに書き込みます。
注: MQTTソースには、一意のクライアントIDが必要です。2つのMQTTソースが同じクライアントIDを持っている場合、MQTTブローカはクライアントとストリーミング取り込みとレプリケーション ジョブが[実行中(警告あり)] 状態になるのを拒否します。
ストリーミング取り込みとレプリケーション はMQTT Quality of Service(QoS)レベル1をサポートします。レベル1は、クライアントがメッセージをブローカに少なくとも1回送信し、メッセージが複数回配信される可能性があることを示します。ブローカがメッセージの受信を確認した後、クライアントは送信キューからメッセージを削除します。QoSレベルは、クライアントからブローカへの通信またはブローカからクライアントへの通信に制限されています。
OPC UAソース OPC UAソースを使用して、OPC UAアプリケーションタグからメッセージを読み取ります。OPC UAソース接続を作成するには、OPCUA接続タイプを使用します。
オープンプラットフォームコミュニケーション(OPC)は、インダストリー4.0およびIIoT(Industrial Internet Of Things)の重要な通信プロトコルの1つです。OPCユニファイドアーキテクチャ(OPC UA)は、産業オートメーションに使用されるマシン間通信プロトコルです。OPC UAは、企業システム、監視デバイス、および実世界のデータと相互作用するセンサー間でデータを移動するための柔軟で適応性のあるメカニズムを提供します。OPC UAを使用して、単純なダウンタイムステータスまたは大量の非常に複雑なプラント全体の情報の通信を確立できます。
OPC UAソースは、OPCサーバーからデータを収集するクライアントです。OPCのデータポイントは、デバイスからのデータを表し、データへのリアルタイムアクセスを提供するタグです。ストリーミング取り込みとレプリケーション タスクでは、OPC UAソースを作成して、指定したタグのリストに基づいて受信データを読み取ることができます。JSON配列形式でタグをメンションする必要があります。
REST V2ソース REST V2ソースを使用して、Webサービスアプリケーションからデータを読み取ります。REST V2ソース接続を作成するには、REST V2接続タイプを使用します。
REST V2ソースコネクタは、REST APIを使用するクラウドアプリケーション用の汎用コネクタです。これはSwagger仕様バージョン2.0をサポートしています。Swagger仕様ファイルには、操作ID、パスパラメータ、クエリパラメータ、ヘッダフィールド、およびペイロードの詳細が含まれます。
Administratorでストリーミング取り込みとレプリケーション タスクのREST V2ソース接続を作成できたら、次のいずれかのREST認証タイプを設定できます。
• 基本• OAuth1.0• OAuth2.0クライアントの資格情報• OAuth2.0認証コード• JWTベアラートークン認証