以下のYouTubeで、AWS EMRによるspark streaming の実装を学ぶ
- Real Time Analytics on Spark Tutorial [Part 10] | Spark Amazon EMR Elastic MapReduce -Great Learning
- Real Time Analytics on Spark Tutorial [Part 11] Spark Demo on EMR Elastic MapReduce | Great Learning
■EMR nobebook利用時の注意点
emr-5.30.0以降のを使うと、kernel起動エラーが起こるので、EMRのnotebook使うときは、emr-5.29.0を選択する。
詳しくはリンク参照
■仕様
EMR上に置いたcsvファイルから1レコードづつkafkaに流し(ストリーミングデータの代わり)、kafkaからpysparkでデータ取得して可視化用に前処理した結果をnodejsで可視化する
■EMR起動までの各設定
- security group作成
EC2 -> セキュリティグループ
※ インバウンドルールは、すべてのipアドレスOK(anywhere)に設定すると、EMRでセキュリティエラーでクラスターが作成されないので、すべてのipをOKにしたいときは、SSHのみOKの設定にすること!!!(エラーについてはリンク1, リンク2参照) - key pairsの作成
windowsからコンソールにログインする予定なので、キーファイルは.ppkで作成
(プライベートkeyは作成時に自動ダウンロードされる) - puttyダウンロード(リンク)
windowsからAWSコンソールへログイン時に使うSSHツール - EMRクラスター作成(以下は、詳細設定で設定)
・リリースは、spark3.1.1を使っているので、『emr-6.3.0』を選択し、ソフト一覧でsparkも選択入れる
・選択するソフトウエアは、今回はHadoopとsparkのみ
(zookeeper使うソフト選択すると、zookeeperがインストールされるので、自分でkafkaインストールするときに、kafkaのzookeeperと被るので注意
・master node:1、slave node:2でクラスター数は3でとりあえず設定
・EC2 キーペアは、2で作成したkeyを選択
・EC2セキュリティグループの、マスタ・コアそれぞれに1で作成したsセキュリティグループを選択
・ハードウェアは、m4.largeをとりあえず選択
※クラスターの作成がうまくいくと『待機中』になる。 - AWS EMR コンソールへの接続TEST
(1) 作成したクラスターの「マスターパブリック DNS」をコピー
(2) 「Connect to the Master Node Using SSH」のリンクをクリック
(3) 接続するローカルpcで、PuTTYを立ち上げ、Host Nameにコピーした「マスターパブリック DNS」を貼り付け
(4) PuTTYのCategoryからSSH -> Authを選ぶ、Browseでダウンロードしたppkファイルを選択
(5) コンソール立ち上がったら、hadoopと入力してログイン
ログイン出来たら、EMRと大きく表示される。
※EMRのクラスタ作成時に、Zookeeperを使用するソフト選択すると、zookeeperもインストールされるので、kafkaをEMR上でインストールするなら、注意すること。
EMRでのLISTENポートは、「sudo lsof -i -n -P | grep zookeeper」で確認 - コンソールからkafkaをEMRにインストール
今回は、ストリーミングデータをkafkaで取得して、pysparkで読み込んで前処理して、kafkaに戻すシステムなので、まずはkafkaをEMRにインストールする。
kafkaのサイトリンク
(1) kafkaのダウンロード(※必ずBinaryをダウンロードすること!!!)
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
(2) kafkaの解凍
tar -xzf kafka_2.12-2.8.0.tgz
(3) フォルダ名の変更
mv kafka_2.12-2.8.0 kafka
- kafkaの起動
kafkaの起動は、メタデータ管理のzookeeperを起動してから、kafka Brokerを起動する。
(1) zookeeperの起動
kafkaのファルダに入ってから起動
cd kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
(2) Kafka Brokerの起動
最初のコンソールはzookeeperが起動中なので、別コンソールを開いて起動する。
kafkaフォルダ内で、
bin/kafka-server-start.sh config/server.properties
(3) テスト用のtopic生成
別コンソールを開いて、kafkaが正しく起動しているか確認するため、topicを作成してみる
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
※ kafkaのversionが3.0.0以上は、 --zookeeperオプションが無くなったので、3.0.0以降をダウンロードしたら、topic作成方法を確認すること!
以下コマンドで、作成したtestトピックの設定状況確認
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
以上までが、EMRの起動とコンソールへのアクセス
■sparkによるkafka操作用設定
repositoryから、Spark Integration For Kafkaをダウンロード
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.1.2/spark-streaming-kafka-0-10_2.12-3.1.2.jar
■kafkaでのストリーミングデータの送受信
送信はコンソールからデータ流す(ストリーミングデータの変わり)
- トピックの作成
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic orders_topic
ストリーミングデータ送信用トピック:order_topic - ストリーミングデータの送信
実際のストリーミングデータの代わりに、s3ファイルのデータを取得して、コンソールから kafka-console-producer.sh でデータをkafkaに送る
push_orders_data_in_topic.sh
■sparkによる前処理および、kafkaへの再送信
kafkaから受け取ったデータをpysparkで可視化用に前処理した後、その結果を受け取り用kafkaトピックに送る
- 別のEMRコンソール開いて、前処理後用のtopic作成
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic orders_ten_sec_data
前処理後データ送信用トピック:orders_ten_sec_data - pykfkaのインストール
pysparkからトピック作るライブラリ
pip install pykafka
■sparkによる前処理とnode.jsによるストリーミングデータの可視化
- 別のコンソール開く
- node.jsのインストール
※8系はすでにサポート対象外だが、youtubeの仕様をそのままつかう
curl --silent --location https://rpm.nodesource.com/setup_8.x | sudo bash -
sudo yum -y install nodejs
npm install express
npm install socket.io
npm install kafka-node - csvファイルを読み込んでコンソールからストリーミングデータの代わりとしてkafkaに流す(push_orders_data_in_topic.shスクリプトでストリーミングデータの代わりを送信)
/bin/bash push_orders_data_in_topic.sh ../data/ordersdata ip--172-xxxx:9092 orders_data - spark用コンソール開いて、spark-submitで、kafkaからデータ取得して前処理して返すプログラム(spark_streaming_order_status.py)を実行
spark-submit --jars spark-streaming-kafka-0-10-assembly_2.12-3.0.1.jar spark_streaming_order_status.py localhost:2181 orders_data - ローカルPCから、emrのurlで見れないか試したら、ssh接続じゃないとダメなの気づいた・・・
上記YouTubeのプログラムファイル場所:リンク
0コメント