(Note - this post is extracted from an email on the topic and has been posted on labs.imaginea.com retroactively.)

TLDR: There are things you can do to improve your RDD join performance on Spark in some cases.


For one of the clients, we would get data as rows of tuple [ID, v1, v2, ... vn] from S3. (This will be more than a TB each day). After everyday, we needed to find the aggregate of values by their IDs till that day and store it to HDFS. That is -- we would need to join the existing aggregates as on day T-1 (stored on HDFS) with the incremental data for day T (from S3) and store the new aggregates for day T (again onto HDFS) to be used as aggregate next day. And so on.

One of the problems faced was -- as we performed join between the existing data and the incremental data on Spark, there would be huge amount of data shuffle across the cluster. We wanted to see if we could reduce this shuffle and thus reduce the job run time.


One observation that could be made immediately was -- given that for aggregation each ID in one dataset needed to be matched only with the same ID in the other data set, we should ensure that our data sets are partitioned in such a way that the rows with same ID go to the same partition and thus on the same Spark worker. In that case, all rows could be joined locally and the costly shuffle over network could be avoided.

An obvious implementation of this approach is to use an HashPartitioner on the RDDs which partition data based on the key hash.

But there were couple of problems --

  1. First, even though the HashPartitioner will divide data based on keys, it will not enforce the node affinity. HashPatitioner will only say -- "put these rows with keys K1 in partition "part-0" but if for one data set "part-0" goes to Node1 and for another to Node2, we will again have to shuffle to join key K1.

  2. Once the above was solved i.e. data for each ID is always on the same node, we still observed that the amount of data shuffle still did not reduce. Finally, we dug into the Spark code a figured out a way to achieve this.

The remainder of this post will deal with explaining the actual implementation
for 1) and 2) above.

Solution Details

  1. For node affinity, there are couple of things to keep in mind:

    • When aggregates are stored on HDFS after each day, the data partitions should be deterministically assigned based on key hashes.

    • When any worker tries to save a partition onto HDFS, it asks Namenode about the location of datanode to which it should write. By default, and as expected, Namenode will always see if the requesting client is itself a datanode and if it is then namenode will ask client to write data locally.

    • But how are partitions of RDD assigned to workers in Spark? This is important because, given the point above if we could ensure this then we will be assured that rows with specific IDs always go to same spark worker and also the same data node. In Spark, this is controlled by the following method in RDD:

      def getPreferredLocations(split: Partition): Seq[String]

      For any given split, this method tells the TaskScheduler the preferred worker nodes (Seq[String] will be a seq of hostnames) to which this partition needs to be assigned. (TaskScheduler actually waits for a configurable time for a slot on preferred node to be freed after which it can assign the task to some other worker node). So if we could override this method in our new wrapper RDD and use that instead the we can ensure that data always goes to same node.

This can be implemented as following (delegate everything to underlying RDD but re-implement getPrefferedLocations):

class NodeAffinityRDD[U: ClassTag](prev: RDD[U]) extends RDD[U](prev){
  val nodeIPs = Array("","","")

  override def getPreferredLocations(split: Partition): Seq[String] =
    Seq(nodeIPs(split.index % nodeIPs.length))

This way we ensure that for any given partition index we always choose the same node. So whenever we need that functionality, we wrap our underlying RDD with the NodeAffinityRDD above.

  1. Now a slightly more fun part. First let's replicate the problem on smaller datasets. For this, I created two csv data sets [ID, v1, v2...vn] of size ~ 51mb each and using the method described in 1) put them on HDFS. (As a way to sanity check our logic in (1) above, I set the replication level to 1 and then verified if the same partitions were stored on same datanodes. It was as expected.)

Now let's run a job which is very simple -- we read two datasets (dsRdd and devRdd), do a couple of transformations and finally do dsRdd.join(devRdd).

val r1 = sc.textFile("hdfs://")
val r2 = sc.textFile("hdfs://")

val dsRdd = r1.map(line => <some transformation>).map(tokens => <some more>)
val devRDD = r2.map(line => <some transformation>).map(tokens => <some more>)

// finally join and materialize
dsRdd.join(devRDD, dummy).count

Now let's see what did spark do with this job. First thing we look at is the stage statistics from Spark UI --

Completed stages (3)

Couple of points:

