Aug 5, 2020

Programatically get Spark Performance Metrics using SparkListener in PySpark

Code for accessing Spark performance measures or metrics programmatically in PySpark using SparkListener.

class SparkListener(object): 
       
    def onApplicationEnd(self, applicationEnd):
        pass
    def onApplicationStart(self, applicationStart):
        pass
    def onBlockManagerRemoved(self, blockManagerRemoved):
        pass
    def onBlockManagerAdded(self, blockManagerAdded):
        pass
    def onBlockUpdated(self, blockUpdated):
        pass
    def onEnvironmentUpdate(self, environmentUpdate):
        pass
    def onExecutorAdded(self, executorAdded):
        pass
    def onExecutorMetricsUpdate(self, executorMetricsUpdate):
        pass
    def onExecutorRemoved(self, executorRemoved):
        pass
    def onJobEnd(self, jobEnd):
        pass
    def onJobStart(self, jobStart):
        pass
    def onOtherEvent(self, event):
        pass
    def onStageCompleted(self, stageCompleted):
        pass
    def onStageSubmitted(self, stageSubmitted):
        pass
    def onTaskEnd(self, taskEnd):
        pass
    def onTaskGettingResult(self, taskGettingResult):
        pass
    def onTaskStart(self, taskStart):
        pass
    def onUnpersistRDD(self, unpersistRDD):
        pass
    class Java:
        implements = ["org.apache.spark.scheduler.SparkListenerInterface"]


class TaskEndListener(SparkListener): 
    def onTaskEnd(self, taskEnd): 
        print("executorRunTime : " + str(taskEnd.taskMetrics().executorRunTime()) ) 
        print("resultSize : " + str(taskEnd.taskMetrics().resultSize()) ) 
        print("executorCpuTime : " + str(taskEnd.taskMetrics().executorCpuTime()) )   

spark = init_spark("SparkPerfListeners")
spark.sparkContext._gateway.start_callback_server()
te_listener = TaskEndListener()
spark.sparkContext._jsc.sc().addSparkListener(te_listener)
numRdd = spark.sparkContext.parallelize(range(6), 2)  # 2 partitions and 2 tasks per stage
for i in numRdd.collect(): print(i) # stage-1
sumRes = numRdd.reduce(lambda x,y : x+y) # stage-2
print("Sum : " + str(sumRes))
spark.sparkContext._gateway.shutdown_callback_server()
spark.sparkContext.stop()

Output of the above program

executorRunTime : 823
resultSize : 1483
executorCpuTime : 27362130
executorRunTime : 823
resultSize : 1483
executorCpuTime : 38787637
0
1
2
3
4
5
executorRunTime : 45
resultSize : 1418
executorCpuTime : 2543456
executorRunTime : 48
resultSize : 1418
executorCpuTime : 2636528
Sum : 15

Decoding the output for understanding

Stage-1 (Collect) Task-1 Metrics
executorRunTime : 823          (in milli-seconds)
resultSize : 1483                     (in bytes)
executorCpuTime : 27362130 (in nano-seconds)

Stage-1 (Collect) Task-2 Metrics
executorRunTime : 823            (in milli-seconds)​
resultSize : 1483                      (in bytes)​
executorCpuTime : 38787637  (in nano-seconds)​

Stage-2 (Reduce) Task-1 Metrics
executorRunTime : 45                (in milli-seconds)​​
resultSize : 1418                         (in bytes)​​
executorCpuTime : 2543456       (in nano-seconds)​​

Stage-2 (Reduce) Task-2 Metrics
executorRunTime : 48                (in milli-seconds)​​
resultSize : 1418                         (in bytes)​​
executorCpuTime : 2636528      (in nano-seconds)​​

Here I have collected metrics for each task. You can get aggregated metrics for each stage by implementing "onStageCompleted" etc...

For more TaskMetrics to report on refer to:
https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/scheduler/SparkListenerTaskEnd.html
https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/status/api/v1/TaskMetrics.html 

May 6, 2020

Building a Deep Learning Model That Suggests Closest Emotions to Given Emotion


