Troubleshooting Data Engineering Software

Hello, I’m Keiji Yoshida. I currently work as a data engineer at LINE Data Labs. In this posting, I’d like to share a few cases on data engineering software troubleshooting:

Troubleshooting failover issue in Apache Hadoop YARN ResourceManager

Let’s start with the failover issue in Apache Hadoop YARN ResourceManager (RM). Failover is a backup mode where the Standby RM takes over when the Active RM goes down. YARN, one of Apache Hadoop‘s core components, is responsible for resource management and job scheduling. Hadoop is an open source distributed processing framework for Apache.

LINE Data Labs manages multiple Hadoop clusters, and the data from each LINE service is collected into one cluster. LINE service data are stored in HDFS (Hadoop Distributed File System) via Apache Sqoop. Applications such as MapReduceTEZ and Spark run on the YARN cluster, aggregating and processing data. Our system is constructed as follows: 

This particular Hadoop cluster was set up with HDP-2.6.2.0(2.6.2.0-205) and YARN v2.7.3.

Issue discovery

Once the Hadoop cluster was installed and deployed with a lot of applications running on the cluster, a number of failovers occurred at the YARN’s RM with the High Availability (HA) feature.  The RM logs at the time of the failover are shown below:

WARN  resourcemanager.EmbeddedElectorService (EmbeddedElectorService.java:enterNeutralMode(175)) - Lost contact with Zookeeper. Transitioning to standby in 10000 ms if connection is not reestablished.
INFO  resourcemanager.ResourceManager (ResourceManager.java:transitionToStandby(1070)) - Transitioning to standby state
INFO  resourcemanager.ResourceManager (ResourceManager.java:transitionToStandby(1077)) - Transitioned to standby state

The logs indicate that the failover was triggered as the active RM failed to send a heartbeat to the Apache ZooKeeper cluster within the specified session timeout period. For this Hadoop cluster, the timeout period was set to 10,000 microseconds as below:

"yarn.resourcemanager.zk-timeout-ms = 10000"

I started to investigate why the active RM failed to send a heartbeat within the timeout period. 

Searching for the cause of YARN RM failover

Checking the logs

I ruled out the GC (Garbage Collector) as a root cause as the active RM’s GC logs did not show that the JVM stopped working for long. Moreover, server resource statistics such as CPU usage were within normal ranges. I tentatively concluded that this failover issue was coming from the RM itself. 

When I dug a little deeper into the logs of the active RM at the time of the failover, I found the following information appearing repeatedly.

INFO  event.AsyncDispatcher (AsyncDispatcher.java:handle(243)) - Size of event-queue is 1000
INFO event.AsyncDispatcher (AsyncDispatcher.java:handle(243)) - Size of event-queue is 2000
INFO event.AsyncDispatcher (AsyncDispatcher.java:handle(243)) - Size of event-queue is 3000
INFO event.AsyncDispatcher (AsyncDispatcher.java:handle(243)) - Size of event-queue is 4000
INFO event.AsyncDispatcher (AsyncDispatcher.java:handle(243)) - Size of event-queue is 5000
INFO event.AsyncDispatcher (AsyncDispatcher.java:handle(243)) - Size of event-queue is 6000
INFO event.AsyncDispatcher (AsyncDispatcher.java:handle(243)) - Size of event-queue is 7000
INFO event.AsyncDispatcher (AsyncDispatcher.java:handle(243)) - Size of event-queue is 8000
...

These logs were printed out by the org.apache.hadoop.yarn.event.AsyncDispatcher (reference), and the size value of the event-queue increased to approximately 400,000, immediately before the failover. Such logs rarely appear. But, since I couldn’t conclude that these logs were directly related to the failover, I decided to look into RM’s source code to better understand the event-queue.

Event handling by RM

The YARN RM’s AsyncDispatcher handles various events of YARN applications (example: starting and ending an application,  launching and removing a container) with LinkedBlockingQueue.

When an event is triggered by opening or closing an application, or by launching or removing a container, the event is passed to the GenericEventHandler#handle method and added to LinkedBlockingQueue in the method. The other thread takes each event sequentially from the LinkedBlockingQueue and processes each event by calling the relevant method of Event Handler. We’ve identified each event and its matching event handler from the logs of the RM, as the following:

Note. The following events are listed at org.apache.hadopp.yarn.server.resourcemanager.

EventEvent handler
RMFatalEventTypeRMFatalEventDispatcher
RMStateStoreEventTypeForwardingEventHandler
NodesListManagerEventTypeNodesListManager
SchedulerEventTypeSchedulerEventDispatcher
RMAppEventTypeApplicationEventDispatcher
RMAppAttemptEventTypeApplicationAttemptEventDispatcher
RMNodeEventTypeNodeEventDispatcher
RMAppManagerEventTypeRMAppManager
AMLauncherEventTypeApplicationMasterLauncher
SystemMetricsEventTypeForwardingEventHandler
RMAppAttemptEventTypeAppEventHandler
RMAppManagerEventTypeAppManagerEventHandler
RMStateStoreEventTypeStateStoreEventHandler
RMContainerEventTypeContainerEventHandler

