A Practical Guide to AWS Glue

Encora | May 02, 2019

Estimated reading time: 11 minutes

AWS Glue是AWS云上的无服务器ETL(提取、转换和加载)服务. 它使客户更容易准备他们的数据进行分析. 在本文中,我将简要介绍AWS Glue和其他AWS服务的基础知识.

然后,我将介绍如何从Amazon S3提取和转换CSV文件. 我们还将了解这些csv如何转换为数据目录,并使用Amazon Athena查询它们,而不需要任何EC2实例或服务器. Towards the end, 我们将把转换后的数据加载到Amazon Redshift中,以便稍后用于分析.

Components of AWS Glue

  • Data catalog: 数据目录保存元数据和数据结构.
  • Database: 它用于为源和目标创建或访问数据库.
  • Table: 在数据库中创建一个或多个可供源和目标使用的表.
  • Crawler and Classifier: 爬虫用于使用内置或自定义分类器从源检索数据. 它创建/使用数据编目中预定义的元数据表.
  • Job: 作业是执行ETL任务的业务逻辑. 在内部,Apache Spark使用python或scala语言编写此业务逻辑.
  • Trigger: 触发器按需或在特定时间启动ETL作业执行.
  • Development endpoint: 它创建了一个开发环境,可以在其中测试、开发和调试ETL作业脚本.

End-to-End ETL on AWS Glue

本教程帮助您了解AWS Glue如何与Amazon S3和Amazon Redshift一起工作. 本实用指南将展示如何从不同的数据源读取数据(我们将在本指南中介绍Amazon S3)。 并在表上应用一些必需的数据转换,如连接和过滤,最后在Amazon Redshift中加载转换后的数据.
JDBC连接使用Amazon S3连接数据源和目标, Amazon RDS, Amazon Redshift or any external database.
在实现任何ETL作业之前,您需要创建一个IAM角色并将数据上传到Amazon S3.

创建IAM角色访问AWS Glue + Amazon S3:

  • Open the Amazon IAM console
  • Click on Roles in the left pane. Then click on Create Role.
  • 从选择受信任实体类型一节中选择AWS服务
  • Choose Glue service from “选择将使用此角色的服务” section
  • Choose Glue from “Select your use case” section
  • Click on Next:Permissions
  • Select AWSGlueServiceRole from the Attach Permissions Policies section. AWSGlueServiceRole policy包含访问Glue、CloudWatch、EC2、S3和IAM的权限
  • Click on Next:Tags. Leave the Add tags section blank and click on Next:Review
  • 提供一个名称来标识服务角色 AWSGlueServiceRole-(为了简单起见,在角色名称中添加前缀“AWSGlueServiceRole-”) for the role
  • Click on Create role
  • 已经创建了对AWS Glue具有完全访问权限且对Amazon S3具有有限访问权限的角色

Upload source CSV files to Amazon S3:

  • On the Amazon S3 console, click on the Create a bucket where you can store files and folders.
  • Enter a bucket name, select a Region and click on Next
  • 创建S3桶的其余配置设置为可选配置. Click Next to create your S3 bucket.
  • 在bucket中创建一个新文件夹并上传源CSV文件

为目标数据库创建到Amazon Redshift的连接:

  • 前提条件:必须存在已存在的集群, 在Amazon Redshift中数据库的名称和用户.
  • In the AWS Glue 控制台,单击左侧窗格中的Add Connection.

在对话框中,在下面输入连接名称 Connection name and choose the  Connection type as Amazon Redshift. Click Next to move to the next screen.

  • Select your existing cluster in Amazon Redshift as the cluster for your connection
  • 输入集群的数据库名称、用户名和密码,单击 Next

查看连接的详细信息,然后单击 Finish您的Amazon Redshift连接现在已经创建,并且可以通过 Test Connection.

添加爬虫以使用Amazon S3作为数据源创建数据目录

On the left pane in the AWS Glue console, click on Crawlers -> Add Crawler

在对话框中输入爬虫名称,单击 Next

Choose S3 作为下拉列表中的数据存储

文件中存储csv的文件夹 Include path field

如果有任何其他数据源,请单击 Yes and repeat the above steps. 在本指南中,我们没有其他示例,我们将单击 No. Then, click Next.

Select Choose an existing IAM role 并从的下拉列表中选择前面创建的角色名称 IAM roles and click Next

Leave the default option of Run on Demand and click Next

Choose an existing database. If you do not have one, Click Add Database to create a new database on the fly.

表前缀是可选的,留给用户和客户. 系统也会在运行爬虫后自动创建这些. Click Next.

一旦您对配置感到满意,单击 Finish

You can now run the crawler by clicking Run it now

Databases 在左侧窗格中,您可以验证表是否是由爬虫自动创建的. Amazon Athena允许您查看表中的数据.

Find out more about our Advanced AWS Services!

Add a Job to execute ETL work

In the left pane, Click on Job, then click on Add Job

