估计阅读时间: 11 分钟
AWS胶是AWS云上的一个无服务器ETL(提取、转换和加载)服务. 这使得客户很容易准备数据进行分析. 在本文中,我将简要介绍AWS胶和其他AWS服务的基础知识.
然后,我将介绍如何从Amazon S3提取和转换CSV文件. 我们还将了解这些csv如何转换为数据目录,并使用Amazon Athena查询它们,而不需要任何EC2实例或服务器. 接近尾声时, 我们将把转换后的数据加载到亚马逊红移中,稍后可以用于分析.
AWS胶组件
- 数据目录: The data catalog holds the metadata and the structure of the data.
- 数据库: 它用于为源和目标创建或访问数据库.
- 表: 在数据库中创建一个或多个可由源和目标使用的表.
- 履带和分类器: 爬虫用于使用内置或自定义分类器从源检索数据. 它创建/使用数据目录中预定义的元数据表.
- 工作: A 工作 is business logic that carries out an ETL task. 在内部,Apache 火花使用python或scala语言编写这个业务逻辑.
- 触发: 触发器按需或在特定时间启动ETL作业执行.
- 开发终端: 它创建了一个开发环境,可以在其中测试、开发和调试ETL作业脚本.
端到端的AWS胶水ETL
本教程帮助您了解AWS胶如何与Amazon S3和亚马逊红移一起工作. 本实用指南将展示如何从不同的来源读取数据(我们将在本指南中介绍Amazon S3) 应用一些必须的数据转换,如表上的连接和过滤,并最终在亚马逊红移加载转换后的数据.
JDBC连接使用Amazon S3连接数据源和目标, Amazon RDS, 亚马逊红移 or any external database.
在执行任何ETL作业之前,您需要创建一个IAM角色并将数据上传到Amazon S3.
Create an 我的角色 to access AWS胶 + Amazon S3:
- 打开Amazon IAM控制台
- 点击 角色 在左边窗格. 然后点击 创建角色.
- 从“选择受信任实体类型”部分选择AWS服务
- 选择“胶水服务”选择 the service that will use this role”部分
- 从“选择胶水”选择你的用例”部分
- 点击 下一个:权限
- Select AWSGlueServiceRole 从 附加权限策略 section. AWSGlueServiceRole policy包含Glue、CloudWatch、EC2、S3和IAM的访问权限
- 点击 下一个:标签. 离开 添加标签 部分空白,点击 下一个:审查
- Provide a 的名字 to identify the service role AWSGlueServiceRole -
( for simplicity add prefix ‘AWSGlueServiceRole -’ in the role 的名字)。 - 单击Create role
- 您已经创建了可以完全访问AWS胶和限制访问Amazon S3的角色
Upload source CSV files to Amazon S3:
- 在 Amazon S3 控制台,单击 创建一个桶 where you can store files and folders.
- 输入一个 bucket名称,选择一个 地区 并点击 下一个
- 创建S3桶的其他配置为可选配置. 点击 下一个 来创建S3桶.
- 在bucket中创建一个新文件夹,并上传源CSV文件
Create a connection 为 target database into 亚马逊红移:
- Prerequisite: You must have an existing cluster, 数据库名称 and user 为 database in 亚马逊红移.
- 在 AWS胶 控制台,单击 Add 连接 在左边窗格.
在 dialog box, enter the connection 的名字 under 连接名 和选择 连接类型 as 亚马逊红移. 点击 下一个 转到下一个画面.

- 中选择您的现有集群 亚马逊红移 as the cluster for your connection
- 输入集群的数据库名称、用户名和密码,单击 下一个

Review the details of the connection and then click 完成您的亚马逊红移连接现在已经创建,并且可以通过 测试连接.

添加一个爬行器来创建使用Amazon S3作为数据源的数据目录
的左窗格 AWS胶 控制台,点击 Crawlers -> Add Crawler
Enter the crawler 的名字 in the dialog box 并点击 下一个

选择 S3 as the data store 从 drop-down 列表
Select the folder where your CSVs are stored in the 包括路径 场

If you have any other data source, click on 是的 并重复上述步骤. 在本指南中,我们没有另一个例子,我们将点击 No. 然后,单击 下一个.

Select 请选择已有的IAM角色 的下拉列表中选择先前创建的角色名 我的角色 并点击 下一个

保留默认选项 上运行的需求 并点击 下一个

选择现有数据库. 如果没有,请单击 添加数据库 to create a new database on the fly.
Table prefixes are optional and left to the user to customer. 系统也会在运行爬虫后自动创建这些. 点击 下一个.

Once you are satisfied with the configuration, click on 完成

You can now run the crawler by clicking 现在运行它

数据库 在左边的窗格中,您可以验证这些表是否是由爬虫自动创建的. Amazon Athena enables you to view the data in the tables.

了解更多关于我们的 先进的AWS服务!
添加一个工作来执行ETL工作
在左侧窗格中,单击 工作,然后点击 添加工作
输入一个 的名字 ,然后选择 我的角色 之前为AWS胶创建的
Select 火花 为 类型 并选择 Python或Scala. 为了我们的目的,我们正在使用 Python.
可编辑“DPU(数据处理单元)个数”的值 最大容量 领域的 Security configuration, script libraries, and 工作 parameters (可选).
其余的配置是可选的,默认值可以正常工作.
点击 下一个

选择一个 源表数据 从 选择一个数据源 section. You can choose only a single data source. 点击 下一个.

