Apache Spark 是一个用于大规模数据处理的开源分布式计算系统。它提供了丰富的API和工具来处理结构化和半结构化数据,包括JSON数据。在本文中,我们将深入探讨如何使用Spark 将JSON 数据转换为DataFrame 并展示一些实用的代码示例。
Spark DataFrame 是Spark 中用于处理结构化数据的主要API。它提供了类似于关系数据库中表的概念,支持SQL查询、数据操作和数据分析。 DataFrame 可以从各种数据源创建,包括CSV、JSON、Parquet 等。
Spark 的DataFrame API 提供了强大的功能来处理和转换复杂的数据结构(例如JSON)。下面我们将给出一个详细的示例来展示如何将JSON 数据转换为DataFrame。
假设我们有一个文件data.json 包含以下JSON 数据:
{'姓名': '爱丽丝', '年龄': 25, '城市': '纽约'}
{'姓名': '鲍勃', '年龄': 30, '城市': '旧金山'}
{'name': '查理', '年龄': 35, '城市': '洛杉矶'}
在开始之前,我们需要创建一个SparkSession 对象。 SparkSession是Spark 2.0中引入的一个新的API,用于与Spark交互。它是Spark SQL、DataFrame 和Dataset API 的入口点。
使用以下代码创建SparkSession 对象:
导入org.apache.spark.sql.SparkSession
val Spark=SparkSession.builder()
.appName('JSON 到DataFrame')
.master('本地')
.getOrCreate()
接下来,我们使用.read.json() 方法从文件中读取JSON 数据并创建一个DataFrame。
val df=Spark.read.json('path/to/data.json')
在上面的代码中,我们将文件路径传递给.read.json() 方法,它将返回一个包含JSON 数据的DataFrame。您可以将实际文件路径替换为您自己的文件路径。
我们可以使用.show() 方法来查看DataFrame 的内容。
df.show()
运行上面的代码将输出DataFrame 的前20 行:
+--------+----+-------------+
|姓名|年龄|城市|
+--------+----+-------------+
|爱丽丝| 25|纽约|
|鲍勃| 30|旧金山|
|查理| 35|洛杉矶|
+--------+----+-------------+
DataFrame 架构是对DataFrame 中列的名称和数据类型的描述。我们可以使用.printSchema() 方法来查看DataFrame 的模式。
df.printSchema()
运行上面的代码将输出DataFrame 的架构:
根
|-- name: 字符串(可空=true)
|--age: 整数(可空=true)
|-- city: 字符串(可空=true)
使用Spark DataFrame,我们可以执行SQL查询和数据操作。以下是一些常见示例:
要选择DataFrame 中的特定列,可以使用.select() 方法。
df.select('姓名', '年龄').show()
运行上面的代码将输出DataFrame 中“name”和“age”列的内容:
+--------+---+
|姓名|年龄|
+--------+---+
|爱丽丝| 25|
|鲍勃| 30|
|查理| 35|
+--------+---+
要向DataFrame 添加新列,可以使用.withColumn() 方法。
val dfWithNewColumn=df.withColumn('age_plus_10', df('age') + 10)
dfWithNewColumn.show()
运行上面的代码将向DataFrame 添加一个名为“age_plus_10”的新列,其值是“age”列值加10:
+--------+---+-------------+----------------+
|姓名|年龄|城市|age_plus_10|
+--------+---+-------------+----------------+
|爱丽丝| 25|纽约| 35|
|鲍勃| 30|旧金山| 40|
|查理| 35|洛杉矶| 45|
+--------+---+-------------+----------------+
要过滤DataFrame 中的行,可以使用.filter() 方法。
valfilteredDf=df.filter(df('年龄') 30)
过滤Df.show()
运行上面的代码将输出年龄大于30 的行