java - tutorials - wordcount tutorial
감속기 코드에서 카운터가 작동하지 않습니다. (1)
Big hadoop 프로젝트를 진행하고 있으며 작은 KPI가 있습니다. 여기서는 상위 10 개 값만 축소 출력해야합니다. 이 요구 사항을 완료하기 위해 카운터를 사용하고 카운터가 11 일 때 루프를 중단했지만 감속기는 모든 값을 HDFS에 씁니다.
이것은 매우 간단한 자바 코드이지만, 나는 붙어있다 : (
테스트를 위해, 나는 이것을하기 위해 하나의 독립 실행 형 클래스 (Java 애플리케이션)를 만들었다. 나는 감속기 코드에서 작동하지 않는 이유가 궁금합니다.
제발 좀 도와주세요. 뭔가 놓친다면 제안 해주세요.
지도 - 코드 줄이기
package comparableTest;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class ValueSortExp2 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(true);
String arguments[] = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "Test commond");
job.setJarByClass(ValueSortExp2.class);
// Setup MapReduce
job.setMapperClass(MapTask2.class);
job.setReducerClass(ReduceTask2.class);
job.setNumReduceTasks(1);
// Specify key / value
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(IntComparator2.class);
// Input
FileInputFormat.addInputPath(job, new Path(arguments[0]));
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
job.setOutputFormatClass(TextOutputFormat.class);
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
public static class IntComparator2 extends WritableComparator {
public IntComparator2() {
super(IntWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();
return v1.compareTo(v2) * (-1);
}
}
public static class MapTask2 extends Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException {
String tokens[]= value.toString().split("\\t");
// int empId = Integer.parseInt(tokens[0]) ;
int count = Integer.parseInt(tokens[2]) ;
context.write(new IntWritable(count), new Text(value));
}
}
public static class ReduceTask2 extends Reducer<IntWritable, Text, IntWritable, Text> {
int cnt=0;
public void reduce(IntWritable key, Iterable<Text> list, Context context)
throws java.io.IOException, InterruptedException {
for (Text value : list ) {
cnt ++;
if (cnt==11)
{
break;
}
context.write(new IntWritable(cnt), value);
}
}
}
}
자바 코드 단순화
package comparableTest;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class TestData {
//static int cnt=0;
public static void main(String args[]) throws IOException, InterruptedException {
ArrayList<String> list = new ArrayList<String>() {{
add("A");
add("B");
add("C");
add("D");
}};
reduce(list);
}
public static void reduce(Iterable<String> list)
throws java.io.IOException, InterruptedException {
int cnt=0;
for (String value : list ) {
cnt ++;
if (cnt==3)
{
break;
}
System.out.println(value);
}
}
}
샘플 데이터 - 헤더는 더 많은 정보입니다. 실제 데이터는 두 번째 라인에서입니다.
` ID NAME COUNT (상위 10 개 내림차순을 표시해야 함)
1 토이 스토리 (1995) 2077
10 GoldenEye (1995) 888
100 시청 (1996) 128
1000 curdled (1996) 20
1001 준회원, (L' Associe) (1982) 0
1002 에드의 다음 행동 (1996) 8
1003 극한 척도 (1996) 121
1004 Glimmer Man, The (1996) 101
1005 D3 : The Mighty Ducks (1996) 142
1006 회의소, (1996) 78
1007 애플 덤 플링 갱, The (1975) 232
1008 데비 크로켓, 와일드 프론티어의 왕 (1955) 97
1009 마녀의 산으로 탈출하다 (1975) 291
101 보틀 로켓 (1996) 253
1010 Love Bug, The (1969) 242
1011 Herbie Rides Again (1974) 135
1012 올드 옐러 (1957) 301
1013 부모 함정, The (1961) 258
1014 Pollyanna (1960) 136
1015 귀가 바운드 경계 : 믿어지지 않는 여정 (1993) 234
1016 털 복숭아 개, The (1959) 156
1017 스위스 가족 Robinson (1960) 276
1018 그 고양이! (1965) 123
1019 20,000 바다 밑의 리그 (1954) 575
102 Mr. Wrong (1996) 60
1020 쿨 러닝 (1993) 392
1021 천사 외야 (1994) 247
1022 신데렐라 (1950) 577
1023 푸우와 방목의 날 (1968) 221
1024 세명의 Caballeros, The (1945) 126
돌에있는 1025 칼, The (1963) 293
1026 내 마음에 Dear (1949) 8
1027 Robin Hood : 도둑의 왕자 (1991) 344
1028 Mary Poppins (1964) 1011
1029 덤보 (1941) 568
103 잊을 수없는 (1996) 33
1030 피트의 용 (1977) 323
1031 Bedknobs and Broomsticks (1971) 319`
int cnt=0;
reduce 메서드 (이 메서드의 첫 번째 문) 내에서 각 키의 처음 10 개 값을 가져옵니다 (원하는 것 같군요).
그렇지 않으면 현재와 같이 카운터가 계속 증가하고 키와 관계없이 11 번째 값만 건너 뛰고 12 번째로 계속됩니다.
키에 관계없이 10 개의 값만 인쇄하려면 cnt
초기화를 그대로두고 if
조건을 if (cnt > 10)
로 변경하십시오. 그러나 이것은 좋은 방법이 아니므로 알고리즘을 재검토해야합니다. (임의의 10 개의 값을 원하지 않는다고 가정 할 때, 분산 환경에서 어느 키가 처음으로 처리되는지, 둘 이상의 감속기와 해시 분할기가있을 때 어떻게 알 수 있습니까?)