Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IndexedRDD.join slower than vanilla PairRDD.join #17

Open
addisonj opened this issue Feb 4, 2016 · 0 comments
Open

IndexedRDD.join slower than vanilla PairRDD.join #17

addisonj opened this issue Feb 4, 2016 · 0 comments

Comments

@addisonj
Copy link

addisonj commented Feb 4, 2016

I am testing out IndexedRDD and noticing some performance problems that I wouldn't expect based on the README and what I saw from your SparkSummit presentation. The use case I have for IndexedRDD is to use it for doing near realtime denormalization for a data warehouse using spark streaming. My hope was that I could join a small subset of data (the most recent group of changed records) against an IndexedRDD in hopes that I could avoid having to do a full scan of the RDD and process the records much faster.

I have tried testing this out using both real and some generated data and found that when doing an inner join with the small dataset (about 100 records) on the left hand side of the join and a large dataset (100,000,000 records) on the right hand side of the join, the vanilla spark RDD performs as fast or faster than the IndexedRDD, even when caching both datasets and ensuring they share a partitioner beforehand to avoid the cost of the repartition.

Of the few times I ran this test using generated data (code follows) the IndexedRDD implementation was 15-20% slower. Digging into the code, it looks like it won't do any actual pruning of partitions to scan and instead will zip all from both sides, even if some partitions on one side of the join are empty. I know that by using the PartitionPruningRDD you should be able to inform the scheduler that only a subset of partitions need to be processed, but I am curious if I am just misunderstanding some details and applying the wrong tool for the job.

As mentioned, here is the code that I used to generate the results:

import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD
import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD._
import org.apache.spark.HashPartitioner
import java.util.Random

class RecordBuilder extends Serializable {
  val r = new Random
  def buildStringLen(len: Int) : String = {
    (0 to len).map((i) => r.nextInt().toChar).mkString
  }
  def buildRecord(id: Long) : (Long, Map[String, String]) = {
    (id, Map(
      "id" -> id.toString,
      "rand1" -> buildStringLen(20),
      "rand2" -> buildStringLen(30),
      "rand3" -> buildStringLen(40)
      )
     )
  }
}
val rb = new RecordBuilder

val myHashPart = new HashPartitioner(200)
val r = new Random


val rhs = sc.parallelize(0L to (100000000 / 1000))
  .flatMap((i) => ((i * 1000) until ((i * 1000) + 1000)))
  .map((i) => rb.buildRecord(i))
  .partitionBy(myHashPart)
  .cache()

val lhs = sc.parallelize((0 to 100)
                         .map((i) => rb.buildRecord(r.nextLong() % 100000000)))
                         .partitionBy(myHashPart)
                         .cache()
rhs.count()
lhs.count()
lhs.join(rhs).collect()

val lhsi = IndexedRDD(lhs).cache()
val rhsi = IndexedRDD(rhs).cache()
lhsi.count()
rhsi.count()

lhsi.join(rhsi)((k, v1, v2) => v1).collect()

I may try adding in the PartitionPruningRDD and see what perf this gives me but would love to get some feedback on my experiment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant