Software Development Engineer

Blog PostsResume

Transform Redshift User Activity Log using Spark (AWS Glue) to Query via Presto

Redshift is Amazon's data warehouse product that is fully managed, reliable, and fast and its ability to handle analytic workloads on big data data sets stored by a column-oriented DBMS principle sets differs it from Amazon's other hosted database offering.

Amazon Redshift logs information about connections and user activities in the clusters' databases. These logs help you to monitor the database for security and troubleshooting purposes, which is a process often referred to as database auditing. This audit logging is not enabled by default in Amazon Redshift. When enabled, it creates logs for authentication attempts (Connection log), user level changes (User log) as well as the queries ran on the database (User activity log).

The user activity logs have the below structure.

Column name Description
recordtime Time the event occurred.
db Database name.
user User name.
pid Process ID associated with the statement.
userid User ID.
xid Transaction ID.
query A prefix of LOG: followed by the text of the query, including newlines.
'2019-09-13T07:13:08Z UTC [ db=mydb user=myusr pid=12345 userid=123 xid=76543210 ]' LOG: set client_encoding to 'UTF8'
'2019-09-13T07:13:08Z UTC [ db=mydb user=myusr pid=12345 userid=123 xid=76543211 ]' LOG: SET application_name = 'Amazon Redshift ODBC Driver 1.4.2.1010'
'2019-09-13T07:13:08Z UTC [ db=mydb user=myusr pid=12345 userid=123 xid=76543212 ]' LOG: select count(*) from mytbl;
'2019-09-13T08:39:35Z UTC [ db=mydb user=myusr pid=12345 userid=123 xid=76543213 ]' LOG: select name, count(distinct id) as my_id 
    from mytbl2
    group by 1
    order by 1
    limit 1000

From the sample logs above, we can notice that there is a structure in these logs. However, as the value for query can span mutiple lines, it becomes impossible to define a grok pattern to Crawl this data using Glue Crawlers before being queried by Athena. Thus, to achieve the use case of queries the Redshift User Activity Log via Hive/Presto, you require to perform an ETL transformation. The below Glue ETL script addresses this and transforms the logs to a more structured form.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
    
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Setting a custom text input format record delimiter for Hadoop
conf = sc._jsc.hadoopConfiguration()
conf.set("textinputformat.record.delimiter", "\n'")

# Reading the logs as a text file into a Spark dataframe
dataframe0 = sc.textFile("s3://path/to/useractivitylog_file")
dataframe1 = dataframe0.map(lambda x: ("'" + x, )).toDF(['data'])

# Convert the Spark DataFrame to Glue's DynamicFrame
dynamicframe2 = DynamicFrame.fromDF(dataframe1, glueContext, "dynamicframe2")

# Fetch the data column from each DynamicRecord and create separate column to conform with the User Activity Logs' structure
def map_function(dynamicRecord):
  dynamicRecordList = dynamicRecord["data"].split(" ")
    
  transformedRecord = {}
  transformedRecord['recordtime'] = dynamicRecordList[0].replace("'", '') + " " +  dynamicRecordList[1]
  transformedRecord['db'] = dynamicRecordList[3].partition("=")[2]
  transformedRecord['user'] = dynamicRecordList[4].partition("=")[2]
  transformedRecord['pid'] = dynamicRecordList[5].partition("=")[2]
  transformedRecord['userid'] = dynamicRecordList[6].partition("=")[2]
  transformedRecord['xid'] = dynamicRecordList[7].partition("=")[2]
  transformedRecord['query'] = dynamicRecord["data"].partition("LOG:")[2]
    
  return transformedRecord
    
# Apply the function to all the DynamicRecord 
mapping3 = Map.apply(frame = dynamicframe2, f = map_function, transformation_ctx = "mapping3")

# Write the transformed dataset to S3
datasink4 = glueContext.write_dynamic_frame.from_options(frame = mapping2, connection_type = "s3", connection_options = {"path": "s3://path/for/transformed/data"}, format = "json", transformation_ctx = "datasink4")
job.commit()

After the job is completed succesfully, we can crawl the data in the output S3 path and then query the created Data Catalog table using Amazon Athena.


© 2024 Ujjwal Bhardwaj. All Rights Reserved.