Return sum(new_values) + (total_sum or 0) def aggregate_tags_count(new_values, total_sum): In our case, we’ve created an update function called aggregate_tags_count that will sum all the new_values for each hashtag and add them to the total_sum that is the sum across all the batches and save the data into tags_totals RDD. It runs on each item in RDD and does the desired logic. The updateStateByKey takes a function as a parameter called the update function. # do processing for each RDD generated in each interval Tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # adding the count of each hashtag to its last count Hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) Words = dataStream.flatMap(lambda line: line.split(" ")) Note that in order to use updateStateByKey, you’ve got to configure a checkpoint, and that what we have done in the previous step. This way is called Stateful Transformation. In our case, we need to calculate the counts across all the batches, so we’ll use another function called updateStateByKey, as this function allows you to maintain the state of RDD while updating it with new data. This function will calculate how many times the hashtag has been mentioned per each batch, i.e. We can do that by using the function reduceByKey. Then we need to calculate how many times the hashtag has been mentioned. Then we’ll filter only hashtags from all words and map them to pair of (hashtag, 1) and put them in hashtags RDD. First we’ll split all the tweets into words and put them in words RDD. Now, we’ll define our transformation logic. # setting a checkpoint to allow RDD recoveryĭataStream = ssc.socketTextStream("localhost",9009) # create the Streaming Context from the above spark context with interval size 2 seconds # create spark context with the above configuration from pyspark import SparkConf,SparkContextįrom pyspark.streaming import StreamingContext Each record in the DStream will be a tweet. Then we define our main DStream dataStream that will connect to the socket server we created before on port 9009 and read the tweets from that port. We defined a checkpoint here in order to allow periodic RDD checkpointing this is mandatory to be used in our app, as we’ll use stateful transformations (will be discussed later in the same section). Notice we have set the log level to ERROR in order to disable most of the logs that Spark writes. Let’s build up our Spark streaming app that will do real-time processing for the incoming tweets, extract the hashtags from them, and calculate how many hashtags have been mentioned.įirst, we have to create an instance of Spark Context sc, then we created the Streaming Context ssc from sc with a batch interval two seconds that will do the transformation on all streams received every two seconds. Setting Up Our Apache Spark Streaming Application S = socket.socket(socket.AF_INET, socket.SOCK_STREAM) Then we’ll call the get_tweets method, which we made above, for getting the tweets from Twitter and pass its response along with the socket connection to send_tweets_to_spark for sending the tweets to Spark. We’ll configure the IP here to be localhost as all will run on the same machine and the port 9009. Now, we’ll make the main part which will make the app host socket connections that spark will connect with. def send_tweets_to_spark(http_resp, tcp_connection): After that, it sends every tweet to Spark Streaming instance (will be discussed later) through a TCP connection. Then, create a function that takes the response from the above one and extracts the tweets’ text from the whole tweets’ JSON object. Response = requests.get(query_url, auth=my_auth, stream=True) Now, we will create a new function called get_tweets that will call the Twitter API URL and return the response for a stream of tweets. My_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET) Import the libraries that we’ll use as below: import socketĪnd add the variables that will be used in OAuth for connecting to Twitter as below: # Replace the values below with yours It should be easy to follow for any professional Python developer.įirst, let’s create a file called twitter_app.py and then we’ll add the code in it together as below. In this step, I’ll show you how to build a simple client that will get the tweets from Twitter API using Python and passes them to the Spark Streaming instance.
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |