兜兜    2021-11-10 09:55:43    2022-01-25 09:27:33   

spark
#### 创建DF的流程 定义RDD ```sh 方式一: sc.textFile(文件.txt)-->RDD 方式二: nums = [1,2,3,4,5] rdd = sc.parallelize(nums) ``` 定义rowRDD ```sh rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1])) ``` 定义structField结构 ```sh StructField(fieldName, StringType(), nullable = True) ``` 定义structType结构 ```sh struct_type = StructType([StructField('name', StringType(), nullable = True),StructField('age', StringType(), nullable = True)]) ``` 定义DataFrame ```sh PeopleDF=spark.createDataFrame(rowRDD, struct_type) ``` #### tempView和DF、RDD之间的转换 DF到tempView ```sh peopleTempView=peopleDF.createOrReplaceTempView("people") ``` tempView到DataFrame ```sh PeopleDF = spark.sql("SELECT * FROM people") ``` DataFrame到RDD ```sh PeopleDF.rdd.map(lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]) ``` #### 加载json文件生成DataFrame ```sh peopleDF2 = spark.read.format("json").load("people.json") ``` #### DataFrame保持csv文件 ```sh peopleDF2.select("name", "age").write.format("csv").save("newpeople.csv") ``` #### 读取parquet文件生成DataFrame ```sh parquetFileDF = spark.read.parquet("users.parquet") ``` #### DataFrame保存成parquet文件 ```sh parquetFileDF.write.parquet("newpeople.parquet") ``` #### 读取jdbc生成DataFrame ```sh jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/test").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "root").load() jdbcDF.show() ``` ```sh +---+----------+------+----+ | id| name|gender| age| +---+----------+------+----+ | 1| test| null|null| | 2|sheyisnong| null|null| +---+----------+------+----+ ``` #### DataFrame数据写入jdbc ```sh from pyspark.sql.types import Row from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" ")) //下面要设置模式信息 schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)]) rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3]))) //建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来 studentDF = spark.createDataFrame(rowRDD, schema) prop = {} prop['user'] = 'root' prop['password'] = 'root' prop['driver'] = "com.mysql.jdbc.Driver" studentDF.write.jdbc("jdbc:mysql://localhost:3306/test",'student','append', prop) ``` 查看数据库数据 ```sql MariaDB [test]> select * from student; +----+------------+--------+------+ | id | name | gender | age | +----+------------+--------+------+ | 1 | test | NULL | NULL | | 2 | sheyisnong | NULL | NULL | | 3 | Rongcheng | M | 26 | | 4 | Guanhua | M | 27 | +----+------------+--------+------+ 4 rows in set (0.00 sec) ```
阅读 925 评论 0 收藏 0
阅读 925
评论 0
收藏 0