Create a Twitter Stream

Gabriel Fajardo
3 min readDec 16, 2017

Hey there, welcome to the first part of the series. We’re going to be conducting a little science experiment to figure out the “happiness levels” of different countries.

To do this we’ll construct a data pipeline with Apache Beam that will perform a sentiment analysis on each tweet and then compile the results to a real-time dashboard.

In this first part we’ll establish the data source.

Setting Up

We’re going to use the Twitter Streaming API to get all tweets being published, in real time. So we need to create some credentials so we can access Twitter’s data.

Creating the App

The first step is to create a Twitter account. Make sure to link your phone number with it or Twitter won’t let you create the application.

After you’ve signed in with your Twitter account, go to apps.twitter.com and click on the “Create New App” button in the top right. We won’t be using a website to connect so we can just set a placeholder value like https://www.my-url.com for the website field.

Finish filling out the form on the next page and click “Create”.

Getting Access Tokens

On the next page, go to the “Keys and Access Tokens” tab and click on “Create My Access Token”. Once you have the access tokens you’ll have everything you need to authenticate your calls to Twitter’s APIs. We’re going to be using these in the code as follows:

  • TWITTER_APP_KEY = ‘your-consumer-key’
  • TWITTER_APP_SECRET = ‘your-consumer-secret’
  • TWITTER_KEY = ‘your-access-token’
  • TWITTER_SECRET = ‘your-access-token-secret’

Opening the Twitter Stream

To connect to the Twitter Streaming API we need to open a persistent connection to the endpoint. There are a few things to know:

  • retry the connection with the right back-off strategy
  • don’t open and close the connection too many times
  • get the data off the message queue quickly

One-touch pass is how we’re going to approach this. For now, this means just spitting out tweets to the console. We won’t be implementing all these details ourselves. Instead, we’ll let Tweepy handle the streaming for us and have it give us the tweets.

To use Tweepy we need to implement its StreamListener class.

Listening for Tweets

# twitter_streamer.py
class TweetStreamListener(StreamListener):
def on_status(self, status):
print(status.text)
def on_error(self, status_code):
print(status_code)

Now remember our end goal is to calculate a country’s overall happiness based on a tweet’s positivity level. For that, we need to know where in the world a tweet came from.

def on_status(self, status):
if not status.user.location:
return

print(status.text)

Initializing the Stream

# twitter_publisher.py
def publish_twitter_stream():
auth = tweepy.OAuthHandler(TWITTER_APP_KEY, TWITTER_APP_SECRET)
auth.set_access_token(TWITTER_KEY, TWITTER_SECRET)
api = tweepy.API(auth)
stream_listener = TwitterStreamListener()
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
stream.filter(track=["soccer", "world cup"])

As you can see, to initialize the stream we needed to provide the authentication details provided by our Twitter application and an implementation of the StreamListener class. We can now run the code and hopefully you can see tweets being outputted to the console. Pretty cool.

Customizing the Stream

The API gives us all the data we could possibly want. But for our purposes we just want the data we can calculate a sentiment on. This of course includes the tweet message itself, but a user’s bio could also be a useful piece of information.

We’re also going to get a tweet’s retweet count. Since we’re capturing tweets as they get published these won’t come into much use since they’ll always be zero. I have some good plans for them later down the road though.

And of course, as mentioned before, we need to know the tweet’s location. Most tweets aren’t Geo-enabled though, so I’ll just use the location the user set on their profile.

def on_status(self, status):
if not status.user.location:
return
text = status.text
retweets = status.retweet_count
loc = status.user.location
bio = status.user.description
tw = dict(text=text, retweets=retweets, location=loc,
description=description)

print(tw)

Conclusion

That is all for now. Hopefully creating that stream was fun. Next time we’ll see how to get these tweets stored into Cloud BigQuery. Thanks for reading and look out for the next one very soon!

--

--

Gabriel Fajardo
Gabriel Fajardo

Written by Gabriel Fajardo

Computer Science grad from FIU. Back-end software engineer curious about data engineering. Currently doing contracting work for Cricket Wireless.

No responses yet