Skip to content

Commit

Permalink
BIGTOP-4223: Support PostgreSQL for mybatis
Browse files Browse the repository at this point in the history
  • Loading branch information
fuxiaofengfu committed Sep 14, 2024
1 parent 5ad2b3e commit d062988
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,25 @@ public static <Entity> String insert(TableMetaData tableMetaData, Entity entity,
}
break;
}
case POSTGRESQL: {
sql.INSERT_INTO("\"" + tableMetaData.getTableName() + "\"");
for (Map.Entry<String, String> entry : fieldColumnMap.entrySet()) {
// Ignore primary key
if (Objects.equals(entry.getKey(), tableMetaData.getPkProperty())) {
continue;
}
PropertyDescriptor ps = BeanUtils.getPropertyDescriptor(entityClass, entry.getKey());
if (ps == null || ps.getReadMethod() == null) {
continue;
}
Object value = ReflectionUtils.invokeMethod(ps.getReadMethod(), entity);
if (!ObjectUtils.isEmpty(value)) {
sql.VALUES("\"" + entry.getValue() + "\"", getTokenParam(entry.getKey()));
}
}
break;
}

default: {
log.error("Unsupported data source");
}
Expand Down Expand Up @@ -105,6 +124,25 @@ public static <Entity> String update(TableMetaData tableMetaData, Entity entity,
sql.WHERE(getEquals(tableMetaData.getPkColumn(), tableMetaData.getPkProperty()));
break;
}
case POSTGRESQL: {
sql.UPDATE("\"" + tableMetaData.getTableName() + "\"");
for (Map.Entry<String, String> entry : fieldColumnMap.entrySet()) {
// Ignore primary key
if (Objects.equals(entry.getKey(), tableMetaData.getPkProperty())) {
continue;
}
PropertyDescriptor ps = BeanUtils.getPropertyDescriptor(entityClass, entry.getKey());
if (ps == null || ps.getReadMethod() == null) {
continue;
}
Object value = ReflectionUtils.invokeMethod(ps.getReadMethod(), entity);
if (!ObjectUtils.isEmpty(value)) {
sql.SET("\"" + getEquals(entry.getValue() + "\"", entry.getKey()));
}
}
sql.WHERE(getEquals(tableMetaData.getPkColumn(), tableMetaData.getPkProperty()));
break;
}
default: {
log.error("Unsupported data source");
}
Expand All @@ -123,6 +161,16 @@ public static String selectById(TableMetaData tableMetaData, String databaseId,
sql.WHERE(tableMetaData.getPkColumn() + " = '" + id + "'");
break;
}
case POSTGRESQL: {
String baseColumns = tableMetaData.getBaseColumns();
if (baseColumns.toLowerCase().contains("user.")) {
baseColumns = baseColumns.replace("user.", "\"user\".");
}
sql.SELECT(baseColumns);
sql.FROM("\"" + tableMetaData.getTableName() + "\"");
sql.WHERE(tableMetaData.getPkColumn() + " = " + id);
break;
}
default: {
log.error("Unsupported data source");
}
Expand All @@ -143,6 +191,17 @@ public static String selectByIds(
sql.WHERE(tableMetaData.getPkColumn() + " in ('" + idsStr + "')");
break;
}
case POSTGRESQL: {
String idsStr = ids.stream().map(String::valueOf).collect(Collectors.joining(","));
String baseColumns = tableMetaData.getBaseColumns();
if (baseColumns.toLowerCase().contains("user.")) {
baseColumns = baseColumns.replace("user.", "\"user\".");
}
sql.SELECT(baseColumns);
sql.FROM("\"" + tableMetaData.getTableName() + "\"");
sql.WHERE(tableMetaData.getPkColumn() + " in (" + idsStr + ")");
break;
}
default: {
log.error("Unsupported data source");
}
Expand All @@ -155,6 +214,14 @@ public static String selectAll(TableMetaData tableMetaData, String databaseId) {

SQL sql = new SQL();
switch (DBType.toType(databaseId)) {
case POSTGRESQL:
String baseColumns = tableMetaData.getBaseColumns();
if (baseColumns.toLowerCase().contains("user.")) {
baseColumns = baseColumns.replace("user.", "\"user\".");
}
sql.SELECT(baseColumns);
sql.FROM("\"" + tableMetaData.getTableName() + "\"");
break;
case MYSQL: {
sql.SELECT(tableMetaData.getBaseColumns());
sql.FROM(tableMetaData.getTableName());
Expand All @@ -176,6 +243,11 @@ public static String deleteById(TableMetaData tableMetaData, String databaseId,
sql.WHERE(tableMetaData.getPkColumn() + " = '" + id + "'");
break;
}
case POSTGRESQL: {
sql.FROM("\"" + tableMetaData.getTableName() + "\"");
sql.WHERE(tableMetaData.getPkColumn() + " = " + id);
break;
}
default: {
log.error("Unsupported data source");
}
Expand All @@ -194,6 +266,12 @@ public static String deleteByIds(
sql.WHERE(tableMetaData.getPkColumn() + " in ('" + idsStr + "')");
break;
}
case POSTGRESQL: {
String idsStr = ids.stream().map(String::valueOf).collect(Collectors.joining(", "));
sql.DELETE_FROM("\"" + tableMetaData.getTableName() + "\"");
sql.WHERE(tableMetaData.getPkColumn() + " in (" + idsStr + ")");
break;
}
default: {
log.error("Unsupported data source");
}
Expand All @@ -208,6 +286,8 @@ public static <Condition> String findByCondition(
log.info("databaseId: {}", databaseId);
SQL sql = new SQL();
switch (DBType.toType(databaseId)) {
case POSTGRESQL:
tableName = "\"" + tableName + "\"";
case MYSQL: {
sql = mysqlCondition(condition, tableName);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
</collection>
</resultMap>

<select id="findAllByClusterId" parameterType="java.lang.Long" resultMap="jobMap">
<select id="findAllByIdsJoin" parameterType="java.lang.Long" resultMap="jobMap">
select
<include refid="baseColumnsV2">
<property name="alias" value="h"/>
Expand All @@ -83,15 +83,19 @@
from
(select * from job
<where>
<if test="clusterId != 0">
cluster_id = #{clusterId}
<if test="ids != null">
id in
<foreach collection="ids" item="id" index="index" open="(" close=")" separator=",">
#{id}
</foreach>
</if>
</where>
) h
inner join stage st
on h.id = st.job_id
inner join task tk
on tk.job_id = h.id and st.id = tk.stage_id
order by h.id desc
</select>

<select id="findByIdJoin" parameterType="java.lang.Long" resultMap="jobMap">
Expand All @@ -115,16 +119,24 @@
on tk.job_id = h.id and st.id = tk.stage_id
</select>

<select id="findAllByClusterId" parameterType="java.lang.Long" resultType="org.apache.bigtop.manager.dao.po.JobPO">
select
<include refid="baseColumns"/>
from
job
<where>
<if test="clusterId != 0">
cluster_id = #{clusterId}
</if>
</where>
</select>

<select id="findAllByClusterIsNull"
resultType="org.apache.bigtop.manager.dao.po.JobPO">
select
<include refid="baseColumnsV2">
<property name="alias" value="h"/>
</include>
<include refid="baseColumns"/>
from
(select * from job where cluster_id is null) h
inner join stage st
on h.id = st.job_id
job where cluster_id is null
</select>

</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<mapper namespace="org.apache.bigtop.manager.dao.repository.PlatformDao">

<sql id="baseColumns">
id, name, credential, support_models
"id", "name", "credential", "support_models"
</sql>

<!-- Define the resultMap with the custom typeHandler for the credential column -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
from
(select * from service_config
<where>
selected = 1
selected is true
<if test="clusterId != 0">
and cluster_id = #{clusterId}
</if>
Expand All @@ -103,7 +103,7 @@
from
(select * from service_config
<where>
selected = 1
selected is true
<if test="clusterId != 0">
and cluster_id = #{clusterId}
</if>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<mapper namespace="org.apache.bigtop.manager.dao.repository.StackDao">

<sql id="baseColumns">
id, stack_name, stack_version
"id", "stack_name", "stack_version"
</sql>

<select id="findByStackNameAndStackVersion" resultType="org.apache.bigtop.manager.dao.po.StackPO">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
<mapper namespace="org.apache.bigtop.manager.dao.repository.StageDao">

<update id="updateStateByIds" parameterType="java.util.List">
UPDATE stage
SET `state` = CASE
UPDATE "stage"
SET "state" = CASE
<foreach collection="stages" item="item" index="index">
WHEN id = #{item.id} THEN #{item.state}
</foreach>
END
WHERE id IN
WHERE "id" IN
<foreach collection="stages" index="index" item="item" open="(" separator="," close=")">
#{item.id}
</foreach>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

<update id="updateStateByIds" parameterType="java.util.List">
UPDATE task
SET `state` = CASE
SET "state" = CASE
<foreach collection="tasks" item="item" index="index">
WHEN id = #{item.id} THEN #{item.state}
</foreach>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<mapper namespace="org.apache.bigtop.manager.dao.repository.UserDao">

<sql id="baseColumns">
id, username, password, nickname, status
"id", "username", "password", "nickname", "status"
</sql>

<select id="findByUsername" resultType="org.apache.bigtop.manager.dao.po.UserPO">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@
import org.apache.bigtop.manager.server.command.task.Task;
import org.apache.bigtop.manager.server.holder.SpringContextHolder;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

@Slf4j
public abstract class AbstractJob implements Job {

protected StackDao stackDao;
Expand Down Expand Up @@ -106,6 +109,7 @@ public void run() {
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
success = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import org.apache.bigtop.manager.server.command.task.Task;
import org.apache.bigtop.manager.server.holder.SpringContextHolder;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Slf4j
public abstract class AbstractStage implements Stage {

protected StageDao stageDao;
Expand Down Expand Up @@ -93,13 +96,15 @@ public Boolean run() {
try {
return future.get();
} catch (Exception e) {
log.error("stage failed,", e);
return false;
}
})
.toList();

allTaskSuccess = taskResults.stream().allMatch(Boolean::booleanValue);
} catch (Exception e) {
log.error("stage failed", e);
allTaskSuccess = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.apache.bigtop.manager.server.grpc.GrpcClient;
import org.apache.bigtop.manager.server.holder.SpringContextHolder;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class AbstractTask implements Task {

protected TaskDao taskDao;
Expand Down Expand Up @@ -85,6 +88,7 @@ public Boolean run() {

taskSuccess = reply != null && reply.getCode() == MessageConstants.SUCCESS_CODE;
} catch (Exception e) {
log.error("task failed", e);
taskSuccess = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ CREATE TABLE "user"
username VARCHAR(32) DEFAULT NULL,
password VARCHAR(32) DEFAULT NULL,
nickname VARCHAR(32) DEFAULT NULL,
status int DEFAULT 1,
status BOOLEAN DEFAULT TRUE,
create_time TIMESTAMP(0) DEFAULT NULL,
update_time TIMESTAMP(0) DEFAULT NULL,
create_by BIGINT,
Expand All @@ -57,7 +57,7 @@ CREATE TABLE cluster
cluster_name VARCHAR(255) DEFAULT NULL,
cluster_desc VARCHAR(255) DEFAULT NULL,
cluster_type SMALLINT CHECK (cluster_type > 0) DEFAULT 1,
selected int DEFAULT 1,
selected BOOLEAN DEFAULT TRUE,
create_time TIMESTAMP(0) DEFAULT NULL,
update_time TIMESTAMP(0) DEFAULT NULL,
create_by BIGINT,
Expand Down Expand Up @@ -265,7 +265,7 @@ CREATE TABLE service_config
config_desc VARCHAR(255),
create_by BIGINT,
create_time TIMESTAMP(0),
selected SMALLINT default 0,
selected BOOLEAN default FALSE,
update_by BIGINT,
update_time TIMESTAMP(0),
version INTEGER,
Expand Down Expand Up @@ -372,7 +372,7 @@ CREATE INDEX idx_thread_id ON llm_chat_message (thread_id);
CREATE INDEX idx_user_id ON llm_chat_message (user_id);

INSERT INTO "user" (create_time, update_time, nickname, password, status, username)
VALUES (now(), now(), 'Administrator', '21232f297a57a5a743894a0e4a801fc3', 1, 'admin');
VALUES (now(), now(), 'Administrator', '21232f297a57a5a743894a0e4a801fc3', true, 'admin');

INSERT INTO llm_platform (credential, NAME, support_models)
VALUES
Expand Down

0 comments on commit d062988

Please sign in to comment.