Software Development Engineer

Blog PostsResume

Relationalize Nested JSON Schema into Star Schema using AWS Glue

AWS Glue is a fully managed ETL service provided by Amazon that makes it easy to extract and migrate data from one source to another whilst performing a transformation on the source data. Amongst these transformation is the Relationalize[1] transformation. When applied on a DynamicFrame, it flattens nested schema and pivots out array columns from the flattened frame.

Due to its ability to primitively convert a NoSQL style nested schema into a combination of multiple Relationalized schemas, it is a great tool for tranforming a nested JSON into a series of tables linked to each other which can then be operated like Relational Databases, with operations including by not limited to JOINS, Aggregations etc.

In the example, we take a sample JSON source file, relationalize it and then store it in a Redshift cluster for further analytics.

  1. Store the JSON data source in S3.
  2. Add a Crawler with "S3" data store and specify the S3 prefix in the include path. Run a crawler to create an external table in Glue Data Catalog.
  3. Add a Glue connection with connection type as Amazon Redshift, preferably in the same region as the datastore, and then set up access to your data source.
  4. Create a Glue ETL job that runs "A new script to be authored by you" and specify the connection created in step 3.

Sample JSON file:

{"id": 1,"info": {"first_name": "NameA","last_name": "SurnameA"},"notes": [{"note_id": 1,"subject": "Sample Subject 1", "text": "Sample text 1"},{"note_id": 2,"subject": "Sample Subject 2", "text": "Sample text 2"}],"properties": [{"email": "sample1@sample.com"},{"password": "password1"}]}
{"id": 2, "info": { "first_name": "NameB","last_name": "SurnameB"},"notes": [{"note_id": 1,"subject": "Sample Subject 3", "text": "Sample text 3"},{"note_id": 2,"subject": "Sample Subject 4", "text": "Sample text 4"}],"properties": [{"email": "sample2@sample.com"},{"password": "password2"}]}

ETL Script:

  import sys
  from awsglue.transforms import *
  from awsglue.utils import getResolvedOptions
  from pyspark.context import SparkContext
  from awsglue.context import GlueContext
  from awsglue.job import Job

  ## @params: [TempDir, JOB_NAME]
  args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

  sc = SparkContext()
  glueContext = GlueContext(sc)
  spark = glueContext.spark_session
  job = Job(glueContext)
  job.init(args['JOB_NAME'], args)

  datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "rel", transformation_ctx = "datasource0")

  relationalized_json = datasource0.relationalize(root_table_name = "root", staging_path = args["TempDir"])

  root_df = relationalized_json.select('root')
  root_notes_df = relationalized_json.select('root_notes')
  root_properties_df = relationalized_json.select('root_properties')

  applymapping1 = ApplyMapping.apply(frame = root_df, mappings = [("id", "int", "id", "int"), ("`info.first_name`", "string", "first_name", "string"), ("`info.last_name`", "string", "last_name", "string"), ("notes", "long", "notes", "long"), ("properties", "long", "properties", "long")], transformation_ctx = "applymapping1")
  applymapping2 = ApplyMapping.apply(frame = root_notes_df, mappings = [("id", "long", "id", "long"), ("`notes.val.note_id`", "int", "note_id", "int"), ("`notes.val.subject`", "string", "subject", "string"), ("`notes.val.text`", "string", "text", "string")], transformation_ctx = "applymapping2")
  applymapping3 = ApplyMapping.apply(frame = root_properties_df, mappings = [("id", "long", "id", "long"), ("`properties.val.email`", "string", "email", "string"), ("`properties.val.password`", "string", "password", "string")], transformation_ctx = "applymapping3")

  datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping1, catalog_connection = "connection-rel", connection_options = {"dbtable": "root", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")
  datasink2 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping2, catalog_connection = "connection-rel", connection_options = {"dbtable": "notes", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink2")
  datasink3 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping3, catalog_connection = "connection-rel", connection_options = {"dbtable": "properties", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink3")

  job.commit()
  

This ETL script reads the data store via the Glue Catalog. The data source is then transformed using the relationalize transformation. The three DynamicFrames are extracted from the DynamicFrameCollection 'relationalized_json' and then each are tranformed using the ApplyMapping transformation. This transformation helps in renaming the fields before they are written to the destination. The resultant DynamicFrames are then written to Redshift as separate tables using the from_jdbc_conf method.

Notable Consideration:

If the datastore is streamed in a time series, then we can leverage the use of Glue Job Bookmarks[2] to only read part of the datastore that has not been computed in the previous job runs. In order to ensure that the job bookmarking functionality works, ensure that all the transformation specify the transformation_ctx argument:

  relationalized_json = datasource0.relationalize(root_table_name = "root", staging_path = args["TempDir"], transformation_ctx = "relationalized_json")
  
  

References:

[1] https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-Relationalize.html [2] https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

If you have questions or suggestions, please leave a comment following.


© 2024 Ujjwal Bhardwaj. All Rights Reserved.