私信
兜兜
文章
189
评论
12
点赞
97
原创 164
翻译 4
转载 21

文章
关注
粉丝
收藏

个人分类:

兜兜    2021-11-11 11:35:05    2021-11-12 00:29:27   

kakfa
### `版本:kafka_2.11-0.10.1.0和kafka_2.12-2.2.0` ### 初始化操作 `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ export KAFKA_HOME=/usr/local/kafka/ $ cd $KAFKA_HOME ``` ### 启动zookeeper `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ bin/zookeeper-server-start.sh config/zookeeper.properties ``` ### 启动kafka `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ bin/kafka-server-start.sh config/server.properties ``` ### 创建topic `kafka_2.11-0.10.1.0` ```sh $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test ``` `kafka_2.12-2.2.0` ```sh bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test ``` ### 查看topic列表 `kafka_2.11-0.10.1.0` ```sh $ bin/kafka-topics.sh --list --zookeeper localhost:2181 ``` `kafka_2.12-2.2.0` ```sh $ bin/kafka-topics.sh --list --bootstrap-server localhost:9092 ``` ### 查看topic详细 `kafka_2.11-0.10.1.0` ```sh $ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test ``` `kafka_2.12-2.2.0` ```sh $ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test ``` ### producer发送消息 `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test ``` 输入 ```sh This is a message This is another message ``` ### consumer消费消息 `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning ``` 输出 ```sh This is a message This is another message ```` #### 管理消费组(Consumer Groups) `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` ```sh # 查看消费组列表 $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list # 查看消费组offsets详情 $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group_name ``` `kafka_2.11-0.10.1.0/kafka_2.12-2.2.0` 如果使用的旧版消费者,组的元数据存在zookeeper(offsets.storage=zookeeper),请使用下面的命令 ```sh # 查看消费组列表 $ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list # 查看消费组offsets详情 $ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group group_name ``` #### 查看topic消息统计 `kafka_2.11-0.10.1.0` ```sh $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test ```
阅读 65 评论 0 收藏 0
阅读 65
评论 0
收藏 0


兜兜    2018-09-20 17:44:04    2019-11-14 14:31:45   

hdfs hadoop
阅读 183 评论 0 收藏 0
阅读 183
评论 0
收藏 0


兜兜    2018-09-20 17:12:38    2019-11-14 14:31:53   