The log in question, "Size of event-queue is 1000", was output immediately before the GenericEventHandler#handle  method added an event to LinkedBlockingQueue, and there were about 400,000 events remaining in the LinkedBlockingQueue immediately before the failover. 

The thread named SendThread in the ClientCnxn class, a ZooKeeper client in the RM, sends a heartbeat from the RM to ZooKeeper. I needed to further investigate if a large number of events in LinkedBlockingQueue has anything to do with RM failing to send a heartbeat to Zookeeper within the timeout period, especially since the relation between those events and this thread was not clear.

Monitoring JMX metrics

To compare the status of RM’s JVM before and after the failover, I used the Prometheus JMX Exporter to draw a time series graph of various indicators in the Grafana. The following graph shows that the number of JVM threads in the RM sharply increased at the time of the failover. 

The failover occurred close to 12:20 as shown at the far right end of the graph. The number of threads was usually less than 1,000 but shot up to 15,000 immediately before the failover. 

Checking the thread Data

I analyzed the thread dump to see which threads increased before the failover. Based on my experience, I was aware that a failover is likely to occur when the value of Size of event-queue exceeds 10,000. The following table shows the statistics on each of thread.

ThreadQuantityRatio
RegistryAdminService13,98997.6 %
IPC Server handler1551.1 %
ApplicationMasterLauncher500.3 %
AsyncDispatcher event handler130.1 %
Others1270.9 %
Total14,334100.0

As shown in this table, the RegistryAdminService thread accounted for 97.6 % of all threads before the failover. 

RegistryAdminService Thread

This time, I checked what the RegistryAdminService thread did. This thread was created to process the following four events in the RMRegistryService class:

  • RMStateStoreEventType.STORE_APP
  • RMAppAttemptEventType.UNREGISTERED
  • RMAppManagerEventType.APP_COMPLETED
  • RMAppAttemptEventType.CONTAINER_FINISHED, RMContainerEventType.FINISHED

See the following description for the event details.

RMStateStoreEventType.STORE_APP

This event is triggered when application information is registered to RMStateStore, a RegistryAdminService thread is created to make a user directory /registry/users/{username} in ZooKeeper. See the handling here.

RMAppAttemptEventType.UNREGISTERED

This event is triggered when deleting an application attempt, a RegistryAdminService thread is created to delete data corresponding to the app attempt ID (example: appattempt_1523336070074_12345_000001) under the /registry directory in ZooKeeper. See the handling here.

RMAppManagerEventType.APP_COMPLETED

This event is triggered when an application is completed, a RegistryAdminService thread is created to delete data corresponding to application ID (example: application_1523336070074_12345) under the /registry directory in ZooKeeper. See the handling here.

RMAppAttemptEventType.CONTAINER_FINISHED, RMContainerEventType.FINISHED

This event is triggered when a container is being removed, a RegistryAdminService  thread is created to delete data corresponding to a container ID (example: container_e102_1523336070074_12346_01_000221) under the /registry directory in ZooKeeper. See the handling here and here, respectively.

It turned out that the events for removing containers —RMAppAttemptEventType.CONTAINER_FINISHED and RMContainerEventType.FINISHED —  were most frequently fired. For example, when a MapReduce job using 1,000 containers is completed, all of 1,000 containers stop, creating 1,000 events to be registered to the LinkedBlockingQueue as mentioned in the Event Handling by RM section above. These events are sent to the AsyncDispatcher sequentially to trigger the processing of the events, and a RegistryAdminService thread is created for each event to send a request to ZooKeeper to delete the corresponding data. And, as requests from the RM to ZooKeeper are executed as a single thread in the ClientCnxn  instance, a ZooKeeper client, RegistryAdminService threads are created corresponding to the number of containers when a large number of containers are being removed. This creates a bottleneck among the requests to ZooKeeper, leaving a large number of RegistryAdminService threads pending.

Solution for YARN RM failover

The RM source code shows that processing of the four events in RMRegistryService class creates a RegistryAdminService thread, and is executed only when the the value of hadoop.registry.rm.enabled, a YARN settings, is set true. When this value is true, the event handling is added to use the YARN Service Registry (enabling communications between YARN applications). When installing various components of Hadoop cluster with Ambari, this value must be automatically set as true. As our system does not use the YARN Service Registry, I changed the value of hadoop.registry.rm.enabled to false so that RegistryAdminService threads are not created even if a large of number of containers stop at the same time. I ran a test to see if a RM failover still occurred. 

I changed the value of hadoop.registry.rm.enabled to false, and watched the stats for a while. I also ran MapReduce job using thousands of containers. As shown in the graph below, the number of JVM threads was around 300 and RM failovers no longer occurred.

You can find a similar case here. It is already a well-known RM failover issue when using the YARN Service Registry; RM failover triggered by heartbeat timeout due to the connectivity issues between RM and ZooKeeper.


Troubleshooting Apache Hadoop HDFS NameNode failover issue

The HDFS (Hadoop Distributed File System) is a distributed file system designed to run on a commodity hardware. The NameNode, a key component of HDFS, maintains the directory tree of all files in the file system namespace and tracks the location of each file data in the cluster. But, the NameNode does not store actual file data.

One of the Hadoop clusters managed by LINE Data Labs collects server access logs and application logs using fluentd. Such log data are registered to the table in Apache Hive every hour via ETL (Extract, Transform, Load) batch processing. The flow of data in this task is as follows: 

This Hadoop cluster was set up with HDP-2.6.1.0 (2.6.1.0-129), and HDFS v2.7.3. 

The data flow of registering log data to a Hive table is described below:

  1. Move log data files in fluentd to the HDFS directory 
  2. Run the hive> LOAD DATA command to move log data files to the intermediate table directory.
  3. Run the hive> INSERT ... SELECT command to write data from the intermediate table to the final table. Note that file format is changed.
  4. Run the hive> ALTER TABLE ... DROP PARTITION command to delete the log data files in the intermediate table. 

Issue discovery

In HDFS in HA mode, the NameNode failover was witnessed a few times at a specific time (around 9 AM) of the day. We’ve checked the ZooKeeper Failover Controller (ZKFC) logs at the time of the failover. Here is a snap of the log:

09:03:08,165 WARN  ha.HealthMonitor (HealthMonitor.java:doHealthChecks(211)) - Transport-level exception trying to monitor health of NameNode at xxxx/xxx.xxx.xxx.xxx:xxxx: java.net.SocketTimeoutException: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/xxx.xxx.xxx.xxx:xxxx remote=xxxx/xxx.xxx.xxx.xxx:xxxx] Call From xxxx/xxx.xxx.xxx.xxx to xxxx:xxxx failed on socket timeout exception: java.net.SocketTimeoutException: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/xxx.xxx.xxx.xxx:xxxx remote=xxxx/xxx.xxx.xxx.xxx:xxxx]; For more details see:  http://wiki.apache.org/hadoop/SocketTimeout

The logs indicate that the failover was triggered as the NameNode failed to respond to a health check from the ZKFC within the timeout period (60 seconds). So, I decided to find out the reasons of failed health check by ZKFC by looking into the NameNode logs and its processing details. 

Searching for the cause of NameNode failover

The following NameNode logs were found immediately before the failover. This shows that deleting files in each of subdirectory under the .Trash directory started. That is, the log data files deleted were moved to the .Trash directory in the corresponding Hadoop cluster. And, when one of the subdirectory was being deletedFSNamesystem write lock was held for too long,  approximately for 36 seconds.

09:00:30,574 INFO  fs.TrashPolicyDefault (TrashPolicyDefault.java:deleteCheckpoint(319)) - TrashPolicyDefault#deleteCheckpoint for trashRoot: hdfs://xxxx/user/hive/.Trash
09:01:06,673 INFO namenode.FSNamesystem (FSNamesystem.java:writeUnlock(1659)) - FSNamesystem write lock held for 36094 ms via
java.lang.Thread.getStackTrace(Thread.java:1559)
org.apache.hadoop.util.StringUtils.getStackTrace(StringUtils.java:945)
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.writeUnlock(FSNamesystem.java:1659)
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3975)
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.delete(NameNodeRpcServer.java:1078)
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.delete(ClientNamenodeProtocolServerSideTranslatorPB.java:622)
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
java.security.AccessController.doPrivileged(Native Method)
javax.security.auth.Subject.doAs(Subject.java:422)
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
org.apache.hadoop.ipc.Server$Handler.run(Server.java:2345)
Number of suppressed write-lock reports: 0
Longest write-lock held interval: 36094

Afterwards, the following logs were also recorded. There was 1 to 7 seconds of pause in GC (Garbage Collector) to delete a block, and FSNamesystemwrite lock continued to be held.  

09:01:17,955 INFO  util.JvmPauseMonitor (JvmPauseMonitor.java:run(196)) - Detected pause in JVM or host machine (eg GC): pause of approximately 1487ms
GC pool 'ParNew' had collection(s): count=1 time=1937ms
09:01:17,993 INFO  namenode.FSNamesystem (FSNamesystem.java:writeUnlock(1659)) - FSNamesystem write lock held for 2038 ms via
java.lang.Thread.getStackTrace(Thread.java:1559)
org.apache.hadoop.util.StringUtils.getStackTrace(StringUtils.java:945)
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.writeUnlock(FSNamesystem.java:1659)
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.removeBlocks(FSNamesystem.java:4009)
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3979)
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.delete(NameNodeRpcServer.java:1078)
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.delete(ClientNamenodeProtocolServerSideTranslatorPB.java:622)
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
java.security.AccessController.doPrivileged(Native Method)
javax.security.auth.Subject.doAs(Subject.java:422)
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
org.apache.hadoop.ipc.Server$Handler.run(Server.java:2345)
        Number of suppressed write-lock reports: 0
        Longest write-lock held interval: 2038

Given this situation, I checked the length of the NameNode RPC client port queue. As FSNamesystem write lock was held for a long time while deleting a file block, there were a lot of pending RPC calls (about 3,500) in the Queue.

ZooKeeper Failover Controller logs showed that the port number for NameNode health check was the same as the port number of RPC Queue. As the health check was pending for long time in the queue, a timeout was triggered, resulting in a NameNode failover. 

Solution for NameNode failover

I decided to resolve this issue in the following ways:

  • Reducing the interval for deleting files in the subdirectory of the .Trash directory
  • Changing the port for NameNode health check by ZooKeeper

Reducing the interval for deleting files in the subdirectory of the .Trash directory

Initially, fs.trash.interval was set to 4320 (in minutes) so files in the subdirectory of the .Trash directory were deleted every three days. I changed this interval to 1440 minutes, so that the files would be deleted on a daily basis. Consequently, this will reduce the write lock of FSNamesystem when deleting files in the subdirectory of .Trash directory.

Changing the port for NameNode health check by ZooKeeper

You can assign a dedicated port for ZKFC and DataNode health check (including block report), using this page as reference. By using a dedicated port for health checks, NameNode will be able to respond to ZKFC’s health check call without a delay even if a lot of RPC calls are lining up in the RPC client port queue for deleting a large quantity of files. I added the following settings to enable the Service RPC port.

  • dfs.namenode.servicerpc-address.mycluster.nn1=mynamenode1:8040
  • dfs.namenode.servicerpc-address.mycluster.nn2=mynamenode2:8040

Troubleshooting Apache Zeppelin Notebook scheduler issue

Issue discovery

Currently, we are using an in-house data analysis application, OASIS. Before OASIS, we internally used Apache Zeppelin v0.7.3 within limited scope. When using Apache Zeppelin, we often experienced notebook failing to automatically run on a given schedule. This was resolved when Apache Zeppelin was restarted, but it wasn’t long before we experienced the same issue again. 

Searching for the cause of scheduler issue

I couldn’t find any clue from the Apache Zeppelin logs so I looked into the thread dump of the Apache Zeppelin JVM process. As shown below, Quartz Scheduler Worker threads that run Notebook schedule were all in the TIMED_WAITING(sleeping) status.

"DefaultQuartzScheduler_Worker-10" #76 prio=5 os_prio=0 tid=0x00007fb41d3b4000 nid=0x1b521 sleeping[0x00007fb3daef1000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:889)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0a7dbf0> (a java.lang.Object)


   Locked ownable synchronizers:
        - None


"DefaultQuartzScheduler_Worker-9" #75 prio=5 os_prio=0 tid=0x00007fb41d3b2000 nid=0x1b520 waiting on condition [0x00007fb3daff2000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:889)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0a7a470> (a java.lang.Object)


   Locked ownable synchronizers:
        - None


...


"DefaultQuartzScheduler_Worker-2" #68 prio=5 os_prio=0 tid=0x00007fb41d3c8800 nid=0x1b519 waiting on condition [0x00007fb3da473000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:889)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0a7a7b0> (a java.lang.Object)


   Locked ownable synchronizers:
        - None


"DefaultQuartzScheduler_Worker-1" #67 prio=5 os_prio=0 tid=0x00007fb41d3cc800 nid=0x1b518 waiting on condition [0x00007fb3da372000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:889)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0a7dd90> (a java.lang.Object)


   Locked ownable synchronizers:
        - None

All threads were in sleep mode because they were all being held at the Thread.sleep(1000)(line 7 below) of the execute()  function.

String noteId = context.getJobDetail().getJobDataMap().getString("noteId");
Note note = notebook.getNote(noteId);
note.runAll();

while (!note.isTerminated()) {
  try {
    Thread.sleep(1000);
  } catch (InterruptedException e) {
    logger.error(e.toString(), e);
  }
}

Looking into details, a Notebook was scheduled to run but did not run, as it got stuck at some paragraph, leading to repeated execution of the Thread.sleep(1000) function. I looked up the code to find out the conditions for checking the completion of Notebook and job in order to identify why this step was not completed.

For Notebook to be identified as complete, all the paragraphs of a Notebook shouldn’t be in any of the following status: readyrunning or pending. When there is a disabled (execution disabled) paragraph in a Notebook, the status of the Notebook will remain as ready because execution of such paragraph will be skipped. As a result, the Notebook cannot be complete, moving to the Thread.sleep(1000) call, leaving the Notebook remain in the sleep status.

Solution for Apache Zeppelin Notebook scheduler issue

I sent a pull request to the Apache Zeppelin’s GitHub repository to resolve this issue and the same to the Apache Zeppelin code managed internally. Please see the PR for the details.


Troubleshooting Apache Zeppelin deadlock issue

Issue discovery

When we were running Apache Zeppelin v0.7.3, we often experienced abrupt cases where there were no response and nothing could be done unless we restarted Apache Zeppelin.

Searching for the cause of Zeppelin deadlock issue

Again, I couldn’t find any clue from the Apache Zeppelin logs, so I looked into the thread dump of Apache Zeppelin JVM process. As shown below, there was a deadlock internally.

Found one Java-level deadlock:
=============================
"pool-2-thread-63":
  waiting to lock monitor 0x00007fd2fc0073e8 (object 0x00000000c04f9c50, a java.util.concurrent.ConcurrentHashMap),
  which is held by "DefaultQuartzScheduler_Worker-2"
"DefaultQuartzScheduler_Worker-2":
  waiting to lock monitor 0x00007fd28c0036c8 (object 0x00000000c16f4738, a org.apache.zeppelin.notebook.Note),
  which is held by "DefaultQuartzScheduler_Worker-4"
"DefaultQuartzScheduler_Worker-4":
  waiting to lock monitor 0x00007fd2fc0073e8 (object 0x00000000c04f9c50, a java.util.concurrent.ConcurrentHashMap),
  which is held by "DefaultQuartzScheduler_Worker-2"


Java stack information for the threads listed above:
===================================================
"pool-2-thread-63":
        at org.apache.zeppelin.interpreter.InterpreterSettingManager.get(InterpreterSettingManager.java:981)
        - waiting to lock <0x00000000c04f9c50> (a java.util.concurrent.ConcurrentHashMap)
        at org.apache.zeppelin.interpreter.InterpreterSettingManager.getInterpreterSettings(InterpreterSettingManager.java:450)
        at org.apache.zeppelin.interpreter.InterpreterFactory.getInterpreter(InterpreterFactory.java:382)
        at org.apache.zeppelin.notebook.Paragraph.getRepl(Paragraph.java:255)
        at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:361)
        at org.apache.zeppelin.scheduler.Job.run(Job.java:175)
        at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:329)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
"DefaultQuartzScheduler_Worker-2":
        at org.apache.zeppelin.notebook.Note.stopDelayedPersistTimer(Note.java:789)
        - waiting to lock <0x00000000c16f4738> (a org.apache.zeppelin.notebook.Note)
        at org.apache.zeppelin.notebook.Note.persist(Note.java:725)
        at org.apache.zeppelin.socket.NotebookServer$ParagraphListenerImpl.afterStatusChange(NotebookServer.java:2070)
        at org.apache.zeppelin.scheduler.Job.setStatus(Job.java:149)
        at org.apache.zeppelin.interpreter.InterpreterSettingManager.stopJobAllInterpreter(InterpreterSettingManager.java:966)
        at org.apache.zeppelin.interpreter.InterpreterSettingManager.restart(InterpreterSettingManager.java:942)
        - locked <0x00000000c04f9c50> (a java.util.concurrent.ConcurrentHashMap)
        at org.apache.zeppelin.interpreter.InterpreterSettingManager.restart(InterpreterSettingManager.java:956)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:907)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0ef6190> (a java.lang.Object)
"DefaultQuartzScheduler_Worker-4":
        at org.apache.zeppelin.interpreter.InterpreterSettingManager.get(InterpreterSettingManager.java:981)
        - waiting to lock <0x00000000c04f9c50> (a java.util.concurrent.ConcurrentHashMap)
        at org.apache.zeppelin.interpreter.InterpreterSettingManager.getInterpreterSettings(InterpreterSettingManager.java:450)
        at org.apache.zeppelin.interpreter.InterpreterFactory.getInterpreter(InterpreterFactory.java:382)
        at org.apache.zeppelin.notebook.Paragraph.isValidInterpreter(Paragraph.java:701)
        at org.apache.zeppelin.notebook.Paragraph.getMagic(Paragraph.java:690)
        at org.apache.zeppelin.notebook.Paragraph.isBlankParagraph(Paragraph.java:354)
        at org.apache.zeppelin.notebook.Note.run(Note.java:609)
        at org.apache.zeppelin.notebook.Note.runAll(Note.java:596)
        at org.apache.zeppelin.notebook.Note.runAll(Note.java:587)
        - locked <0x00000000c16f4738> (a org.apache.zeppelin.notebook.Note)
        at org.apache.zeppelin.notebook.Notebook$CronJob.execute(Notebook.java:885)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
        - locked <0x00000000c0f4b800> (a java.lang.Object)


Found 1 deadlock.

The deadlock was found between two threads, DefaultQuartzScheduler_Worker-2 and DefaultQuartzScheduler_Worker-4, that ran a Notebook on a schedule. We assume the process was something like the following:

  1. [DefaultQuartzScheduler_Worker-2] After completing the execution of a Notebook schedule, this thread restarts interpreter and locks the Map<String, InterpreterSetting> interpreterSettings.  
  2. [DefaultQuartzScheduler_Worker-4] This thread runs a notebook schedule and locks the Note instance.  
  3. [DefaultQuartzScheduler_Worker-2] This thread stops running the Notebook, to restart the interpreter, and keeps the Notebook by changing the status of the Notebook to  Abort. This thread is put into pending mode as it tries to place a lock on the Note instance that the DefaultQuartzScheduler_Worker-4 thread already locked in step 2.
  4. [DefaultQuartzScheduler_Worker-4] When collecting the information of the interpreter connected to run Notebook, this thread enters pending mode as it tries lock the Map<String, InterpreterSetting> interpreterSettings  lock  that the DefaultQuartzScheduler_Worker-2 thread already locked in step 1.
  5. Thus a deadlock takes place between DefaultQuartzScheduler_Worker-2 and DefaultQuartzScheduler_Worker-4. 

As indicated above, there was a possibility for a deadlock to take place between Apache Zeppelin threads as restarting an interpreter and executing a Notebook was done in a reverse order. In other words, when restarting an interpreter, the interpreterSettings gets locked first and then a Note instance; when running a Notebook, a Note instance is locked first and then the interpreterSettings. So if these two tasks — restarting an interpreter and running a Notebook — take place at the same time, then there is a chance to get a deadlock.

Solution for Appache Zeppelin deadlock issue

The fundamental solution to this issue requires overhauling the Apache Zeppelin’s source code. So, I reported this issue via Appache Zeppelin JIRA. As for LINE’s Apache Zeppelin code, we decided to disable the auto-restart interpreter on cron execution option (highlighted in the red box) which makes the interpreter to be restarted after running a Notebook on a schedule. 


Troubleshooting Apache Spark SQL performance issue

Currently we are using an in-house data analysis application, OASIS. On OASIS, users can freely run Apache Spark SQL to extract and analyze data from a Hadoop cluster. This Hadoop cluster was set up with HDP-2.6.2.0(2.6.2.0-205) and Apache Spark v2.1.1. 

Issue discovery

One of the users sent me an inquiry, “Query execution does not get completed.”,  so I started to investigate by analyzing, modifying and testing the given query. I found out that CPU usage by the Spark driver process went up to 100% even with a simple query such as specifying all partition keys, consequently causing the query to be incomplete.

select
    max(log_hour)
from
    event_log
where
    log_date = '20181101'
and log_hour = '00'
and log_name = 'xxxx'

The columns, log_datelog_hour and log_name, are the partition keys of the event_log table. The number of partitions is approximately 4,368,000, obtained as below.

Number of partition = log_date x log_hour x log_name
                    = 1300 x 24 x 140
                    = 4368000

Searching for the cause of Apache Spark SQL performance issue

Spark Web UI did not show any information on the jobs running, and queries seemed to be stopped at the stage of query analysis and driver optimization prior to executing jobs. In addition, no information was given in the driver program logs (log level set as INFO). I executed the following command to see which thread in the driver program was using the CPU the most (The value 13300 is the PID of the driver program process).

$ top -H -n 1 -p 13300
top - 00:32:28 up 96 days, 10:08,  1 user,  load average: 3.33, 3.44, 2.83
Tasks:  71 total,   4 running,  67 sleeping,   0 stopped,   0 zombie
Cpu(s):  0.5%us,  0.1%sy,  0.0%ni, 99.3%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Mem:   8193180k total,  7893796k used,   299384k free,   186164k buffers
Swap:  4194300k total,        0k used,  4194300k free,  1460808k cached


   PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
 13329 oasis     20   0 7961m 4.2g  25m R 96.6 53.7  25:32.43 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13331 oasis     20   0 7961m 4.2g  25m R 94.7 53.7  25:33.44 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13330 oasis     20   0 7961m 4.2g  25m R 92.7 53.7  25:33.37 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13332 oasis     20   0 7961m 4.2g  25m R 92.7 53.7  25:34.44 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13341 oasis     20   0 7961m 4.2g  25m S  2.0 53.7   0:00.73 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13300 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.01 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13328 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:06.24 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13333 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:21.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13334 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.01 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13335 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.02 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13336 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13337 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:07.10 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13338 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:06.49 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13339 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:03.08 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13340 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13347 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13358 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13359 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.11 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13360 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.07 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13361 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.14 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13362 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.07 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13363 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.24 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13364 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13365 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13366 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13367 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13368 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13369 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13370 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13371 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13377 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.01 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13378 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*
 13379 oasis     20   0 7961m 4.2g  25m S  0.0 53.7   0:00.00 /usr/java/jdk/bin/java -Dhdp.version=2.6.2.0-205 -cp /etc/spark2/conf/:/usr/hdp/current/spark2-client/jars/*

The threads with the PID 13329 (0x3411), 13330 (0x3412), 13331 (0x3413) and 13332 (0x3414) were using the CPU the most. Thread dump showed that these were GC task threads as follows:

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007fe0a4024000 nid=0x3411 runnable
 
"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007fe0a4026000 nid=0x3412 runnable
 
"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007fe0a4028000 nid=0x3413 runnable
 
"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007fe0a4029800 nid=0x3414 runnable

I generated a heap dump to run an analysis in the Eclipse Memory Analyzer to see which thread of the driver program consumed the memory the most. It turned out that  the Thread-62 was consuming the memory the most by far. 

The thread dump of Thread-62 was as follows:

"Thread-62" #118 prio=5 os_prio=0 tid=0x00007fe024007800 nid=0x352c runnable [0x00007fdffdfe8000]
   java.lang.Thread.State: RUNNABLE
        at org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:378)
        at org.apache.thrift.protocol.TBinaryProtocol.readString(TBinaryProtocol.java:372)
        at org.apache.hadoop.hive.metastore.api.Partition$PartitionStandardScheme.read(Partition.java:1009)
        at org.apache.hadoop.hive.metastore.api.Partition$PartitionStandardScheme.read(Partition.java:929)
        at org.apache.hadoop.hive.metastore.api.Partition.read(Partition.java:821)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partitions_result$get_partitions_resultStandardScheme.read(ThriftHiveMetastore.java:64423)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partitions_result$get_partitions_resultStandardScheme.read(ThriftHiveMetastore.java:64402)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partitions_result.read(ThriftHiveMetastore.java:64336)
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:88)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partitions(ThriftHiveMetastore.java:1994)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partitions(ThriftHiveMetastore.java:1979)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitions(HiveMetaStoreClient.java:1050)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:156)
        at com.sun.proxy.$Proxy30.listPartitions(Unknown Source)
        at org.apache.hadoop.hive.ql.metadata.Hive.getAllPartitionsOf(Hive.java:2096)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.sql.hive.client.Shim_v0_13.getAllPartitions(HiveShim.scala:477)
        at org.apache.spark.sql.hive.client.HiveClientImpl$anonfun$getPartitions$1.apply(HiveClientImpl.scala:561)
        at org.apache.spark.sql.hive.client.HiveClientImpl$anonfun$getPartitions$1.apply(HiveClientImpl.scala:558)
        at org.apache.spark.sql.hive.client.HiveClientImpl$anonfun$withHiveState$1.apply(HiveClientImpl.scala:279)
        at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:226)
        at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:225)
        - locked <0x000000071665acc0> (a org.apache.spark.sql.hive.client.IsolatedClientLoader)
        at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:268)
        at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:558)
        at org.apache.spark.sql.hive.client.HiveClient$class.getPartitions(HiveClient.scala:188)
        at org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:78)
        at org.apache.spark.sql.hive.HiveExternalCatalog$anonfun$listPartitions$1.apply(HiveExternalCatalog.scala:995)
        at org.apache.spark.sql.hive.HiveExternalCatalog$anonfun$listPartitions$1.apply(HiveExternalCatalog.scala:993)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
        - locked <0x0000000716868228> (a org.apache.spark.sql.hive.HiveExternalCatalog)
        at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:993)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitions(SessionCatalog.scala:797)
        at org.apache.spark.sql.execution.OptimizeMetadataOnlyQuery$anonfun$org$apache$spark$sql$execution$OptimizeMetadataOnlyQuery$replaceTableScanWithPartitionMetadata$1.applyOrElse(OptimizeMetadataOnlyQuery.scala:105)
        at org.apache.spark.sql.execution.OptimizeMetadataOnlyQuery$anonfun$org$apache$spark$sql$execution$OptimizeMetadataOnlyQuery$replaceTableScanWithPartitionMetadata$1.applyOrElse(OptimizeMetadataOnlyQuery.scala:95)
        at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$2.apply(TreeNode.scala:268)
        at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$2.apply(TreeNode.scala:268)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$transformDown$1.apply(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$transformDown$1.apply(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$4.apply(TreeNode.scala:307)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$transformDown$1.apply(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$transformDown$1.apply(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$4.apply(TreeNode.scala:307)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257)
        at org.apache.spark.sql.execution.OptimizeMetadataOnlyQuery.org$apache$spark$sql$execution$OptimizeMetadataOnlyQuery$replaceTableScanWithPartitionMetadata(OptimizeMetadataOnlyQuery.scala:95)
        at org.apache.spark.sql.execution.OptimizeMetadataOnlyQuery$anonfun$apply$1.applyOrElse(OptimizeMetadataOnlyQuery.scala:68)
        at org.apache.spark.sql.execution.OptimizeMetadataOnlyQuery$anonfun$apply$1.applyOrElse(OptimizeMetadataOnlyQuery.scala:49)
        at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$2.apply(TreeNode.scala:268)
        at org.apache.spark.sql.catalyst.trees.TreeNode$anonfun$2.apply(TreeNode.scala:268)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257)
        at org.apache.spark.sql.execution.OptimizeMetadataOnlyQuery.apply(OptimizeMetadataOnlyQuery.scala:49)
        at org.apache.spark.sql.execution.OptimizeMetadataOnlyQuery.apply(OptimizeMetadataOnlyQuery.scala:40)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$execute$1$anonfun$apply$1.apply(RuleExecutor.scala:85)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$execute$1$anonfun$apply$1.apply(RuleExecutor.scala:82)
        at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
        at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
        at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$execute$1.apply(RuleExecutor.scala:82)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$execute$1.apply(RuleExecutor.scala:74)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
        - locked <0x000000071de9cd78> (a org.apache.spark.sql.execution.QueryExecution)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
        - locked <0x000000071de9cd78> (a org.apache.spark.sql.execution.QueryExecution)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
        - locked <0x000000071de9cd78> (a org.apache.spark.sql.execution.QueryExecution)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
        at org.apache.spark.sql.execution.QueryExecution$anonfun$simpleString$1.apply(QueryExecution.scala:218)
        at org.apache.spark.sql.execution.QueryExecution$anonfun$simpleString$1.apply(QueryExecution.scala:218)
        at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:112)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:218)
        at org.apache.spark.sql.execution.command.ExplainCommand.run(commands.scala:117)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
        - locked <0x000000071de9cef8> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
        at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:117)
        at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:117)
        at org.apache.spark.sql.execution.SparkPlan$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
        - locked <0x000000071de9cfa0> (a org.apache.spark.sql.execution.QueryExecution)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
        at org.apache.spark.sql.Dataset.(Dataset.scala:185)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:600)
        at com.linecorp.oasis.spark.SqlExecutor.run(SqlExecutor.java:54)
        at java.lang.Thread.run(Thread.java:748)

This thread is mainly responsible for query analysis and the driver program optimization. I took a look at the source code as I was wondering about a class called org.apache.spark.sql.execution.OptimizeMetadataOnlyQuery.

/**
 * This rule optimizes the execution of queries that can be answered by looking only at
 * partition-level metadata. This applies when all the columns scanned are partition columns, and
 * the query has an aggregate operator that satisfies the following conditions:
 * 1. aggregate expression is partition columns.
 *  e.g. SELECT col FROM tbl GROUP BY col.
 * 2. aggregate function on partition columns with DISTINCT.
 *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
 * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword.
 *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
 */
