Transform Redshift User Activity Log using Spark (AWS Glue) to Query via Presto
Redshift is Amazon's data warehouse product that is fully managed, reliable, and fast and its ability to handle analytic workloads on big data data sets stored by a column-oriented DBMS principle sets differs it from Amazon's other hosted database offering.
Amazon Redshift logs information about connections and user activities in the clusters' databases. These logs help you to monitor the database for security and troubleshooting purposes, which is a process often referred to as database auditing. This audit logging is not enabled by default in Amazon Redshift. When enabled, it creates logs for authentication attempts (Connection log), user level changes (User log) as well as the queries ran on the database (User activity log).
The user activity logs have the below structure.
Column name | Description |
---|---|
recordtime | Time the event occurred. |
db | Database name. |
user | User name. |
pid | Process ID associated with the statement. |
userid | User ID. |
xid | Transaction ID. |
query | A prefix of LOG: followed by the text of the query, including newlines. |
'2019-09-13T07:13:08Z UTC [ db=mydb user=myusr pid=12345 userid=123 xid=76543210 ]' LOG: set client_encoding to 'UTF8'
'2019-09-13T07:13:08Z UTC [ db=mydb user=myusr pid=12345 userid=123 xid=76543211 ]' LOG: SET application_name = 'Amazon Redshift ODBC Driver 1.4.2.1010'
'2019-09-13T07:13:08Z UTC [ db=mydb user=myusr pid=12345 userid=123 xid=76543212 ]' LOG: select count(*) from mytbl;
'2019-09-13T08:39:35Z UTC [ db=mydb user=myusr pid=12345 userid=123 xid=76543213 ]' LOG: select name, count(distinct id) as my_id
from mytbl2
group by 1
order by 1
limit 1000
From the sample logs above, we can notice that there is a structure in these logs. However, as the value for query
can span mutiple lines, it becomes impossible to define a grok pattern to Crawl this data using Glue Crawlers before being queried by Athena. Thus, to achieve the use case of queries the Redshift User Activity Log via Hive/Presto, you require to perform an ETL transformation. The below Glue ETL script addresses this and transforms the logs to a more structured form.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Setting a custom text input format record delimiter for Hadoop
conf = sc._jsc.hadoopConfiguration()
conf.set("textinputformat.record.delimiter", "\n'")
# Reading the logs as a text file into a Spark dataframe
dataframe0 = sc.textFile("s3://path/to/useractivitylog_file")
dataframe1 = dataframe0.map(lambda x: ("'" + x, )).toDF(['data'])
# Convert the Spark DataFrame to Glue's DynamicFrame
dynamicframe2 = DynamicFrame.fromDF(dataframe1, glueContext, "dynamicframe2")
# Fetch the data column from each DynamicRecord and create separate column to conform with the User Activity Logs' structure
def map_function(dynamicRecord):
dynamicRecordList = dynamicRecord["data"].split(" ")
transformedRecord = {}
transformedRecord['recordtime'] = dynamicRecordList[0].replace("'", '') + " " + dynamicRecordList[1]
transformedRecord['db'] = dynamicRecordList[3].partition("=")[2]
transformedRecord['user'] = dynamicRecordList[4].partition("=")[2]
transformedRecord['pid'] = dynamicRecordList[5].partition("=")[2]
transformedRecord['userid'] = dynamicRecordList[6].partition("=")[2]
transformedRecord['xid'] = dynamicRecordList[7].partition("=")[2]
transformedRecord['query'] = dynamicRecord["data"].partition("LOG:")[2]
return transformedRecord
# Apply the function to all the DynamicRecord
mapping3 = Map.apply(frame = dynamicframe2, f = map_function, transformation_ctx = "mapping3")
# Write the transformed dataset to S3
datasink4 = glueContext.write_dynamic_frame.from_options(frame = mapping2, connection_type = "s3", connection_options = {"path": "s3://path/for/transformed/data"}, format = "json", transformation_ctx = "datasink4")
job.commit()
After the job is completed succesfully, we can crawl
the data in the output S3 path and then query the created Data Catalog table using Amazon Athena.