Skip to content

Commit

Permalink
Added tests. Added type parameters. Fixed JDBCTap so it works inside …
Browse files Browse the repository at this point in the history
…of local joins.
  • Loading branch information
sritchie committed Apr 20, 2012
1 parent 8ff841b commit d33ee81
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 28 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
:dev-dependencies [[midje "1.3.1" :exclusions [org.clojure/clojure]]
[org.clojure/clojure "1.2.1"]
[org.apache.hadoop/hadoop-core "0.20.2-dev"]
[cascading/cascading-hadoop "2.0.0-wip-271"
[cascading/cascading-hadoop "2.0.0-wip-278"
:exclusions [org.codehaus.janino/janino
org.apache.hadoop/hadoop-core]]])
33 changes: 29 additions & 4 deletions src/jvm/com/twitter/maple/jdbc/JDBCTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.twitter.maple.jdbc;

import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.flow.hadoop.HadoopUtil;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.TapException;
Expand All @@ -35,6 +36,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Class JDBCTap is a {@link Tap} sub-class that provides read and write access to a RDBMS via JDBC drivers.
Expand Down Expand Up @@ -268,17 +270,40 @@ public boolean isWriteDirect() {
return true;
}

private JobConf getSourceConf( HadoopFlowProcess flowProcess, JobConf conf, String property )
throws IOException {
Map<String, String> priorConf = HadoopUtil.deserializeMapBase64( property, true );
return flowProcess.mergeMapIntoConfig( conf, priorConf );
}

@Override
public TupleEntryIterator openForRead( HadoopFlowProcess flowProcess, RecordReader input ) throws IOException {
if (input != null)
public TupleEntryIterator openForRead( HadoopFlowProcess flowProcess, RecordReader input )
throws IOException {

// this is only called cluster task side when Hadoop is providing a RecordReader instance it owns
// during processing of an InputSplit
if( input != null )
return new TupleEntrySchemeIterator( flowProcess, getScheme(), new RecordReaderIterator( input ) );

JobConf conf = new JobConf( flowProcess.getJobConf() );
Map<Object, Object> properties = HadoopUtil.createProperties(flowProcess.getJobConf());

properties.remove( "mapred.input.dir" );

JobConf conf = HadoopUtil.createJobConf( properties, null );

// allows client side config to be used cluster side
String property = flowProcess.getJobConf()
.getRaw( "cascading.step.accumulated.source.conf." + getIdentifier() );

if( property != null ) {
conf = getSourceConf( flowProcess, conf, property );
flowProcess = new HadoopFlowProcess( flowProcess, conf );
}

LOG.info("Opening JDBCTap for read.");

return new TupleEntrySchemeIterator( flowProcess, getScheme(),
new MultiRecordReaderIterator(flowProcess, this, conf ) );
new MultiRecordReaderIterator(flowProcess, this ) );
}

@Override
Expand Down
31 changes: 18 additions & 13 deletions src/jvm/com/twitter/maple/tap/MemorySourceTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntryIterator;
import cascading.tuple.TupleEntrySchemeIterator;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
Expand All @@ -28,11 +27,13 @@
import java.util.Map;
import java.util.UUID;

public class MemorySourceTap extends SourceTap<HadoopFlowProcess, JobConf, RecordReader> implements
public class MemorySourceTap extends SourceTap<HadoopFlowProcess, JobConf,
RecordReader<TupleWrapper, NullWritable>> implements
Serializable {
private static final Logger logger = LoggerFactory.getLogger(MemorySourceTap.class);

public static class MemorySourceScheme extends Scheme<HadoopFlowProcess, JobConf, RecordReader, Void, Object[], Void> {
public static class MemorySourceScheme extends Scheme<HadoopFlowProcess, JobConf,
RecordReader<TupleWrapper, NullWritable>, Void, Object[], Void> {
private static final Logger logger = LoggerFactory.getLogger(MemorySourceScheme.class);

private transient List<Tuple> tuples;
Expand All @@ -49,8 +50,7 @@ public String getId() {
return this.id;
}

public List<Tuple> getTuples()
{
public List<Tuple> getTuples() {
return this.tuples;
}

Expand All @@ -67,15 +67,17 @@ public void sinkConfInit(HadoopFlowProcess flowProcess, Tap tap, JobConf jc) {
}

@Override
public void sourcePrepare( HadoopFlowProcess flowProcess, SourceCall<Object[], RecordReader> sourceCall ) {
public void sourcePrepare( HadoopFlowProcess flowProcess, SourceCall<Object[],
RecordReader<TupleWrapper, NullWritable>> sourceCall ) {
sourceCall.setContext( new Object[ 2 ] );

sourceCall.getContext()[ 0 ] = sourceCall.getInput().createKey();
sourceCall.getContext()[ 1 ] = sourceCall.getInput().createValue();
}

@Override
public boolean source(HadoopFlowProcess flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
public boolean source(HadoopFlowProcess flowProcess, SourceCall<Object[],
RecordReader<TupleWrapper, NullWritable>> sourceCall) throws IOException {
TupleWrapper key = (TupleWrapper) sourceCall.getContext()[ 0 ];
NullWritable value = (NullWritable) sourceCall.getContext()[ 1 ];

Expand All @@ -89,7 +91,8 @@ public boolean source(HadoopFlowProcess flowProcess, SourceCall<Object[], Record
}

@Override
public void sourceCleanup( HadoopFlowProcess flowProcess, SourceCall<Object[], RecordReader> sourceCall ) {
public void sourceCleanup( HadoopFlowProcess flowProcess, SourceCall<Object[],
RecordReader<TupleWrapper, NullWritable>> sourceCall ) {
sourceCall.setContext( null );
}

Expand All @@ -101,7 +104,6 @@ public void sink(HadoopFlowProcess flowProcess, SinkCall<Void, Void> sinkCall )
}

private final String id;
private transient FileStatus[] statuses;

public MemorySourceTap(List<Tuple> tuples, Fields fields) {
super(new MemorySourceScheme(tuples, fields, "/" + UUID.randomUUID().toString()));
Expand Down Expand Up @@ -137,13 +139,15 @@ private JobConf getSourceConf( HadoopFlowProcess flowProcess, JobConf conf, Stri
}

@Override
public TupleEntryIterator openForRead( HadoopFlowProcess flowProcess, RecordReader input ) throws IOException {
public TupleEntryIterator openForRead( HadoopFlowProcess flowProcess,
RecordReader<TupleWrapper, NullWritable> input ) throws IOException {
String identifier = (String) flowProcess.getProperty( "cascading.source.path" );

// this is only called cluster task side when Hadoop is providing a RecordReader instance it owns
// during processing of an InputSplit
if( input != null )
return new TupleEntrySchemeIterator( flowProcess, getScheme(), new RecordReaderIterator( input ), identifier );
return new TupleEntrySchemeIterator( flowProcess, getScheme(),
new RecordReaderIterator( input ), identifier );

Map<Object, Object> properties = HadoopUtil.createProperties(flowProcess.getJobConf());

Expand All @@ -152,7 +156,8 @@ public TupleEntryIterator openForRead( HadoopFlowProcess flowProcess, RecordRead
JobConf conf = HadoopUtil.createJobConf( properties, null );

// allows client side config to be used cluster side
String property = flowProcess.getJobConf().getRaw( "cascading.step.accumulated.source.conf." + getIdentifier() );
String property = flowProcess.getJobConf()
.getRaw( "cascading.step.accumulated.source.conf." + getIdentifier() );

if( property != null ) {
conf = getSourceConf( flowProcess, conf, property );
Expand All @@ -163,7 +168,7 @@ public TupleEntryIterator openForRead( HadoopFlowProcess flowProcess, RecordRead
// MultiRecordReader will create a new RecordReader instance for use across any file parts
// or on the cluster side during accumulation for a Join
return new TupleEntrySchemeIterator( flowProcess, getScheme(),
new MultiRecordReaderIterator( flowProcess, this, conf ), "MemoryTap: " + getIdentifier() );
new MultiRecordReaderIterator( flowProcess, this ), "MemoryTap: " + getIdentifier() );
}

@Override
Expand Down
42 changes: 32 additions & 10 deletions test/com/twitter/maple/tap/memory_test.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
(ns com.twitter.maple.tap.memory-test
(:use clojure.test)
(:require [clojure.string :as s])
(:import [java.util ArrayList]
[com.twitter.maple.tap MemorySourceTap]
[cascading.tuple Fields]
[cascading.tuple Fields Tuple]
[cascading.flow.hadoop HadoopFlowProcess]
[org.apache.hadoop.mapred JobConf]))

Expand Down Expand Up @@ -35,12 +36,33 @@
(doall (for [wrapper (iterator-seq it)]
(into [] (.getTuple wrapper))))))

(comment
"TODO: Implement coerceToTuple and fields."
(defn memory-tap
([tuples] (memory-tap Fields/ALL tuples))
([fields-in tuple-seq]
(let [tuples (->> tuple-seq
(map #(Util/coerceToTuple %))
(ArrayList.))]
(MemorySourceTap. tuples (fields fields-in))))))
(defn collectify [obj]
(if (or (sequential? obj)
(instance? java.util.List obj))
obj, [obj]))

(defn fields
{:tag Fields}
[obj]
(if (or (nil? obj) (instance? Fields obj))
obj
(let [obj (collectify obj)]
(if (empty? obj)
Fields/ALL ; TODO: add Fields/NONE support
(Fields. (into-array String obj))))))

(defn coerce-to-tuple [o]
(Tuple. (if (instance? java.util.List o)
(.toArray o)
0)))

(defn memory-tap
([tuples] (memory-tap Fields/ALL tuples))
([fields-in tuple-seq]
(let [tuples (ArrayList. (map coerce-to-tuple tuple-seq))]
(MemorySourceTap. tuples (fields fields-in)))))

(deftest round-trip-tuple-test
(are [coll] (= coll (tuple-seq (memory-tap coll)))
[[1] [2]]
[[1 2] [3 4]]))

0 comments on commit d33ee81

Please sign in to comment.