class SparkListener(object):
def onApplicationEnd(self, applicationEnd):
pass
def onApplicationStart(self, applicationStart):
pass
def onBlockManagerRemoved(self, blockManagerRemoved):
pass
def onBlockManagerAdded(self, blockManagerAdded):
pass
def onBlockUpdated(self, blockUpdated):
pass
def onEnvironmentUpdate(self, environmentUpdate):
pass
def onExecutorAdded(self, executorAdded):
pass
def onExecutorMetricsUpdate(self, executorMetricsUpdate):
pass
def onExecutorRemoved(self, executorRemoved):
pass
def onJobEnd(self, jobEnd):
pass
def onJobStart(self, jobStart):
pass
def onOtherEvent(self, event):
pass
def onStageCompleted(self, stageCompleted):
pass
def onStageSubmitted(self, stageSubmitted):
pass
def onTaskEnd(self, taskEnd):
pass
def onTaskGettingResult(self, taskGettingResult):
pass
def onTaskStart(self, taskStart):
pass
def onUnpersistRDD(self, unpersistRDD):
pass
class Java:
implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
class TaskEndListener(SparkListener):
def onTaskEnd(self, taskEnd):
print("executorRunTime : " + str(taskEnd.taskMetrics().executorRunTime()) )
print("resultSize : " + str(taskEnd.taskMetrics().resultSize()) )
print("executorCpuTime : " + str(taskEnd.taskMetrics().executorCpuTime()) )
spark = init_spark("SparkPerfListeners")
spark.sparkContext._gateway.start_callback_server()
te_listener = TaskEndListener()
spark.sparkContext._jsc.sc().addSparkListener(te_listener)
numRdd = spark.sparkContext.parallelize(range(6), 2) # 2 partitions and 2 tasks per stage
for i in numRdd.collect(): print(i) # stage-1
sumRes = numRdd.reduce(lambda x,y : x+y) # stage-2
print("Sum : " + str(sumRes))
spark.sparkContext._gateway.shutdown_callback_server()
spark.sparkContext.stop()
Output of the above program
executorRunTime : 823 resultSize : 1483 executorCpuTime : 27362130 executorRunTime : 823 resultSize : 1483 executorCpuTime : 38787637 0 1 2 3 4 5 executorRunTime : 45 resultSize : 1418 executorCpuTime : 2543456 executorRunTime : 48 resultSize : 1418 executorCpuTime : 2636528 Sum : 15
Decoding the output for understanding
Stage-1 (Collect) Task-1 MetricsexecutorRunTime : 823 (in milli-seconds) resultSize : 1483 (in bytes) executorCpuTime : 27362130 (in nano-seconds)Stage-1 (Collect) Task-2 MetricsexecutorRunTime : 823 (in milli-seconds) resultSize : 1483 (in bytes) executorCpuTime : 38787637 (in nano-seconds)
Stage-2 (Reduce) Task-1 MetricsexecutorRunTime : 45 (in milli-seconds) resultSize : 1418 (in bytes) executorCpuTime : 2543456 (in nano-seconds)Stage-2 (Reduce) Task-2 MetricsexecutorRunTime : 48 (in milli-seconds) resultSize : 1418 (in bytes) executorCpuTime : 2636528 (in nano-seconds)Here I have collected metrics for each task. You can get aggregated metrics for each stage by implementing "onStageCompleted" etc...For more TaskMetrics to report on refer to:https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/scheduler/SparkListenerTaskEnd.htmlhttps://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/status/api/v1/TaskMetrics.html