AWS EMRによる、streaming dataの分散処理実装

以下のYouTubeで、AWS EMRによるspark streaming の実装を学ぶ


■EMR nobebook利用時の注意点

emr-5.30.0以降のを使うと、kernel起動エラーが起こるので、EMRのnotebook使うときは、emr-5.29.0を選択する。

詳しくはリンク参照


■仕様

EMR上に置いたcsvファイルから1レコードづつkafkaに流し(ストリーミングデータの代わり)、kafkaからpysparkでデータ取得して可視化用に前処理した結果をnodejsで可視化する


■EMR起動までの各設定

  1. security group作成
     EC2 -> セキュリティグループ
      ※ インバウンドルールは、すべてのipアドレスOK(anywhere)に設定すると、EMRでセキュリティエラーでクラスターが作成されないので、すべてのipをOKにしたいときは、SSHのみOKの設定にすること!!!(エラーについてはリンク1, リンク2参照)

  2. key pairsの作成
        windowsからコンソールにログインする予定なので、キーファイルは.ppkで作成
    (プライベートkeyは作成時に自動ダウンロードされる)

  3. puttyダウンロード(リンク)
    windowsからAWSコンソールへログイン時に使うSSHツール

  4. EMRクラスター作成(以下は、詳細設定で設定)
     ・リリースは、spark3.1.1を使っているので、『emr-6.3.0』を選択し、ソフト一覧でsparkも選択入れる
     ・選択するソフトウエアは、今回はHadoopとsparkのみ
      (zookeeper使うソフト選択すると、zookeeperがインストールされるので、自分でkafkaインストールするときに、kafkaのzookeeperと被るので注意
     ・master node:1、slave node:2でクラスター数は3でとりあえず設定
     ・EC2 キーペアは、2で作成したkeyを選択
     ・EC2セキュリティグループの、マスタ・コアそれぞれに1で作成したsセキュリティグループを選択
     ・ハードウェアは、m4.largeをとりあえず選択
     ※クラスターの作成がうまくいくと『待機中』になる。

  5. AWS EMR コンソールへの接続TEST
     (1) 作成したクラスターの「マスターパブリック DNS」をコピー
     (2) 「Connect to the Master Node Using SSH」のリンクをクリック
     (3) 接続するローカルpcで、PuTTYを立ち上げ、Host Nameにコピーした「マスターパブリック DNS」を貼り付け
     (4) PuTTYのCategoryからSSH -> Authを選ぶ、Browseでダウンロードしたppkファイルを選択
     (5) コンソール立ち上がったら、hadoopと入力してログイン
      ログイン出来たら、EMRと大きく表示される。

    ※EMRのクラスタ作成時に、Zookeeperを使用するソフト選択すると、zookeeperもインストールされるので、kafkaをEMR上でインストールするなら、注意すること。
    EMRでのLISTENポートは、「sudo lsof -i -n -P | grep zookeeper」で確認

  6. コンソールからkafkaをEMRにインストール
    今回は、ストリーミングデータをkafkaで取得して、pysparkで読み込んで前処理して、kafkaに戻すシステムなので、まずはkafkaをEMRにインストールする。
     kafkaのサイトリンク
     (1) kafkaのダウンロード(※必ずBinaryをダウンロードすること!!!)
      wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
     (2) kafkaの解凍
      tar -xzf kafka_2.12-2.8.0.tgz
     (3) フォルダ名の変更
      mv kafka_2.12-2.8.0 kafka
     
  7. kafkaの起動
       kafkaの起動は、メタデータ管理のzookeeperを起動してから、kafka Brokerを起動する。
     (1) zookeeperの起動
      kafkaのファルダに入ってから起動
      cd kafka
      bin/zookeeper-server-start.sh config/zookeeper.properties


     (2) Kafka Brokerの起動 
      最初のコンソールはzookeeperが起動中なので、別コンソールを開いて起動する。
      kafkaフォルダ内で、
      bin/kafka-server-start.sh config/server.properties
     (3) テスト用のtopic生成
      別コンソールを開いて、kafkaが正しく起動しているか確認するため、topicを作成してみる
       bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
      ※ kafkaのversionが3.0.0以上は、 --zookeeperオプションが無くなったので、3.0.0以降をダウンロードしたら、topic作成方法を確認すること!
           以下コマンドで、作成したtestトピックの設定状況確認
      bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

  以上までが、EMRの起動とコンソールへのアクセス


■sparkによるkafka操作用設定

repositoryから、Spark Integration For Kafkaをダウンロード

wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.1.2/spark-streaming-kafka-0-10_2.12-3.1.2.jar


■kafkaでのストリーミングデータの送受信

送信はコンソールからデータ流す(ストリーミングデータの変わり)

  1. トピックの作成
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic orders_topic

    ストリーミングデータ送信用トピック:order_topic

  2. ストリーミングデータの送信
    実際のストリーミングデータの代わりに、s3ファイルのデータを取得して、コンソールから kafka-console-producer.sh でデータをkafkaに送る

    push_orders_data_in_topic.sh


■sparkによる前処理および、kafkaへの再送信

kafkaから受け取ったデータをpysparkで可視化用に前処理した後、その結果を受け取り用kafkaトピックに送る

  1. 別のEMRコンソール開いて、前処理後用のtopic作成
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic orders_ten_sec_data

    前処理後データ送信用トピック:orders_ten_sec_data

  2. pykfkaのインストール
     pysparkからトピック作るライブラリ
    pip install pykafka


■sparkによる前処理とnode.jsによるストリーミングデータの可視化

  1. 別のコンソール開く
  2. node.jsのインストール
    ※8系はすでにサポート対象外だが、youtubeの仕様をそのままつかう
    curl --silent --location https://rpm.nodesource.com/setup_8.x | sudo bash -
    sudo yum -y install nodejs
    npm install express
    npm install socket.io
    npm install kafka-node

  3. csvファイルを読み込んでコンソールからストリーミングデータの代わりとしてkafkaに流す(push_orders_data_in_topic.shスクリプトでストリーミングデータの代わりを送信)
    /bin/bash push_orders_data_in_topic.sh ../data/ordersdata ip--172-xxxx:9092 orders_data

  4. spark用コンソール開いて、spark-submitで、kafkaからデータ取得して前処理して返すプログラム(spark_streaming_order_status.py)を実行
    spark-submit --jars spark-streaming-kafka-0-10-assembly_2.12-3.0.1.jar spark_streaming_order_status.py localhost:2181 orders_data

  5. ローカルPCから、emrのurlで見れないか試したら、ssh接続じゃないとダメなの気づいた・・・

上記YouTubeのプログラムファイル場所:リンク





機械学習Tips保管庫

データ解析、機械学習のための学習内容の保管庫。復習用。

0コメント

  • 1000 / 1000