diff --git a/business/ck_rebalance.go b/business/ck_rebalance.go index b8cb9ef7..ba49ae74 100644 --- a/business/ck_rebalance.go +++ b/business/ck_rebalance.go @@ -232,6 +232,14 @@ func (this *CKRebalance) ExecutePlan(database string, tbl *TblPartitions) (err e fmt.Sprintf("ALTER TABLE %s FETCH PARTITION '%s' FROM '%s'", tbl.Table, patt, tbl.ZooPath), fmt.Sprintf("ALTER TABLE %s ATTACH PARTITION '%s'", tbl.Table, patt), } + if strings.Contains(patt, "(") && strings.Contains(patt, ")") { + // This implies that the partition name contains tuple. Removing quotes + dstQuires = []string{ + fmt.Sprintf("ALTER TABLE %s DROP DETACHED PARTITION %s ", tbl.Table, patt), + fmt.Sprintf("ALTER TABLE %s FETCH PARTITION %s FROM '%s'", tbl.Table, patt, tbl.ZooPath), + fmt.Sprintf("ALTER TABLE %s ATTACH PARTITION %s", tbl.Table, patt), + } + } for _, query := range dstQuires { log.Logger.Infof("host %s: query: %s", dstHost, query) if _, err = dstChConn.Exec(query); err != nil { @@ -246,6 +254,10 @@ func (this *CKRebalance) ExecutePlan(database string, tbl *TblPartitions) (err e return fmt.Errorf("can't get connection: %s", tbl.Host) } query := fmt.Sprintf("ALTER TABLE %s DROP PARTITION '%s'", tbl.Table, patt) + if strings.Contains(patt, "(") && strings.Contains(patt, ")") { + // This implies that the partition name contains tuple. Removing quotes + query = fmt.Sprintf("ALTER TABLE %s DROP PARTITION %s", tbl.Table, patt) + } if _, err = srcChConn.Exec(query); err != nil { log.Logger.Infof("host %s: query: %s", tbl.Host, query) err = errors.Wrapf(err, "") @@ -271,6 +283,10 @@ func (this *CKRebalance) ExecutePlan(database string, tbl *TblPartitions) (err e srcDir := dstDir + "/" query := fmt.Sprintf("ALTER TABLE %s DETACH PARTITION '%s'", tbl.Table, patt) + if strings.Contains(patt, "(") && strings.Contains(patt, ")") { + // This implies that the partition name contains tuple. Removing quotes + query = fmt.Sprintf("ALTER TABLE %s DETACH PARTITION %s", tbl.Table, patt) + } log.Logger.Infof("host: %s, query: %s", tbl.Host, query) if _, err = srcCkConn.Exec(query); err != nil { err = errors.Wrapf(err, "") @@ -297,8 +313,10 @@ func (this *CKRebalance) ExecutePlan(database string, tbl *TblPartitions) (err e return } log.Logger.Debugf("host: %s, output: %s", tbl.Host, out) - query = fmt.Sprintf("ALTER TABLE %s ATTACH PARTITION '%s'", tbl.Table, patt) + if strings.Contains(patt, "(") && strings.Contains(patt, ")") { + query = fmt.Sprintf("ALTER TABLE %s ATTACH PARTITION %s", tbl.Table, patt) + } log.Logger.Infof("host: %s, query: %s", dstHost, query) if _, err = dstCkConn.Exec(query); err != nil { err = errors.Wrapf(err, "") @@ -308,6 +326,9 @@ func (this *CKRebalance) ExecutePlan(database string, tbl *TblPartitions) (err e lock.Unlock() query = fmt.Sprintf("ALTER TABLE %s DROP DETACHED PARTITION '%s'", tbl.Table, patt) + if strings.Contains(patt, "(") && strings.Contains(patt, ")") { + query = fmt.Sprintf("ALTER TABLE %s DROP DETACHED PARTITION %s", tbl.Table, patt) + } log.Logger.Infof("host: %s, query: %s", tbl.Host, query) if _, err = srcCkConn.Exec(query); err != nil { err = errors.Wrapf(err, "") diff --git a/controller/clickhouse.go b/controller/clickhouse.go index 85b67760..c3cb1603 100644 --- a/controller/clickhouse.go +++ b/controller/clickhouse.go @@ -306,15 +306,19 @@ func (ck *ClickHouseController) CreateTable(c *gin.Context) { } // sync zookeeper path - err = clickhouse.GetReplicaZkPath(&conf) - if err != nil { - model.WrapMsg(c, model.CREAT_CK_TABLE_FAIL, err) - return - } + if conf.IsReplica { + path, err := clickhouse.GetZkPath(ckService.DB, params.DB, params.Name) + if err != nil { + model.WrapMsg(c, model.CREAT_CK_TABLE_FAIL, err) + return + } + tableName := fmt.Sprintf("%s.%s", params.DB, params.Name) + conf.ZooPath[tableName] = path - if err = repository.Ps.UpdateCluster(conf); err != nil { - model.WrapMsg(c, model.CREAT_CK_TABLE_FAIL, err) - return + if err = repository.Ps.UpdateCluster(conf); err != nil { + model.WrapMsg(c, model.CREAT_CK_TABLE_FAIL, err) + return + } } if req.DryRun { @@ -844,10 +848,12 @@ func (ck *ClickHouseController) StopCluster(c *gin.Context) { we cant't get zookeeper path by querying ck, so need to save the ZooKeeper path before stopping the cluster. */ - err = clickhouse.GetReplicaZkPath(&conf) - if err != nil { - model.WrapMsg(c, model.STOP_CK_CLUSTER_FAIL, err) - return + if conf.IsReplica { + err = clickhouse.GetReplicaZkPath(&conf) + if err != nil { + model.WrapMsg(c, model.STOP_CK_CLUSTER_FAIL, err) + return + } } common.CloseConns(conf.Hosts) diff --git a/deploy/ck.go b/deploy/ck.go index 086f9c95..7e8cb5dc 100644 --- a/deploy/ck.go +++ b/deploy/ck.go @@ -116,14 +116,13 @@ func (d *CKDeploy) Init() error { NeedSudo: d.Conf.NeedSudo, AuthenticateType: d.Conf.AuthenticateType, } - cmd := "hostname -f" - output, err := common.RemoteExecute(sshOpts, cmd) - if err != nil { - lastError = err - return - } + cmd := "hostname" + output, _ := common.RemoteExecute(sshOpts, cmd) hostname := strings.Trim(output, "\n") + if hostname == "" { + hostname = innerReplica.Ip + } d.Conf.Shards[innerShardIndex].Replicas[innerReplicaIndex].HostName = hostname lock.Lock() HostNameMap[hostname] = true diff --git a/service/clickhouse/clickhouse_service.go b/service/clickhouse/clickhouse_service.go index 58959ad4..cb51e5e0 100644 --- a/service/clickhouse/clickhouse_service.go +++ b/service/clickhouse/clickhouse_service.go @@ -898,7 +898,7 @@ func GetReplicaZkPath(conf *model.CKManClickHouseConfig) error { for _, database := range databases { if tables, ok := dbtables[database]; ok { for _, table := range tables { - path, err := getReplicaZkPath(service.DB, database, table) + path, err := GetZkPath(service.DB, database, table) if err != nil { return err } @@ -910,7 +910,7 @@ func GetReplicaZkPath(conf *model.CKManClickHouseConfig) error { return nil } -func getReplicaZkPath(db *sql.DB, database, table string) (string, error) { +func GetZkPath(db *sql.DB, database, table string) (string, error) { var err error var path string var rows *sql.Rows diff --git a/service/runner/ck.go b/service/runner/ck.go index 17c1a3c3..11047e20 100644 --- a/service/runner/ck.go +++ b/service/runner/ck.go @@ -106,34 +106,36 @@ func DeleteCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, ip index++ } - service, err := zookeeper.NewZkService(conf.ZkNodes, conf.ZkPort) - if err != nil { - return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN) - } - _ = clickhouse.GetReplicaZkPath(conf) - var zooPaths []string - for _, path := range conf.ZooPath { - zooPath := strings.Replace(path, "{cluster}", conf.Cluster, -1) - zooPath = strings.Replace(zooPath, "{shard}", fmt.Sprintf("%d", shardNum+1), -1) - zooPaths = append(zooPaths, zooPath) - } + if conf.IsReplica { + service, err := zookeeper.NewZkService(conf.ZkNodes, conf.ZkPort) + if err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN) + } + _ = clickhouse.GetReplicaZkPath(conf) + var zooPaths []string + for _, path := range conf.ZooPath { + zooPath := strings.Replace(path, "{cluster}", conf.Cluster, -1) + zooPath = strings.Replace(zooPath, "{shard}", fmt.Sprintf("%d", shardNum+1), -1) + zooPaths = append(zooPaths, zooPath) + } - for _, path := range zooPaths { - if ifDeleteShard { - //delete the shard - shardNode := fmt.Sprintf("%d", shardNum+1) - err = service.DeletePathUntilNode(path, shardNode) - if err != nil { - return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN) - } - } else { - // delete replica path - replicaName := conf.Shards[shardNum].Replicas[replicaNum].Ip - replicaPath := fmt.Sprintf("%s/replicas/%s", path, replicaName) - log.Logger.Debugf("replicaPath: %s", replicaPath) - err = service.DeleteAll(replicaPath) - if err != nil { - return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN) + for _, path := range zooPaths { + if ifDeleteShard { + //delete the shard + shardNode := fmt.Sprintf("%d", shardNum+1) + err = service.DeletePathUntilNode(path, shardNode) + if err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN) + } + } else { + // delete replica path + replicaName := conf.Shards[shardNum].Replicas[replicaNum].Ip + replicaPath := fmt.Sprintf("%s/replicas/%s", path, replicaName) + log.Logger.Debugf("replicaPath: %s", replicaPath) + err = service.DeleteAll(replicaPath) + if err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN) + } } } } @@ -143,12 +145,12 @@ func DeleteCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, ip d := deploy.NewCkDeploy(*conf) d.Packages = deploy.BuildPackages(conf.Version, conf.PkgType, conf.Cwd) d.Conf.Hosts = []string{ip} - if err = d.Stop(); err != nil { + if err := d.Stop(); err != nil { log.Logger.Warnf("can't stop node %s, ignore it", ip) } deploy.SetNodeStatus(task, model.NodeStatusUninstall, model.ALL_NODES_DEFAULT) - if err = d.Uninstall(); err != nil { + if err := d.Uninstall(); err != nil { log.Logger.Warnf("can't uninsatll node %s, ignore it", ip) } @@ -179,10 +181,10 @@ func DeleteCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, ip d = deploy.NewCkDeploy(*conf) d.Conf.Hosts = hosts d.Conf.Shards = shards - if err = d.Init(); err != nil { + if err := d.Init(); err != nil { return errors.Wrapf(err, "[%s]", model.NodeStatusConfigExt.EN) } - if err = d.Config(); err != nil { + if err := d.Config(); err != nil { return errors.Wrapf(err, "[%s]", model.NodeStatusConfigExt.EN) } diff --git a/service/zookeeper/zookeeper_service.go b/service/zookeeper/zookeeper_service.go index 14ef7157..fc015959 100644 --- a/service/zookeeper/zookeeper_service.go +++ b/service/zookeeper/zookeeper_service.go @@ -69,6 +69,9 @@ func GetZkService(clusterName string) (*ZkService, error) { } func (z *ZkService) GetReplicatedTableStatus(conf *model.CKManClickHouseConfig) ([]model.ZkReplicatedTableStatus, error) { + if !conf.IsReplica { + return nil, nil + } err := clickhouse.GetReplicaZkPath(conf) if err != nil { return nil, err