diff --git a/java/connector-node/README.md b/java/connector-node/README.md index 1651315fe0331..bf69478f1acfe 100644 --- a/java/connector-node/README.md +++ b/java/connector-node/README.md @@ -80,7 +80,7 @@ Downloading and launching MinIO is a straightforward process. For PostgreSQL, I ```shell # create postgresql in docker -docker run --name my-postgres -e POSTGRES_PASSWORD=connector -e POSTGRES_DB=test -e POSTGRES_USER=test -d -p 5432:5432 postgres +docker run --name my-postgres -e POSTGRES_PASSWORD=connector -e POSTGRES_DB=test -e POSTGRES_USER=test -d -p 5432:5432 postgres:16 # connect postgresql psql -h localhost -p 5432 -U test -d postgres ``` diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index 98f0a39a2a3dd..233694142b392 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -55,6 +55,7 @@ public class DbzConnectorConfig { public static final String PG_PUB_CREATE = "publication.create.enable"; public static final String PG_SCHEMA_NAME = "schema.name"; public static final String PG_SSL_ROOT_CERT = "ssl.root.cert"; + public static final String PG_TEST_ONLY_FORCE_RDS = "test.only.force.rds"; /* Sql Server configs */ public static final String SQL_SERVER_SCHEMA_NAME = "schema.name"; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 50d6423ce3bf7..eeac1f66f4b36 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -65,7 +65,9 @@ public PostgresValidator( var password = userProps.get(DbzConnectorConfig.PASSWORD); this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password); - this.isAwsRds = dbHost.contains(AWS_RDS_HOST); + this.isAwsRds = + dbHost.contains(AWS_RDS_HOST) + || userProps.get(DbzConnectorConfig.PG_TEST_ONLY_FORCE_RDS).equals("true"); this.dbName = dbName; this.user = user; this.schemaName = userProps.get(DbzConnectorConfig.PG_SCHEMA_NAME); @@ -86,8 +88,8 @@ public PostgresValidator( @Override public void validateDbConfig() { try { - if (pgVersion > 16) { - throw ValidatorUtils.failedPrecondition("Postgres version should be less than 16."); + if (pgVersion >= 17) { + throw ValidatorUtils.failedPrecondition("Postgres version should be less than 17."); } try (var stmt = jdbcConnection.createStatement()) { @@ -254,24 +256,17 @@ private void validatePrivileges() throws SQLException { boolean isSuperUser = false; if (this.isAwsRds) { // check privileges for aws rds postgres - boolean hasReplicationRole; + boolean hasReplicationRole = false; try (var stmt = jdbcConnection.prepareStatement( ValidatorUtils.getSql("postgres.rds.role.check"))) { stmt.setString(1, this.user); + stmt.setString(2, this.user); var res = stmt.executeQuery(); - var hashSet = new HashSet(); while (res.next()) { - // check rds_superuser role or rds_replication role is granted - var memberof = res.getArray("memberof"); - if (memberof != null) { - var members = (String[]) memberof.getArray(); - hashSet.addAll(Arrays.asList(members)); - } - LOG.info("rds memberof: {}", hashSet); + isSuperUser = res.getBoolean(1); + hasReplicationRole = res.getBoolean(2); } - isSuperUser = hashSet.contains("rds_superuser"); - hasReplicationRole = hashSet.contains("rds_replication"); } if (!isSuperUser && !hasReplicationRole) { @@ -320,9 +315,8 @@ private void validateTablePrivileges(boolean isSuperUser) throws SQLException { try (var stmt = jdbcConnection.prepareStatement( ValidatorUtils.getSql("postgres.table_read_privilege.check"))) { - stmt.setString(1, this.schemaName); - stmt.setString(2, this.tableName); - stmt.setString(3, this.user); + stmt.setString(1, this.user); + stmt.setString(2, this.schemaName + "." + this.tableName); var res = stmt.executeQuery(); while (res.next()) { if (!res.getBoolean(1)) { diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index 04eaf227b65d7..679264fb3845d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -13,7 +13,7 @@ postgres.slot_limit.check=SELECT CASE WHEN (SELECT count(*) FROM pg_replication_ postgres.role.check=SELECT rolreplication OR rolsuper FROM pg_roles WHERE rolname = ? postgres.superuser.check=SELECT rolsuper FROM pg_roles WHERE rolname = ? postgres.database_privilege.check=SELECT has_database_privilege(?, ?, 'create') FROM pg_roles WHERE rolname = ? -postgres.table_read_privilege.check=SELECT (COUNT(*) = 1) FROM information_schema.role_table_grants WHERE table_schema = ? AND table_name = ? AND grantee = ? and privilege_type = 'SELECT' +postgres.table_read_privilege.check=SELECT has_table_privilege(?, ?, 'SELECT') postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ? postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames' postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ? @@ -51,4 +51,4 @@ sqlserver.has.perms=SELECT HAS_PERMS_BY_NAME('cdc.' + ct.capture_instance + '_CT sqlserver.sql.agent.enabled=SELECT sys.fn_cdc_get_max_lsn() sqlserver.case.sensitive=WITH collations AS (SELECT name, CASE WHEN description like '%case-insensitive%' THEN 0 WHEN description like '%case-sensitive%' THEN 1 END isCaseSensitive FROM sys.fn_helpcollations()) SELECT * FROM collations WHERE name = CONVERT(varchar, DATABASEPROPERTYEX( ? ,'collation')); citus.distributed_table=select citus_table_type from citus_tables where table_name=?::regclass -postgres.rds.role.check=SELECT r.rolname, r.rolsuper, r.rolinherit, r.rolcreaterole, r.rolcreatedb, r.rolcanlogin, r.rolconnlimit, r.rolvaliduntil, ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid) as memberof , r.rolreplication , r.rolbypassrls FROM pg_catalog.pg_roles r WHERE r.rolname = ? +postgres.rds.role.check=SELECT pg_has_role(?, 'rds_superuser', 'member') as is_rds_superuser, pg_has_role(?, 'rds_replication', 'member') as is_rds_replication; diff --git a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java index 3e7a02ca1c111..4b49ec93c21c7 100644 --- a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java +++ b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java @@ -159,27 +159,221 @@ public void testLines() throws Exception { // test whether validation catches permission errors @Test - public void testPermissionCheck() throws SQLException { + public void testUserPermissionCheck() throws SQLException { // user Postgres creates a superuser debezium Connection connPg = SourceTestClient.connect(pgDataSource); - String query = "CREATE USER debezium"; + String query = "CREATE ROLE rds_replication"; SourceTestClient.performQuery(connPg, query); - query = "ALTER USER debezium SUPERUSER REPLICATION"; + query = "CREATE ROLE rds_superuser"; + SourceTestClient.performQuery(connPg, query); + query = "CREATE USER debezium"; SourceTestClient.performQuery(connPg, query); query = "ALTER USER debezium WITH PASSWORD '" + pg.getPassword() + "'"; SourceTestClient.performQuery(connPg, query); + query = + "CREATE TABLE IF NOT EXISTS orders (o_key BIGINT NOT NULL, o_val INT, PRIMARY KEY (o_key))"; + SourceTestClient.performQuery(connPg, query); + // user debezium connects to Postgres DataSource dbzDataSource = SourceTestClient.getDataSource( pg.getJdbcUrl(), "debezium", pg.getPassword(), pg.getDriverClassName()); Connection connDbz = SourceTestClient.connect(dbzDataSource); + + ConnectorServiceProto.TableSchema tableSchema = + ConnectorServiceProto.TableSchema.newBuilder() + .addColumns( + PlanCommon.ColumnDesc.newBuilder() + .setName("o_key") + .setColumnType( + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.INT64) + .build()) + .build()) + .addColumns( + PlanCommon.ColumnDesc.newBuilder() + .setName("o_val") + .setColumnType( + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.INT32) + .build()) + .build()) + .addPkIndices(0) + .build(); + + try { + var resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + assertEquals( + "INVALID_ARGUMENT: Postgres user must be superuser or replication role to start walsender.", + resp.getError().getErrorMessage()); + query = "ALTER USER debezium REPLICATION"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + assertEquals( + "INVALID_ARGUMENT: Postgres user must have select privilege on table 'public.orders'", + resp.getError().getErrorMessage()); + query = "GRANT SELECT ON orders TO debezium"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + assertEquals( + "INVALID_ARGUMENT: Postgres user must have create privilege on database 'test'", + resp.getError().getErrorMessage()); + + query = "GRANT CREATE ON DATABASE test TO debezium"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + + assertEquals( + "INVALID_ARGUMENT: Postgres user must be the owner of table 'orders' to create/alter publication", + resp.getError().getErrorMessage()); + + query = "ALTER TABLE orders OWNER TO debezium"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + + assertEquals("", resp.getError().getErrorMessage()); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders", + true); + assertEquals( + "INVALID_ARGUMENT: Postgres user must be superuser or replication role to start walsender.", + resp.getError().getErrorMessage()); + query = "GRANT rds_replication TO debezium"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders", + true); + assertEquals("", resp.getError().getErrorMessage()); + + query = "REVOKE rds_replication FROM dbz_group"; + SourceTestClient.performQuery(connPg, query); + query = "GRANT rds_superuser TO dbz_group"; + SourceTestClient.performQuery(connPg, query); + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders", + true); + assertEquals("", resp.getError().getErrorMessage()); + + } catch (Exception e) { + Assert.fail("validate rpc fail: " + e.getMessage()); + } finally { + // cleanup + query = testClient.sqlStmts.getProperty("tpch.drop.orders"); + SourceTestClient.performQuery(connPg, query); + query = "DROP OWNED BY debezium"; + SourceTestClient.performQuery(connPg, query); + query = "DROP ROLE rds_replication"; + SourceTestClient.performQuery(connPg, query); + query = "DROP ROLE rds_superuser"; + SourceTestClient.performQuery(connPg, query); + query = "DROP USER debezium"; + SourceTestClient.performQuery(connPg, query); + connDbz.close(); + connPg.close(); + } + } + + @Test + public void testGroupPermissionCheck() throws SQLException { + // user Postgres creates a superuser debezium + Connection connPg = SourceTestClient.connect(pgDataSource); + String query = "CREATE ROLE rds_replication"; + SourceTestClient.performQuery(connPg, query); + query = "CREATE ROLE rds_superuser"; + SourceTestClient.performQuery(connPg, query); + query = "CREATE USER debezium"; + SourceTestClient.performQuery(connPg, query); + query = "ALTER USER debezium REPLICATION"; + SourceTestClient.performQuery(connPg, query); + query = "ALTER USER debezium WITH PASSWORD '" + pg.getPassword() + "'"; + SourceTestClient.performQuery(connPg, query); + query = "CREATE GROUP dbz_group WITH USER debezium"; + SourceTestClient.performQuery(connPg, query); query = "CREATE TABLE IF NOT EXISTS orders (o_key BIGINT NOT NULL, o_val INT, PRIMARY KEY (o_key))"; - SourceTestClient.performQuery(connDbz, query); - // create a partial publication, check whether error is reported - query = - "CREATE PUBLICATION rw_publication FOR TABLE orders (o_key) WITH ( publish_via_partition_root = true );"; - SourceTestClient.performQuery(connDbz, query); + SourceTestClient.performQuery(connPg, query); + + // user debezium connects to Postgres + DataSource dbzDataSource = + SourceTestClient.getDataSource( + pg.getJdbcUrl(), "debezium", pg.getPassword(), pg.getDriverClassName()); + Connection connDbz = SourceTestClient.connect(dbzDataSource); + ConnectorServiceProto.TableSchema tableSchema = ConnectorServiceProto.TableSchema.newBuilder() .addColumns( @@ -213,13 +407,10 @@ public void testPermissionCheck() throws SQLException { "test", "orders"); assertEquals( - "INVALID_ARGUMENT: The publication 'rw_publication' does not cover all columns of the table 'public.orders'", + "INVALID_ARGUMENT: Postgres user must have select privilege on table 'public.orders'", resp.getError().getErrorMessage()); - query = "DROP PUBLICATION dbz_publication"; - SourceTestClient.performQuery(connDbz, query); - // revoke superuser and replication, check if reports error - query = "ALTER USER debezium nosuperuser noreplication"; - SourceTestClient.performQuery(connDbz, query); + query = "GRANT SELECT ON orders TO dbz_group"; + SourceTestClient.performQuery(connPg, query); resp = testClient.validateSource( @@ -231,16 +422,105 @@ public void testPermissionCheck() throws SQLException { tableSchema, "test", "orders"); + assertEquals( + "INVALID_ARGUMENT: Postgres user must have create privilege on database 'test'", + resp.getError().getErrorMessage()); + query = "GRANT CREATE ON DATABASE test TO dbz_group"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + + assertEquals( + "INVALID_ARGUMENT: Postgres user must be the owner of table 'orders' to create/alter publication", + resp.getError().getErrorMessage()); + + query = "ALTER TABLE orders OWNER TO dbz_group"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + + assertEquals("", resp.getError().getErrorMessage()); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders", + true); assertEquals( "INVALID_ARGUMENT: Postgres user must be superuser or replication role to start walsender.", resp.getError().getErrorMessage()); + query = "GRANT rds_replication TO dbz_group"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders", + true); + assertEquals("", resp.getError().getErrorMessage()); + + query = "REVOKE rds_replication FROM dbz_group"; + SourceTestClient.performQuery(connPg, query); + query = "GRANT rds_superuser TO dbz_group"; + SourceTestClient.performQuery(connPg, query); + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders", + true); + assertEquals("", resp.getError().getErrorMessage()); + } catch (Exception e) { Assert.fail("validate rpc fail: " + e.getMessage()); } finally { // cleanup query = testClient.sqlStmts.getProperty("tpch.drop.orders"); - SourceTestClient.performQuery(connDbz, query); + SourceTestClient.performQuery(connPg, query); + query = "DROP OWNED BY dbz_group"; + SourceTestClient.performQuery(connPg, query); + query = "DROP GROUP dbz_group"; + SourceTestClient.performQuery(connPg, query); + query = "DROP ROLE rds_replication"; + SourceTestClient.performQuery(connPg, query); + query = "DROP ROLE rds_superuser"; + SourceTestClient.performQuery(connPg, query); query = "DROP USER debezium"; SourceTestClient.performQuery(connPg, query); connDbz.close(); diff --git a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java index 58c72c688dfef..9ea4b33057b60 100644 --- a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java +++ b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java @@ -102,6 +102,28 @@ protected ConnectorServiceProto.ValidateSourceResponse validateSource( ConnectorServiceProto.TableSchema tableSchema, String databaseName, String tableName) { + return validateSource( + jdbcUrl, + host, + username, + password, + sourceType, + tableSchema, + databaseName, + tableName, + false); + } + + protected ConnectorServiceProto.ValidateSourceResponse validateSource( + String jdbcUrl, + String host, + String username, + String password, + ConnectorServiceProto.SourceType sourceType, + ConnectorServiceProto.TableSchema tableSchema, + String databaseName, + String tableName, + boolean forceRds) { String port = String.valueOf(URI.create(jdbcUrl.substring(5)).getPort()); ConnectorServiceProto.ValidateSourceRequest req = ConnectorServiceProto.ValidateSourceRequest.newBuilder() @@ -119,6 +141,7 @@ protected ConnectorServiceProto.ValidateSourceResponse validateSource( .putProperties("server.id", "1") // mysql only .putProperties("publication.name", "rw_publication") // pg only .putProperties("publication.create.enable", "true") // pg only + .putProperties("test.only.force.rds", forceRds ? "true" : "false") .build(); return blockingStub.validateSource(req); }