Full Blog TOC

Full Blog Table Of Content with Keywords Available HERE

Monday, May 16, 2022

Spark Parallelism Use Case Analysis

 



In this post we will review a use case of performance improvement in a spark cluster.


The spark cluster was used to get JSON objects from many files located in S3.

However, due to a bug in the application creating the files, we've had to remove duplicate JSON objects.

The original implementation used the following code:


# RDD of all JSON objects
rdd = load_all_json_files_from_s3()
# convert (JSON) to (string_of_JSON,JSON)
rdd = rdd.map(lambda x: (str(x), x))
# remove duplicates by the string_of_JSON
rdd = rdd.reduceByKey(lambda x, _: x, 10)
# get back just the objects
rdd = rdd.map(lambda x: x[1])


The problem in this case is that the reduceByKey method needs to compare any object with all other objects, hence the cost is very high. In addition, this implementation totally ignored the fact that the JSON object includes timestamp, which can assist in better partitioning.

To fix this, we've updated the implementation to the following:


partitions_count = 100

def get_partition_by_time(transaction_time):
minutes_since_midnight = transaction_time.hour * 60 + transaction_time.minute
partition = minutes_since_midnight % partitions_count
return partition


# RDD of all JSON objects
rdd = load_all_json_files_from_s3()
# convert RDD: transaction_json_object -> RDD: (time,transaction_json_object)
rdd = rdd.map(lambda t: (t['timestamp'], t))


# re-partition according to the time of the transaction
rdd = rdd.partitionBy(partitions_count, get_partition_by_time)

# convert RDD: (time,transaction_json_object) -> RDD: ((time,transaction_json_string), transaction_json_object)
rdd = rdd.map(lambda t: ((t[0], str(t[1])), t[1]))

# reduce by the key tuple to prevent duplicates
rdd = rdd.reduceByKey(lambda t1, t2: t1,
partitions_count,
lambda key: get_partition_by_time(key[0]))

# convert RDD: ((time,transaction_json_string), transaction_json_object) -> RDD: transaction_json_object
rdd = rdd.map(lambda t: t[1])



Notice that we use the timestamp, which is part of the key, not only to ease the amount of objects to compare with, but also to determine the related partition where this object will be handled, hence, all the de-duplication work will be done in the same partition.


After this change, the run time of the application reduced from 4 hours to 20 minutes. This is a great example of how being aware to the actual work done by each partition is critical to get a good performance from the spark cluster.







No comments:

Post a Comment