Skip to content

Commit

Permalink
[BitSail]Supprt bitsail table catalog. (#181)
Browse files Browse the repository at this point in the history
Support table catalog in bitsail.
  • Loading branch information
hk-lrzy authored Nov 17, 2022
1 parent bb3f423 commit 7bd7aef
Show file tree
Hide file tree
Showing 82 changed files with 2,321 additions and 1,944 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.bytedance.bitsail.base.catalog;

import com.bytedance.bitsail.base.component.ComponentBuilder;
import com.bytedance.bitsail.base.connector.BuilderGroup;
import com.bytedance.bitsail.base.execution.ExecutionEnviron;
import com.bytedance.bitsail.common.catalog.table.TableCatalog;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;

import java.io.Serializable;

/**
* Created 2022/5/23
*/
public interface TableCatalogFactory extends Serializable, ComponentBuilder {

/**
* Create a table catalog.
*
* @param executionEnviron execution environment
* @param connectorConfiguration configuration for the reader/writer
*/
TableCatalog createTableCatalog(BuilderGroup builderGroup,
ExecutionEnviron executionEnviron,
BitSailConfiguration connectorConfiguration);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.bytedance.bitsail.base.catalog;

import com.bytedance.bitsail.base.component.DefaultComponentBuilderLoader;

import org.apache.commons.lang3.StringUtils;

public class TableCatalogFactoryHelper {

public static TableCatalogFactory getTableCatalogFactory(String connectorName) {
DefaultComponentBuilderLoader<TableCatalogFactory> loader =
new DefaultComponentBuilderLoader<>(TableCatalogFactory.class);

return loader.loadComponent(StringUtils.lowerCase(connectorName), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,21 @@ public DefaultComponentBuilderLoader(Class<T> clazz) {
}

public T loadComponent(String componentName) {
return this.loadComponent(componentName, true);
}

public T loadComponent(String componentName, boolean failOnMiss) {
if (!loaded) {
loadAllComponents();
loaded = true;
}
componentName = StringUtils.lowerCase(componentName);
if (!components.containsKey(componentName)) {
throw new BitSailException(CommonErrorCode.CONFIG_ERROR,
String.format("Component %s not in interface %s support until now.", componentName, clazz));
if (failOnMiss) {
throw new BitSailException(CommonErrorCode.CONFIG_ERROR,
String.format("Component %s not in interface %s support until now.", componentName, clazz));
}
return null;
}
return components.get(componentName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.bytedance.bitsail.base.connector;

public enum BuilderGroup {

READER,
WRITER,
TRANSFORMER;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.bytedance.bitsail.base.connector.reader.v1;

import com.bytedance.bitsail.base.execution.ExecutionEnviron;
import com.bytedance.bitsail.base.extension.TypeInfoConverterFactory;
import com.bytedance.bitsail.base.serializer.BinarySerializer;
import com.bytedance.bitsail.base.serializer.SimpleBinarySerializer;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
Expand All @@ -29,7 +30,8 @@
import java.io.IOException;
import java.io.Serializable;

public interface Source<T, SplitT extends SourceSplit, StateT extends Serializable> extends Serializable {
public interface Source<T, SplitT extends SourceSplit, StateT extends Serializable>
extends Serializable, TypeInfoConverterFactory {

/**
* Run in client side for source initialize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.bytedance.bitsail.base.connector.writer.v1;

import com.bytedance.bitsail.base.extension.TypeInfoConverterFactory;
import com.bytedance.bitsail.base.serializer.BinarySerializer;
import com.bytedance.bitsail.base.serializer.SimpleBinarySerializer;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
Expand All @@ -31,7 +32,8 @@
/**
* Created 2022/6/10
*/
public interface Sink<InputT, CommitT extends Serializable, WriterStateT extends Serializable> extends Serializable {
public interface Sink<InputT, CommitT extends Serializable, WriterStateT extends Serializable>
extends Serializable, TypeInfoConverterFactory {

/**
* @return The name of writer operation.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.bytedance.bitsail.base.extension;

import com.bytedance.bitsail.common.type.TypeInfoConverter;

import java.io.Serializable;

public interface TypeInfoConverterFactory extends Serializable {

TypeInfoConverter createTypeInfoConverter();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.bytedance.bitsail.common.catalog;

import com.bytedance.bitsail.common.exception.ErrorCode;

public enum TableCatalogErrorCode implements ErrorCode {

TABLE_CATALOG_TABLE_NOT_EXISTS("Table-Catalog-1", "Catalog table not exists."),

;
private final String code;

private final String description;

TableCatalogErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return code;
}

@Override
public String getDescription() {
return description;
}
}
Loading

0 comments on commit 7bd7aef

Please sign in to comment.