Tuesday, March 21, 2023
HomeBig DataEstablish supply schema adjustments utilizing AWS Glue

Establish supply schema adjustments utilizing AWS Glue


In right this moment’s world, organizations are gathering an unprecedented quantity of knowledge from every kind of various knowledge sources, equivalent to transactional knowledge shops, clickstreams, log knowledge, IoT knowledge, and extra. This knowledge is commonly in several codecs, equivalent to structured knowledge or unstructured knowledge, and is often known as the three Vs of huge knowledge (quantity, velocity, and selection). To extract data from the info, it’s often saved in a knowledge lake constructed on Amazon Easy Storage Service (S3). The information lake gives an essential attribute known as schema on learn, which lets you deliver knowledge within the knowledge lake with out worrying concerning the schema or adjustments within the schema on the info supply. This allows sooner ingestion of knowledge or constructing knowledge pipelines.

Nonetheless, it’s possible you’ll be studying and consuming this knowledge for different use instances, equivalent to pointing to functions, constructing enterprise intelligence (BI) dashboards in providers like Amazon QuickSight, or doing knowledge discovery utilizing a serverless question engine like Amazon Athena. Moreover, you could have constructed an extract, rework, and cargo (ETL) knowledge pipeline to populate your knowledge retailer like a relational database, non-relational database, or knowledge warehouse for additional operational and analytical wants. In these instances, you might want to outline the schema upfront and even maintain an account of the adjustments within the schema, equivalent to including new columns, deleting current columns, altering the info sort of current columns, or renaming current columns, to keep away from any failures in your software or points together with your dashboard or reporting.

In lots of use instances, we now have discovered that the info groups chargeable for constructing the info pipeline don’t have any management of the supply schema, and they should construct an answer to establish adjustments within the supply schema so as to have the ability to construct the method or automation round it. This would possibly embrace sending notifications of adjustments to the groups depending on the supply schema, constructing an auditing resolution to log all of the schema adjustments, or constructing an automation or change request course of to propagate the change within the supply schema to downstream functions equivalent to an ETL device or BI dashboard. Generally, to regulate the variety of schema variations, it’s possible you’ll need to delete the older model of the schema when there aren’t any adjustments detected between it and the newer schema.

For instance, assume you’re receiving declare recordsdata from totally different exterior companions within the type of flat recordsdata, and also you’ve constructed an answer to course of claims based mostly on these recordsdata. Nonetheless, as a result of these recordsdata had been despatched by exterior companions, you don’t have a lot management over the schema and knowledge format. For instance, columns equivalent to customer_id and claim_id had been modified to customerid and claimid by one companion, and one other companion added new columns equivalent to customer_age and incomes and stored the remainder of the columns the identical. You must establish such adjustments upfront so you may edit the ETL job to accommodate the adjustments, equivalent to altering the column identify or including the brand new columns to course of the claims.

On this resolution, we showcase a mechanism that simplifies the seize of the schema adjustments in your knowledge supply utilizing an AWS Glue crawler.

Resolution overview

An AWS Glue knowledge crawler is constructed to sync metadata based mostly on current knowledge. After we establish the adjustments, we use Amazon CloudWatch to log the adjustments and Amazon Easy Notification Service (Amazon SNS) to inform the adjustments to the appliance crew over e mail. You possibly can broaden this resolution to resolve for different use instances equivalent to constructing an automation to propagate the adjustments to downstream functions or pipelines, which is out of scope for this submit, to keep away from any failures in downstream functions due to schema adjustments. We additionally present a method to delete older variations of the schema if there aren’t any adjustments between the in contrast schema variations.

If you wish to seize the change in an event-driven method, you are able to do so by utilizing Amazon EventBridge. Nonetheless, if you wish to seize the schema adjustments on a number of tables on the similar time, based mostly on a selected schedule, you should utilize the answer on this submit.

In our state of affairs, we now have two recordsdata, every with totally different schemas, simulating knowledge that has undergone a schema change. We use an AWS Glue crawler to extract the metadata from knowledge in an S3 bucket. Then we use an AWS Glue ETL job to extract the adjustments within the schema to the AWS Glue Knowledge Catalog.

AWS Glue gives a serverless atmosphere to extract, rework, and cargo numerous datasets from a number of sources for analytic functions. The Knowledge Catalog is a characteristic inside AWS Glue that allows you to create a centralized knowledge catalog of metadata by storing and annotating knowledge from totally different knowledge shops. Examples embrace object shops like Amazon S3, relational databases like Amazon Aurora PostgreSQL-Suitable Version, and knowledge warehouses like Amazon Redshift. You possibly can then use that metadata to question and rework the underlying knowledge. You utilize a crawler to populate the Knowledge Catalog with tables. It will probably mechanically uncover new knowledge, extract schema definitions, detect schema adjustments, and model tables. It will probably additionally detect Hive-style partitions on Amazon S3 (for instance 12 months=YYYY, month=MM, day=DD).

Amazon S3 serves because the storage for our knowledge lake. Amazon S3 is an object storage service that gives industry-leading scalability, knowledge availability, safety, and efficiency.

The next diagram illustrates the structure for this resolution.

The workflow contains the next steps:

  1. Copy the primary knowledge file to the knowledge folder of the S3 bucket and run the AWS Glue crawler to create a brand new desk within the Knowledge Catalog.
  2. Transfer the present file from the knowledge folder to the archived folder.
  3. Copy the second knowledge file with the up to date schema to the knowledge folder, then rerun the crawler to create new model of desk schema.
  4. Run the AWS Glue ETL job to examine if there’s a new model of the desk schema.
  5. The AWS Glue job lists the adjustments within the schema with the earlier model of the schema in CloudWatch Logs. If there aren’t any adjustments within the schema and the flag to delete older variations is about to true, the job additionally deletes the older schema variations.
  6. The AWS Glue job notifies all adjustments within the schema to the appliance crew over e mail utilizing Amazon SNS.

To construct the answer, full the next steps:

  1. Create an S3 bucket with the knowledge and archived folders to retailer the brand new and processed knowledge recordsdata.
  2. Create an AWS Glue database and an AWS Glue crawler that crawls the info file within the knowledge folder to create an AWS Glue desk within the database.
  3. Create an SNS subject and add an e mail subscription.
  4. Create an AWS Glue ETL job to check the 2 variations of the desk schema, record the adjustments within the schema with the older model of schema, and delete older variations of schema if the flag to delete older variations is about to true. The job additionally publishes an occasion in Amazon SNS to inform the adjustments within the schema to the info groups.

For the aim of this submit, we manually carry out the steps to maneuver the info recordsdata from the knowledge folder to the archive folder, triggering the crawler and ETL job. Relying in your software wants, you may automate and orchestrate this course of by AWS Glue workflows.

Let’s arrange the infrastructure required to undergo the answer to check an AWS Glue desk model to a model up to date with latest schema adjustments.

Create an S3 bucket and folders

To create an S3 bucket with the knowledge and archived folders to retailer the brand new and processed knowledge recordsdata, full the next steps:

  1. On the Amazon S3 console, select Buckets within the navigation pane.
  2. Select Create bucket.
  3. For Bucket identify, enter a DNS-compliant distinctive identify (for instance, aws-blog-sscp-ng-202202).
  4. For Area, select the Area the place you need the bucket to reside.
  5. Hold all different settings as default and select Create bucket.
  6. On the Buckets web page, select the newly created bucket.
  7. Select Create folder.
  8. For Folder identify, enter knowledge.
  9. Go away server-side encryption at its default (disabled).
  10. Select Create folder.
  11. Repeat these steps to create the archived folder in the identical bucket.

Create an AWS Glue database and crawler

