1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| AMMapper: package ArrayMultiply;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class AMMapper extends Mapper<LongWritable, Text,Text, Text> { private String flag = null; //A:4*3 B:3*3 private int m = 4;//矩阵A的行数 private int p = 3;//矩阵B的列数
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); if(fields[0].equals("a")){ for(int i = 1;i<=p;i++){ context.write(new Text(fields[1] + "," + i),new Text("a," + fields[2] + "," +fields[3])); } }else if(fields[0].equals("b")){ for(int i = 1;i<=m;i++){ context.write(new Text(i + "," + fields[2]),new Text("b,"+ fields[1] + "," + fields[3])); } } } }
AMReducer: package ArrayMultiply;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException; import java.util.HashMap; import java.util.Iterator;
public class AMReducer extends Reducer<Text, Text,Text, IntWritable> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { HashMap<String,String> mapA = new HashMap<>(); HashMap<String,String> mapB = new HashMap<>();
for (Text value : values) { String[] val = value.toString().split(","); if("a".equals(val[0])){ mapA.put(val[1],val[2]); }else if("b".equals(val[0])){ mapB.put(val[1],val[2]); } }
int result = 0; Iterator<String> mKeys = mapA.keySet().iterator(); while(mKeys.hasNext()){ String mkey = mKeys.next(); if(mapB.get(mkey) == null) continue; result += Integer.parseInt(mapA.get(mkey)) * Integer.parseInt(mapB.get(mkey)); } context.write(key,new IntWritable(result)); } }
|