Skip to content

Commit

Permalink
dbus版本升级到0.6.0
Browse files Browse the repository at this point in the history
canal升级到1.1.4,废弃mysql-extractor模块
sinker功能完善,添加根据table订阅功能
文档完善
  • Loading branch information
star-brilliant committed Apr 23, 2020
1 parent 68482a0 commit 4668e9d
Show file tree
Hide file tree
Showing 168 changed files with 2,925 additions and 1,854 deletions.
16 changes: 6 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,13 @@ DBus的主要潜在客户包括:

专注于数据的收集及实时数据流计算,通过简单灵活的配置,以无侵入的方式对源端数据进行采集,采用高可用的流式计算框架,对公司各个IT系统在业务流程中产生的数据进行汇聚,经过转换处理后成为统一JSON的数据格式(UMS),提供给不同数据使用方订阅和消费,充当数仓平台、大数据分析平台、实时报表和实时营销等业务的数据源。

### 快速开始

全套DBus包含诸多组件(Canal,zk,kafka,storm,mysql,influxdb,grafana),为了简单化,我们准备了All in One 包,包含了预先安装数据和一键启动脚本, 用于快速尝试。 请参考 [Quick Start](docs/quick-start.md)

### 相关文档

详细介绍 DBus请参考 [wiki](docs/index.md)
详细介绍 DBus请参考 [wiki](https://bridata.github.io/DBus/index.html)

常见问题可参考 [FAQ](docs/more-faq.md)
常见问题可参考 [FAQ](https://bridata.github.io/DBus/more-faq.html)

系统介绍参考 [system architecture](docs/more-system-architecture.md)
系统介绍参考 [system architecture](https://bridata.github.io/DBus/more-system-architecture.html)

### 系统架构和工作原理

Expand Down Expand Up @@ -110,19 +106,19 @@ DBUS源端数据采集大体来说分为2部分:

##### 编译打包代码

关于编译代码,参考 [compile](docs/more-compile-code.md)
关于编译代码,参考 [compile](https://bridata.github.io/DBus/more-compile-code.html)

##### 版本相关:

建议版本:0.6.0
建议版本:0.6.x

下载发布包:请参考:[downloads](https://github.com/BriData/DBus/releases)

##### 版权声明

DBus 自身使用 Apache v2.0 协议

关于DBus 自身协议,修改第三方包代码,以及三方包协议参考: [License](docs/more-license.md)
关于DBus 自身协议,修改第三方包代码,以及三方包协议参考: [License](https://bridata.github.io/DBus/more-license.html)

##### 其他相关资料:

Expand Down
2 changes: 1 addition & 1 deletion dbus-auto-check/allinone-auto-check/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dbus-auto-check</artifactId>
<groupId>com.creditease.dbus</groupId>
<version>0.6.0</version>
<version>0.6.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dbus-auto-check/dbus-canal-auto/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dbus-auto-check</artifactId>
<groupId>com.creditease.dbus</groupId>
<version>0.6.0</version>
<version>0.6.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,20 @@
package com.creditease.dbus.canal.auto;


import com.creditease.dbus.canal.utils.CanalUtils;
import org.apache.commons.cli.*;
import org.apache.commons.lang3.StringUtils;

import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import static com.creditease.dbus.canal.utils.FileUtils.getValueFromFile;
import static com.creditease.dbus.canal.utils.FileUtils.writeProperties;

/**
* This is Description
Expand All @@ -35,27 +44,133 @@
* @date 2018/12/12
*/
public class AddLine {
public static String type = "newLine";
public static String dsName = null;
public static String zkString = null;
public static String address = null;
public static String user = null;
public static String pass = null;
public static Integer slaveId = null;
public static String bootstrapServers = null;
public static String tableNames = null;
public static String userDir = System.getProperty("user.dir");

public static String DEFAULT_FILTER = ".*\\\\..*";

public static void main(String[] args) {
try {
parseCommandArgs(args);
autoDeploy();
AutoDeployStart.main(null);
} catch (Exception e) {
e.printStackTrace();
}
}

private static void autoDeploy() throws Exception {
switch (type) {
case "newLine":
newLine();
AutoDeployStart.main(null);
break;
case "editFilter":
editFilter();
break;
case "initFilter":
initFilter();
break;
case "deleteFilter":
deleteFilter();
break;
}
}

private static void deleteFilter() throws Exception {
if (StringUtils.isNotBlank(tableNames)) {
System.out.println("delete filter." + tableNames);
deleteTableFromParamFile();
}
restart();
}

private static void initFilter() throws Exception {
if (StringUtils.isNotBlank(tableNames)) {
System.out.println("init filter." + tableNames);
addAllTableToParamFile();
}
restart();
}

private static void editFilter() throws Exception {
if (StringUtils.isNotBlank(tableNames)) {
System.out.println("edit filter." + tableNames);
addTableToParamFile();
}
restart();
}

private static void addAllTableToParamFile() throws Exception {
String userdir = System.getProperty("user.dir");
String paramFilePath = String.format("%s/canal-%s/conf/%s/instance.properties", userdir, dsName, dsName);
if (StringUtils.isBlank(tableNames)) {
tableNames = DEFAULT_FILTER;
}
writeProperties(paramFilePath, "canal.instance.filter.regex", "canal.instance.filter.regex=" + tableNames);
}

private static void deleteTableFromParamFile() throws Exception {
String userdir = System.getProperty("user.dir");
String paramFilePath = String.format("%s/canal-%s/conf/%s/instance.properties", userdir, dsName, dsName);
String filterRegex = getValueFromFile(paramFilePath, "canal.instance.filter.regex");
if (StringUtils.equals(DEFAULT_FILTER, filterRegex)) {
return;
}
List<String> oldFilterList = Arrays.asList(StringUtils.split(filterRegex, ","));
// 删除的表
List<String> tableNameList = Arrays.asList(StringUtils.split(tableNames, ","));
// 新的filter表
List<String> newFilterList = new ArrayList<>();
for (String tableName : oldFilterList) {
if (!tableNameList.contains(tableName)) {
newFilterList.add(tableName);
}
}
String newfilterRegex = newFilterList.stream().collect(Collectors.joining(","));
if (StringUtils.isBlank(newfilterRegex)) {
newfilterRegex = DEFAULT_FILTER;
}
writeProperties(paramFilePath, "canal.instance.filter.regex", "canal.instance.filter.regex=" + newfilterRegex);
}

private static void restart() throws Exception {
String canalPath = String.format("%s/canal-%s", System.getProperty("user.dir"), dsName);
CanalUtils.start(canalPath);
}

private static void addTableToParamFile() throws Exception {
String userdir = System.getProperty("user.dir");
String paramFilePath = String.format("%s/canal-%s/conf/%s/instance.properties", userdir, dsName, dsName);
String filterRegex = getValueFromFile(paramFilePath, "canal.instance.filter.regex");
if (StringUtils.equals(DEFAULT_FILTER, filterRegex)) {
filterRegex = "";
}
List<String> oldFilterList = Arrays.asList(StringUtils.split(filterRegex, ","));
// 新添加的表
List<String> tableNameList = Arrays.asList(StringUtils.split(tableNames, ","));
// 新的filter表
List<String> newFilterList = new ArrayList<>();
for (String tableName : tableNameList) {
if (!oldFilterList.contains(tableName)) {
newFilterList.add(tableName);
}
}
newFilterList.addAll(oldFilterList);
String newfilterRegex = newFilterList.stream().collect(Collectors.joining(","));
if (StringUtils.isBlank(newfilterRegex)) {
newfilterRegex = ".*\\\\..*";
}
writeProperties(paramFilePath, "canal.instance.filter.regex", "canal.instance.filter.regex=" + newfilterRegex);
}

private static void newLine() throws Exception {
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(userDir + "/conf/canal-auto.properties")));
Expand Down Expand Up @@ -89,18 +204,23 @@ private static void autoDeploy() throws Exception {
private static void parseCommandArgs(String[] args) throws Exception {
Options options = new Options();

options.addOption("t", "type", true, "newSchema");
options.addOption("dn", "dsName", true, "");
options.addOption("zk", "zkString", true, "");
options.addOption("a", "address", true, "");
options.addOption("u", "user", true, "");
options.addOption("p", "pass", true, "");
options.addOption("s", "slaveId", true, "");
options.addOption("bs", "bootstrap.servers", true, "");
options.addOption("tn", "tableNames", true, "");


CommandLineParser parser = new DefaultParser();
try {
CommandLine line = parser.parse(options, args);
if (line.hasOption("type")) {
type = line.getOptionValue("type");
}
if (line.hasOption("dsName")) {
dsName = line.getOptionValue("dsName");
}
Expand All @@ -122,6 +242,9 @@ private static void parseCommandArgs(String[] args) throws Exception {
if (line.hasOption("bootstrap.servers")) {
bootstrapServers = line.getOptionValue("bootstrap.servers");
}
if (line.hasOption("tableNames")) {
tableNames = line.getOptionValue("tableNames");
}

} catch (ParseException e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,17 @@ public static void start(String canalPath) throws Exception {
try {
String startPath = canalPath + "/bin/" + "startup.sh";
String stopPath = canalPath + "/bin/" + "stop.sh";

String cmd = "sh " + stopPath;
writeAndPrint("exec: " + cmd);

//停止已存在
exec(cmd);

cmd = "sh " + startPath;
writeAndPrint("exec: " + cmd);

exec(cmd);

} catch (Exception e) {
writeAndPrint("************************************* START CANAL FAIL ************************************** ");

throw e;
}
}
Expand Down Expand Up @@ -100,7 +98,7 @@ public static String exec(Object cmd) throws Exception {

try {
if (cmd instanceof String) {
process = Runtime.getRuntime().exec(cmd.toString());
process = Runtime.getRuntime().exec(((String) cmd));
} else {
String[] cmd2 = (String[]) cmd;
process = Runtime.getRuntime().exec(cmd2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -45,9 +45,8 @@ public static void init(BufferedWriter bufferedWriter) {
}

public static DeployPropsBean readProps(String path) throws Exception {
try {
try (InputStream ins = new BufferedInputStream(new FileInputStream(path))) {
Properties deployProps = new Properties();
InputStream ins = new BufferedInputStream(new FileInputStream(path));
deployProps.load(ins);
DeployPropsBean props = new DeployPropsBean();
props.setDsName(deployProps.getProperty("dsname").trim());
Expand All @@ -68,6 +67,20 @@ public static DeployPropsBean readProps(String path) throws Exception {
}
}

public static String getValueFromFile(String path, String key) throws Exception {
try (BufferedReader br = new BufferedReader(new FileReader(path))) {
String line;
while ((line = br.readLine()) != null) {
if (line.matches(key + "\\s*=.*")) {
return StringUtils.trim(StringUtils.split(line, "=")[1]);
}
}
return null;
} catch (Exception e) {
throw e;
}
}

/**
* 更新properties文件某key的value
*/
Expand Down
6 changes: 1 addition & 5 deletions dbus-auto-check/dbus-ogg-auto/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dbus-auto-check</artifactId>
<groupId>com.creditease.dbus</groupId>
<version>0.6.0</version>
<version>0.6.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -45,10 +45,6 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion dbus-auto-check/log-auto-check/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dbus-auto-check</artifactId>
<groupId>com.creditease.dbus</groupId>
<version>0.6.0</version>
<version>0.6.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
Expand Down
2 changes: 1 addition & 1 deletion dbus-auto-check/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dbus-main</artifactId>
<groupId>com.creditease.dbus</groupId>
<version>0.6.0</version>
<version>0.6.1</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
8 changes: 1 addition & 7 deletions dbus-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.creditease.dbus</groupId>
<artifactId>dbus-main</artifactId>
<version>0.6.0</version>
<version>0.6.1</version>
</parent>
<artifactId>dbus-commons</artifactId>

Expand Down Expand Up @@ -78,16 +78,10 @@
<!-- 你需要添加 mysql 依赖在这里 mysql-connector-java -->

<!-- 你需要添加 oracle 依赖在这里 ojdbc7 -->
<!--
dbcp从1.4升级到2.6.0对应的oracle需要升级,否在报以下错误
Exception in thread "emit-heartbeat-event-xinghuo" java.lang.AbstractMethodError
at org.apache.commons.dbcp2.DelegatingConnection.isValid(DelegatingConnection.java:874)
-->
<!-- mvn install:install-file -DgroupId=com.oracle -DartifactId=ojdbc7 -Dversion=12.1.0.2 -Dpackaging=jar -Dfile=ojdbc7-12.1.0.2.jar -DgeneratePom=true -->


<!-- 你需要添加 db2 依赖在这里 db2jcc4 -->
<!-- Install driver jar to your local repository with the flowing command.-->
<!-- mvn install:install-file -DgroupId=com.ibm.db2.jcc -DartifactId=db2jcc4 -Dversion=4.23.42 -Dpackaging=jar -Dfile=db2jcc4-4.23.42.jar -DgeneratePom=true-->

</dependencies>
Expand Down
Loading

0 comments on commit 4668e9d

Please sign in to comment.