动手系列在Apache Spark和Python中建立您的第一个ETL管道

动手系列:在Apache Spark和Python中建立您的第一个ETL管道

点选上方关注,All in AI中国

在这篇文章中,我将讨论Apache Spark以及如何在其中建立简单但强大的ETL管道。您将了解Spark如何提供API以将不同的资料格式转换为资料帧和SQL以进行分析,以及如何将一个数据源转换为另一个数据源。

什么是Apache Spark?

根据维基百科:

Apache Spark是一个开源的分散式通用丛集计算框架。 Spark提供了一个界面,用于使用隐式资料并行和容错来程式设计整个丛集。

官方网站:

Apache Spark是用于大规模资料处理的统一分析引擎。

简而言之,Apache Spark是一个用于处理、查询和分析大资料的框架。由于计算是在内存中完成的,因此它比MapReduce等竞争对手要快好几倍。以每天产生数TB的资料的速率,需要一种能够以高速提供实时分析的解决方案。一些Spark功能是:

它比传统的大规模资料处理框架快100倍。易于使用,因为您可以在Python、R和Scala中编写Spark应用程序。它为SQL、Steaming和Graph计算提供了库。Apache Spark元件

Spark 核心

它包含Spark的基本功能,如任务排程、内存管理,与储存的互动等。

Spark SQL

它是一组用于与结构化资料互动的库。它使用类似SQL的界面与各种格式的资料进行互动,如CSV、JSON、Parquet等。

Spark Streaming

Spark Streaming是一个Spark元件,支援处理实时资料流。实时流,如股票资料、天气资料、日志和其他各种。

MLIB

MLib是Spark提供的一套机器学习算法,用于监督和无监督学习

GraphX

它是Apache Spark用于图形和图形平行计算的API。它扩充套件了Spark RDD API,允许我们建立一个带有附加到每个顶点和边缘的任意属性的有向图。它为ETL、探索性分析和迭代图计算提供了统一的工具。

Spark丛集管理器

Spark支援以下资源/丛集管理器:

Spark Standalone - Spark附带的简单丛集管理器Apache Mesos - 一个也可以执行Hadoop应用程序的通用丛集管理器。Apache Hadoop YARN - Hadoop 2中的资源管理器Kubernetes - 一个开源系统,用于自动化容器化应用程序的部署、扩充套件和管理。设定和安装

从这里下载Apache Spark的二进位制档案。您必须在系统上安装Scala,并且还应设定其路径。

对于本教程,我们使用的是在2019年5月释出的2.4.3版。将资料夹移到/ usr / local中

mv spark-2.4.3-bin-hadoop2.7 / usr / local / spark

然后汇出Scala和Spark的路径。

#Scala Path

export PATH=/usr/local/scala/bin:$PATH

#Apache Spark path

export PATH=/usr/local/spark/bin:$PATH

通过在终端上执行spark-shell命令来呼叫Spark Shell。如果一切顺利,你会看到如下所示:

它载入基于Scala的shell。由于我们将使用Python语言,因此我们必须安装PySpark。

pip install pyspark

安装完成后,您可以通过在终端中执行命令pyspark来呼叫它:

您找到了一个典型的Python shell,但它载入了Spark库。

用Python开发

让我们开始编写我们的第一个程式。

from pyspark.sql import SparkSession

from pyspark.sql import SQLContext

if __name__ == \__main__\:

scSpark = SparkSession

.builder

.appName(reading csv)

.getOrCreate()

我们汇入了两个库:SparkSession和SQLContext。

SparkSession是编写Spark应用程序的入口点。它允许您与Spark提供的DataSet和DataFrame API进行互动。我们通过呼叫appName来设定应用程序名称。 getOrCreate()方法返回应用程序的新SparkSession或返回现有的SparkSession。

我们的下一个目标是读取CSV档案。我建立了一个示例CSV档案,名为data.csv,如下所示:

name,age,country

adnan,40,Pakistan

maaz,9,Pakistan

musab,4,Pakistan

ayesha,32,Pakistan

和程式码:

if __name__ == \__main__\:

scSpark = SparkSession

.builder

.appName(reading csv)

.getOrCreate()

data_file = \/Development/PetProjects/LearningSpark/data.csv\

sdfData = scSpark.read.csv(data_file, header=True, sep=,).cache()

print(\Total Records = {}\.format(sdfData.count()))

sdfData.show()

我设定了档案路径,然后呼叫.read.csv来读取CSV档案。引数是不言自明的。 .cache()快取返回resultset,从而提高效能。当我执行该程式时,它返回如下所示的内容:

看起来有趣,不是吗?现在,如果我想读取资料帧中的多个档案,该怎么办?让我们建立另一个档案,我将其称为data1.csv,它如下所示:

1

2

3

4

姓名,年龄,国家

诺琳,23,英格兰

