tcp - hdp yarn port



Apache Spark Job을 실행하는 동안 피어의 연결 재설정 (0)

두 개의 HDP 클러스터 설정을 A와 B라고합시다.

클러스터 노드 :

  • 여기에는 총 20 대의 상품 기계가 포함됩니다.
  • 20 개의 데이터 노드가 있습니다.
  • namenode HA가 구성되었으므로 하나의 활성 namenode와 하나의 대기 namenode가 있습니다.

클러스터 B 노드 :

  • 여기에는 총 5 개의 상품 기계가 포함됩니다.
  • 5 개의 데이터 노드가 있습니다.
  • HA가 구성되어 있지 않으며이 클러스터에는 1 차 및 1 차 namenode가 있습니다.

들어오는 파일에 대해 ETL (Extract, Transform 및 Load) 작업을 수행하는 세 가지 주요 구성 요소가 있습니다. 이 구성 요소를 각각 E, T 및 L로 지칭합니다.

구성 요소 전자 특성 :

  • 이 구성 요소는 Apache Spark Job이며 클러스터 B에서만 실행됩니다.
  • NAS 스토리지에서 파일을 가져와 클러스터 B의 HDFS에 저장하는 것이 일입니다.

구성 요소 T 특성 :

  • 이 구성 요소는 Apache Spark Job이기도하며 클러스터 B에서 실행됩니다.
  • 작업은 구성 요소 E로 작성된 HDFS의 파일을 선택하여 변환 한 다음 변형 된 파일을 클러스터 A의 HDFS에 기록하는 것입니다.

구성 요소 L 특성 :

  • 이 구성 요소는 Apache Spark 작업이며 클러스터 A에서만 실행됩니다.
  • 구성 요소 T가 작성한 파일을 선택하고 클러스터 A에있는 하이브 테이블에 데이터를로드하는 작업이 있습니다.

구성 요소 L은 세 가지 구성 요소 중 하나이며 우리는 그 구성 요소에 결함이 없었습니다. 구성 요소 E에는 설명 할 수없는 작은 결함이 있지만 구성 요소 T는 가장 귀찮은 결함입니다.

구성 요소 E와 T는 모두 DFS 클라이언트를 사용하여 namenode와 통신합니다.

다음은 구성 요소 T를 실행하는 동안 간헐적으로 관찰 한 예외의 일부입니다.

clusterA.namenode.com/10.141.160.141:8020. Trying to fail over immediately.
java.io.IOException: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details : local host is: "clusterB.datanode.com"; destination host is: "clusterA.namenode.com":8020;
            at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:782)
            at org.apache.hadoop.ipc.Client.call(Client.java:1459)
            at org.apache.hadoop.ipc.Client.call(Client.java:1392)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
            at com.sun.proxy.$Proxy15.complete(Unknown Source)
            at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:464)
            at sun.reflect.GeneratedMethodAccessor1240.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
            at com.sun.proxy.$Proxy16.complete(Unknown Source)
            at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2361)
            at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2338)
            at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2303)
            at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
            at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
            at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:109)
            at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
            at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
            at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
            at com.abc.xyz.io.CounterWriter.close(CounterWriter.java:34)
            at com.abc.xyz.common.io.PathDataSink.close(PathDataSink.java:47)
            at com.abc.xyz.diamond.parse.map.node.AbstractOutputNode.finalise(AbstractOutputNode.java:142)
            at com.abc.xyz.diamond.parse.map.application.spark.node.SparkOutputNode.finalise(SparkOutputNode.java:239)
            at com.abc.xyz.diamond.parse.map.DiamondMapper.onParseComplete(DiamondMapper.java:1072)
            at com.abc.xyz.diamond.parse.decode.decoder.DiamondDecoder.parse(DiamondDecoder.java:956)
            at com.abc.xyz.parsing.functions.ProcessorWrapper.process(ProcessorWrapper.java:96)
            at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:131)
            at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:45)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
            at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
            at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
            at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
            at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:123)
            at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:82)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
            at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
            at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
            at org.apache.spark.scheduler.Task.run(Task.scala:89)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection reset by peer
            at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
            at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
            at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
            at sun.nio.ch.IOUtil.read(IOUtil.java:197)
            at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
            at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
            at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
            at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
            at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
            at java.io.FilterInputStream.read(FilterInputStream.java:133)
            at java.io.FilterInputStream.read(FilterInputStream.java:133)
            at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:554)
            at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
            at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
            at java.io.DataInputStream.readInt(DataInputStream.java:387)
            at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1116)
            at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1011)   

언급 한 바와 같이, 우리는이 예외를 매우 간헐적으로 다루며, 어플리케이션이 발생하면 어플리케이션이 멈추어 서 다시 시작하게됩니다.

