Software Development Engineer

Blog PostsResume

Capture the Input File Name in AWS Glue ETL Job

As described in the Wikipedia page, "extract, transform, load (ETL) is the general procedure of copying data from one or more sources into a destination system which represents the data differently from the source(s)". Even though the crux of all ETL workflows is the same, there are various implementation for it each individual to a particular use case.

AWS provides the perfect ETL platform that has various templates to achieve varying use cases. However, there are certain use cases that cannot currently be achieved by Glue and its DynamicFrame implementations. As at its backend, Glue executes Spark jobs, it also allows us to leverage the use of Spark's Dataframe which can help in handling Glue's limitations.

One such use case is the reading of files from a data source (e.g., Amazon S3) and capturing the input file name in the ETL job. Currently, there is no built-in function for DynamicFrames that could help us achieve this scenario and thus we fallback to DataFrames,

In this blog post, I describe how we can use Spark's Dataframe and its method to achieve the aforementioned use case. In the example, we perform the following operations.

  1. Create multiple JSON files, each containting an indivial JSON record.
  2. Load these files to an S3 bucket prefix.
  3. Crawl the S3 input path using Glue Crawler.
  4. Use the Glue Data Catalog table created on crawling the S3 path as the data source for Glue ETL Job.

Following is the Glue ETL script that I used to achieve this use case:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import input_file_name
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "<table-name>", transformation_ctx = "datasource0")

# Create a DataFrame and add a new column in the containing the file name of every DataRecord
dataframe1 = datasource0.toDF().withColumn("filename", input_file_name())
dataframe1.show()

# Convert the DataFrame back to DynamicFrame
dynamicframe2 = DynamicFrame.fromDF(dataframe1, glueContext, "dynamicframe2")

# Write the transformed dataset to S3
datasink3 = glueContext.write_dynamic_frame.from_options(frame = dynamicframe2, connection_type = "s3", connection_options = {"path": "s3://path/to/s3"}, format = "json", transformation_ctx = "datasink3")
job.commit()

Output

+---+-------+--------------------+
| id| name| filename|
+---+-------+--------------------+
| 1| Ujjwal|s3://path/to/s...|
| 2|Utkarsh|s3://path/to/s...|
+---+-------+--------------------+

Each JSON record was stored in a separate file and the input_file_name()[1] method returned each record's file name.

References:

  1. http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html?highlight=explode#pyspark.sql.functions.input_file_name

© 2024 Ujjwal Bhardwaj. All Rights Reserved.