- NorthBay Blogs
Working with Twitter (complex JSON) data set
Amazon Athena is a serverless interactive query service that allows analytics using standard SQL for data residing in S3. Before Athena, to query data sets on S3, Hive/Presto/Hue or similar tools had to be installed on top EMR service or integrated with other third party partner products.
Athena also supports JDBC connectivity so the managed service can be easily integrated with wide variety of SQL and Visualization tools.
Lot of customers are interested in exploring Amazon Athena for their use case and looking for ways to optimize for performance and costs. As an APN Partner NorthBay has been working with Athena in testing and exploring various customer use cases. This is a multi-part blog series to share our findings as well as provide the audience with a jumpstart on working with Amazon Athena..
Twitter use case
Unstructured Data and semi-structured (typically JSON) is becoming typical for Big Data sets. We have chosen Twitter data as the data set to validate working on Athena with complex JSON’s. The current blog post will share the details of querying Twitter data using Athena and executing complex queries based on the data set.
The following is the architecture followed for the implementation:
– Configure Twitter for API access
– Configure Kinesis Firehose to stream the output to S3
– Configure and run Tweepy to read Twitter feed and stream to Kinesis Firehose
– Define schema definition in Athena
– Query Twitter data from Athena Query Editor
– Query Twitter data using JDBC connection
– Query Twitter data from Quicksight
Configure Twitter for API access
To create this platform, you will need an AWS account and a Twitter application. Sign in with your Twitter account and create a new application at https://apps.twitter.com/. Make sure your application is set for ‘read-only’ access. Next, choose Create My Access Token at the bottom of the Keys and Access Tokens tab. By this point, you should have four Twitter application keys: consumer key (API key), consumer secret (API secret), access token, and access token secret. Take note of these keys.
Configure Kinesis Firehose to stream the output to S3
Create a Kinesis Firehose Delivery Stream as the destination for our data..
Step 1: Configure Destination: Choose “Amazon S3” as Destination and select the existing S3 bucket or create a new Bucket for Firehose to persist the data.
Choose “Create bucket”:
Once the Bucket is created, add a prefix to the data. In this case, json/ prefix is added so all json data goes to the same bucket/prefix
Step2: Configuration: Kinesis Firehose allows for optimizations and configuration for Buffer sizes, interval, compression, encryption and security policies. These values can be chosen based on the streaming ingest frequency and optimal size of the output file in S3.
We just have to click “Allow” in the new window without changing anything.
Review the configuration and click “Create Delivery Stream”
In the Firehose Delivery Stream console, we can see our created Delivery Stream with status “CREATING”. Once the status changes to “ACTIVE” we can start using the delivery stream.
Ingest Twitter feeds from the feeder system (Tweepy/Python)
We need a stream producer/feeder system to publish streaming data to Kinesis Firehose. Tweepy is an open-source python library that enables communication with Twitter. The following code can be run on an EC2 instance (with relevant IAM role to access Kinesis Firehose and Twitter API credentials from the earlier step in configuration file) to feed the stream that we created earlier.
import tweepy from tweepy import Stream from tweepy import OAuthHandler from tweepy.streaming import StreamListener import time import argparse import string import config import json import boto3
“””Get parser for command line arguments.”””
parser = argparse.ArgumentParser(description=”Twitter Downloader”)
“””Custom StreamListener for streaming data.”””
def __init__(self, data_dir, query):
query_fname = format_filename(query)
self.outfile = “%s/stream_%s.json” % (data_dir, query_fname)
def on_data(self, data):
result = send_record_to_firehose(data)
except BaseException as e:
print(“Error on_data: %s” % str(e))
def on_error(self, status):
print(“Error with status:” + str(status))
if status == 420:
print(“You are being rate limited!!!.”)
“”” Sends Json response from tweeter to Kinesis Firehose Delivery Stream
data — json file from tweeter
String — json response from Kinesis Firehos
client = boto3.client(‘firehose’)
response = client.put_record(
“””Convert file name into a safe string.
fname — the file name to convert
String — converted file name
return ”.join(convert_valid(one_char) for one_char in fname)
“””Convert a character into ‘_’ if invalid.
one_char — the char to convert
Character — converted char
valid_chars = “-_.%s%s” % (string.ascii_letters, string.digits)
if one_char in valid_chars:
def parse(cls, api, raw):
status = cls.first_parse(api, raw)
setattr(status, ‘json’, json.dumps(raw))
if __name__ == ‘__main__’:
parser = get_parser()
args = parser.parse_args()
auth = OAuthHandler(config.consumer_key, config.consumer_secret)
api = tweepy.API(auth)
# Added this logic to reconnect if it fails
twitter_stream = Stream(auth, MyListener(args.data_dir, args.query))
# Sure??, let’s reconnect and keep tracking
Define schema definition in Athena
Catalog Manager UI is provided to define new tables in Athena. However, with complex JSON it is easier to run the schema definition DDL in the query editor. The following DDL is generated/built based on Twitter data stream:
CREATE EXTERNAL TABLE IF NOT EXISTS tweets ( created_at string , id string , id_str string , text string , display_text_range ARRAY, source string , truncated string , user struct< id:string , id_str:string , name:string , screen_name:string , location:string , description:string , protected:string , verified:string , followers_count:string , friends_count:string , listed_count:string , favourites_count:string , statuses_count:string , created_at:string , utc_offset:string , time_zone:string , geo_enabled:string , lang:string>, is_quote_status string, extended_tweet STRUCT< full_text:string, display_text_range:ARRAY, entities:STRUCT< media:ARRAY<STRUCT< id:string, id_str:string, indices:ARRAY, media_url:string, media_url_https:string, url:string, display_url:string, expanded_url:string, type:string, sizes:STRUCT< small:STRUCT<w:string, h:string, resize:string>, thumb:STRUCT<w:string, h:string, resize:string>>>>>>, retweet_count string, favorite_count string, retweeted_status STRUCT< retweet_count:string, text:string>, entities STRUCT< urls:ARRAY<STRUCT<url:string, expanded_url:string, display_url:string, indices:ARRAY>>, user_mentions:ARRAY<STRUCT<screen_name:string>>, hashtags:ARRAY<STRUCT>>, favorited string, retweeted string, possibly_sensitive string, filter_level string, lang string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION 's3://twitter-to-s3/json/';
Query Twitter data from Athena Query Editor
Athena Query Editor provides a UI to submit Queries to Athena. The response also captures the run time and the data scanned which are incredibly useful for estimating costs and optimizing queries.
Query 1: Total records in the table
SELECT count(*) FROM tweets
Query 2: Get sample of 10000 records
> SELECT * FROM tweets LIMIT 10000
Query Twitter data from SQL client with JDBC connection
Detailed documentation is available from the following link to establish a connection to Athena from a client tool such as SQL Workbench:
Query 3: Top hashtags with at least 100 occurrences
SELECT ht.text,count(*) FROM tweets CROSS JOIN UNNEST (entities.hashtags) AS t(ht) GROUP BY ht.text HAVING count(*)>100 ORDER by count(*) desc
Query 4: Number of Tweets from verified accounts with the most followers
SELECT user.screen_name,user.name,max(user.followers_count),count(*) FROM tweets WHERE user.verified='true' GROUP BY user.screen_name,user.name ORDER BY cast(max(user.followers_count) as integer) DESC
Query 5: Top URL mentions in Tweets
SELECT url_extract_host(u.expanded_url), count(*) FROM tweets CROSS JOIN UNNEST (entities.urls) AS t(u) GROUP BY url_extract_host(u.expanded_url) HAVING count(*)>100 ORDER by count(*) desc;
Query 6: Hashtags tweeted along with “Amazon”
WITH ht_list AS (SELECT entities.hashtags FROM tweets CROSS JOIN UNNEST (entities.hashtags) AS t(ht) WHERE ht.text LIKE 'amazon') SELECT t AS "hashtag",count(*) AS "occurences" FROM ht_list CROSS JOIN UNNEST (hashtags) AS t(t) GROUP BY t ORDER BY count(*) desc;
Visualize Twitter data from Quicksight using Athena
The following blogpost provides information on querying from Athena using Quicksight
Quicksight currently does not support complex JSON’s and expects the data types to be among the supported data types:
The current dashboard displays sensitive Tweets by language from the data set:
The underlying query:
Query 7: Find the number of Tweets by language and sensitive media content
SELECT lang,possibly_senstive,count(*) FROM tweets GROUP BY lang, possibly_sensitive
Twitter analysis using Athena proves that the product can be leveraged for use cases involving complex data formats (unstructured/semi-structured), can be automated using JDBC connections, and reveals basic insights using Quicksight (lack of support for arrays currently hinders analytics capabilities).
Athena can be excellent tool for “S3 as a data lake” use cases where the data is already staged in S3 and with serverless managed service, Athena takes precedence over previously used methods like Presto/Hive/Impala on EMR.
List of SQL statements supported by Athena
NorthBay Authors: Abdullah Jamshed & Ana Zamarron
About NorthBay – we are a fast-growing, 100% AWS focused onshore/offshore AWS advanced consulting partner, supporting our customers to accelerate the reinvention of their applications and data for a Cloud native world. Our >350 AWS certified employees excel in developing and deploying database & application migrations, data lakes and analytics, machine learning/AI, DevOps and application and data modernization/development that drive measurable business impact.