우리가 시도한 해결책 :

  • 첫 번째 용의자는 구성 요소 T가 많은 DFS 클라이언트를 병렬로 열고 다른 파일에서 파일 작업을 수행하므로 (동일한 파일에 대한 경합 문제 없음) 클러스터 A에서 활성 namenode가 오버로드되고 있다는 것입니다. 이 문제를 해결하기 위해 namenode dfs.namenode.handler.countipc.server.listen.queue.size에 대한 두 가지 핵심 매개 변수를 살펴본 후 128을 기본값 인 1024로 바꿨 습니다.

  • 불행하게도 문제는 여전히 T 구성 요소에서 지속되었습니다. 문제에 대해 다른 접근 방식을 취하기 시작했습니다. 우리는 오직 Peer에 의한 Connection Reset의 발생 이유를 찾는 것에 중점을 두었습니다. 많은 기사와 스택 교환 토론에 따르면 문제는 다음과 같이 설명됩니다 . RST 플래그는 피어에 의해 설정되어 연결을 즉시 종료합니다 . 우리의 경우 우리는 피어가 클러스터 A의 namenode라는 것을 확인했습니다.

  • RST 플래그를 염두에두고, TCP 통신의 내부를 이해하는 데 깊이 파고 들었습니다. RST 플래그의 이유 때문일뿐입니다.

  • 리눅스 배포본 (BSD는 아님)의 모든 소켓에는 두 개의 대기열, 즉 accept와 backlog가있다.
  • TCP 핸드 셰이크 프로세스 동안 모든 요청은 연결을 설정하기 시작한 노드에서 ACK 패킷을 수신 할 때까지 백 로그 대기열에 보관됩니다. 수신되면 요청은 수락 큐로 전송되고 소켓을 연 응용 프로그램은 원격 클라이언트에서 패킷 수신을 시작할 수 있습니다.
  • 백 로그 대기열의 크기는 net.ipv4.tcp_max_syn_backlognet.core.somaxconn 이라는 두 커널 수준 매개 변수로 제어되는 반면 응용 프로그램 (이 경우에는 namenode)은 상한값으로 제한되는 원하는 대기열 크기의 커널을 요청할 수 있습니다 accept 큐 크기는 ipc.server.listen.queue.size에 의해 정의 된 큐 크기라고 생각합니다.
  • 또한 여기에서 주목할 또 다른 흥미로운 점은 net.ipv4.tcp_max_syn_backlog 의 크기가 net.core.somaxconn 보다 큰 경우 전자의 값이 후자의 값으로 잘리는 것입니다. 이 주장은 Linux 문서를 기반으로하며 https://linux.die.net/man/2/listen 에서 찾을 수 있습니다.
  • 백 로그가 완전히 채워지는 시점으로 되돌아 가면 TCP는 두 가지 방식으로 동작 하며이 동작은 net.ipv4.tcp_abort_on_overflow 라는 커널 매개 변수로 제어 할 수도 있습니다. 이것은 기본적으로 0으로 설정되고 백 로그가 꽉 차면 커널이 새로운 SYN 패킷을 버리도록합니다. 그러면 보낸 사람이 SYN 패킷을 다시 보낼 수 있습니다. 1로 설정하면 커널은 패킷의 RST 플래그를 표시하고이를 보낸 사람에게 보내서 갑자기 연결을 종료합니다.

  • 위에서 언급 한 커널 매개 변수의 값을 확인한 결과 net.core.somaxconn 이 1024로 설정되고 net.ipv4.tcp_abort_on_overflow 가 0으로 설정되고 net.ipv4.tcp_max_syn_backlog 가 4096으로 설정된 것으로 확인되었습니다. 클러스터.

  • 우리가 남긴 유일한 용의자는 클러스터 A에 클러스터 B를 연결하는 스위치입니다. 클러스터의 어느 머신도 net.ipv4.tcp_abort_on_overflow 매개 변수가 0으로 설정되어 있기 때문에 RST 플래그를 설정하지 않기 때문입니다.

내 질문

  • DFS 클라이언트가 파일 조작을 수행하기 위해 namenode와 통신하기 위해 RPC를 사용한다는 것은 HDFS 문서에서 분명합니다. 모든 RPC 호출에는 namenode에 대한 TCP 연결 설정이 필요합니까?
  • ipc.server.listen.queue.size 매개 변수가 namenode가 RPC 요청을 받아들이는 소켓의 수용 대기열 길이를 정의합니까?
  • namenode는 과부하 일 때 암시 적으로 DFS 클라이언트에 대한 연결을 닫을 수 있으므로 커널 매개 변수 net.ipv4.tcp_abort_on_overflow 가 0으로 설정되어 있어도 커널이 RST 플래그가 설정된 패킷을 보내도록 설정합니까?
  • 버스트 트래픽을 처리 할 수 ​​없기 때문에 RST 플래그를 설정할 수있는 L2 또는 L3 스위치 (두 클러스터의 시스템 연결에 사용)입니까?

이 문제에 대한 우리의 다음 접근 방식은 tcpdump 또는 wireshark를 사용하여 패킷을 분석하여 RST 플래그를 설정하는 머신 또는 스위치 (라우터가 관련되지 않음)를 식별하는 것입니다. 또한 버스트 트래픽을 효과적으로 처리하기 위해 위에서 언급 한 모든 대기열의 크기를 4096으로 늘립니다.

namenode 로그에는 Ambari에서 볼 수있는 네임 노드 연결로드가 특정 시점에 엿보여졌으며 피어 연결 재설정 예외가 발생할 때가 아니라는 것을 제외하고는 예외가 표시되지 않습니다.

결론을 내리면, 우리는이 문제를 해결하기 위해 올바른 방향으로 가고 있는지, 아니면 막 다른 길을 가고 있는지 알고 싶습니다.

추신 : 내 질문에 내용의 길이에 대해 사과드립니다. 어떤 도움이나 제안을 요청하기 전에 독자들에게 전체 문맥을 보여주고 싶었습니다. 양해 해 주셔서 감사합니다.