Enter a name for the Job and then select an IAM role previously created for AWS Glue

Select Spark for the Type and select Python or Scala. For our purposes, we are using Python.

可以编辑DPU(数据处理单元)的个数 Maximum capacity field of 安全配置、脚本库和作业参数 (optional).

其余的配置是可选的,默认值就可以了.

Click Next

Choose a data source table from Choose a data source section. 您只能选择一个数据源. Click Next.

Choose a data target table from the list of tables. 您可以创建新表,也可以选择现有表.

如果没有创建任何目标表,请选择 Create tables in your data target option

Our target database is Amazon Redshift and hence we should select JDBC from the dropdown of Datastore 之前创建的连接 Connection list

Enter a database name that must exist in the target data store. Click Next.

可以将源表的列与目标表的列进行映射. 在本教程中,我们继续使用默认映射. 稍后业务逻辑也可以修改它. 

Click Save job and edit script.

通过选择最近创建的作业名称打开Python脚本. Click on Action -> Edit Script.

左窗格显示了ETL流程的可视化表示. 右侧窗格显示脚本代码,在其下方可以看到正在运行的Job的日志.

让我们来理解一下在AWS Glue上执行提取、转换和加载过程的脚本.

我们首先导入创建ETL作业所需的python库.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import sys
from awsglue. transforms import *
from awsglue. utils import getResolvedOptions
from pyspark. context import SparkContext
from awsglue. context import GlueContext
from awsglue. job import Job
import sys from awsglue.transforms import * from awsglue.utils从pyspark导入getResolvedOptions.从awsglue中导入SparkContext.context import GlueContext from awsglue.job import Job
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

通过命令行获取Job的名称.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
args = getResolvedOptions ( sys. argv , [ 'TempDir' , 'JOB_NAME' ])
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

初始化作业的GlueContext和SparkContext.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
sc = SparkContext ()
glueContext = GlueContext ( sc )
spark = glueContext. spark_session
job = Job ( glueContext )
job. init ( args [ 'JOB_NAME' ] , args )
sc = SparkContext() glueContext = glueContext (sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

从数据目录中提取tbl_syn_source_1_csv和tbl_syn_source_2_csv表的数据. AWS Glue支持数据的动态框架.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
datasource1 = glueContext. create_dynamic_frame . from_catalog ( database = "db_demo1" , table_name = "tbl_syn_source_1_csv" , transformation_ctx = "datasource1" )
datasource2 = glueContext. create_dynamic_frame . from_catalog ( database = "db_demo1" , table_name = "tbl_syn_source_2_csv" , transformation_ctx = "datasource2" )
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_1_csv", transformation_ctx = "datasource1") datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_2_csv", transformation_ctx = "datasource2")
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_1_csv", transformation_ctx = "datasource1")
datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_2_csv", transformation_ctx = "datasource2")

现在,在源表上应用转换. 的statecode列连接这两个表 tbl_syn_source_1_csv and code column of tbl_syn_source_2_csv.

AWS Glue中提供了几种转换,如重命名字段、选择字段、连接等. Refer – http://docs.aws.amazon.com/glue/latest/dg/built-in-transforms.html.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
join1 = Join. apply ( Frame1 = datasource1, frame2 = datasource2, keys1 = "statecode" , keys2 = "code" , transformation_ctx = "join1" )
join1 = Join.apply(Frame1 = datasource1, frame2 = datasource2, keys1 = "statecode", keys2 = "code", transformation_ctx = "join1")
join1 = Join.apply(Frame1 = datasource1, frame2 = datasource2, keys1 = "statecode", keys2 = "code", transformation_ctx = "join1")

