Saturday, October 13, 2018

Tweet Read Example with PySpark streaming analytics

===================TweetsRead.py===================

import tweepy
from tweepy import Stream
from tweepy.auth import OAuthHandler
from tweepy.streaming import StreamListener
import socket
import json

consumer_key = ''consumer_secret = ''access_token = ''access_secret = ''

class TweetsListener(StreamListener):

    def __init__(self, csocket):
        self.client_socket = csocket

    def on_data(self, data):
        try:
            msg = json.loads(data)
            print(msg['text'].encode('utf-8'))
            self.client_socket.send(msg['text'].encode('utf-8'))
            return True        except BaseException as e:
            print("Error on data: %s" % str(e))
        return True
    def on_error(self, status):
        print(status)
        return True

def sendData(c_socket):
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_secret)

    twitter_stream = Stream(auth, TweetsListener(c_socket))
    twitter_stream.filter(track=['china'])


if __name__ == "__main__":
    s = socket.socket()  # create socket object    host = "127.0.0.1"    port = 5555    s.bind((host, port))
    print("Listening on port: %s" % str(port))

    s.listen(5)
    c, addr = s.accept()
    print("Received request from: " + str(addr))

    sendData(c)


===================TwitterAnalytics.py===================

import findspark

findspark.init('/Users/donghua/spark-2.3.2-bin-hadoop2.7')

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc

# Not required if running in Pyspark integrated notebooksc = SparkContext()

ssc = StreamingContext(sc, 10)
sqlContext = SQLContext(sc)

socket_stream = ssc.socketTextStream("127.0.0.1",5555)


lines = socket_stream.window(20)

from collections import namedtuple
fields = ("tag","count")
Tweet = namedtuple('Tweet', fields)

# use () for multiple lines(lines.flatMap(lambda text: text.split(" "))
.filter(lambda word: word.startswith("#"))
.map(lambda word: (word.lower(), 1))
.reduceByKey(lambda a, b : a + b)
.map(lambda rec: Tweet(rec[0],rec[1]))
.foreachRDD(lambda rdd: rdd.toDF().sort(desc("count"))
.limit(10).registerTempTable("tweets")))


import time
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
import pandas
# get_ipython().run_line_magic('matplotlib', 'inline')

ssc.start()

count = 0while count < 10:
    time.sleep(10)
    top_10_tweets = sqlContext.sql("select tag,count from tweets")
    top_10_df = top_10_tweets.toPandas()
    display.clear_output(wait = True)
    plt.figure(figsize= (10, 8))
    sns.barplot(x="count", y="tag",data=top_10_df)
    plt.show()
    count = count+1
ssc.stop()

Wednesday, October 10, 2018

Use Jupyter Notebook with Spark2 on Apache Spark on MacOS

Reference url: http://www.dbaglobe.com/2018/07/use-jupyter-notebook-with-spark2-on.html

Below are changes specific for apache spark, anaconda3 on MacOS

mkdir /Users/donghua/anaconda3/share/jupyter/kernels/pyspark2/

[root@cdh-vm bin]# cat  /Users/donghua/anaconda3/share/jupyter/kernels/pyspark2/kernel.json
    {
      "argv": [
        "python3.6",
        "-m",
        "ipykernel_launcher",
        "-f",
        "{connection_file}"
      ],
      "display_name": "Python3.6 + Pyspark(Spark 2.3.2)",
      "language": "python",
      "env": {
        "PYSPARK_PYTHON": "python",
        "SPARK_HOME": "/Users/donghua/spark-2.3.2-bin-hadoop2.7",
        "SPARK_CONF_DIR": "/Users/donghua/spark-2.3.2-bin-hadoop2.7/conf",
        "PYTHONPATH": "/Users/donghua/spark-2.3.2-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip:/Users/donghua/spark-2.3.2-bin-hadoop2.7/python/:",
        "PYTHONSTARTUP": "/Users/donghua/spark-2.3.2-bin-hadoop2.7/python/pyspark/shell.py",
        "PYSPARK_SUBMIT_ARGS": "--master spark://Donghuas-MacBook-Air.local:7077 --name PySparkShell pyspark-shell"
      }
   }

/Users/donghua/anaconda3//bin/jupyter-notebook --ip=Donghuas-MacBook-Air.local --port 9999

#export PYSPARK_DRIVER_PYTHON=ipython
#export SPARK_HOME=/Users/donghua/spark-2.3.2-bin-hadoop2.7
#export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH





Tuesday, October 9, 2018

Modify default slaves.sh to start apache-spark on MacOS


By default, Apache Spark sbin/start-all.sh will try to start worker using "ssh" to slave node, regardless we were testing using our laptop. Below modification is to start it using Bash instead of SSH.

Donghuas-MacBook-Air:sbin donghua$ diff slaves.sh slaves.sh.old
92,94c92,93
<       cmd="${@// /\\ } 2>&1"
<       echo $cmd
<       bash -c "$cmd"
---
>     ssh $SPARK_SSH_OPTS "$slave" $"${@// /\\ }" \
>       2>&1 | sed "s/^/$slave: /"
96,98c95,96
<       cmd="${@// /\\ } 2>&1"
<       echo $cmd
<       bash -c "$cmd"
---
>     ssh $SPARK_SSH_OPTS "$slave" $"${@// /\\ }" \
>       2>&1 | sed "s/^/$slave: /" &


Revised code could be found here:

https://github.com/luodonghua/bigdata/blob/master/slaves.sh

And modified version of spark-conf.sh to start multiple workers could be found here:

https://github.com/luodonghua/bigdata/blob/master/spark-config.sh