  1. There are three stages. Why?
  2. Each of the first two stages produce/write shuffle data (6.4 + 6.4 MB) which is then consumed/read by final stage ( = 12.8 MB)

To understand why there are 3 stages above we will need to look at the DAG for each stage. (Remember: Spark's DAG scheduler mark the stage boundaries at ShuffleDependencies.)

DAG for stage 0 is --

Stage 0

So basically we start with reading the "random1" file, then apply the two map functions on that RDD. The "textfile" and two "map" marked boxes in the DAG above are exactly conveying that. (You can tally these function in the code snippet pasted above). Exact same will be the case for file "random2" -- it will have same DAG in stage 1.

Now a bit of sanity check: we started with 51MB of files and did some clean up to produce intermediate results for next task (join). We can see that from our clean up we trimmed the data to 6.4 MB which was written for shuffle.

Finally let's look at the final stage for join --

Stage 2

In it there's only one block corresponding to our "join" method call on dsRdd. So this corresponds to the dsRdd.join(devRDD, dummy) bit.

Now this all sort of makes sense, so what is the problem? The problem, to recap, is there are shuffle boundaries involved in the job when ideally there's no reason for them to be. Since all the data for any ID is already present on the same node why should shuffle kick in? (BTW, shuffle can involve two costs -- one in sending data over network to other workers that may need this data and the other in writing the intermediate blocks to disk, their serialization and de-serialization.)

So we look at CoGrouped RDD to see what is causing the shuffle boundary between it and previous stages. Here's the relvant code from CoGroupedRDD --

override def getDependencies: Seq[Dependency[_]] = {
  rdds.map { rdd: RDD[_] =>
    if (rdd.partitioner == Some(part)) {
      logDebug("Adding one-to-one dependency with " + rdd)
      new OneToOneDependency(rdd)
    } else {
      logDebug("Adding shuffle dependency with " + rdd)
      new ShuffleDependency[K, Any, CoGroupCombiner](
        rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)

So basically if your rdds being joined do not have exact same partitioner (note object equality) as the one for this RDD then they are marked as ShuffleDependency (in else block). Clearly our dsRdd and devRdd went through this path -- and both were marked as separate stages coming into "join". Hence three stages. So obviously we would want to set the same partitioner on all three of these rdds so that there are only one to one dependencies and everyone is a one big happy family.

Initial impulse maybe to repartition both rdds using some partitioner. But we don't really need to repartition our data (we already stored the exact partitionings we want it to be) and also repartitioning introduces its own stage boundary (so we fall from sky and get stuck on coconut tree sort of thing).

What else can be done? A bit of ugly hacking, surely. So we wrap our two rdd again delegating everything to the underlying rdds but plugging in our dummy partitioner. This will make CoGroupedRdd to report that there are no stage boundaries and the DagScheduler will schedule everything locally on each worker. Here's the (very small) code for WrapRDD:

class WrapRDD[T: ClassTag](rdd: RDD[T], part: Partitioner) extends RDD[T](rdd.sparkContext, rdd.dependencies) {

  override def compute(split: Partition,
                       context: TaskContext): Iterator[T] = rdd.compute(split, context)

  // ********* main thing/hack ******* ///
  override val partitioner = Some(part)

  override protected def getPartitions: Array[Partition] = rdd.partitions

Sounds like a plan, let's see how it works.

First the stages --


Perfect! Now there's only one stage where whole of the data is read (51 X 2 = 102 MB) and nothing is written or read from shuffle.

Let's look at DAG now --


Again, there's only one stage. But notice how the two rdds that were computed in separate stages (0 and 1) earlier are now part of the same stage because they were wrapped in WrapRDD (which convinced CoGroupedRDD to mark them as OneToOneDependency).

Also, notice the execution time difference: in first case it was 4s and now it is 3 secs. A cool 25% improvement. Just because you don't have to go to disk for writing the shuffle and then reading back again immediately. Instead we managed to work on intermediate data right away.

Code for this can be found on Gitlab here. Data for replicating these is already present on Kodebeagle HDFS cluster.