私信
兜兜
文章
203
评论
12
点赞
98
原创 177
翻译 4
转载 22

文章
关注
粉丝
收藏

个人分类:

兜兜    2021-11-15 15:04:03    2022-01-25 09:27:04   

kerberos CDH
阅读 218 评论 0 收藏 0
阅读 218
评论 0
收藏 0


兜兜    2021-11-11 11:35:05    2021-11-12 00:29:27   

kakfa
### `版本:kafka_2.11-0.10.1.0和kafka_2.12-2.2.0` ### 初始化操作 `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ export KAFKA_HOME=/usr/local/kafka/ $ cd $KAFKA_HOME ``` ### 启动zookeeper `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ bin/zookeeper-server-start.sh config/zookeeper.properties ``` ### 启动kafka `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ bin/kafka-server-start.sh config/server.properties ``` ### 创建topic `kafka_2.11-0.10.1.0` ```sh $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test ``` `kafka_2.12-2.2.0` ```sh bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test ``` ### 查看topic列表 `kafka_2.11-0.10.1.0` ```sh $ bin/kafka-topics.sh --list --zookeeper localhost:2181 ``` `kafka_2.12-2.2.0` ```sh $ bin/kafka-topics.sh --list --bootstrap-server localhost:9092 ``` ### 查看topic详细 `kafka_2.11-0.10.1.0` ```sh $ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test ``` `kafka_2.12-2.2.0` ```sh $ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test ``` ### producer发送消息 `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test ``` 输入 ```sh This is a message This is another message ``` ### consumer消费消息 `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning ``` 输出 ```sh This is a message This is another message ```` #### 管理消费组(Consumer Groups) `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh # 查看消费组列表 $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list # 查看消费组offsets详情 $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group_name ``` `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` 如果使用的旧版消费者,组的元数据存在zookeeper(offsets.storage=zookeeper),请使用下面的命令 ```sh # 查看消费组列表 $ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list # 查看消费组offsets详情 $ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group group_name ``` #### 查看topic消息统计 `kafka_2.11-0.10.1.0` ```sh $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test ```
阅读 311 评论 0 收藏 0
阅读 311
评论 0
收藏 0


兜兜    2021-11-10 09:55:43    2022-01-25 09:27:33   

spark
#### 创建DF的流程 定义RDD ```sh 方式一: sc.textFile(文件.txt)-->RDD 方式二: nums = [1,2,3,4,5] rdd = sc.parallelize(nums) ``` 定义rowRDD ```sh rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1])) ``` 定义structField结构 ```sh StructField(fieldName, StringType(), nullable = True) ``` 定义structType结构 ```sh struct_type = StructType([StructField('name', StringType(), nullable = True),StructField('age', StringType(), nullable = True)]) ``` 定义DataFrame ```sh PeopleDF=spark.createDataFrame(rowRDD, struct_type) ``` #### tempView和DF、RDD之间的转换 DF到tempView ```sh peopleTempView=peopleDF.createOrReplaceTempView("people") ``` tempView到DataFrame ```sh PeopleDF = spark.sql("SELECT * FROM people") ``` DataFrame到RDD ```sh PeopleDF.rdd.map(lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]) ``` #### 加载json文件生成DataFrame ```sh peopleDF2 = spark.read.format("json").load("people.json") ``` #### DataFrame保持csv文件 ```sh peopleDF2.select("name", "age").write.format("csv").save("newpeople.csv") ``` #### 读取parquet文件生成DataFrame ```sh parquetFileDF = spark.read.parquet("users.parquet") ``` #### DataFrame保存成parquet文件 ```sh parquetFileDF.write.parquet("newpeople.parquet") ``` #### 读取jdbc生成DataFrame ```sh jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/test").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "root").load() jdbcDF.show() ``` ```sh +---+----------+------+----+ | id| name|gender| age| +---+----------+------+----+ | 1| test| null|null| | 2|sheyisnong| null|null| +---+----------+------+----+ ``` #### DataFrame数据写入jdbc ```sh from pyspark.sql.types import Row from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" ")) //下面要设置模式信息 schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)]) rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3]))) //建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来 studentDF = spark.createDataFrame(rowRDD, schema) prop = {} prop['user'] = 'root' prop['password'] = 'root' prop['driver'] = "com.mysql.jdbc.Driver" studentDF.write.jdbc("jdbc:mysql://localhost:3306/test",'student','append', prop) ``` 查看数据库数据 ```sql MariaDB [test]> select * from student; +----+------------+--------+------+ | id | name | gender | age | +----+------------+--------+------+ | 1 | test | NULL | NULL | | 2 | sheyisnong | NULL | NULL | | 3 | Rongcheng | M | 26 | | 4 | Guanhua | M | 27 | +----+------------+--------+------+ 4 rows in set (0.00 sec) ```
阅读 205 评论 0 收藏 0
阅读 205
评论 0
收藏 0