In this post, I will demonstrate how to build a model that, given an emotion and its effect/type (Positive Or Negative) get the three closest synonymous emotions to a given emotion.

I use an online dictionary sometimes, then I got a thought that why don’t I pick some synonyms for “hate” and “love” and build a Deep Learning(DL) model using Keras and Tensorflow backend, that uses Embeddings Vectors (To know more about Entity embeddings and advantages of them over OneHot encoded vectors click here). So below is the list of synonyms I picked from Dictionary.com

emotionsList= ['like','antipathy''hostility','love','warmth','loathe','abhor','intimacy','dislike','venom','affection'       'tenderness','animosity','attachment','infatuation','fondness','hate'

But to build any good model we need representative data, that’s when I thought I may need one more feature that helps my DL model to suggest better closest emotions. So I came up with “emoeffect” feature as below. For example, If someone says I like you, it gives a positive impression/feeling. But if someone says I hate you, it gives a negative feeling to us.

emoaffect= ['positive','negative''negative','positive','positive','negative','negative','positive','negative','negative','positive'              'positive','negative','positive','positive','positive','negative'

Here the objective is not to build the model with the best accuracy, but to generate best embeddings. So that we can use these embedding vectors to find closest matches. So we treat this problem as a  supervised task. The supervised task is just the method through which we train our network.

The complete code with explanation is available on Github at:

Feb 12, 2020

Transforming Categorical Variable into Embedding Vectors using Deep Learning

In this post will go through on how to transform categorical variable(s) into low dimensional embedding vectors.

The complete code with sample output is available on Github at:
https://github.com/srichallla/DeepLearning/blob/master/CategoricalToEmbeddings.ipynb

First, we will transform the categorical variable into one-hot encodings.

For simplicity, we will have a collection of emotions in a list named "emotionsList". Below code transforms the contents of the list into one-hot encodings.

emotionsList= ['like','antipathy', 'hostility','love','warmth','loathe','abhor',
     'intimacy','dislike','venom','affection','tenderness','animosity','attachment',
     'infatuation','fondness','hate']

#One hot encode the above list
ohe = OneHotEncoder()
X = np.array(emotionsList, dtype=object).reshape(-1, 1)
transformed_X =ohe.fit_transform(X).toarray()
transformed_X #onehot sparse array of shape (17,17) is the result

This transformation results in a high dimensional sparse matrix of shape (17,17). The size of one-hot encoding is equivalent to the number of unique elements in "emotionsList". So the drawbacks with this one-hot transformations are: High dimensionality, Sparse with mostly zeros and, semantics or meanings are not captured.

Next, we will try to transform the same "emotionsList" into embedding vectors using a deep learning embedding layer using Keras with a Tensorflow backend, as below.

#Define Embedding Size
embedding_size = int(min(np.ceil((len(emotionsList))/2), 50 ))

#Converting the categorical input to an entity embedding using Neural network embedding layer 
to reduce the dimensions from 17 to 9
input_model = Input(shape=(1,))
output_model = Embedding(len(emotionsList), embedding_size, name='emotions_embedding')(input_model)
output_model = Reshape(target_shape=(embedding_size,))(output_model)

model = Model(inputs = input_model, outputs = output_model)
model.compile(optimizer = 'Adam', loss = 'binary_crossentropy', metrics = ['accuracy'])

# Here Weights are the embeddings, so retrieving the weights generated
emotions_layer = model.get_layer('emotions_embedding')
emotions_weights = emotions_layer.get_weights()[0]
print(emotions_weights) # embedding vector of (17,9)

So above code results in transforming a single categorical variable taken in an "emotionsList" into embedding vectors/weight vectors of shape (17,9). So clearly you can see that each emotion from the "emotionsList" is transformed into an embedding weight/vector of size 9 as opposed to 17 with one-hot.

Advantages with converting categorical variables to embedding vectors are: Low dimensionality, Densely packed and, captures the semantics.

In my next blog, I will come up with a post that demonstrates how to train a neural network on emotions like hate, love etc... so that once it's trained, it will suggest the semantically closest emotion(s) to the given emotion.