AWS Glue - ETL job to sync data from source to target data store
AWS Glue is ETL service, which help you to load, transfer and load data from source DB to target data store. This is very much useful when you have Datawarehouse and you need to load data from your application source database to Datawarehouse DB or you have multiple files or different data stores which you want to read, transform and load into Datawarehouse or other target data store. To achieve this, you need ETL tool and AWS Glue service provide same feature as ETL job to sync data from source to target data store.
While I was working with AWS Glue, there was multiple steps require to setup and some of important information's which you must be aware of like how you can do incremental load and even how you can override existing data into target data store etc. Below are some tips which I would like to share, so that it may help you if you are planning to use AWS Glue.
- Create connections for you source & target and test connection.
- Make sure you have vpc endpoint for s3 and type as gateway and route table same as subnet used for your source, target and glue.
- Security group you create for glue and apply for connection, make sure it have all TCP inbound rule allowed for its same/self security group source.
- With database server machine security group, source server port like if your source is sql server then port 1433 should be allowed into inbound rule from source as subnet CIDR where glue connection created inside which subnet.
- While you test connection, if you get timeout or connection fail, then try with private ip of your sql server if your sql server is hosted on aws ec2.
- The IAM role, you need should have permission: "GlueServiceRole" for the service trust as glue.
- Create crawler i.e. adding your data source & target.
- While you creating crawler, it will ask for source and create database for crawler to store source schema and on crawler run, it actually sync schema and data to that temp table got sync in backend and later same push to target via job. It also ask you for other configuration like how your source schema change will reflect on crawler run in future like you want to auto apply change or ignore etc.
- If you want to create table/schema automatically with crawler database you crated, then with configuration make sure you select that option it treat as "UPDATE_IN_DATABASE" in backend.
- Once crawler is created, run that crawler, so that it will sync the table schema into your crawler database you created during setup and you can check this from left navigation inside database. You can check no. of tables added from crawler screen also in the same grid.
- Create job to run this to get data from source, transform and then push to target.
- While creating, using UI visual editor option, you add source as your provider like sql server as source and then add transform like filter, mapping, etc and select parent node as source and then add target and select node as filter or mapping you choose at end.
- Here you need filter node if want to filter source data like where condition.
- Mapping node is for apply field mapping between source and target connection schema.
- While creating, it provide bookmark option which is by default disabled, so if you want to load incremental data, then you should enable it. So if its enabled, then, it will only sync new records. But as on today "03-Sep-2022", it does not support overriding/updating existing records which are already sync earlier with your target data store and now it got change in source db. But there are some workaround, which you can find in below sample solution.
- Here once you create, you get code/script which you can change if you want to process data before send to target. (below sample codes applied in same script).
- Now you can run job manually.
- Create trigger to schedule job if you want to run on some specified time/interval.
- By default, it created in disabled stage, so select your trigger and enable that from action menu.
Custom codes/solution:
- Problem 1: How I can replace/override existing data if same got updated in source data store?
- As explained above, Glue currently only support incremental load i.e. only new records but not able to replace/override existing already sync record and to do that, you should create one staging table at target data store and then using the filter option you only filter updated records based on let say last update date and sync with that staging table. To run sql query to filter updated records as per last update date, you can use "SQL" filter option available part of transform option on AWS Glue UI and there you can write sql query, refer below same code:
SqlQuery0 = "select * from myDataSource where update_date between '"+@LastRuneDateTime+"' and '"+@CurrentDateTime+"'"
UpdatedRecordSet = sparkSqlQuery(
glueContext,
query=SqlQuery0,
mapping={"myDataSource": SQLServertable_source_node},
transformation_ctx="UpdatedRecordSet",
)
- So once data sync to staging table, then you should call stored procedure which exists at your target data store and that will use the staging table and actual target table to update the records. Below is the sample code, how you can call stored procedure from Glue script:
source_jdbc_conf = glueContext.extract_jdbc_conf('<your-target-connection-name-you-had-created-on-glue>')
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
java_import(sc._gateway.jvm,"java.sql.ResultSet")
#making connection to target data store
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
logger.info('Connected to ' + conn.getMetaData().getDatabaseProductName() + ', ' + source_jdbc_conf.get('url'))
try:
#execute stored procedure, where test_transform is the sp name as passing two parameters
cstmt = conn.prepareCall("{call targetdb.dbo.test_transform(?,?)}");
cstmt.setString(1, parameter1) #sp parameter number 1
cstmt.setString(2, parameter2) #sp parameter number 2
cstmt.execute();
except Exception as e:
logger.info("error - "+str(e))
conn.close()
- Problem 2: How to get field/column value from source or target data store, let say you have stored your last run date time and while running this Glue job you want to pull last run date time, so that same you can use to fetch updated/new records only as per last run date time.
- To get this, you can simply use same source or target node and apply filters or SQL query if you need specific records as where condition and then use below query to get that specific column by iterating result set:
dfrecord = SQLServertableTarget_node_filtered_data.toDF()
pandasDF = dfrecord.toPandas()
for index, row in pandasDF.iterrows():
LastRunDateTime= row['LastRunDateTime'] #note- column name is case sensitive
Categories/Tags: aws glue~glue~etl