#### 创建DF的流程
定义RDD
```sh
方式一:
sc.textFile(文件.txt)-->RDD
方式二:
nums = [1,2,3,4,5]
rdd = sc.parallelize(nums)
```
定义rowRDD
```sh
rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1]))
```
定义structField结构
```sh
StructField(fieldName, StringType(), nullable = True)
```
定义structType结构
```sh
struct_type = StructType([StructField('name', StringType(), nullable = True),StructField('age', StringType(), nullable = True)])
```
定义DataFrame
```sh
PeopleDF=spark.createDataFrame(rowRDD, struct_type)
```
#### tempView和DF、RDD之间的转换
DF到tempView
```sh
peopleTempView=peopleDF.createOrReplaceTempView("people")
```
tempView到DataFrame
```sh
PeopleDF = spark.sql("SELECT * FROM people")
```
DataFrame到RDD
```sh
PeopleDF.rdd.map(lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1])
```
#### 加载json文件生成DataFrame
```sh
peopleDF2 = spark.read.format("json").load("people.json")
```
#### DataFrame保持csv文件
```sh
peopleDF2.select("name", "age").write.format("csv").save("newpeople.csv")
```
#### 读取parquet文件生成DataFrame
```sh
parquetFileDF = spark.read.parquet("users.parquet")
```
#### DataFrame保存成parquet文件
```sh
parquetFileDF.write.parquet("newpeople.parquet")
```
#### 读取jdbc生成DataFrame
```sh
jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/test").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "root").load()
jdbcDF.show()
```
```sh
+---+----------+------+----+
| id| name|gender| age|
+---+----------+------+----+
| 1| test| null|null|
| 2|sheyisnong| null|null|
+---+----------+------+----+
```
#### DataFrame数据写入jdbc
```sh
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" "))
//下面要设置模式信息
schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3])))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataFrame(rowRDD, schema)
prop = {}
prop['user'] = 'root'
prop['password'] = 'root'
prop['driver'] = "com.mysql.jdbc.Driver"
studentDF.write.jdbc("jdbc:mysql://localhost:3306/test",'student','append', prop)
```
查看数据库数据
```sql
MariaDB [test]> select * from student;
+----+------------+--------+------+
| id | name | gender | age |
+----+------------+--------+------+
| 1 | test | NULL | NULL |
| 2 | sheyisnong | NULL | NULL |
| 3 | Rongcheng | M | 26 |
| 4 | Guanhua | M | 27 |
+----+------------+--------+------+
4 rows in set (0.00 sec)
```