网站受到攻击,国际人才网招聘网,北京十大代理记账公司,上海开公司目录 整合结构准备数据下载pom.xmlMain.javaReduce.javaMap.java操作 总结 整合结构
和案例1的结构差不多#xff0c;Hbase移动到开头#xff0c;后面跟随MR程序。 因此对于输入的K1 V1会进行一定的修改
准备
在HBASE中创建表#xff0c;并写入数据
create wunaii… 目录 整合结构准备数据下载pom.xmlMain.javaReduce.javaMap.java操作 总结 整合结构
和案例1的结构差不多Hbase移动到开头后面跟随MR程序。 因此对于输入的K1 V1会进行一定的修改
准备
在HBASE中创建表并写入数据
create wunaiieq:sentence,colf系统文件上传
datain3.java
package org.wunaiieq.hbase2hdfs;import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.wunaiieq.HBaseConnection;
import org.wunaiieq.HbaseDML;import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;public class datain3 {public static Connection connection HBaseConnection.connection;public static void main(String[] args) throws IOException {BufferedReader bufferedReader new BufferedReader(new FileReader(/opt/module/jar/data.txt));String line null;Table table connection.getTable(TableName.valueOf(wunaiieq, sentence));int rowkey 1;while ((linebufferedReader.readLine())!null){Put put new Put(Bytes.toBytes(rowkey));put.addColumn(Bytes.toBytes(colf),Bytes.toBytes(line),Bytes.toBytes(line));table.put(put);rowkey;}bufferedReader.close();}
}数据下载
pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.hbase/groupIdartifactIdhbase2hdfs/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodinghadoop.version3.1.3/hadoop.versionhbase.version2.2.3/hbase.version/propertiesdependencies!-- Hadoop Dependencies --dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion${hadoop.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion${hadoop.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-hdfs/artifactIdversion${hadoop.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-mapreduce-client-core/artifactIdversion${hadoop.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-yarn-api/artifactIdversion${hadoop.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-streaming/artifactIdversion${hadoop.version}/version/dependency!-- HBase Dependencies --dependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-client/artifactIdversion${hbase.version}/version/dependencydependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-server/artifactIdversion${hbase.version}/version/dependencydependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-common/artifactIdversion${hbase.version}/version/dependencydependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-mapreduce/artifactIdversion${hbase.version}/version/dependency!-- Other Dependencies --dependencygroupIdcom.google.protobuf/groupIdartifactIdprotobuf-java/artifactIdversion3.19.1/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.25/version/dependencydependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.17/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversionRELEASE/versionscopecompile/scope/dependency/dependenciesbuildpluginsplugin!--声明--groupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.3.0/version!--具体配置--configurationarchivemanifest!--jar包的执行入口--mainClassorg.wunaiieq.hbase2hdfs.Main/mainClass/manifest/archivedescriptorRefs!--描述符此处为预定义的表示创建一个包含项目所有依赖的可执行 JAR 文件;允许自定义生成jar文件内容--descriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configuration!--执行配置--executionsexecution!--执行配置ID可修改--idmake-assembly/id!--执行的生命周期--phasepackage/phasegoals!--执行的目标single表示创建一个分发包--goalsingle/goal/goals/execution/executions/plugin/plugins/build/project
Main.java
package org.wunaiieq.hbase2hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Main {public static void main(String[] args) throws Exception {//配置文件写在resources目录下Job job Job.getInstance(new Configuration());//入口类job.setJarByClass(Main.class);Scan scan new Scan();TableMapReduceUtil.initTableMapperJob(wunaiieq:sentence,//表名scan,//表输入时可以在此处进行部分设置如选择查询的列簇列过滤行等等org.wunaiieq.hbase2hdfs.Map.class,//指定mapper类Text.class,//k2IntWritable.class,//v2job,false);job.setOutputKeyClass(Text.class);//K3job.setOutputValueClass(IntWritable.class);//V3job.setReducerClass(org.wunaiieq.hbase2hdfs.Reduce.class);//手动输入输出路径FileOutputFormat.setOutputPath(job,new Path(args[0]));job.waitForCompletion(true);}
}
Reduce.java
package org.wunaiieq.hbase2hdfs;import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;// K3 V3 K4 V4
public class Reduce extends ReducerText,IntWritable,Text,IntWritable{private IntWritable v4 new IntWritable();private Text k4 new Text();Overrideprotected void reduce(Text k3, IterableIntWritable v3,Context context) throws IOException, InterruptedException {int sum 0;for (IntWritable v30:v3){sumv30.get();}v4.set(sum);k4k3;context.write(k4,v4);}
}
Map.java
package org.wunaiieq.hbase2hdfs;import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
// K1 V1
public class Map extends TableMapperText,IntWritable {private Text k2new Text();private IntWritable v2 new IntWritable(1);Overrideprotected void map(ImmutableBytesWritable k1, Result v1,Context context) throws IOException, InterruptedException {System.out.println(k1:k1.toString());//读取当前行中的colf:line数据byte[] data v1.getValue(Bytes.toBytes(colf),Bytes.toBytes(line));String line Bytes.toString(data);String [] words line.split( );for (String word :words){k2.set(word);context.write(k2,v2);}}
}
操作
打包上传至linux系统中
hadoop jar hbase2hdfs-1.0-SNAPSHOT-jar-with-dependencies.jar /output/test检查文件
hdfs dfs -cat /output/test/part-r-00000总结
没什么特殊点记录下这两个案例即可只需要在MR程序中替换掉对应的Mapper和Reducer即可