Relationalize Nested JSON Schema into Star Schema using AWS Glue
AWS Glue is a fully managed ETL service provided by Amazon that makes it easy to extract and migrate data from one source to another whilst performing a transformation on the source data. Amongst these transformation is the Relationalize[1] transformation. When applied on a DynamicFrame, it flattens nested schema and pivots out array columns from the flattened frame.
Due to its ability to primitively convert a NoSQL style nested schema into a combination of multiple Relationalized schemas, it is a great tool for tranforming a nested JSON into a series of tables linked to each other which can then be operated like Relational Databases, with operations including by not limited to JOINS, Aggregations etc.
In the example, we take a sample JSON source file, relationalize it and then store it in a Redshift cluster for further analytics.
- Store the JSON data source in S3.
- Add a Crawler with "S3" data store and specify the S3 prefix in the include path. Run a crawler to create an external table in Glue Data Catalog.
- Add a Glue connection with connection type as Amazon Redshift, preferably in the same region as the datastore, and then set up access to your data source.
- Create a Glue ETL job that runs "A new script to be authored by you" and specify the connection created in step 3.
Sample JSON file:
{"id": 1,"info": {"first_name": "NameA","last_name": "SurnameA"},"notes": [{"note_id": 1,"subject": "Sample Subject 1", "text": "Sample text 1"},{"note_id": 2,"subject": "Sample Subject 2", "text": "Sample text 2"}],"properties": [{"email": "sample1@sample.com"},{"password": "password1"}]}
{"id": 2, "info": { "first_name": "NameB","last_name": "SurnameB"},"notes": [{"note_id": 1,"subject": "Sample Subject 3", "text": "Sample text 3"},{"note_id": 2,"subject": "Sample Subject 4", "text": "Sample text 4"}],"properties": [{"email": "sample2@sample.com"},{"password": "password2"}]}
ETL Script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "rel", transformation_ctx = "datasource0")
relationalized_json = datasource0.relationalize(root_table_name = "root", staging_path = args["TempDir"])
root_df = relationalized_json.select('root')
root_notes_df = relationalized_json.select('root_notes')
root_properties_df = relationalized_json.select('root_properties')
applymapping1 = ApplyMapping.apply(frame = root_df, mappings = [("id", "int", "id", "int"), ("`info.first_name`", "string", "first_name", "string"), ("`info.last_name`", "string", "last_name", "string"), ("notes", "long", "notes", "long"), ("properties", "long", "properties", "long")], transformation_ctx = "applymapping1")
applymapping2 = ApplyMapping.apply(frame = root_notes_df, mappings = [("id", "long", "id", "long"), ("`notes.val.note_id`", "int", "note_id", "int"), ("`notes.val.subject`", "string", "subject", "string"), ("`notes.val.text`", "string", "text", "string")], transformation_ctx = "applymapping2")
applymapping3 = ApplyMapping.apply(frame = root_properties_df, mappings = [("id", "long", "id", "long"), ("`properties.val.email`", "string", "email", "string"), ("`properties.val.password`", "string", "password", "string")], transformation_ctx = "applymapping3")
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping1, catalog_connection = "connection-rel", connection_options = {"dbtable": "root", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")
datasink2 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping2, catalog_connection = "connection-rel", connection_options = {"dbtable": "notes", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink2")
datasink3 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping3, catalog_connection = "connection-rel", connection_options = {"dbtable": "properties", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink3")
job.commit()
This ETL script reads the data store via the Glue Catalog. The data source is then transformed using the relationalize
transformation. The three DynamicFrames are extracted from the DynamicFrameCollection 'relationalized_json' and then each are tranformed using the ApplyMapping transformation. This transformation helps in renaming the fields before they are written to the destination. The resultant DynamicFrames are then written to Redshift as separate tables using the from_jdbc_conf
method.
Notable Consideration:
If the datastore is streamed in a time series, then we can leverage the use of Glue Job Bookmarks[2] to only read part of the datastore that has not been computed in the previous job runs. In order to ensure that the job bookmarking functionality works, ensure that all the transformation specify the transformation_ctx
argument:
relationalized_json = datasource0.relationalize(root_table_name = "root", staging_path = args["TempDir"], transformation_ctx = "relationalized_json")
References:
[1] https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-Relationalize.html [2] https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html
If you have questions or suggestions, please leave a comment following.