Now we create an AWS Glue database and crawler that crawls the info file within the knowledge bucket to create an AWS Glue desk within the new database.

  1. On the AWS Glue console, select Databases within the navigation pane.
  2. Select Add database.
  3. Enter a reputation (for instance, sscp-database) and outline.
  4. Select Create.
  5. Select Crawlers within the navigation pane.
  6. Select Add crawler.
  7. For Crawler identify, enter a reputation (glue-crawler-sscp-sales-data).
  8. Select Subsequent.
  9. For the crawler supply sort¸ select Knowledge shops.
  10. To repeat crawls of the info shops, select Crawl all folders.
  11. Select Subsequent.
  12. For Select a knowledge retailer, select S3.
  13. For Embody path, select the S3 bucket and folder you created (s3://aws-blog-sscp-ng-202202/knowledge).
  14. Select Subsequent.
  15. On the Add one other knowledge retailer web page, select No, then select Subsequent.
  16. Select Create an IAM function and enter a reputation for the function (for instance, sscp-blog).
  17. Select Subsequent.
  18. Select Run on Demand, then select Subsequent.
  19. For Database, select your AWS Glue database (sscp-database).
  20. For Prefix added to tables, enter a prefix (for instance, sscp_sales_).
  21. Increase the Configuration choices part and select Replace the desk definition within the knowledge catalog.
  22. Go away all different settings as default and select Subsequent.
  23. Select End to create the crawler.

Create an SNS subject

To create an SNS subject and add an e mail subscription, full the next steps:

  1. On the Amazon SNS console, select Subjects within the navigation pane.
  2. Select Create subject.
  3. For Sort, select Commonplace.
  4. Enter a reputation for the subject (for instance, NotifySchemaChanges).
  5. Go away all different settings as default and select Create subject.
  6. Within the navigation pane, select Subscriptions.
  7. Select Create subscription.
  8. For Subject ARN, select the ARN of the created SNS subject.
  9. For Protocol, select E-mail.
  10. For Endpoint, enter the e-mail deal with to obtain notifications.
  11. Go away all different defaults and select Create subscription.You need to obtain an e mail to substantiate the subscription.
  12. Select the hyperlink within the e mail to substantiate.
  13. Add the next permission coverage to the AWS Glue service function created earlier as a part of the crawler creation (AWSGlueServiceRole-sscp-blog) to permit publishing to the SNS subject. Make sure that to vary <$SNSTopicARN> within the coverage with the precise ARN of the SNS subject.
{
    "Model": "2012-10-17",
    "Assertion": [
        {
            "Sid": "AllowEventPublishing",
            "Effect": "Allow",
            "Action": "sns:Publish",
            "Resource": "<$SNSTopicARN>"
        }
    ]
}

Create an AWS Glue ETL job

Now you create an AWS Glue ETL job to check two schema variations of a desk and record the adjustments in schemas. If there aren’t any adjustments within the schema and the flag to delete older variations is about to true, the job additionally deletes any older variations. If there are adjustments in schema, the job lists adjustments within the CloudWatch logs and publishes an occasion in Amazon SNS to inform adjustments to the info crew.

  1. On the AWS Glue console, select AWS Glue Studio.
  2. Select Create and handle jobs.
  3. Select the Python Shell script editor.
  4. Select Create to create a Python Shell job.
  5. Enter the next code within the script editor subject:
    import boto3
    import pandas as pd
    
    # Enter Paramaters:  
    # catalog_id - Your AWS Glue Catalg Id - it's similar as your AWS account ID
    # db_name - identify of your AWS Glue Database in your Glue Knowledge catalog_id
    # table_name - identify of the desk in your AWS Glue Database that you just want to examine of change in schema
    # topic_arn - ARN of the SNS subject to publish the adjustments in desk schema
    # versions_to_compare - Two variations that buyer would need to evaluate. 0 is the lastes model and 1 within the model previous to the newest model
    # delete_old_versions - If True and there aren't any adjustments within the variations in contrast, job would delete all outdated variations aside from the newest "number_of_versions_to_retain" variations 
    # number_of_versions_to_retain - if delete_old_versions is True and there aren't any adjustments within the variations in contrast, the job would delete all outdated variations aside from the newest "number_of_versions_to_retain" variations
    
    catalog_id = '<$catalog_id>'
    db_name="<$db_name>"
    table_name="<$table_name>"
    topic_arn='<$sns_topic_ARN>'
    versions_to_compare=[0,1]
    delete_old_versions = False
    number_of_versions_to_retain = 2
    
    columns_modified = []
    
    # Perform to check the identify and sort of columns in new column record with outdated column record to 
    # discover any newly added column and the columns with modified knowledge sort
    def findAddedUpdated(new_cols_df, old_cols_df, old_col_name_list):
        for index, row in new_cols_df.iterrows():
            new_col_name = new_cols_df.iloc[index]['Name']
            new_col_type = new_cols_df.iloc[index]['Type']
    
            # Test if a column with similar identify exist in outdated desk however the knowledge sort has chaged
            if new_col_name in old_col_name_list:
                old_col_idx = old_cols_df.index[old_cols_df['Name']==new_col_name][0]
                old_col_type = old_cols_df.iloc[old_col_idx]['Type']
    
                if old_col_type != new_col_type:
                    columns_modified.append(f"Knowledge sort modified for '{new_col_name}' from '{old_col_type}' to '{new_col_type}'")
            # If a column is simply in new column record, it a newly added column
            else:
                columns_modified.append(f"Added new column '{new_col_name}' with knowledge sort as '{new_col_type}'")
    
    # Perform to iterate by the record of outdated columns and examine if any column does not exist in new columns record to seek out out dropped columns
    def findDropped(old_cols_df, new_col_name_list):
        for index, row in old_cols_df.iterrows():
            old_col_name = old_cols_df.iloc[index]['Name']
            old_col_type = old_cols_df.iloc[index]['Type']
    
            #examine if column does not exist in new column record  
            if old_col_name not in new_col_name_list:
                columns_modified.append(f"Dropped outdated column '{old_col_name}' with knowledge sort as '{old_col_type}'")
    
    # Perform to publish adjustments in schema to a SNS subject that may be subscribed to obtain e mail notifications when adjustments are detected
    def notifyChanges(message_to_send):
        sns = boto3.consumer('sns')
        # Publish a easy message to the required SNS subject
        response = sns.publish(
            TopicArn=topic_arn,   
            Message=message_to_send,  
            Topic="DWH Notification: Modifications in desk schema"
        )
        
    # Perform to transform version_id to int to make use of for sorting the variations
    def version_id(json):
        strive:
            return int(json['VersionId'])
        besides KeyError:
            return 0
    
    # Perform to delete the desk variations
    def delele_versions(glue_client, versions_list, number_of_versions_to_retain):
        print("deleting outdated variations...")
        if len(versions_list) > number_of_versions_to_retain:
            version_id_list = []
            for table_version in versions_list:
                version_id_list.append(int(table_version['VersionId']))
            # Kind the variations in descending order
            version_id_list.type(reverse=True)
            versions_str_list = [str(x) for x in version_id_list]
            versions_to_delete = versions_str_list[number_of_versions_to_retain:]
            
            del_response = glue_client.batch_delete_table_version(
                DatabaseName=db_name,
                TableName=table_name,
                VersionIds=versions_to_delete
            )
            return del_response
    
    # Calling glue API to get the record of desk variations. The answer assums that variety of model within the desk are lower than 100. In case you have greater than 100 variations, it's best to use pagination and loop by every web page.  
    glue = boto3.consumer('glue')
    response = glue.get_table_versions(
        CatalogId=catalog_id,
        DatabaseName=db_name,
        TableName=table_name,
        MaxResults=100
    )
    table_versions = response['TableVersions']
    table_versions.type(key=version_id, reverse=True)
    
    version_count = len(table_versions)
    print(version_count)
    
    # checking if the model of desk to check exists. You would want move the numbers of variations to check to the job. 
    if version_count > max(versions_to_compare):
    
        new_columns = table_versions[versions_to_compare[0]]['Table']['StorageDescriptor']['Columns']
        new_cols_df = pd.DataFrame(new_columns)
    
        old_columns = table_versions[versions_to_compare[1]]['Table']['StorageDescriptor']['Columns']
        old_cols_df = pd.DataFrame(old_columns)
    
        new_col_name_list =  new_cols_df['Name'].tolist()
        old_col_name_list =  old_cols_df['Name'].tolist()
        findAddedUpdated(new_cols_df, old_cols_df, old_col_name_list)
        findDropped(old_cols_df, new_col_name_list)
        if len(columns_modified) > 0: 
            email_msg = f"Following adjustments are recognized in '{table_name}' desk of '{db_name}' database of your Datawarehouse. Please assessment.nn"
            print("Job accomplished! -- under is record of adjustments.")
            for column_modified in columns_modified:
                email_msg += f"t{column_modified}n"
    
            print(email_msg)
            notifyChanges(email_msg)
        else:
            if delete_old_versions:
                delele_versions(glue, table_versions,number_of_versions_to_retain)
            print("Job accomplished! -- There aren't any adjustments in desk schema.")
    else:
        print("Job accomplished! -- Chosen desk does not have the variety of variations chosen to check. Please confirm the record handed in 'versions_to_compare'")

  6. Enter a reputation for the job (for instance, find-change-job-sscp).
  7. For IAM Position, select the AWS Glue service function (AWSGlueServiceRole-sscp-blog).
  8. Go away all different defaults and select Save.

Take a look at the answer

We’ve configured the infrastructure to run the answer. Let’s now see it in motion. First we add the primary knowledge file and run our crawler to create a brand new desk within the Knowledge Catalog.

  1. Create a CSV file known as salesdata01.csv with the next contents:
    ordertime,area,rep,merchandise,models,unitcost
    2022-01-06,US-West,Jones,Pencil,95,1.99
    2022-01-06,US-Central,Kivell,Binder,50,19.99
    2022-01-07,US-Central,Jardine,Pencil,36,4.99
    2022-01-07,US-Central,Gill,Pen,27,19.99
    2022-01-08,US-East,Sorvino,Pencil,56,2.99
    2022-01-08,US-West,Jones,Binder,60,4.99
    2022-01-09,US-Central,Andrews,Pencil,75,1.99
    2022-01-10,US-Central,Jardine,Pencil,90,4.99
    2022-01-11,US-East,Thompson,Pencil,32,1.99
    2022-01-20,US-West,Jones,Binder,60,8.99

  2. On the Amazon S3 console, navigate to the info folder and add the CSV file.
  3. On the AWS Glue console, select Crawlers within the navigation pane.
  4. Choose your crawler and select Run crawler.The crawler takes a couple of minutes to finish. It provides a desk (sscp_sales_data) within the AWS Glue database (sscp-database).
  5. Confirm the created desk by selecting Tables within the navigation pane.Now we transfer the present file within the knowledge folder to the archived folder.
  6. On the Amazon S3 console, navigate to the knowledge folder.
  7. Choose the file you uploaded (salesdata01.csv) and on the Actions menu, select Transfer.
  8. Transfer the file to the archived folder.Now we copy the second knowledge file with the up to date schema to the knowledge folder and rerun the crawler.
  9. Create a CSV file known as salesdata02.csv with the next code. It comprises the next adjustments from the earlier model:
    1. The information within the area column is modified from area names to some codes (for instance, the info sort is modified from string to BIGINT).
    2. The rep column is dropped.
    3. The brand new column whole is added.
      ordertime,area,merchandise,models,unitcost,whole
      2022-02-01,01,Pencil,35,4.99,174.65
      2022-02-01,03,Desk,2,125,250
      2022-02-02,01,Pen Set,16,15.99,255.84
      2022-02-02,03,Binder,28,8.99,251.72
      2022-02-03,01,Pen,64,8.99,575.36
      2022-02-03,01,Pen,15,19.99,299.85
      2022-02-06,03,Pen Set,96,4.99,479.04
      2022-02-10,03,Pencil,67,1.29,86.43
      2022-02-11,01,Pen Set,74,15.99,183.26
      2022-02-11,03,Binder,46,8.99,413.54

  10. On the Amazon S3 bucket, add the file to the knowledge folder.
  11. On the AWS Glue console, select Crawlers within the navigation pane.
  12. Choose your crawler and select Run crawler.The crawler takes roughly 2 minutes to finish. It updates the schema of the beforehand created desk (sscp_sales_data).
  13. Confirm the brand new model of the desk is created on the Tables web page.Now we run the AWS Glue ETL job to examine if there’s a new model of the desk schema and record the adjustments within the schema with the earlier model of the schema in CloudWatch Logs.
  14. On the AWS Glue console, select Jobs within the navigation pane.
  15. Choose your job (find-change-job-sscp) and on the Actions menu, select Edit script.
  16. Change the next enter parameters for the job within the script to match together with your configuration:
  17. Select Save.
  18. Shut the script editor.
  19. Choose the job once more and on the Actions menu, select Run job.
  20. Go away all default parameters and select Run job.
  21. To watch the job standing, select the job and assessment the Historical past tab.
  22. When the job is full, select the Output hyperlink to open the CloudWatch logs for the job.

The log ought to present the adjustments recognized by the AWS Glue job.

You also needs to obtain an e mail with particulars on the adjustments within the schema. The next is an instance of an e mail acquired.

We are able to now assessment the adjustments recognized by the AWS Glue ETL job and make adjustments within the downstream knowledge retailer accordingly earlier than working the job to propagate the info from the S3 bucket to downstream functions. For instance, you probably have an Amazon Redshift desk, after the job lists all of the schema adjustments, you might want to connect with the Amazon Redshift database and make these schema adjustments. Comply with the change request course of set by your group earlier than making schema adjustments in your manufacturing system.

The next desk has a listing of mappings for Apache Hive and Amazon Redshift knowledge varieties. You’ll find related mappings for different knowledge shops and replace your downstream knowledge retailer.

The offered Python code takes care of the logic to check the schema adjustments. The script takes within the parameters of the AWS Glue Knowledge Catalog ID, AWS Glue database identify, and AWS Glue desk identify.

Hive Knowledge Varieties Description Amazon Redshift Knowledge Varieties Description
TINYINT 1 byte integer . .
SMALLINT Signed two-byte integer SMALLINT Signed two-byte integer
INT Signed four-byte integer INT Signed four-byte integer
BIGINT Signed eight-byte integer BIGINT Signed eight-byte integer
DECIMAL . . .
DOUBLE . . .
STRING . VARCHAR, CHAR .
VARCHAR 1 to 65355, obtainable beginning with Hive 0.12.0 VARCHAR .
CHAR 255 size, obtainable beginning with Hive 0.13.0 CHAR .
DATE 12 months/month/day DATE 12 months/month/day
TIMESTAMP No timezone TIME Time with out time zone
. . TIMETZ Time with time zone
ARRAY/STRUCTS . SUPER .
BOOLEAN . BOOLEAN .
BINARY . VARBYTE Variable-length binary worth

Clear up

While you’re completed exploring the answer, clear up the assets you created as a part of this walkthrough:

  • AWS Glue ETL job (find-change-job-sscp)
  • AWS Glue crawler (glue-crawler-sscp-sales-data)
  • AWS Glue desk (sscp_sales_data)
  • AWS Glue database (sscp-database)
  • IAM function for the crawler and ETL job (AWSGlueServiceRole-sscp-blog)
  • S3 bucket (aws-blog-sscp-ng-202202) with all of the recordsdata within the knowledge and archived folders
  • SNS subject and subscription (NotifySchemaChanges)

Conclusion

On this submit, we confirmed you methods to use AWS providers collectively to detect schema adjustments in your supply knowledge, which you’ll then use to vary your downstream knowledge shops and run ETL jobs to keep away from any failures in your knowledge pipeline. We used AWS Glue to know and catalog the supply knowledge schema, AWS Glue APIs to establish schema adjustments, and Amazon SNS to inform the crew concerning the adjustments. We additionally confirmed you methods to delete the older variations of your supply knowledge schema utilizing AWS Glue APIs. We used Amazon S3 as our knowledge lake storage tier.

Right here you may study extra about AWS Glue.


Concerning the authors

Narendra Gupta is a Specialist Options Architect at AWS, serving to prospects on their cloud journey with a deal with AWS analytics providers. Outdoors of labor, Narendra enjoys studying new applied sciences, watching films, and visiting new locations.

Navnit Shukla is AWS Specialist Options Architect in Analytics. He’s keen about serving to prospects uncover insights from their knowledge. He has been constructing options to assist organizations make data-driven choices.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments