更新时间:2023年07月28日16时16分 来源:传智教育 浏览次数:
DataFrame对象可以从RDD转换而来,都是分布式数据集 其实就是转换一下内部存储的结构,转换为二维表结构。
将RDD转换为DataFrame方式1:
调用spark
# 首先构建一个RDD rdd[(name, age), ()] rdd = sc.textFile("../data/sql/people.txt").\ map(lambda x: x.split(',')).\ map(lambda x: [x[0], int(x[1])]) # 需要做类型转换, 因为类型从RDD中探测 # 构建DF方式1 df = spark.createDataFrame(rdd, schema = ['name', 'age'])
通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame,这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)。
# coding:utf8 # 演示DataFrame创建的三种方式 from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder.\ appName("create df").\ master("local[*]").\ getOrCreate() sc = spark.sparkContext # 首先构建一个RDD rdd[(name, age), ()] rdd = sc.textFile("../data/sql/people.txt").\ map(lambda x: x.split(',')).\ map(lambda x: [x[0], int(x[1])]) # 需要做类型转换, 因为类型从RDD中探测 # 构建DF方式1 df = spark.createDataFrame(rdd, schema = ['name', 'age']) # 打印表结构 df.printSchema() # 打印20行数据 df.show() df.createTempView("ttt") spark.sql("select * from ttt where age< 30").show()将RDD转换为DataFrame方式2:
通过StructType对象来定义DataFrame的“表结构”转换RDD
# 创建DF , 首先创建RDD 将RDD转DF rdd = sc.textFile("../data/sql/stu_score.txt").\ map(lambda x:x.split(',')).\ map(lambda x:(int(x[0]), x[1], int(x[2]))) # StructType 类 # 这个类 可以定义整个DataFrame中的Schema schema = StructType().\ add("id", IntegerType(), nullable=False).\ add("name", StringType(), nullable=True).\ add("score", IntegerType(), nullable=False) # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add, 每一个add代表一个StructField # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空 df = spark.createDataFrame(rdd, schema) # coding:utf8 # 需求: 基于StructType的方式构建DataFrame 同样是RDD转DF from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType if __name__ == '__main__': spark = SparkSession.builder.\ appName("create_df"). \ config("spark.sql.shuffle.partitions", "4"). \ getOrCreate() # SparkSession对象也可以获取 SparkContext sc = spark.sparkContext # 创建DF , 首先创建RDD 将RDD转DF rdd = sc.textFile("../data/sql/stu_score.txt").\ map(lambda x:x.split(',')).\ map(lambda x:(int(x[0]), x[1], int(x[2]))) # StructType 类 # 这个类 可以定义整个DataFrame中的Schema schema = StructType().\ add("id", IntegerType(), nullable=False).\ add("name", StringType(), nullable=True).\ add("score", IntegerType(), nullable=False) # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空 df = spark.createDataFrame(rdd, schema) df.printSchema() df.show()
将RDD转换为DataFrame方式3:
使用RDD的toDF方法转换RDD
# StructType 类 # 这个类 可以定义整个DataFrame中的Schema schema = StructType().\ add("id", IntegerType(), nullable=False).\ add("name", StringType(), nullable=True).\ add("score", IntegerType(), nullable=False) # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空 # 方式1: 只传列名, 类型靠推断, 是否允许为空是true df = rdd.toDF(['id', 'subject', 'score']) df.printSchema() df.show() # 方式2: 传入完整的Schema描述对象StructType df = rdd.toDF(schema) df.printSchema() df.show() # coding:utf8 # 需求: 使用toDF方法将RDD转换为DF from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType if __name__ == '__main__': spark = SparkSession.builder.\ appName("create_df"). \ config("spark.sql.shuffle.partitions", "4"). \ getOrCreate() # SparkSession对象也可以获取 SparkContext sc = spark.sparkContext # 创建DF , 首先创建RDD 将RDD转DF rdd = sc.textFile("../data/sql/stu_score.txt").\ map(lambda x:x.split(',')).\ map(lambda x:(int(x[0]), x[1], int(x[2]))) # StructType 类 # 这个类 可以定义整个DataFrame中的Schema schema = StructType().\ add("id", IntegerType(), nullable=False).\ add("name", StringType(), nullable=True).\ add("score", IntegerType(), nullable=False) # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空 # 方式1: 只传列名, 类型靠推断, 是否允许为空是true df = rdd.toDF(['id', 'subject', 'score']) df.printSchema() df.show() # 方式2: 传入完整的Schema描述对象StructType df = rdd.toDF(schema) df.printSchema() df.show()