===================TweetsRead.py===================
import tweepy from tweepy import Stream from tweepy.auth import OAuthHandler from tweepy.streaming import StreamListener import socket import json consumer_key = ''consumer_secret = ''access_token = ''access_secret = '' class TweetsListener(StreamListener): def __init__(self, csocket): self.client_socket = csocket def on_data(self, data): try: msg = json.loads(data) print(msg['text'].encode('utf-8')) self.client_socket.send(msg['text'].encode('utf-8')) return True except BaseException as e: print("Error on data: %s" % str(e)) return True def on_error(self, status): print(status) return True def sendData(c_socket): auth = OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_secret) twitter_stream = Stream(auth, TweetsListener(c_socket)) twitter_stream.filter(track=['china']) if __name__ == "__main__": s = socket.socket() # create socket object host = "127.0.0.1" port = 5555 s.bind((host, port)) print("Listening on port: %s" % str(port)) s.listen(5) c, addr = s.accept() print("Received request from: " + str(addr)) sendData(c)
===================TwitterAnalytics.py===================
import findspark findspark.init('/Users/donghua/spark-2.3.2-bin-hadoop2.7') from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext from pyspark.sql.functions import desc # Not required if running in Pyspark integrated notebooksc = SparkContext() ssc = StreamingContext(sc, 10) sqlContext = SQLContext(sc) socket_stream = ssc.socketTextStream("127.0.0.1",5555) lines = socket_stream.window(20) from collections import namedtuple fields = ("tag","count") Tweet = namedtuple('Tweet', fields) # use () for multiple lines(lines.flatMap(lambda text: text.split(" ")) .filter(lambda word: word.startswith("#")) .map(lambda word: (word.lower(), 1)) .reduceByKey(lambda a, b : a + b) .map(lambda rec: Tweet(rec[0],rec[1])) .foreachRDD(lambda rdd: rdd.toDF().sort(desc("count")) .limit(10).registerTempTable("tweets"))) import time from IPython import display import matplotlib.pyplot as plt import seaborn as sns import pandas # get_ipython().run_line_magic('matplotlib', 'inline') ssc.start() count = 0while count < 10: time.sleep(10) top_10_tweets = sqlContext.sql("select tag,count from tweets") top_10_df = top_10_tweets.toPandas() display.clear_output(wait = True) plt.figure(figsize= (10, 8)) sns.barplot(x="count", y="tag",data=top_10_df) plt.show() count = count+1 ssc.stop()