Aug 5, 2020

Programatically get Spark Performance Metrics using SparkListener in PySpark

Code for accessing Spark performance measures or metrics programmatically in PySpark using SparkListener.

class SparkListener(object): 
       
    def onApplicationEnd(self, applicationEnd):
        pass
    def onApplicationStart(self, applicationStart):
        pass
    def onBlockManagerRemoved(self, blockManagerRemoved):
        pass
    def onBlockManagerAdded(self, blockManagerAdded):
        pass
    def onBlockUpdated(self, blockUpdated):
        pass
    def onEnvironmentUpdate(self, environmentUpdate):
        pass
    def onExecutorAdded(self, executorAdded):
        pass
    def onExecutorMetricsUpdate(self, executorMetricsUpdate):
        pass
    def onExecutorRemoved(self, executorRemoved):
        pass
    def onJobEnd(self, jobEnd):
        pass
    def onJobStart(self, jobStart):
        pass
    def onOtherEvent(self, event):
        pass
    def onStageCompleted(self, stageCompleted):
        pass
    def onStageSubmitted(self, stageSubmitted):
        pass
    def onTaskEnd(self, taskEnd):
        pass
    def onTaskGettingResult(self, taskGettingResult):
        pass
    def onTaskStart(self, taskStart):
        pass
    def onUnpersistRDD(self, unpersistRDD):
        pass
    class Java:
        implements = ["org.apache.spark.scheduler.SparkListenerInterface"]


class TaskEndListener(SparkListener): 
    def onTaskEnd(self, taskEnd): 
        print("executorRunTime : " + str(taskEnd.taskMetrics().executorRunTime()) ) 
        print("resultSize : " + str(taskEnd.taskMetrics().resultSize()) ) 
        print("executorCpuTime : " + str(taskEnd.taskMetrics().executorCpuTime()) )   

spark = init_spark("SparkPerfListeners")
spark.sparkContext._gateway.start_callback_server()
te_listener = TaskEndListener()
spark.sparkContext._jsc.sc().addSparkListener(te_listener)
numRdd = spark.sparkContext.parallelize(range(6), 2)  # 2 partitions and 2 tasks per stage
for i in numRdd.collect(): print(i) # stage-1
sumRes = numRdd.reduce(lambda x,y : x+y) # stage-2
print("Sum : " + str(sumRes))
spark.sparkContext._gateway.shutdown_callback_server()
spark.sparkContext.stop()

Output of the above program

executorRunTime : 823
resultSize : 1483
executorCpuTime : 27362130
executorRunTime : 823
resultSize : 1483
executorCpuTime : 38787637
0
1
2
3
4
5
executorRunTime : 45
resultSize : 1418
executorCpuTime : 2543456
executorRunTime : 48
resultSize : 1418
executorCpuTime : 2636528
Sum : 15

Decoding the output for understanding

Stage-1 (Collect) Task-1 Metrics
executorRunTime : 823          (in milli-seconds)
resultSize : 1483                     (in bytes)
executorCpuTime : 27362130 (in nano-seconds)

Stage-1 (Collect) Task-2 Metrics
executorRunTime : 823            (in milli-seconds)​
resultSize : 1483                      (in bytes)​
executorCpuTime : 38787637  (in nano-seconds)​

Stage-2 (Reduce) Task-1 Metrics
executorRunTime : 45                (in milli-seconds)​​
resultSize : 1418                         (in bytes)​​
executorCpuTime : 2543456       (in nano-seconds)​​

Stage-2 (Reduce) Task-2 Metrics
executorRunTime : 48                (in milli-seconds)​​
resultSize : 1418                         (in bytes)​​
executorCpuTime : 2636528      (in nano-seconds)​​

Here I have collected metrics for each task. You can get aggregated metrics for each stage by implementing "onStageCompleted" etc...

For more TaskMetrics to report on refer to:
https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/scheduler/SparkListenerTaskEnd.html
https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/status/api/v1/TaskMetrics.html