Create a Twitter Stream
Hey there, welcome to the first part of the series. We’re going to be conducting a little science experiment to figure out the “happiness levels” of different countries.
To do this we’ll construct a data pipeline with Apache Beam that will perform a sentiment analysis on each tweet and then compile the results to a real-time dashboard.
In this first part we’ll establish the data source.
Setting Up
We’re going to use the Twitter Streaming API to get all tweets being published, in real time. So we need to create some credentials so we can access Twitter’s data.
Creating the App
The first step is to create a Twitter account. Make sure to link your phone number with it or Twitter won’t let you create the application.
After you’ve signed in with your Twitter account, go to apps.twitter.com and click on the “Create New App” button in the top right. We won’t be using a website to connect so we can just set a placeholder value like https://www.my-url.com for the website field.
Finish filling out the form on the next page and click “Create”.
Getting Access Tokens
On the next page, go to the “Keys and Access Tokens” tab and click on “Create My Access Token”. Once you have the access tokens you’ll have everything you need to authenticate your calls to Twitter’s APIs. We’re going to be using these in the code as follows:
- TWITTER_APP_KEY = ‘your-consumer-key’
- TWITTER_APP_SECRET = ‘your-consumer-secret’
- TWITTER_KEY = ‘your-access-token’
- TWITTER_SECRET = ‘your-access-token-secret’
Opening the Twitter Stream
To connect to the Twitter Streaming API we need to open a persistent connection to the endpoint. There are a few things to know:
- retry the connection with the right back-off strategy
- don’t open and close the connection too many times
- get the data off the message queue quickly
One-touch pass is how we’re going to approach this. For now, this means just spitting out tweets to the console. We won’t be implementing all these details ourselves. Instead, we’ll let Tweepy handle the streaming for us and have it give us the tweets.
To use Tweepy we need to implement its StreamListener class.
Listening for Tweets
# twitter_streamer.py
class TweetStreamListener(StreamListener):
def on_status(self, status):
print(status.text) def on_error(self, status_code):
print(status_code)
Now remember our end goal is to calculate a country’s overall happiness based on a tweet’s positivity level. For that, we need to know where in the world a tweet came from.
def on_status(self, status):
if not status.user.location:
return
print(status.text)
Initializing the Stream
# twitter_publisher.py
def publish_twitter_stream():
auth = tweepy.OAuthHandler(TWITTER_APP_KEY, TWITTER_APP_SECRET)
auth.set_access_token(TWITTER_KEY, TWITTER_SECRET)
api = tweepy.API(auth) stream_listener = TwitterStreamListener()
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
stream.filter(track=["soccer", "world cup"])
As you can see, to initialize the stream we needed to provide the authentication details provided by our Twitter application and an implementation of the StreamListener class. We can now run the code and hopefully you can see tweets being outputted to the console. Pretty cool.
Customizing the Stream
The API gives us all the data we could possibly want. But for our purposes we just want the data we can calculate a sentiment on. This of course includes the tweet message itself, but a user’s bio could also be a useful piece of information.
We’re also going to get a tweet’s retweet count. Since we’re capturing tweets as they get published these won’t come into much use since they’ll always be zero. I have some good plans for them later down the road though.
And of course, as mentioned before, we need to know the tweet’s location. Most tweets aren’t Geo-enabled though, so I’ll just use the location the user set on their profile.
def on_status(self, status):
if not status.user.location:
return
text = status.text
retweets = status.retweet_count
loc = status.user.location
bio = status.user.description
tw = dict(text=text, retweets=retweets, location=loc,
description=description)
print(tw)
Conclusion
That is all for now. Hopefully creating that stream was fun. Next time we’ll see how to get these tweets stored into Cloud BigQuery. Thanks for reading and look out for the next one very soon!