选择一个 目标表数据 从表列表中. Either you can create new tables or choose an existing one.

If you haven’t created any target table, select Create tables in your data target option
我们的目标数据库是 亚马逊红移 因此我们应该选择 JDBC 的下拉列表 数据存储 and the connection created earlier 从 连接 列表
输入一个 数据库名称 它一定存在于 目标数据存储. 点击 下一个.

您可以将源表的列映射到目标表的列. For this tutorial, we are going ahead with the default mapping. The business logic can also later modify this.
点击 保存作业和编辑脚本.

Open the Python script by selecting the recently created 工作 的名字. 点击 Action -> Edit Script.
The left pane shows a visual representation of the ETL process. 右边的窗格显示了脚本代码,在它的下面您可以看到正在运行的工作的日志.

让我们了解在AWS胶上执行提取、转换和加载过程的脚本.
我们从导入创建ETL作业所需的python库开始.
导入系统 从awsglue.转换导入* 从awsglue.utils import getResolvedOptions 从pyspark.上下文进口火花Context 从awsglue.上下文进口GlueContext 从awsglue.工作导入工作
Get the 的名字 of 工作 through the command line.
args = getResolvedOptions (sys.argv [' TempDir ', ' JOB_NAME '])
Initialize the GlueContext and 火花Context 为 工作.
sc = 火花Context () glueContext = glueContext (sc) 火花= glueContext.spark_session 工作= (glueContext) 工作.init (args(“JOB_NAME”),args)
从数据目录中提取tbl_syn_source_1_csv和tbl_syn_source_2_csv表的数据. AWS胶 supports Dynamic Frames of the data.
datasource1 = glueContext.create_dynamic_frame.从_catalog(database = "db_demo1", table_的名字 = "tbl_syn_source_1_csv", transformation_ctx = "datasource1") datasource2 = glueContext.create_dynamic_frame.从_catalog(database = "db_demo1", table_的名字 = "tbl_syn_source_2_csv", transformation_ctx = "datasource2")
Now, Apply transformation on the source tables. You can join both the tables on statecode column of tbl_syn_source_1_csv 和代码列 tbl_syn_source_2_csv.
在AWS胶中有几个转换可用,如Re的名字Field、SelectField、Join等. 请参考, http://docs.aws.amazon.com/glue/latest/dg/built-in-transforms.html.
join1 =加入.(第一帧= datasource1申请, frame2 = datasource2, keys1 = " statecode ", keys2 =“代码”, transformation_ctx = " join1”)
Load the joined Dynamic Frame in 亚马逊红移 (Database=dev and Schema=shc_demo_1).
datasink1 = glueContext.write_dynamic_frame.从_jdbc_conf(帧= join1, catalog_connection = "my-redshift-1", connection_options = {"dbtable": "sch_demo_1.tbl_joined”, “数据库”:“开发”}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")
最后,提交你的工作.
工作.commit ()
Save and execute the 工作 by clicking on Run 工作.
代码示例
导入系统 从awsglue.转换导入* 从awsglue.utils import getResolvedOptions 从pyspark.上下文进口火花Context 从awsglue.上下文进口GlueContext 从awsglue.工作导入工作 args = getResolvedOptions (sys.argv [' TempDir ', ' JOB_NAME ']) ## Initialize the GlueContext and 火花Context sc = 火花Context () glueContext = glueContext (sc) 火花= glueContext.spark_session 工作= (glueContext) 工作.init (args(“JOB_NAME”),args) ##从Amazon S3读取数据,并在数据目录中有它们的结构. datasource1 = glueContext.create_dynamic_frame.从_catalog(database = "db_demo1", table_的名字 = "tbl_syn_source_1_csv", transformation_ctx = "datasource1") datasource2 = glueContext.create_dynamic_frame.从_catalog(database = "db_demo1", table_的名字 = "tbl_syn_source_2_csv", transformation_ctx = "datasource2") ## Apply transformation, join the tables join1 =加入.(第一帧= datasource1申请, frame2 = datasource2, keys1 = " statecode ", keys2 =“代码”, transformation_ctx = " join1”) ## Write the transformed data into 亚马逊红移 datasink1 = glueContext.write_dynamic_frame.从_jdbc_conf(帧= join1, catalog_connection = "my-redshift-1", connection_options = {"dbtable": "sch_demo_1.tbl_joined”, “数据库”:“开发”}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1") 工作.commit ()
More on transformation with AWS胶
AWS胶对UNION、LEFT JOIN、RIGHT JOIN等转换有一些限制. To overcome this issue, we can use 火花. 将AWS胶的动态框架转换为火花 DataFrame,然后可以应用火花函数进行各种转换.
Example: Union transformation is not available in AWS胶. However, you can use spark union() to achieve Union on two tables.
## Convert Glue Dynamic frame to 火花 DataFrame spark_data_frame_1 = glue_dynamic_frame_1.toDF () spark_data_frame_2 = glue_dynamic_frame_2.toDF () ## Apply UNION Transformation on 火花 DataFrame spark_data_frame_union = spark_data_frame_1.联盟(spark_data_frame_2).不同的() ## Again, convert 火花 DataFrame back to Glue Dynamic Frame glue_dynamic_frame_union = DynamicFrame.从DF (spark_data_frame_union glueContext,“spark_data_frame_union”)
Github源代码链接: http://gist.github.com/nitinmlvya/ba4626e8ec40dc546119bb14a8349b45