Map-Reduce 여러번 하기 - ChainMapper
Intoduction
Hadoop을 이용하다 보면, 원본 데이타(Source)의 상태에 따라 여러 번의 Mapper 함수가 필요한 경우가 있습니다. 이러한 경우를 위해서, ChainMapper라는 함수를 제공합니다.
ChainMapper/ChainReducer
재밌는 것은, Reduce는 한번만 가능한다는 점입니다. Mapper가 여러 번 작동되어도, 혹은 Reduce 이후에 Mapper를 다시 사용할 수도 있지만... Reducer는 꼭 한번만 존재해야 합니다. 정규식으로 이러한 관계를 나타내면, 다음과 같습니다.
Map+ Reduce Map*
정규 표현식의 +, *의 의미를 떠올려 보시면 금방 이해가 되시리라 생각됩니다. 이러한 형태로 Data Flow를 잡으셨다면, 최적의 선택일지도 모르겠습니다만, 조금 신가한 관계입니다. 사용법은 상당히 간단합니다. ChainMapper, ChainReducer라는 클래스로 이어주기만 하면 끝입니다. 다음 코드를 보시면...
JobConf conf = new JobConf(getConf(), Count.class); JobConf mapConf = new JobConf(false); ChainMapper.addMapper(conf, Tokenizer.class, LongWritable.class, Text.class, Text.class, Text.class, true, mapConf); JobConf parseConf = new JobConf(false); ChainMapper.addMapper(conf, Parser.class, Text.class, Text.class, Text.class, Text.class, true, parseConf); JobConf keyReduceConf = new JobConf(false); ChainReducer.setReducer(conf, KeyReduce.class, Text.class, Text.class, Text.class, IntWritable.class, true, keyReduceConf); // setReduce는 한 번만 사용할 수 있습니다 // ChainReducer.setReducer(conf, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, null); JobClient.runJob(conf);
상당히 깔끔하고 사용하기 쉽습니다. 선언만 해주면, 나머지 코드는 수정할 부분이 없이 동작하게 됩니다. 저의 경우에는, Reduce를 여러 번 수행해야 했기 때문에, 적절한 방법은 아니었습니다.
여러 번의 Reduce = 여러 개의 Job
위의 경우로는, Map을 여러 번 진행시킬 수는 있지만, Reduce는 한 번만 실행되어야 합니다. 이러한 경우에는, 단순히 Job을 여러 개 만들어서 진행시킬 수 있겠습니다. 다만, 2개의 job 사이의 데이타의 흐름을 신경써야 하므로, Sequence 파일 형태로 진행시키는 것이 좋습니다.
JobConf mergeJob = new JobConf(getConf(), Find.class); try { // 1. Merge - 먼저, 병합하는 MR mergeJob.setJobName("MakingPrescription"); mergeJob.setMapOutputKeyClass(Text.class); mergeJob.setMapOutputValueClass(Text.class); mergeJob.setOutputKeyClass(Text.class); mergeJob.setOutputValueClass(LongWritable.class); // Grep Job의 Input을 SequenceFile로 내보냄 mergeJob.setOutputFormat(SequenceFileOutputFormat.class); mergeJob.setMapperClass(Tokenizer.class); mergeJob.setReducerClass(KeyReduce.class); FileInputFormat.setInputPaths(mergeJob, args[0]); FileOutputFormat.setOutputPath(mergeJob, mergeTemp); JobClient.runJob(mergeJob); // 2. Grep - 해당하는 찾아내는 MR JobConf grepJob = new JobConf(getConf(), Find.class); grepJob.setJobName("FilteringSelectivly"); grepJob.setOutputKeyClass(Text.class); grepJob.setOutputValueClass(LongWritable.class); grepJob.setMapperClass(FilterMapper.class); grepJob.set("mapred.mapper.regex", String.format("^%s.*%s", args[3], args[2])); // MR작업에 Parameter를 넘겨줌 grepJob.setReducerClass(FilterReduce.class); // Merge Job의 Output을 SequenceFile로 받음 grepJob.setInputFormat(SequenceFileInputFormat.class); FileInputFormat.setInputPaths(grepJob, mergeTemp); FileOutputFormat.setOutputPath(grepJob, new Path(args[1])); JobClient.runJob(grepJob); } finally { FileSystem.get(mergeJob).delete(mergeTemp, true); FileSystem.get(mergeJob).delete(grepTemp, true); }
위의 소스를 보시면, 간단히 2번의 job을 돌리는 것을 보실 수 있습니다. 중간에 임시 파일을 사용하여 2개의 Job이 연결되었으며, finally에서 정리하고 있습니다.