case class OptimizeMetadataOnlyQuery(
    catalog: SessionCatalog,
    conf: SQLConf) extends Rule[LogicalPlan] {

This optimization rule looked only at partition-level metadata rather than accessing actual data files and was applied when all the columns scanned were partition columns. Handling of the apply method of this class indicated that optimization was conducted only when the spark.sql.optimizer.metadataOnly value of Spark SQL was true. You can find a section on spark.sql.optimizer.metadataOnly in the Spark documentation. As the default value is set as true , this optimization rule is applied unless configured otherwise. 

Solution for Apache Spark SQL performance issue

As the table under scan for this query had a large number of partitions (approximately 4,368,000), the optimization would incur overhead when the driver program collected partition information from Apache Hive Metastore (a central repository of Apache Hive metadata). As a result, this would lead to frequent GC in the driver program, taking a long time to complete the query.

As we have more tables with a large number of partitions like in this case, I set the spark.sql.optimizer.metadataOnly flag to false for Spark applications running on OASIS so that this optimization is not conducted. 

Closing

Resolving issues unique to large-scale distributed processing environment (those that cannot be simulated under test environment) requires monitoring, review of source code, analysis of thread dump and heap dump and participation in OSS communities too. Of all, I’d like to emphasize participating in OSS communities. I hope more people will come along and actively participate in OSS communities, sharing their experience and insights.