Software Development Engineer

Blog PostsResume

Partition Data in S3 from DateTime column using AWS Glue

Partitioning is an important technique for organizing datasets so they can be queried efficiently. It organizes data in a hierarchical directory structure based on the distinct values of one or more columns.

By default, a DynamicFrame is not partitioned when it is written and all the output files are written at the top level of the specified output path. However, DynamicFrames support native partitioning using a sequence of keys, using the partitionKeys option when you create a sink. From there, you can process these partitions using other systems, such as Amazon Athena.

If your source dataset has a DateTime column in the format YYYY-MM-DD HH: MI: SS you can split the column to year, month, day, etc., and realize them as separate columns using the Map.apply() trasformation. You can then partition the data by these columns while writing the DynamicFrame to S3.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "source_partition", transformation_ctx = "datasource0")

 # Fetch the date of every DynamicRecord and create new column 'year', 'month', and 'day' with their respective value
def map_function(dynamicRecord):
    date = dynamicRecord["datetime"].split(" ")[0]
    date_arr = date.split("-")

    dynamicRecord["year"] = date_arr[0]
    dynamicRecord["month"] = date_arr[1]
    dynamicRecord["day"]= date_arr[2]
    return dynamicRecord

# Apply the function to all the DynamicRecord 
mapping1 = Map.apply(frame = datasource0, f = map_function, transformation_ctx = "mapping1")

 # Write the transformed dataset to S3 with Paritioning
datasink5 = glueContext.write_dynamic_frame.from_options(frame = applymapping4, connection_type = "s3", connection_options = {"path": "s3://<bucket-name>/destination_parition", "partitionKeys": ["year", "month", "day"]}, format = "parquet", transformation_ctx = "datasink5")
job.commit()

© 2024 Ujjwal Bhardwaj. All Rights Reserved.