兜兜    2018-09-20 17:44:04    2019-11-14 14:31:45   

hdfs hadoop
阅读 281 评论 0 收藏 0
阅读 281
评论 0
收藏 0


兜兜    2018-09-20 17:12:38    2019-11-14 14:31:53   

hdfs haddop
### 环境准备 系统: `CentOS7` 软件: - `hadoop`:`2.7.7` &emsp; 服务器: `Hadoop Master`: `172.16.0.3(master)` `NameNode` `SecondaryNameNode` `ResourceManager` `DataNode` `NodeManager` `Hadoop Slave` : `172.16.0.4(slave1)` `DataNode` `NodeManager` `Hadoop Slave` : `172.16.0.5(slave2)` `DataNode` `NodeManager` `Hadoop Slave` : `172.16.0.6(slave3)` `DataNode` `NodeManager` `Hadoop Slave` : `172.16.0.7(slave4)` `DataNode` `NodeManager` &emsp; ### 初始化工作 #### 配置主机名解析 `所有主机` ```bash cat >> /etc/hosts << EOF 172.16.0.3 master 172.16.0.4 slave1 172.16.0.5 slave2 172.16.0.6 slave3 172.16.0.7 slave4 EOF ``` #### 创建私钥以及免密登陆slaves `master` ```bash su - hadoop ssh-keygen -t rsa ssh-copy-id slave1 ssh-copy-id slave2 ssh-copy-id slave3 ssh-copy-id slave4 ``` #### 下载安装java 下载地址: https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html `所有主机` ```bash rpm -ivh jdk-8u221-linux-x64.rpm ``` &emsp; ### 安装hadoop集群 #### 创建用户 `所有主机` ```bash useradd -d /opt/hadoop hadoop echo "password"|passwd --stdin hadoop #免交互设置用户密码 ``` #### 下载hadoop `master` ```bash curl -O http://apache.javapipe.com/hadoop/common/hadoop-2.7.7/hadoop-2.7.7.tar.gz tar xfz hadoop-2.7.7.tar.gz cp -rf hadoop-2.7.7/* /opt/hadoop/ chown -R hadoop:hadoop /opt/hadoop/ ``` #### 配置环境变量 `master` ```bash su - hadoop cat >> .bash_profile << EOF ## JAVA env variables export JAVA_HOME=/usr/java/default export PATH=\$PATH:\$JAVA_HOME/bin export CLASSPATH=.:\$JAVA_HOME/jre/lib:\$JAVA_HOME/lib:\$JAVA_HOME/lib/tools.jar ## HADOOP env variables export HADOOP_HOME=/opt/hadoop export HADOOP_COMMON_HOME=\$HADOOP_HOME export HADOOP_HDFS_HOME=\$HADOOP_HOME export HADOOP_MAPRED_HOME=\$HADOOP_HOME export HADOOP_YARN_HOME=\$HADOOP_HOME export HADOOP_OPTS="-Djava.library.path=\$HADOOP_HOME/lib/native" export HADOOP_COMMON_LIB_NATIVE_DIR=\$HADOOP_HOME/lib/native export PATH=\$PATH:\$HADOOP_HOME/sbin:\$HADOOP_HOME/bin EOF source .bash_profile ``` &emsp; ### 配置hadoop集群 #### 编辑core-site.xml `master` ```bash su - hadoop vi etc/hadoop/core-site.xml ``` ```xml <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://master:9000/</value> </property> </configuration> ``` #### 编辑hdfs-site.xml `master` ```bash vi etc/hadoop/hdfs-site.xml ``` ```xml <configuration> <property> <name>dfs.data.dir</name> <value>file:///opt/volume/datanode</value> </property> <property> <name>dfs.name.dir</name> <value>file:///opt/volume/namenode</value> </property> </configuration> ``` #### 编辑mapred-site.xml `master` ```bash vi etc/hadoop/mapred-site.xml ``` ```xml <configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <property> <name>mapred.job.tracker</name> <value>master:9001</value> </property> </configuration> ``` #### 编辑yarn-site.xml `master` ```bash vi etc/hadoop/yarn-site.xml ``` ```xml <configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.hostname</name> <value>master</value> </property> <property> <name>yarn.resourcemanager.address</name> <value>${yarn.resourcemanager.hostname}:8032</value> </property> <property> <name>yarn.resourcemanager.bind-host</name> <value>0.0.0.0</value> </property> </configuration> ``` #### 编辑hadoop-env.sh `master` ```bash vi etc/hadoop/hadoop-env.sh ``` ```bash export JAVA_HOME=/usr/java/default/ ``` #### 编辑masters `master` ```bash cat > etc/hadoop/masters<EOF master EOF ``` #### 编辑slaves `master` ```bash cat > etc/hadoop/slaves <EOF master slave1 slave2 slave3 slave4 EOF ``` &emsp; #### 拷贝hadoop到slaves节点 ```bash su - hadoop scp -r * slave1:/opt/hadoop/* scp -r * slave2:/opt/hadoop/* scp -r * slave3:/opt/hadoop/* scp -r * slave4:/opt/hadoop/* ``` &emsp; ### 格式化Namenode `master` ```bash su - hadoop hdfs namenode -format ``` &emsp; ### 启动停止集群 `master` ```bash start-all.sh #启动hadoop集群 stop-all.sh #停止hadoop集群 ``` &emsp; ### 监控进程 `master` ```bash jps ``` ``` 21078 Jps 3922 ResourceManager 4050 NodeManager 3431 NameNode 3577 DataNode 3755 SecondaryNameNode ``` `slaves节点` ```bash jps ``` ``` 7517 Jps 21298 DataNode 21422 NodeManager ``` &emsp; ### 测试HDFS集群 ```bash hdfs dfs -mkdir /my_storage #创建目录 hdfs dfs -put LICENSE.txt /my_storage #上传文件 hdfs dfs -cat /my_storage/LICENSE.txt #查看文件 hdfs dfs -ls /my_storage/ hdfs dfs -get /my_storage/ ./ #获取文件 ``` &emsp; ### 监控集群服务 `master` ```bash http://master:50070 ``` #### 查看hdfs文件系统 ```bash http://master:50070/explorer.html ``` #### 集群和应用信息 ```bash http://master:8088 ``` #### NodeManager信息 ```bash http://master:8042 ``` &emsp; ### 开机启动 `master` ```bash vi /etc/rc.local ``` ```bash su - hadoop -c "/opt/hadoop/sbin/start-all.sh" ``` ```bash chmod +x /etc/rc.d/rc.local systemctl enable rc-local systemctl start rc-local ``` &emsp; ### Python执行MapReduce `说明:统计noaa数据1901-1909各个年份的最大温度,文件格式15-18位代表年份,87-91代表温度,92位为检验码。mapper对文件每一行内容进行处理,生成"年份 温度"的格式(例如:1901 +0056),reducer对mapper输出统计出每个年份的最大值.` Mapper程序 ```bash cat mapper_noaa.py ``` ```bash #!/usr/bin/env python import sys import re pattern = re.compile(r'[01459]') for line in sys.stdin: year,temperature,q = line[15:19],int(line[87:92]),line[92:93] if pattern.match(q) and temperature != 9999: print("{0}\t{1}".format(year,temperature)) ``` Reducer进程 ```bash cat reducer_noaa.py ``` ```bash #!/usr/bin/env python import sys import re current_year=None current_temp_max=None for line in sys.stdin: year,templature= line.strip().split('\t') try: templature=int(templature) except: continue if current_year == year: if current_temp_max < templature: current_temp_max=templature else: if current_year: print("{0} {1}".format(current_year,current_temp_max)) current_year=year current_temp_max=templature if current_year: print("{0} {1}".format(current_year,current_temp_max)) ``` #### 下载数据 ```bash ftp://ftp.ncdc.noaa.gov/pub/data/noaa/ #把下载的对应每年数据放到noaa文件夹 ``` #### 上传数据到hdfs ```bash su - hadoop hdfs dfs -mkdir /test/ #创建test目录 hdfs dfs -copyFromLocal noaa /test/noaa #noaa为下载的天气数据 ``` #### 运行MapReduce ```bash su - hadoop hadoop jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.7.jar -file ./mapper_noaa.py -file ./reducer_noaa.py -mapper ./mapper_noaa.py -reducer ./reducer_noaa.py -input /test/noaa/190[0-9]/ -output /test/noaa_1901_1909_results ``` #### 查看运行结果 ```bash hdfs dfs -cat /test/noaa_1901_1909_results/part-00000 ``` ``` 1901 317 1902 244 1903 289 1904 256 1905 283 1906 294 1907 283 1908 289 1909 278 ``` `注:由于气温被放大10倍,所以1901年的最高气温为31.7°`
阅读 299 评论 0 收藏 0
阅读 299
评论 0
收藏 0