运作第4个MapReduce程序,MapReduce达成矩阵乘法

亚洲城ca88手机版官网

MapReduce实现矩阵乘法–实现代码

之前写了一篇分析MapReduce实现矩阵乘法算法的文章:Mapreduce实现矩阵乘法的算法思路

为了让大家更直观的了解程序执行,今天编写了实现代码供大家参考。

编程环境:

  • java version “1.7.0_40”
  • Eclipse Kepler
  • Windows7 x64
  • Ubuntu 12.04 LTS
  • Hadoop2.2.0
  • Vmware 9.0.0 build-812388 

输入数据:

A矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA/matrixa

A矩阵内容:
3 4 6
4 0 8

 B矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB/matrixb

B矩阵内容:
2 3
3 0
4 1

实现代码:

一共三个类:

  • 驱动类MMDriver
  • Map类MMMapper
  • Reduce类MMReducer

大家可根据个人习惯合并成一个类使用。

MMDriver.java

package dataguru.matrixmultiply;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MMDriver {
 
 public static void main(String[] args) throws Exception {
 
  // set configuration
  Configuration conf = new Configuration();

  // create job
  Job job = new Job(conf,”MatrixMultiply”);
  job.setJarByClass(dataguru.matrixmultiply.MMDriver.class);
 
        //  specify Mapper & Reducer
  job.setMapperClass(dataguru.matrixmultiply.MMMapper.class);
  job.setReducerClass(dataguru.matrixmultiply.MMReducer.class);
 
  // specify output types of mapper and reducer
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
 
  // specify input and output DIRECTORIES
  Path inPathA = new
Path(“hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA”);
  Path inPathB = new
Path(“hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB”);
  Path outPath = new
Path(“hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixC”);
  FileInputFormat.addInputPath(job, inPathA);
  FileInputFormat.addInputPath(job, inPathB);
        FileOutputFormat.setOutputPath(job,outPath);

  // delete output directory
  try{
   FileSystem hdfs = outPath.getFileSystem(conf);
   if(hdfs.exists(outPath))
    hdfs.delete(outPath);
   hdfs.close();
  } catch (Exception e){
   e.printStackTrace();
   return ;
  }
 
  //  run the job
  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

MMMapper.java

package dataguru.matrixmultiply;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class MMMapper extends Mapper<Object, Text, Text, Text> {
 private String tag;  //current matrix
 
    private int crow = 2;// 矩阵A的行数
    private int ccol = 2;// 矩阵B的列数
   
    private static int arow = 0; //current arow
    private static int brow = 0; //current brow
 
 @Override
 protected void setup(Context context) throws IOException,
   InterruptedException {
  // TODO get inputpath of input data, set to tag
  FileSplit fs = (FileSplit)context.getInputSplit();
  tag = fs.getPath().getParent().getName();
 }

 /**
  * input data include two matrix files
  */
 public void map(Object key, Text value, Context context)
   throws IOException, InterruptedException {
  StringTokenizer str = new StringTokenizer(value.toString());
 
  if (“matrixA”.equals(tag)) {          //left matrix,output key:x,y
   int col = 0;
   while (str.hasMoreTokens()) {
    String item = str.nextToken();  //current x,y = line,col
    for (int i = 0; i < ccol; i++) {
     Text outkey = new Text(arow+”,”+i);
     Text outvalue = new Text(“a,”+col+”,”+item);
     context.write(outkey, outvalue);
     System.out.println(outkey+” | “+outvalue);
    }
    col++;
   }
   arow++;
   
  }else if (“matrixB”.equals(tag)) {
   int col = 0;
   while (str.hasMoreTokens()) {
    String item = str.nextToken();  //current x,y = line,col
    for (int i = 0; i < crow; i++) {
     Text outkey = new Text(i+”,”+col);
     Text outvalue = new Text(“b,”+brow+”,”+item);
     context.write(outkey, outvalue);
     System.out.println(outkey+” | “+outvalue);
    }
    col++;
   }
   brow++;
   
  }
 }
}

MMReducer.java

package dataguru.matrixmultiply;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class MMReducer extends Reducer<Text, Text, Text, Text> {

 public void reduce(Text key, Iterable<Text> values, Context
context)
   throws IOException, InterruptedException {

  Map<String,String> matrixa = new
HashMap<String,String>();
  Map<String,String> matrixb = new
HashMap<String,String>();
 
  for (Text val : values) {  //values example : b,0,2  or  a,0,4
   StringTokenizer str = new StringTokenizer(val.toString(),”,”);
   String sourceMatrix = str.nextToken();
   if (“a”.equals(sourceMatrix)) {
    matrixa.put(str.nextToken(), str.nextToken());  //(0,4)
   }
   if (“b”.equals(sourceMatrix)) {
    matrixb.put(str.nextToken(), str.nextToken());  //(0,2)
   }
  }
 
  int result = 0;
  Iterator<String> iter = matrixa.keySet().iterator();
  while (iter.hasNext()) {
   String mapkey = iter.next();
   result += Integer.parseInt(matrixa.get(mapkey)) *
Integer.parseInt(matrixb.get(mapkey));
  }

  context.write(key, new Text(String.valueOf(result)));
 }
}

Ubuntu 13.04上搭建Hadoop环境

Ubuntu 12.10 +Hadoop 1.2.1版本集群配置

Ubuntu上搭建Hadoop环境(单机模式+伪分布模式)

Ubuntu下Hadoop环境的配置

单机版搭建Hadoop环境图文教程详解

搭建Hadoop环境(在Winodws环境下用虚拟机虚拟两个Ubuntu系统进行搭建)

之前写了一篇分析MapReduce实现矩阵乘法算法的文章:Mapreduce实现矩阵乘法的算法思路
为了让大家更直观的了…

1、安装Eclipse

安装后如果无法启动重新配置Java路径(如果之前配置了Java)

2、下载安装eclipse的hadoop插件

注意版本对应,放到/uer/lib/eclipse/plugins下

3、新建MapReduce程序(放了插件会自动出现这个选项)

4、新建Hadoop节点

在Map/Reduce视图下

5、上传文件夹到hdsf

命令bin/hadoop fs -put input input01

上传文件出错,改node文件下的version和name一致

6、写代码

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {

public static class WordCountMap extends
Mapper<LongWritable, Text, Text, IntWritable> {

private final IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
word.set(token.nextToken());
context.write(word, one);
}
}
}

public static class WordCountReduce extends
Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(WordCount.class);
job.setJobName(“wordcount”);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}
}

7、运行程序

运行配置hdfs://master:9000/user/wyh/input01  
hdfs://master:9000/user/wyh/output01

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图