java - tutorials - wordcount tutorial



감속기 코드에서 카운터가 작동하지 않습니다. (1)

Big hadoop 프로젝트를 진행하고 있으며 작은 KPI가 있습니다. 여기서는 상위 10 개 값만 축소 출력해야합니다. 이 요구 사항을 완료하기 위해 카운터를 사용하고 카운터가 11 일 때 루프를 중단했지만 감속기는 모든 값을 HDFS에 씁니다.

이것은 매우 간단한 자바 코드이지만, 나는 붙어있다 : (

테스트를 위해, 나는 이것을하기 위해 하나의 독립 실행 형 클래스 (Java 애플리케이션)를 만들었다. 나는 감속기 코드에서 작동하지 않는 이유가 궁금합니다.

제발 좀 도와주세요. 뭔가 놓친다면 제안 해주세요.

지도 - 코드 줄이기

package comparableTest;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class ValueSortExp2 {
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration(true);

        String arguments[] = new GenericOptionsParser(conf, args).getRemainingArgs();

        Job job = new Job(conf, "Test commond");
        job.setJarByClass(ValueSortExp2.class);

        // Setup MapReduce
        job.setMapperClass(MapTask2.class);
        job.setReducerClass(ReduceTask2.class);
        job.setNumReduceTasks(1);

        // Specify key / value
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        job.setSortComparatorClass(IntComparator2.class);
        // Input
        FileInputFormat.addInputPath(job, new Path(arguments[0]));
        job.setInputFormatClass(TextInputFormat.class);

        // Output
        FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
        job.setOutputFormatClass(TextOutputFormat.class);


        int code = job.waitForCompletion(true) ? 0 : 1;
        System.exit(code);

    }

    public static class IntComparator2 extends WritableComparator {

        public IntComparator2() {
            super(IntWritable.class);
        }

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

            Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
            Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();

            return v1.compareTo(v2) * (-1);
        }
    }

    public static class MapTask2 extends Mapper<LongWritable, Text, IntWritable, Text> {

            public void  map(LongWritable key,Text value, Context context) throws IOException, InterruptedException {

                String tokens[]= value.toString().split("\\t");

            //    int empId = Integer.parseInt(tokens[0])    ;    
                int count = Integer.parseInt(tokens[2])    ;

                context.write(new IntWritable(count), new Text(value));

            }    

        }


    public static class ReduceTask2 extends Reducer<IntWritable, Text, IntWritable, Text> {
        int cnt=0;
        public void reduce(IntWritable key, Iterable<Text> list, Context context)
                throws java.io.IOException, InterruptedException {


            for (Text value : list ) {
                cnt ++;

                if (cnt==11)
                {
                    break;    
                }

                context.write(new IntWritable(cnt), value);




            }

        }
}
}  

자바 코드 단순화

package comparableTest;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class TestData {

    //static int cnt=0;


    public static void main(String args[]) throws IOException, InterruptedException {

        ArrayList<String> list = new ArrayList<String>() {{
            add("A");
            add("B");
            add("C");
            add("D");
        }};


        reduce(list);


    }

    public static void reduce(Iterable<String> list)
            throws java.io.IOException, InterruptedException {


        int cnt=0;
        for (String value : list ) {
            cnt ++;

            if (cnt==3)
            {
                break;    
            }

            System.out.println(value);    


        }

    }
}

샘플 데이터 - 헤더는 더 많은 정보입니다. 실제 데이터는 두 번째 라인에서입니다.

` ID NAME COUNT (상위 10 개 내림차순을 표시해야 함)

1 토이 스토리 (1995) 2077

10 GoldenEye (1995) 888

100 시청 (1996) 128

1000 curdled (1996) 20

1001 준회원, (L' Associe) (1982) 0

1002 에드의 다음 행동 (1996) 8

1003 극한 척도 (1996) 121

1004 Glimmer Man, The (1996) 101

1005 D3 : The Mighty Ducks (1996) 142

1006 회의소, (1996) 78

1007 애플 덤 플링 갱, The (1975) 232

1008 데비 크로켓, 와일드 프론티어의 왕 (1955) 97

1009 마녀의 산으로 탈출하다 (1975) 291

101 보틀 로켓 (1996) 253

1010 Love Bug, The (1969) 242

1011 Herbie Rides Again (1974) 135

1012 올드 옐러 (1957) 301

1013 부모 함정, The (1961) 258

1014 Pollyanna (1960) 136

1015 귀가 바운드 경계 : 믿어지지 않는 여정 (1993) 234

1016 털 복숭아 개, The (1959) 156

1017 스위스 가족 Robinson (1960) 276

1018 그 고양이! (1965) 123

1019 20,000 바다 밑의 리그 (1954) 575

102 Mr. Wrong (1996) 60

1020 쿨 러닝 (1993) 392

1021 천사 외야 (1994) 247

1022 신데렐라 (1950) 577

1023 푸우와 방목의 날 (1968) 221

1024 세명의 Caballeros, The (1945) 126

돌에있는 1025 칼, The (1963) 293

1026 내 마음에 Dear (1949) 8

1027 Robin Hood : 도둑의 왕자 (1991) 344

1028 Mary Poppins (1964) 1011

1029 덤보 (1941) 568

103 잊을 수없는 (1996) 33

1030 피트의 용 (1977) 323

1031 Bedknobs and Broomsticks (1971) 319`


int cnt=0; reduce 메서드 (이 메서드의 첫 번째 문) 내에서 각 키의 처음 10 개 값을 가져옵니다 (원하는 것 같군요).

그렇지 않으면 현재와 같이 카운터가 계속 증가하고 키와 관계없이 11 번째 값만 건너 뛰고 12 번째로 계속됩니다.

키에 관계없이 10 개의 값만 인쇄하려면 cnt 초기화를 그대로두고 if 조건을 if (cnt > 10) 로 변경하십시오. 그러나 이것은 좋은 방법이 아니므로 알고리즘을 재검토해야합니다. (임의의 10 개의 값을 원하지 않는다고 가정 할 때, 분산 환경에서 어느 키가 처음으로 처리되는지, 둘 이상의 감속기와 해시 분할기가있을 때 어떻게 알 수 있습니까?)





hadoop2