-
Notifications
You must be signed in to change notification settings - Fork 707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(🚧🚜 WIP 👷🚧 ) Split fabric-specific code into individual back-ends #1617
base: cascading3
Are you sure you want to change the base?
Conversation
- create 4 empty fabric jars - break & split Mode across the "storage" and "execution tech" axis - intentionally retire some of the Mode-related type names to ensure things DO move - fix what breaks at compile time. TESTS NOT YET RUN AT THIS POINT
… toxic to hadoop-commons 2.6.0)
…terns across fabrics)
…ster-test is requested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work.
One question: is there any way we can reduce the size of the PR.
For instance, can we keep flink out and just do the existing fabrics? That will make is easier to review. Once we get it in, we can add more.
I like this approach and think it is the most sensible way to get the most out of cascading 3 without breaking users.
import org.slf4j.LoggerFactory | ||
|
||
import scala.annotation.meta.param | ||
import scala.collection.{ Map, mutable } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we avoid scala.collection.Map
and explicitly use Map
for immutable map (standard) and mutable.Map
when we need a mutable one?
/** | ||
* This is a Args and a Mode together. It is used purely as | ||
* a work-around for the fact that Job only accepts an Args object, | ||
* but needs a Mode inside. | ||
*/ | ||
private class ArgsWithMode(argsMap: Map[String, List[String]], val mode: Mode) extends Args(argsMap) { | ||
private class ArgsWithMode(argsMap: scala.Predef.Map[String, List[String]], val mode: Mode) extends Args(argsMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather not hide Map
with an import or use scala.collection.immutable.Map
here.
Hdfs(strictSources, config) | ||
} else | ||
throw ArgsException("[ERROR] Mode must be one of --local, --hadoop1, --hadoop2-mr1, --hadoop2-tez or --hdfs, you provided none") | ||
lazy val autoMode = if (args.boolean("autoCluster")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we prefix new scalding options with scalding.
?
"hdfs" -> "com.twitter.scalding.LegacyHadoopMode", | ||
"flink" -> "com.twitter.scalding.FlinkMode") | ||
|
||
val KnownTestModesMap = Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make these maps instead?
"flink-test" -> "com.twitter.scalding.FlinkTestMode") | ||
// TODO: define hadoop2-mr1 (easy), tez and flink (less easy) classes. | ||
|
||
private def getModeConstructor[M <: Mode](clazzName: String, types: Class[_]*) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should just have a contract that a Mode
needs a constructor that takes exactly one Args
and exactly one Configuration
, or even, possibly Config
which is immutable. Wouldn't that work? I don't want to add too much flexibility if we don't need it.
} | ||
|
||
trait HadoopMode extends Mode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have user code that assumes this. Can we find a way not to break those folks?
* The "HadoopMode" is actually an alias for "a mode running on a fabric that ultimately runs using an execution | ||
* engine compatible with some of the Hadoop technology stack (may or may not include Hadoop 1.x, YARN, etc.) | ||
*/ | ||
trait HadoopExecutionModeBase[ConfigType <: Configuration] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we actually want an abstract type here:
trait HadoopExecutionModeBase {
type ConfigType <: Configuration
}
since then we can refer to it: mode.ConfigType
. We might do this to put more of the cascading types and not use _
so much.
* engine compatible with some of the Hadoop technology stack (may or may not include Hadoop 1.x, YARN, etc.) | ||
*/ | ||
trait HadoopExecutionModeBase[ConfigType <: Configuration] | ||
extends ExecutionMode { | ||
def jobConf: Configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this not return ConfigType
?
|
||
val memo = scala.collection.mutable.Map[String, Constructor[CounterImpl]]() | ||
val ctor = memo.synchronized { | ||
memo.getOrElse(klassName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use getOrElseUpdate
rather than the nested put.
} | ||
|
||
private[scalding] def upcast[T <: FlowProcess[_]](fp: FlowProcess[_])(implicit ev: TypeTag[T]): T = fp match { | ||
case hfp: T @unchecked if (ev == typeTag[T]) => hfp // see below |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this typeTag[T]
will always match ev
no? I think so. Why not just explicitly cast in this method:
def downCast[T <: U](u: U): T = u.asInstanceOf[T]
Also, this is a downcast, no?
Cyrille Chépélov (TP12) seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
(as being discussed on scalding-dev)
In the things I've left out for now (in addition to the list in the e-mail): scalding-hadoop-test (platform-specific tests on a real minicluster).
Perhaps it'd make sense to have run / runHadoop (→runFabricLocal ?) / runOnMiniCluster in JobTest, and the relevant Mode implementation assortment?