阿米尔,9,巴基斯坦

诺曼,4,巴基斯坦

拉希德,12,巴基斯坦

我只需要这样做:

data_file =\/Development/PetProjects/LearningSpark/data*.csv\它将读取所有以CSV型别的资料开头的档案。

它将如何读取与模式和转储结果匹配的所有CSV档案:

如您所见,它将CSV中的所有资料转储到单个数据帧中。

但有一点,只有当所有CSV都遵循某种模式时,这种转储才有效。如果您有一个具有不同列名的CSV,那么它将返回以下讯息。

19/06/04 18:59:05 WARN CSVDataSource: Number of column in CSV header is not equal to number of fields in the schema:

Header length: 3, schema size: 17

CSV file: file:///Development/PetProjects/LearningSpark/data.csv

如您所见,Spark抱怨不能处理不同的CSV档案。

您可以使用DataFrame执行许多操作,但Spark为您提供了更简单、更熟悉的界面来使用SQLContext操作资料。它是SparkSQL的闸道器,它允许您使用类似SQL的查询来获得所需的结果。

在我们进一步发展之前,让我们先玩一些真实的资料。为此,我们使用的是从Kaggle得到的超市销售资料。在我们尝试SQL查询之前,让我们尝试按性别对记录进行分组。我们正在处理ETL的提取部分。

data_file = \/Development/PetProjects/LearningSpark/supermarket_sales.csv\

sdfData = scSpark.read.csv(data_file, header=True, sep=,).cache()

gender = sdfData.groupBy(\Gender\).count()

print(gender.show())

当你执行时,它会返回如下内容:

groupBy()按给定列对资料进行分组。在我们的例子中,它是性别列。

SparkSQL允许您使用类似SQL的查询来访问资料。

sdfData.registerTempTable(sales)

output = scSpark.sql(\SELECT * from sales\)

output.show()

首先,我们从dataframe中建立一个临时表。为此,使用了registerTampTable。在我们的例子中,表名是sales。完成后,您可以在其上使用典型的SQL查询。在我们的例子中,它是Select * from sales。

或者类似下面的内容:

output = scSpark.sql(\SELECT * from sales WHERE `Unit Price` output.show()

甚至是聚合值。

output = scSpark.sql(\SELECT COUNT(*) as total, City from sales GROUP BY City\)

output.show()

非常灵活,对吗?

我们刚刚完成了ETL的变换部分。

最后是ETL的载入部分。如果要储存转换后的资料怎么办?您会有很多的可用的选项,RDBMS、XML或JSON。

output.write.format(\json\).save(\filtered.json\)

执行时,Sparks会建立以下资料夹/档案结构。

它建立了一个具有档名称的资料夹,在我们的例子中是filtered.json。然后,名为SUCCESStells的档案是否成功执行。如果失败,则生成名为FAILURE的档案。然后,您在此处找到多个档案。多个档案的原因是每个工作都涉及在档案中写入的操作。如果要建立单个档案(不建议使用),则可以使用合并来收集所有分割槽中的资料并将其减少到单个数据帧。

output.coalesce(1).write.format(\json\).save(\filtered.json\)

它将输出以下资料:

{ “总”:328, “城市”: “内比都”}

{ “总”:332, “城市”: “曼德勒”}

{ “总”:340, “城市”: “仰光”}

MySQL和Apache Spark整合

上述资料帧包含转换后的资料。我们希望将这些资料载入到MYSQL中,以便进一步使用,例如视觉化或显示在应用程序上。

首先,我们需要MySQL联结器库来与Spark进行互动。我们将从MySQL网站下载联结器并将其放在一个资料夹中。我们将修改SparkSession以包含JAR档案。

scSpark = SparkSession

.builder

.appName(reading csv)

.config(spark.driver.extraClassPath, /usr/local/spark/jars/mysql-connector-java-8.0.16.jar)

.getOrCreate()

输出现在如下所示:

output = scSpark.sql(\SELECT COUNT(*) as total, City from sales GROUP BY City\)

output.show()

output.write.format(\jdbc\).options(

url=\jdbc:mysql://localhost/spark\,

driver=\com.mysql.cj.jdbc.Driver\,

dbtable=\city_info\,

user=\root\,

password=\root\).mode(\append\).save()

在执行指令码之前,我在数据库中建立了所需的Db和表。如果一切顺利,您应该看到如下结果:

如您所见,Spark可以更轻松地将资料从一个数据源传输到另一个数据源。

结论

Apache Spark是一个非常苛刻且有用的大资料工具,可以帮助您轻松编写ETL。 您可以载入pb级的资料,并且可以通过设定多个节点的丛集轻松地处理这些资料。本教程只是为您提供Apache Spark编写ETL的基本思想。 您应该检查文件和其他资源以深入挖掘。

猜你喜欢