在Amazon Redshift中加载加入的动态框架(Database=dev and Schema=shc_demo_1).

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
datasink1 = glueContext. write_dynamic_frame . from_jdbc_conf ( frame = join1, catalog_connection = "my-redshift-1" , connection_options = { "dbtable" : "sch_demo_1.tbl_joined" , "database" : "dev" } , redshift_tmp_dir = args [ "TempDir" ] , transformation_ctx = "datasink1" )
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = join1, catalog_connection = "my-redshift-1", Connection_options = {"dbtable": "sch_demo_1 ..tbl_joined", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = join1, catalog_connection = "my-redshift-1", Connection_options = {"dbtable": "sch_demo_1 ..tbl_joined", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")

Finally, commit your Job.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
job. commit ()
job.commit()
job.commit()

通过单击Run Job保存并执行作业.

Code Sample

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import sys
from awsglue. transforms import *
from awsglue. utils import getResolvedOptions
from pyspark. context import SparkContext
from awsglue. context import GlueContext
from awsglue. job import Job
args = getResolvedOptions ( sys. argv , [ 'TempDir' , 'JOB_NAME' ])
初始化GlueContext和SparkContext
sc = SparkContext ()
glueContext = GlueContext ( sc )
spark = glueContext. spark_session
job = Job ( glueContext )
job. init ( args [ 'JOB_NAME' ] , args )
##从Amazon S3读取数据,并将其结构放入数据目录.
datasource1 = glueContext. create_dynamic_frame . from_catalog ( database = "db_demo1" , table_name = "tbl_syn_source_1_csv" , transformation_ctx = "datasource1" )
datasource2 = glueContext. create_dynamic_frame . from_catalog ( database = "db_demo1" , table_name = "tbl_syn_source_2_csv" , transformation_ctx = "datasource2" )
应用转换,连接表
join1 = Join. apply ( Frame1 = datasource1, frame2 = datasource2, keys1 = "statecode" , keys2 = "code" , transformation_ctx = "join1" )
将转换后的数据写入Amazon Redshift
datasink1 = glueContext. write_dynamic_frame . from_jdbc_conf ( frame = join1, catalog_connection = "my-redshift-1" , connection_options = { "dbtable" : "sch_demo_1.tbl_joined" , "database" : "dev" } , redshift_tmp_dir = args [ "TempDir" ] , transformation_ctx = "datasink1" )
job. commit ()
import sys from awsglue.transforms import * from awsglue.utils从pyspark导入getResolvedOptions.从awsglue中导入SparkContext.context import GlueContext from awsglue.job import job args = getResolvedOptions(sys ..argv, ['TempDir','JOB_NAME']) ##初始化GlueContext和SparkContext sc = SparkContext() GlueContext = GlueContext(sc) spark = GlueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], 从Amazon S3读取数据,并将其结构放入数据目录中. datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_1_csv", transformation_ctx = "datasource1") datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_2_csv", transformation_ctx = "datasource2") ##应用转换, join the tables join1 = Join.apply(Frame1 = datasource1, frame2 = datasource2, keys1 = "statecode", keys2 = "code", transformation_ctx = "join1") ##将转换后的数据写入Amazon Redshift datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = join1, catalog_connection = "my-redshift-1", Connection_options = {"dbtable": "sch_demo_1 ..tbl_joined", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1") job.commit()
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
初始化GlueContext和SparkContext
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
##从Amazon S3读取数据,并将其结构放入数据目录.
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_1_csv", transformation_ctx = "datasource1")
datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "db_demo1", table_name = "tbl_syn_source_2_csv", transformation_ctx = "datasource2")
应用转换,连接表
join1 = Join.apply(Frame1 = datasource1, frame2 = datasource2, keys1 = "statecode", keys2 = "code", transformation_ctx = "join1")
将转换后的数据写入Amazon Redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = join1, catalog_connection = "my-redshift-1", Connection_options = {"dbtable": "sch_demo_1 ..tbl_joined", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")
job.commit()

More on transformation with AWS Glue

AWS Glue在UNION、LEFT JOIN、RIGHT JOIN等转换上有一些限制. 为了克服这个问题,我们可以使用Spark. 将AWS Glue的动态框架转换为Spark DataFrame,然后可以应用Spark函数进行各种转换.

示例:AWS Glue中不支持Union转换. 但是,您可以使用spark union()在两个表上实现union.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
将Glue动态框架转换为Spark数据框架
Spark_data_frame_1 = glue_dynamic_frame_1. toDF ()
Spark_data_frame_2 = glue_dynamic_frame_2. toDF ()
##在Spark DataFrame上应用UNION转换
Spark_data_frame_union = spark_data_frame_1. union ( spark_data_frame_2 ) . distinct ()
再次,将Spark DataFrame转换回Glue Dynamic Frame
glue_dynamic_frame_union = DynamicFrame. fromDF ( spark_data_frame_union, glueContext, "spark_data_frame_union" )
将Glue动态框架转换为Spark DataFrame Spark_data_frame_1 = glue_dynamic_frame_1.toDF() Spark_data_frame_2 = glue_dynamic_frame_2.Spark_data_frame_union = spark_data_frame_1 . toDF() ##在Spark数据框上应用UNION转换.union(spark_data_frame_2).distinct() ## Again, 将Spark DataFrame转换回Glue DynamicFrame.fromDF(spark_data_frame_union, glueContext, "spark_data_frame_union")
将Glue动态框架转换为Spark数据框架
Spark_data_frame_1 = glue_dynamic_frame_1.toDF()
Spark_data_frame_2 = glue_dynamic_frame_2.toDF()
##在Spark DataFrame上应用UNION转换
Spark_data_frame_union = spark_data_frame_1.union(spark_data_frame_2).distinct()
再次,将Spark DataFrame转换回Glue Dynamic Frame
glue_dynamic_frame_union = DynamicFrame.fromDF(spark_data_frame_union, glueContext, "spark_data_frame_union")

Github link for source code: http://gist.github.com/nitinmlvya/ba4626e8ec40dc546119bb14a8349b45

Further Reading:

Insight Content

Share this Post

Featured Insights