hdfs haddop
### 环境准备 系统: `CentOS7` 软件: - `hadoop`:`2.7.7` &emsp; 服务器: `Hadoop Master`: `172.16.0.3(master)` `NameNode` `SecondaryNameNode` `ResourceManager` `DataNode` `NodeManager` `Hadoop Slave` : `172.16.0.4(slave1)` `DataNode` `NodeManager` `Hadoop Slave` : `172.16.0.5(slave2)` `DataNode` `NodeManager` `Hadoop Slave` : `172.16.0.6(slave3)` `DataNode` `NodeManager` `Hadoop Slave` : `172.16.0.7(slave4)` `DataNode` `NodeManager` &emsp; ### 初始化工作 #### 配置主机名解析 `所有主机` ```bash cat >> /etc/hosts << EOF 172.16.0.3 master 172.16.0.4 slave1 172.16.0.5 slave2 172.16.0.6 slave3 172.16.0.7 slave4 EOF ``` #### 创建私钥以及免密登陆slaves `master` ```bash su - hadoop ssh-keygen -t rsa ssh-copy-id slave1 ssh-copy-id slave2 ssh-copy-id slave3 ssh-copy-id slave4 ``` #### 下载安装java 下载地址: https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html `所有主机` ```bash rpm -ivh jdk-8u221-linux-x64.rpm ``` &emsp; ### 安装hadoop集群 #### 创建用户 `所有主机` ```bash useradd -d /opt/hadoop hadoop echo "password"|passwd --stdin hadoop #免交互设置用户密码 ``` #### 下载hadoop `master` ```bash curl -O http://apache.javapipe.com/hadoop/common/hadoop-2.7.7/hadoop-2.7.7.tar.gz tar xfz hadoop-2.7.7.tar.gz cp -rf hadoop-2.7.7/* /opt/hadoop/ chown -R hadoop:hadoop /opt/hadoop/ ``` #### 配置环境变量 `master` ```bash su - hadoop cat >> .bash_profile << EOF ## JAVA env variables export JAVA_HOME=/usr/java/default export PATH=\$PATH:\$JAVA_HOME/bin export CLASSPATH=.:\$JAVA_HOME/jre/lib:\$JAVA_HOME/lib:\$JAVA_HOME/lib/tools.jar ## HADOOP env variables export HADOOP_HOME=/opt/hadoop export HADOOP_COMMON_HOME=\$HADOOP_HOME export HADOOP_HDFS_HOME=\$HADOOP_HOME export HADOOP_MAPRED_HOME=\$HADOOP_HOME export HADOOP_YARN_HOME=\$HADOOP_HOME export HADOOP_OPTS="-Djava.library.path=\$HADOOP_HOME/lib/native" export HADOOP_COMMON_LIB_NATIVE_DIR=\$HADOOP_HOME/lib/native export PATH=\$PATH:\$HADOOP_HOME/sbin:\$HADOOP_HOME/bin EOF source .bash_profile ``` &emsp; ### 配置hadoop集群 #### 编辑core-site.xml `master` ```bash su - hadoop vi etc/hadoop/core-site.xml ``` ```xml <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://master:9000/</value> </property> </configuration> ``` #### 编辑hdfs-site.xml `master` ```bash vi etc/hadoop/hdfs-site.xml ``` ```xml <configuration> <property> <name>dfs.data.dir</name> <value>file:///opt/volume/datanode</value> </property> <property> <name>dfs.name.dir</name> <value>file:///opt/volume/namenode</value> </property> </configuration> ``` #### 编辑mapred-site.xml `master` ```bash vi etc/hadoop/mapred-site.xml ``` ```xml <configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <property> <name>mapred.job.tracker</name> <value>master:9001</value> </property> </configuration> ``` #### 编辑yarn-site.xml `master` ```bash vi etc/hadoop/yarn-site.xml ``` ```xml <configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.hostname</name> <value>master</value> </property> <property> <name>yarn.resourcemanager.address</name> <value>${yarn.resourcemanager.hostname}:8032</value> </property> <property> <name>yarn.resourcemanager.bind-host</name> <value>0.0.0.0</value> </property> </configuration> ``` #### 编辑hadoop-env.sh `master` ```bash vi etc/hadoop/hadoop-env.sh ``` ```bash export JAVA_HOME=/usr/java/default/ ``` #### 编辑masters `master` ```bash cat > etc/hadoop/masters<EOF master EOF ``` #### 编辑slaves `master` ```bash cat > etc/hadoop/slaves <EOF master slave1 slave2 slave3 slave4 EOF ``` &emsp; #### 拷贝hadoop到slaves节点 ```bash su - hadoop scp -r * slave1:/opt/hadoop/* scp -r * slave2:/opt/hadoop/* scp -r * slave3:/opt/hadoop/* scp -r * slave4:/opt/hadoop/* ``` &emsp; ### 格式化Namenode `master` ```bash su - hadoop hdfs namenode -format ``` &emsp; ### 启动停止集群 `master` ```bash start-all.sh #启动hadoop集群 stop-all.sh #停止hadoop集群 ``` &emsp; ### 监控进程 `master` ```bash jps ``` ``` 21078 Jps 3922 ResourceManager 4050 NodeManager 3431 NameNode 3577 DataNode 3755 SecondaryNameNode ``` `slaves节点` ```bash jps ``` ``` 7517 Jps 21298 DataNode 21422 NodeManager ``` &emsp; ### 测试HDFS集群 ```bash hdfs dfs -mkdir /my_storage #创建目录 hdfs dfs -put LICENSE.txt /my_storage #上传文件 hdfs dfs -cat /my_storage/LICENSE.txt #查看文件 hdfs dfs -ls /my_storage/ hdfs dfs -get /my_storage/ ./ #获取文件 ``` &emsp; ### 监控集群服务 `master` ```bash http://master:50070 ``` #### 查看hdfs文件系统 ```bash http://master:50070/explorer.html ``` #### 集群和应用信息 ```bash http://master:8088 ``` #### NodeManager信息 ```bash http://master:8042 ``` &emsp; ### 开机启动 `master` ```bash vi /etc/rc.local ``` ```bash su - hadoop -c "/opt/hadoop/sbin/start-all.sh" ``` ```bash chmod +x /etc/rc.d/rc.local systemctl enable rc-local systemctl start rc-local ``` &emsp; ### Python执行MapReduce `说明:统计noaa数据1901-1909各个年份的最大温度,文件格式15-18位代表年份,87-91代表温度,92位为检验码。mapper对文件每一行内容进行处理,生成"年份 温度"的格式(例如:1901 +0056),reducer对mapper输出统计出每个年份的最大值.` Mapper程序 ```bash cat mapper_noaa.py ``` ```bash #!/usr/bin/env python import sys import re pattern = re.compile(r'[01459]') for line in sys.stdin: year,temperature,q = line[15:19],int(line[87:92]),line[92:93] if pattern.match(q) and temperature != 9999: print("{0}\t{1}".format(year,temperature)) ``` Reducer进程 ```bash cat reducer_noaa.py ``` ```bash #!/usr/bin/env python import sys import re current_year=None current_temp_max=None for line in sys.stdin: year,templature= line.strip().split('\t') try: templature=int(templature) except: continue if current_year == year: if current_temp_max < templature: current_temp_max=templature else: if current_year: print("{0} {1}".format(current_year,current_temp_max)) current_year=year current_temp_max=templature if current_year: print("{0} {1}".format(current_year,current_temp_max)) ``` #### 下载数据 ```bash ftp://ftp.ncdc.noaa.gov/pub/data/noaa/ #把下载的对应每年数据放到noaa文件夹 ``` #### 上传数据到hdfs ```bash su - hadoop hdfs dfs -mkdir /test/ #创建test目录 hdfs dfs -copyFromLocal noaa /test/noaa #noaa为下载的天气数据 ``` #### 运行MapReduce ```bash su - hadoop hadoop jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.7.jar -file ./mapper_noaa.py -file ./reducer_noaa.py -mapper ./mapper_noaa.py -reducer ./reducer_noaa.py -input /test/noaa/190[0-9]/ -output /test/noaa_1901_1909_results ``` #### 查看运行结果 ```bash hdfs dfs -cat /test/noaa_1901_1909_results/part-00000 ``` ``` 1901 317 1902 244 1903 289 1904 256 1905 283 1906 294 1907 283 1908 289 1909 278 ``` `注:由于气温被放大10倍,所以1901年的最高气温为31.7°`
阅读 213 评论 0 收藏 0
阅读 213
评论 0
收藏 0