apache-spark - tutorial - spark sql이란
Spark는 분할 된 데이터에서도 모든 리프 노드를 나열합니다. (2)
나는 마루 데이터를 date
와 hour
, 폴더 구조별로 분류했다 :
events_v3
-- event_date=2015-01-01
-- event_hour=2015-01-1
-- part10000.parquet.gz
-- event_date=2015-01-02
-- event_hour=5
-- part10000.parquet.gz
스파크를 통해 raw_events
테이블을 raw_events
만 쿼리를 시도 할 때 모든 디렉터리에서 바닥 글을 검색하고 초기 쿼리가 느려지더라도 데이터를 하루 만 쿼리합니다.
쿼리 : select * from raw_events where event_date='2016-01-01'
비슷한 문제 : http://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%[email protected].com%3E (그러나 이전 버전)
로그:
App > 16/09/15 03:14:03 main INFO HadoopFsRelation: Listing leaf files and directories in parallel under: s3a://bucket/events_v3/
350 일 분량의 데이터가 있기 때문에 350 개의 작업이 생성됩니다.
schemaMerge
를 사용하지 않도록 설정하고 읽을 스키마를 지정 했으므로 내가보고있는 파티션으로 갈 수 있으며 모든 리프 파일을 인쇄해야하는 이유는 무엇입니까? 2 명의 실행자가있는 리프 파일을 나열하는 데 10 분이 걸리고 실제 실행은 20 초가 걸립니다.
코드 샘플 :
val sparkSession = org.apache.spark.sql.SparkSession.builder.getOrCreate()
val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3a://bucket/events_v3")
df.createOrReplaceTempView("temp_events")
sparkSession.sql(
"""
|select verb,count(*) from temp_events where event_date = "2016-01-01" group by verb
""".stripMargin).show()
Gaurav의 답을 명확히하기 위해 코드를 잘라내는 것은 Hadoop 브랜치 -2에서 나온 것입니다. 아마도 Hadoop 2.9 ( HADOOP-13208 참조)가 될 때까지 표면을 HADOOP-13208 않을 것입니다. 누군가가 Spark를 업데이트하여 해당 기능을 사용해야합니다 (HDFS를 사용하는 코드에 해를 끼치 지 않으므로 속도가 향상되지 않습니다).
고려해야 할 한 가지는 : 객체 저장소에 적합한 파일 레이아웃을 만드는 것입니다.
- 디렉토리 당 몇 개의 파일 만있는 깊은 디렉토리 트리가 없습니다.
- 파일이 많은 얕은 나무가 있습니까?
- 가장 중요한 값 (예 : 요일 / 시간)은 파일의 처음 몇 문자를 사용하는 것이 좋습니다. 왜? 일부 객체 저장소는 해싱에 대한 선행 문자를 사용하는 것으로 보이지만 후행은 아닙니다. 이름을 좀 더 독창적으로 지정하면 더 많은 대역폭과 스로틀 위험없이 더 많은 서버에 퍼집니다.
- Hadoop 2.7 라이브러리를 사용하는 경우 s3n : //을 통해 s3a : //로 전환하십시오. ASF 소스 트리에서 이미 더 빨라지고 매주 개선되고 있습니다.
마지막으로 Apache Hadoop, Apache Spark 및 관련 프로젝트는 모두 오픈 소스입니다. 기부를 환영합니다. 이것은 코드뿐 아니라 문서화, 테스트 및 실제 데이터 세트에 대한 테스트입니다. 문제의 원인 (및 데이터 집합 레이아웃)에 대한 세부 정보를 제공하는 것도 흥미 롭습니다.
spark가 listLeafFiles
그것을 읽을 디렉토리가 listLeafFiles
(org / apache / spark / sql / execution / datasources / fileSourceInterfaces.scala)를 호출합니다. 그러면 fs.listStatus
가 호출되어 api 호출이 만들어 파일 및 디렉토리 목록을 가져옵니다. 이제 각 디렉토리에 대해이 메소드가 다시 호출됩니다. 이것은 디렉토리가 남지 않을 때까지 재귀 적으로 발생합니다. 이 디자인은 HDFS 시스템에서 잘 작동합니다. 하지만 목록 파일은 RPC 호출이므로 s3에서 작동하지 않습니다. S3에 대한 지원은 모든 파일을 접두사로 가져와서 우리가 필요로하는 것입니다.
예를 들어, 1 시간 동안 각 디렉토리에 1 년 분의 데이터가있는 디렉토리 구조가 있고 하위 디렉토리가 10 개 인 경우 (365 * 24 * 10 = 87k API 호출 인 경우), 138 API 호출로 줄일 수 있습니다. 137000 파일 만. 각 s3 api 호출은 1000 개의 파일을 반환합니다.
코드 : org/apache/hadoop/fs/s3a/S3AFileSystem.java
public FileStatus[] listStatusRecursively(Path f) throws FileNotFoundException,
IOException {
String key = pathToKey(f);
if (LOG.isDebugEnabled()) {
LOG.debug("List status for path: " + f);
}
final List<FileStatus> result = new ArrayList<FileStatus>();
final FileStatus fileStatus = getFileStatus(f);
if (fileStatus.isDirectory()) {
if (!key.isEmpty()) {
key = key + "/";
}
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName(bucket);
request.setPrefix(key);
request.setMaxKeys(maxKeys);
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: doing listObjects for directory " + key);
}
ObjectListing objects = s3.listObjects(request);
statistics.incrementReadOps(1);
while (true) {
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir);
// Skip over keys that are ourselves and old S3N _$folder$ files
if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring: " + keyPath);
}
continue;
}
if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
result.add(new S3AFileStatus(true, true, keyPath));
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: fd: " + keyPath);
}
} else {
result.add(new S3AFileStatus(summary.getSize(),
dateToLong(summary.getLastModified()), keyPath,
getDefaultBlockSize(f.makeQualified(uri, workingDir))));
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: fi: " + keyPath);
}
}
}
for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
if (keyPath.equals(f)) {
continue;
}
result.add(new S3AFileStatus(true, false, keyPath));
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: rd: " + keyPath);
}
}
if (objects.isTruncated()) {
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: list truncated - getting next batch");
}
objects = s3.listNextBatchOfObjects(objects);
statistics.incrementReadOps(1);
} else {
break;
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: rd (not a dir): " + f);
}
result.add(fileStatus);
}
return result.toArray(new FileStatus[result.size()]);
}
/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {
logTrace(s"Listing ${status.getPath}")
val name = status.getPath.getName.toLowerCase
if (shouldFilterOut(name)) {
Array.empty[FileStatus]
}
else {
val statuses = {
val stats = if(fs.isInstanceOf[S3AFileSystem]){
logWarning("Using Monkey patched version of list status")
println("Using Monkey patched version of list status")
val a = fs.asInstanceOf[S3AFileSystem].listStatusRecursively(status.getPath)
a
// Array.empty[FileStatus]
}
else{
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))
}
if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
}
// statuses do not have any dirs.
statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus => f
// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `listLeafFilesInParallel` when the number of
// paths exceeds threshold.
case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
}
}
}