私信
兜兜
文章
203
评论
12
点赞
97
原创 177
翻译 4
转载 22

文章
关注
粉丝
收藏

个人分类:

兜兜    2021-11-30 16:34:06    2022-01-25 09:25:41   

H3C 交换机
阅读 270 评论 0 收藏 0
阅读 270
评论 0
收藏 0


兜兜    2021-11-23 23:13:52    2021-11-23 23:20:51   

frp 内网穿透
阅读 254 评论 0 收藏 0
阅读 254
评论 0
收藏 0


兜兜    2021-11-15 15:04:03    2022-01-25 09:27:04   

kerberos CDH
阅读 187 评论 0 收藏 0
阅读 187
评论 0
收藏 0


兜兜    2021-11-11 11:35:05    2021-11-12 00:29:27   

kakfa
### `版本:kafka_2.11-0.10.1.0和kafka_2.12-2.2.0` ### 初始化操作 `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ export KAFKA_HOME=/usr/local/kafka/ $ cd $KAFKA_HOME ``` ### 启动zookeeper `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ bin/zookeeper-server-start.sh config/zookeeper.properties ``` ### 启动kafka `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ bin/kafka-server-start.sh config/server.properties ``` ### 创建topic `kafka_2.11-0.10.1.0` ```sh $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test ``` `kafka_2.12-2.2.0` ```sh bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test ``` ### 查看topic列表 `kafka_2.11-0.10.1.0` ```sh $ bin/kafka-topics.sh --list --zookeeper localhost:2181 ``` `kafka_2.12-2.2.0` ```sh $ bin/kafka-topics.sh --list --bootstrap-server localhost:9092 ``` ### 查看topic详细 `kafka_2.11-0.10.1.0` ```sh $ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test ``` `kafka_2.12-2.2.0` ```sh $ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test ``` ### producer发送消息 `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test ``` 输入 ```sh This is a message This is another message ``` ### consumer消费消息 `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning ``` 输出 ```sh This is a message This is another message ```` #### 管理消费组(Consumer Groups) `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh # 查看消费组列表 $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list # 查看消费组offsets详情 $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group_name ``` `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` 如果使用的旧版消费者,组的元数据存在zookeeper(offsets.storage=zookeeper),请使用下面的命令 ```sh # 查看消费组列表 $ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list # 查看消费组offsets详情 $ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group group_name ``` #### 查看topic消息统计 `kafka_2.11-0.10.1.0` ```sh $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test ```
阅读 271 评论 0 收藏 0
阅读 271
评论 0
收藏 0


兜兜    2021-11-10 09:55:43    2022-01-25 09:27:33   

spark
#### 创建DF的流程 定义RDD ```sh 方式一: sc.textFile(文件.txt)-->RDD 方式二: nums = [1,2,3,4,5] rdd = sc.parallelize(nums) ``` 定义rowRDD ```sh rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1])) ``` 定义structField结构 ```sh StructField(fieldName, StringType(), nullable = True) ``` 定义structType结构 ```sh struct_type = StructType([StructField('name', StringType(), nullable = True),StructField('age', StringType(), nullable = True)]) ``` 定义DataFrame ```sh PeopleDF=spark.createDataFrame(rowRDD, struct_type) ``` #### tempView和DF、RDD之间的转换 DF到tempView ```sh peopleTempView=peopleDF.createOrReplaceTempView("people") ``` tempView到DataFrame ```sh PeopleDF = spark.sql("SELECT * FROM people") ``` DataFrame到RDD ```sh PeopleDF.rdd.map(lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]) ``` #### 加载json文件生成DataFrame ```sh peopleDF2 = spark.read.format("json").load("people.json") ``` #### DataFrame保持csv文件 ```sh peopleDF2.select("name", "age").write.format("csv").save("newpeople.csv") ``` #### 读取parquet文件生成DataFrame ```sh parquetFileDF = spark.read.parquet("users.parquet") ``` #### DataFrame保存成parquet文件 ```sh parquetFileDF.write.parquet("newpeople.parquet") ``` #### 读取jdbc生成DataFrame ```sh jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/test").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "root").load() jdbcDF.show() ``` ```sh +---+----------+------+----+ | id| name|gender| age| +---+----------+------+----+ | 1| test| null|null| | 2|sheyisnong| null|null| +---+----------+------+----+ ``` #### DataFrame数据写入jdbc ```sh from pyspark.sql.types import Row from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" ")) //下面要设置模式信息 schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)]) rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3]))) //建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来 studentDF = spark.createDataFrame(rowRDD, schema) prop = {} prop['user'] = 'root' prop['password'] = 'root' prop['driver'] = "com.mysql.jdbc.Driver" studentDF.write.jdbc("jdbc:mysql://localhost:3306/test",'student','append', prop) ``` 查看数据库数据 ```sql MariaDB [test]> select * from student; +----+------------+--------+------+ | id | name | gender | age | +----+------------+--------+------+ | 1 | test | NULL | NULL | | 2 | sheyisnong | NULL | NULL | | 3 | Rongcheng | M | 26 | | 4 | Guanhua | M | 27 | +----+------------+--------+------+ 4 rows in set (0.00 sec) ```
阅读 178 评论 0 收藏 0
阅读 178
评论 0
收藏 0


兜兜    2021-09-29 18:10:11    2021-10-27 15:15:56   

kubernetes rancher
阅读 406 评论 0 收藏 0
阅读 406
评论 0
收藏 0


第 1 页 / 共 15 页