本文共 12428 字,大约阅读时间需要 41 分钟。
要解决的问题:
假设有两类数据:用户和交易。用户数据包括用户的地址信息,交易数据包括用户身份信息,但是不包括应乎地址的直接信息。给定users和transactions如下:
users(user_id,location_id)
transactions(transaction_id,product_id,user_id,quantity,amount)
我们的目标是得出每个商品对应的唯一用户地址数。
输入:
users.txt
u1 UTu2 GAu3 CAu4 CAu5 GA
transactions.txt
t1 p3 u1 3 330t2 p1 u2 1 440t3 p1 u1 3 600t4 p2 u2 10 1000t5 p4 u4 9 90t6 p1 u1 4 120t7 p4 u1 8 160t8 p4 u5 2 40
如何进行左外连接?
根据user_id进行左外连接
与左外连接相关的一些SQL查询示例:
查询1:select * from transactions left outer join users on transactions.user_id = users.user_id
结果:
t1 p3 u1 3 330 u1 UT
t2 p1 u2 1 440 u2 GA t3 p1 u1 3 600 u1 UT t4 p2 u2 10 1000 u2 GA t5 p4 u4 9 90 u4 CA t6 p1 u1 4 120 u1 UT t7 p4 u1 8 160 u1 UT t8 p4 u5 2 40 u5 GA查询2:select product_id,location_id from transactions left outer join users on transactions.user_id = users.user_id
结果:
p3 UT
p1 GA p1 UT p2 GA p4 CA p1 UT p4 UT p4 GA查询3:select product_id,count(location_id) from transactions left outer join users on transactions.user_id = users.user_id group by product_id
p1 3
p2 1 p3 1 p4 3查询4:select product_id,count(distinct location_id) from transactions left outer join users on transactions.user_id = users.user_id group by product_id
p1 2
p2 1 p3 1 p4 3MapReduce左外连接实现:
阶段1:找出所有售出的商品以及关联的地址,类似于SQL查询2
阶段2:找出所有售出的商品以及关联的唯一地址数
阶段1:找出所有售出的商品以及关联的地址,类似于SQL查询2
package LeftOutJoin_hadoop;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Mapper;import org.apache.commons.lang.StringUtils;import edu.umd.cloud9.io.pair.PairOfStrings;public class LeftJoinUserMapper extends Mapper2、LeftJoinTransactionMapper交易映射类{ PairOfStrings outputKey = new PairOfStrings(); PairOfStrings outputValue = new PairOfStrings(); public void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { String[] tokens = StringUtils.split(value.toString(), "\t"); if (tokens.length == 2) { // tokens[0] = user_id // tokens[1] = location_id // to make sure location arrives before products outputKey.set(tokens[0], "1"); // set user_id outputValue.set("L", tokens[1]); // set location_id context.write(outputKey, outputValue); } }}
package LeftOutJoin_hadoop;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Mapper;import org.apache.commons.lang.StringUtils;import edu.umd.cloud9.io.pair.PairOfStrings;public class LeftJoinTransactionMapper extends Mapper{ PairOfStrings outputKey = new PairOfStrings(); PairOfStrings outputValue = new PairOfStrings(); @Override /** * @param key: system generated, ignored here * @param value: */ public void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { String[] tokens = StringUtils.split(value.toString(), "\t"); System.out.println("tokens size:" + tokens.length); String productID = tokens[1]; String userID = tokens[2]; // make sure products arrive at a reducer after location outputKey.set(userID, "2"); outputValue.set("P", productID); context.write(outputKey, outputValue); }}
3、分区类SecondarySortPartitioner
package LeftOutJoin_hadoop;import org.apache.hadoop.mapreduce.Partitioner;import edu.umd.cloud9.io.pair.PairOfStrings;public class SecondarySortPartitioner extends Partitioner4、比较器{ @Override public int getPartition(PairOfStrings key, Object value, int numberOfPartitions) { return (key.getLeftElement().hashCode() & Integer.MAX_VALUE) % numberOfPartitions; }}
package LeftOutJoin_hadoop;import org.apache.hadoop.io.RawComparator;import edu.umd.cloud9.io.pair.PairOfStrings;import org.apache.hadoop.io.DataInputBuffer;public class SecondarySortGroupComparator implements RawComparator5、规约器{ /** * Group only by userID */ @Override public int compare(PairOfStrings first, PairOfStrings second) { return first.getLeftElement().compareTo(second.getLeftElement()); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2 ) { DataInputBuffer buffer = new DataInputBuffer(); PairOfStrings a = new PairOfStrings(); PairOfStrings b = new PairOfStrings(); try { buffer.reset(b1, s1, l1); a.readFields(buffer); buffer.reset(b2, s2, l2); b.readFields(buffer); return compare(a,b); } catch(Exception ex) { return -1; } }}
package LeftOutJoin_hadoop;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import edu.umd.cloud9.io.pair.PairOfStrings;import java.util.Iterator;public class LeftJoinReducer extends Reducer6、main{ Text productID = new Text(); Text locationID = new Text("undefined"); @Override public void reduce(PairOfStrings key, Iterable values, Context context) throws java.io.IOException, InterruptedException { System.out.println("key=" + key); Iterator iterator = values.iterator(); System.out.println("values"); if (iterator.hasNext()) { // firstPair must be location pair PairOfStrings firstPair = iterator.next(); System.out.println("firstPair="+firstPair.toString()); if (firstPair.getLeftElement().equals("L")) { locationID.set(firstPair.getRightElement()); } } while (iterator.hasNext()) { // the remaining elements must be product pair PairOfStrings productPair = iterator.next(); System.out.println("productPair="+productPair.toString()); productID.set(productPair.getRightElement()); context.write(productID, locationID); } }}
package LeftOutJoin_hadoop;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;//import edu.umd.cloud9.io.pair.PairOfStrings;public class LeftJoinDriver { public static void main( String[] args ) throws Exception { Path transactions = new Path("input/transactions.txt");// input Path users = new Path("input/users.txt"); // input Path output = new Path("output/1"); // output Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(LeftJoinDriver.class); job.setJobName("Phase-1: Left Outer Join"); // "secondary sort" is handled by setting the following 3 plug-ins: // 1. how the mapper generated keys will be partitioned job.setPartitionerClass(SecondarySortPartitioner.class); // 2. how the natural keys (generated by mappers) will be grouped job.setGroupingComparatorClass(SecondarySortGroupComparator.class); // 3. how PairOfStrings will be sorted job.setSortComparatorClass(PairOfStrings.Comparator.class); job.setReducerClass(LeftJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class); // define multiple mappers: one for users and one for transactions MultipleInputs.addInputPath(job, transactions, TextInputFormat.class, LeftJoinTransactionMapper.class); MultipleInputs.addInputPath(job, users, TextInputFormat.class, LeftJoinUserMapper.class); job.setMapOutputKeyClass(PairOfStrings.class); job.setMapOutputValueClass(PairOfStrings.class); FileOutputFormat.setOutputPath(job, output); if (job.waitForCompletion(true)) { return; } else { throw new Exception("Phase-1: Left Outer Join Job Failed"); } }}阶段1的输出:
p4 UT
p1 UT p1 UT p3 UT p2 GA p1 GA p4 CA p4 GA阶段2代码:
package LeftOutJoin_hadoop;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import java.io.IOException;public class LocationCountMapper extends Mapper{ private Text outputKey = new Text(); private Text outputValue = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println(value); String [] tokens = value.toString().split("\t"); System.out.println("tokens:" + tokens.length); outputKey.set(tokens[0]); outputValue.set( tokens[1]); context.write(outputKey,outputValue); }}
package LeftOutJoin_hadoop;import java.io.IOException;//import java.util.Set;import java.util.HashSet;//import org.apache.hadoop.io.Text;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Reducer;public class LocationCountReducer extends Reducer{ @Override public void reduce(Text productID, Iterable locations, Context context) throws IOException, InterruptedException { // Set set = new HashSet (); // for (Text location: locations) { set.add(location.toString()); } // context.write(productID, new LongWritable(set.size())); }}
package LeftOutJoin_hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LocationCountDriver { public static void main( String[] args ) throws Exception { Path input = new Path("output/1/"); Path output = new Path("output/2"); Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(LocationCountDriver.class); job.setJobName("Phase-2: LocationCountDriver"); FileInputFormat.addInputPath(job, input); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(LocationCountMapper.class); job.setReducerClass(LocationCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, output); if (job.waitForCompletion(true)) { return; } else { throw new Exception("LocationCountDriver Failed"); } }}阶段2输出:
p1 2
p2 1 p3 1 p4 3