Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First sketch at a sketch join #758

Merged
merged 12 commits into from
Jan 30, 2014
Merged

Conversation

avibryant
Copy link
Contributor

Note: no test yet.

This join is appropriate where you have a larged, skewed left hand side that you want to join to a small right hand side: for example, joining page views against pages where the most popular pages each make up a significant fraction the total. It builds a CountMinSketch of the keys in the left hand side, then uses that to decide how many replicas of each item in the right side to produce, then does a normal join.

The semantics are similar to hash joins: you have to go directly from (TypedPipe[K,V],TypedPipe[K,V2]) to some TypedPipe[K,R], with no guarantee of ever seeing the entirety of a group at once (and in fact, we reuse the hashInner2 and hashLeft2 joiner functions).

Rough usage:

  left
   .groupBy{_.foo}
   .sketch()
   .join(right.groupBy{_.bar})
   .withReducers(20)
   .values

eps: Double,
seed: Int,
reducers: Option[Int])
(implicit serialization: K => Array[Byte],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a type that does not extend function here? When you have an implicit function in scope, it can be used for implicit conversions, which will give K all the methods of Array[Byte](there are a lot: http://www.scala-lang.org/api/current/index.html#scala.Array) and might be confusing.

trait Encoder[K, T] {
  def apply(k: K): T
}

or we can punt, add this to bijection, and make bijection a dep of scalding.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just copying what SketchMap did. It seems like importing bijection provides something this can use for String, at least, though TBH I'm not sure exactly what's going on there. I'm +1 on an explicit Encoder typeclass though, and I think having Scalding depend on bijection is inevitable and fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... BTW since really this is about hashing, it could also be an argument for a Hashable typeclass in scalding, which I know has come up before.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is probably cleaner, or Hashable in Algebird.

There was a proposal, but I could never get something looked really good. Something like:

trait Hashable32[K] {
  def hash(k: K): Int
}

trait Hashable64[K] extends Hashable32[K] {
  def hash2(k: K): Long
  def hash(k: K): Int = {
    val l = hash2(k)
    ((l >>> 32) ^ l).toInt
   } 
}

trait Hashable128[K] extends Hashable64[K] {
  def hash4(k: K): (Long, Long)
  def hash2(k: K) = {
    val h = hash4(k)
    h._1 ^ h._2
  }
}

might do. I went overboard last time (shocker).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks pretty good, though I might use hash64 and hash128 for the method names, for consistency with the trait names?

So... does this go in algebird or in bijection?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess algebird. algebird-hash? and then depend on that in algebird-core?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds right. Do we want to block this PR on that? My vote is that we get this into 0.9.0 as is (once I've addressed other feedback), then try to do algebird-hash "right" for the next round of releases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. Let's get this in with (K => Array[Byte]) and fix all later.

@johnynek
Copy link
Collaborator

As for tests, it would be great to have some that did something like: given a distribution: K => Int for how many times K is to appear on the left, run the tests and be sure that you get the right result (done for instance in a non-sketch way).

As for the distributions: testing the case where you have a lot of keys mission on the left that appear on the right, would be good. Also the case where every key appears exactly once. Lastly, cases where we have a key that is half the total count.

@johnynek
Copy link
Collaborator

By the way: Dmitriy explained why bucketing on rand causes problems without a seed: if reducer R1 thinks it has all the input it needs, it runs and finishes. Now, Mapper M1 restarts because R2 failed, but this means we bucket differently and the proof that this algorithm works (and indeed, it does not work). (and @dvryaboy wrote something similar last time).

//the most of any one reducer we want to try to take up with a single key
val maxReducerFraction = 0.1

private def flatMapWithReplicas[V](pipe: TypedPipe[(K,V)])(fn: Int => Iterable[Int]) =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this V parameter is confusing since there is also a V on the class. Are they distinct? I think so. Can you make this W or something not used in the class?

@avibryant
Copy link
Contributor Author

Think this is probably in decent shape, but still needs a good test.

@johnynek
Copy link
Collaborator

Yep. As soon as we have tests, it's good. This is really great. Hopefully this + composable joins + optimizer in matrix 2 + bug fix for fields skewJoin will mean 0.9.0 is a really good release.

import Dsl._

val rng = new java.util.Random
def generateInput(size: Int, max: Int, dist: (Int) => Int): List[(Int,Int)] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dist is unused for now, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is super essential that we use dist. We could just run tests whece size ~ max and where max << size (so we can expect keys to be repeated many times).

@johnynek
Copy link
Collaborator

I'm confortable merging if you remove the unused diet parameter. What do you think? This test (being random) seems like if it is wrong we will eventually stumble over it.

@avibryant
Copy link
Contributor Author

I was just pushing WIP here for the tests. I actually would like to add some with different dists.

Also: this test was cargo culted somewhat from the skew join tests, and what I discovered is that because ++ does not actually mutate the Buffer objects, it's always just comparing List() == List(). When I switch it to ++=, I get a failure with run but a pass with runHadoop. So I want to at least fix that before merging. Separately, we should fix the skew join test.

If you have any ideas about why local mode would fail lemme know. Is the ValuePipe stuff known not to work there?

@johnynek
Copy link
Collaborator

Okay.

Good catch. Actually, I was concerned how your code works with both .run and .runHadoop. They are sharing a mutable buffer, right?

@johnynek
Copy link
Collaborator

ValuePipe is believed to work (it is just a type wrapper on something that was the result of a total aggregation to a single, or possibly empty) value.

@avibryant
Copy link
Contributor Author

Yeah, it's not great (again, cargo culted from the skew join tests) - the results just get combined into the same buffer, so if you have a problem in one but not the other, it shows up, but it can be hard to tell what's going on.

@avibryant
Copy link
Contributor Author

@johnynek see avibryant@099e69a for a more minimal example of the problem I'm seeing in local mode. Looks like something pretty deep in HashJoin...

@johnynek
Copy link
Collaborator

Looks like a cascading bug. I guess we need to report it to @cwensel

On Wednesday, January 29, 2014, avibryant [email protected] wrote:

@johnynek https://github.com/johnynek see avibryant@099e69ahttps://github.com/avibryant/scalding/commit/099e69a480a368de554da2ca6c0359f68359525afor a more minimal example of the problem I'm seeing in local mode. Looks
like something pretty deep in HashJoin...

Reply to this email directly or view it on GitHubhttps://github.com//pull/758#issuecomment-33660888
.

Oscar Boykin :: @posco :: http://twitter.com/posco

@avibryant
Copy link
Contributor Author

See latest commit - works ok if you just use cross.

@avibryant
Copy link
Contributor Author

@johnynek not totally clear it's a cascading bug - it could be in the typed hashJoin stuff, which cross() bypasses.

@avibryant
Copy link
Contributor Author

I think we should be good to merge this PR now though, at least.

//if the frequency is 0, maxReplicas.ceil will be 0 so we will filter out this key entirely
//if it's < maxPerReducer, the ceil will round maxReplicas up to 1 to ensure we still see it
val replicas = fn(maxReplicas.ceil.toInt.min(numReducers))
replicas.toList.map{i => (i,v._1) -> v._2}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why toList here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, this was needed at one point but no longer.

@johnynek
Copy link
Collaborator

@avibryant about leftCross: if it is not a cascading bug, it seems strange that the job works in hadoop mode but not local mode. leftCross is just calling hashJoin with a particular joiner. I'll look at it after the merge.

johnynek added a commit that referenced this pull request Jan 30, 2014
@johnynek johnynek merged commit 7b321e2 into twitter:develop Jan 30, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants