Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
sritchie committed Jul 18, 2012
2 parents 3a340d1 + 11681b8 commit 45a3f50
Showing 1 changed file with 19 additions and 13 deletions.
32 changes: 19 additions & 13 deletions src/jvm/com/twitter/maple/hbase/HBaseTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ public class HBaseTap extends Tap<JobConf, RecordReader, OutputCollector> {
private transient HBaseAdmin hBaseAdmin;

/** Field hostName */
private String quorumNames = "localhost";
private String quorumNames;
/** Field tableName */
private String tableName;

/**
* Constructor HBaseTap creates a new HBaseTap instance.
*
*
* @param tableName
* of type String
* @param HBaseFullScheme
Expand All @@ -74,7 +74,7 @@ public HBaseTap(String tableName, HBaseScheme HBaseFullScheme) {

/**
* Constructor HBaseTap creates a new HBaseTap instance.
*
*
* @param tableName
* of type String
* @param HBaseFullScheme
Expand All @@ -89,7 +89,7 @@ public HBaseTap(String tableName, HBaseScheme HBaseFullScheme, SinkMode sinkMode

/**
* Constructor HBaseTap creates a new HBaseTap instance.
*
*
* @param tableName
* of type String
* @param HBaseFullScheme
Expand All @@ -103,7 +103,7 @@ public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullSchem

/**
* Constructor HBaseTap creates a new HBaseTap instance.
*
*
* @param tableName
* of type String
* @param HBaseFullScheme
Expand All @@ -119,7 +119,7 @@ public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullSchem

/**
* Method getTableName returns the tableName of this HBaseTap object.
*
*
* @return the tableName (type String) of this HBaseTap object.
*/
public String getTableName() {
Expand All @@ -141,25 +141,28 @@ private HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException,

@Override
public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
conf.set("hbase.zookeeper.quorum", quorumNames);
if(quorumNames != null) {
conf.set("hbase.zookeeper.quorum", quorumNames);
}

LOG.debug("sinking to table: {}", tableName);

if (isReplace() && conf.get("mapred.task.partition") == null) {
try {
deleteResource(conf);

} catch (IOException e) {
throw new RuntimeException("could not delete resource: " + e);
}
}

else if (isUpdate()) {
try {
createResource(conf);
} catch (IOException e) {
throw new RuntimeException(tableName + " does not exist !");
}

}

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
Expand All @@ -186,7 +189,7 @@ public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess,
@Override
public boolean createResource(JobConf jobConf) throws IOException {
HBaseAdmin hBaseAdmin = getHBaseAdmin(jobConf);

if (hBaseAdmin.tableExists(tableName)) {
return true;
}
Expand Down Expand Up @@ -214,7 +217,7 @@ public boolean deleteResource(JobConf jobConf) throws IOException {
if (!hBaseAdmin.tableExists(tableName)) {
return true;
}

LOG.info("deleting hbase table: {}", tableName);

hBaseAdmin.disableTable(tableName);
Expand All @@ -236,7 +239,10 @@ public long getModifiedTime(JobConf jobConf) throws IOException {

@Override
public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
conf.set("hbase.zookeeper.quorum", quorumNames);
if(quorumNames != null) {
conf.set("hbase.zookeeper.quorum", quorumNames);
}

LOG.debug("sourcing from table: {}", tableName);
FileInputFormat.addInputPaths(conf, tableName);
super.sourceConfInit(process, conf);
Expand Down

0 comments on commit 45a3f50

Please sign in to comment.