apache-spark - tutorial - spark sql이란




Spark는 분할 된 데이터에서도 모든 리프 노드를 나열합니다. (2)

나는 마루 데이터를 datehour , 폴더 구조별로 분류했다 :

events_v3
  -- event_date=2015-01-01
    -- event_hour=2015-01-1
      -- part10000.parquet.gz
  -- event_date=2015-01-02
    -- event_hour=5
      -- part10000.parquet.gz

스파크를 통해 raw_events 테이블을 raw_events 만 쿼리를 시도 할 때 모든 디렉터리에서 바닥 글을 검색하고 초기 쿼리가 느려지더라도 데이터를 하루 만 쿼리합니다.

쿼리 : select * from raw_events where event_date='2016-01-01'

비슷한 문제 : http://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%[email protected].com%3E (그러나 이전 버전)

로그:

App > 16/09/15 03:14:03 main INFO HadoopFsRelation: Listing leaf files and directories in parallel under: s3a://bucket/events_v3/

350 일 분량의 데이터가 있기 때문에 350 개의 작업이 생성됩니다.

schemaMerge 를 사용하지 않도록 설정하고 읽을 스키마를 지정 했으므로 내가보고있는 파티션으로 갈 수 있으며 모든 리프 파일을 인쇄해야하는 이유는 무엇입니까? 2 명의 실행자가있는 리프 파일을 나열하는 데 10 분이 걸리고 실제 실행은 20 초가 걸립니다.

코드 샘플 :

val sparkSession = org.apache.spark.sql.SparkSession.builder.getOrCreate()
val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3a://bucket/events_v3")
    df.createOrReplaceTempView("temp_events")
    sparkSession.sql(
      """
        |select verb,count(*) from temp_events where event_date = "2016-01-01" group by verb
      """.stripMargin).show()

Gaurav의 답을 명확히하기 위해 코드를 잘라내는 것은 Hadoop 브랜치 -2에서 나온 것입니다. 아마도 Hadoop 2.9 ( HADOOP-13208 참조)가 될 때까지 표면을 HADOOP-13208 않을 것입니다. 누군가가 Spark를 업데이트하여 해당 기능을 사용해야합니다 (HDFS를 사용하는 코드에 해를 끼치 지 않으므로 속도가 향상되지 않습니다).

고려해야 할 한 가지는 : 객체 저장소에 적합한 파일 레이아웃을 만드는 것입니다.

  • 디렉토리 당 몇 개의 파일 만있는 깊은 디렉토리 트리가 없습니다.
  • 파일이 많은 얕은 나무가 있습니까?
  • 가장 중요한 값 (예 : 요일 / 시간)은 파일의 처음 몇 문자를 사용하는 것이 좋습니다. 왜? 일부 객체 저장소는 해싱에 대한 선행 문자를 사용하는 것으로 보이지만 후행은 아닙니다. 이름을 좀 더 독창적으로 지정하면 더 많은 대역폭과 스로틀 위험없이 더 많은 서버에 퍼집니다.
  • Hadoop 2.7 라이브러리를 사용하는 경우 s3n : //을 통해 s3a : //로 전환하십시오. ASF 소스 트리에서 이미 더 빨라지고 매주 개선되고 있습니다.

마지막으로 Apache Hadoop, Apache Spark 및 관련 프로젝트는 모두 오픈 소스입니다. 기부를 환영합니다. 이것은 코드뿐 아니라 문서화, 테스트 및 실제 데이터 세트에 대한 테스트입니다. 문제의 원인 (및 데이터 집합 레이아웃)에 대한 세부 정보를 제공하는 것도 흥미 롭습니다.


spark가 listLeafFiles 그것을 읽을 디렉토리가 listLeafFiles (org / apache / spark / sql / execution / datasources / fileSourceInterfaces.scala)를 호출합니다. 그러면 fs.listStatus 가 호출되어 api 호출이 만들어 파일 및 디렉토리 목록을 가져옵니다. 이제 각 디렉토리에 대해이 메소드가 다시 호출됩니다. 이것은 디렉토리가 남지 않을 때까지 재귀 적으로 발생합니다. 이 디자인은 HDFS 시스템에서 잘 작동합니다. 하지만 목록 파일은 RPC 호출이므로 s3에서 작동하지 않습니다. S3에 대한 지원은 모든 파일을 접두사로 가져와서 우리가 필요로하는 것입니다.

예를 들어, 1 시간 동안 각 디렉토리에 1 년 분의 데이터가있는 디렉토리 구조가 있고 하위 디렉토리가 10 개 인 경우 (365 * 24 * 10 = 87k API 호출 인 경우), 138 API 호출로 줄일 수 있습니다. 137000 파일 만. 각 s3 api 호출은 1000 개의 파일을 반환합니다.

코드 : org/apache/hadoop/fs/s3a/S3AFileSystem.java

public FileStatus[] listStatusRecursively(Path f) throws FileNotFoundException,
            IOException {
        String key = pathToKey(f);
        if (LOG.isDebugEnabled()) {
            LOG.debug("List status for path: " + f);
        }

        final List<FileStatus> result = new ArrayList<FileStatus>();
        final FileStatus fileStatus =  getFileStatus(f);

        if (fileStatus.isDirectory()) {
            if (!key.isEmpty()) {
                key = key + "/";
            }

            ListObjectsRequest request = new ListObjectsRequest();
            request.setBucketName(bucket);
            request.setPrefix(key);
            request.setMaxKeys(maxKeys);

            if (LOG.isDebugEnabled()) {
                LOG.debug("listStatus: doing listObjects for directory " + key);
            }

            ObjectListing objects = s3.listObjects(request);
            statistics.incrementReadOps(1);

            while (true) {
                for (S3ObjectSummary summary : objects.getObjectSummaries()) {
                    Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir);
                    // Skip over keys that are ourselves and old S3N _$folder$ files
                    if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Ignoring: " + keyPath);
                        }
                        continue;
                    }

                    if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
                        result.add(new S3AFileStatus(true, true, keyPath));
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Adding: fd: " + keyPath);
                        }
                    } else {
                        result.add(new S3AFileStatus(summary.getSize(),
                                dateToLong(summary.getLastModified()), keyPath,
                                getDefaultBlockSize(f.makeQualified(uri, workingDir))));
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Adding: fi: " + keyPath);
                        }
                    }
                }

                for (String prefix : objects.getCommonPrefixes()) {
                    Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
                    if (keyPath.equals(f)) {
                        continue;
                    }
                    result.add(new S3AFileStatus(true, false, keyPath));
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Adding: rd: " + keyPath);
                    }
                }

                if (objects.isTruncated()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("listStatus: list truncated - getting next batch");
                    }

                    objects = s3.listNextBatchOfObjects(objects);
                    statistics.incrementReadOps(1);
                } else {
                    break;
                }
            }
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Adding: rd (not a dir): " + f);
            }
            result.add(fileStatus);
        }

        return result.toArray(new FileStatus[result.size()]);
    }

/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala

def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {
    logTrace(s"Listing ${status.getPath}")
    val name = status.getPath.getName.toLowerCase
    if (shouldFilterOut(name)) {
      Array.empty[FileStatus]
    }
    else {
      val statuses = {
        val stats = if(fs.isInstanceOf[S3AFileSystem]){
          logWarning("Using Monkey patched version of list status")
          println("Using Monkey patched version of list status")
          val a = fs.asInstanceOf[S3AFileSystem].listStatusRecursively(status.getPath)
          a
//          Array.empty[FileStatus]
        }
        else{
          val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
          files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))

        }
        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
      }
      // statuses do not have any dirs.
      statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
        case f: LocatedFileStatus => f

        // NOTE:
        //
        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
        //   operations, calling `getFileBlockLocations` does no harm here since these file system
        //   implementations don't actually issue RPC for this method.
        //
        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
        //   paths exceeds threshold.
        case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
      }
    }
  }




parquet