diff --git a/build.sbt b/build.sbt index f5381b19344..b128351fd50 100644 --- a/build.sbt +++ b/build.sbt @@ -734,7 +734,7 @@ lazy val sharing = (project in file("sharing")) libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", - "io.delta" %% "delta-sharing-client" % "1.3.6", + "io.delta" %% "delta-sharing-client" % "1.3.8", // Test deps "org.scalatest" %% "scalatest" % scalaTestVersion % "test", diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala index 11230fcfe60..34cbef4b36b 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala @@ -73,6 +73,7 @@ private[sharing] class DeltaSharingDataSource logInfo(s"sourceSchema with parquet format for table path:$path, parameters:$parameters") val deltaLog = RemoteDeltaLog( path, + shareCredentialsOptions = options.shareCredentialsOptions, forStreaming = true, responseFormat = options.responseFormat ) @@ -94,9 +95,11 @@ private[sharing] class DeltaSharingDataSource ) } // 1. create delta sharing client - val parsedPath = DeltaSharingRestClient.parsePath(path) + val parsedPath = + DeltaSharingRestClient.parsePath(path, options.shareCredentialsOptions) val client = DeltaSharingRestClient( profileFile = parsedPath.profileFile, + shareCredentialsOptions = options.shareCredentialsOptions, forStreaming = true, responseFormat = options.responseFormat, // comma separated delta reader features, used to tell delta sharing server what delta @@ -185,7 +188,12 @@ private[sharing] class DeltaSharingDataSource if (options.responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_PARQUET) { logInfo(s"createSource with parquet format for table path:$path, parameters:$parameters") - val deltaLog = RemoteDeltaLog(path, forStreaming = true, options.responseFormat) + val deltaLog = RemoteDeltaLog( + path, + shareCredentialsOptions = options.shareCredentialsOptions, + forStreaming = true, + responseFormat = options.responseFormat + ) DeltaSharingSource(SparkSession.active, deltaLog, options) } else if (options.responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) { logInfo(s"createSource with delta format for table path:$path, parameters:$parameters") @@ -195,9 +203,11 @@ private[sharing] class DeltaSharingDataSource ) } // 1. create delta sharing client - val parsedPath = DeltaSharingRestClient.parsePath(path) + val parsedPath = + DeltaSharingRestClient.parsePath(path, options.shareCredentialsOptions) val client = DeltaSharingRestClient( profileFile = parsedPath.profileFile, + shareCredentialsOptions = options.shareCredentialsOptions, forStreaming = true, responseFormat = options.responseFormat, // comma separated delta reader features, used to tell delta sharing server what delta @@ -245,6 +255,7 @@ private[sharing] class DeltaSharingDataSource logInfo(s"createRelation with parquet format for table path:$path, parameters:$parameters") val deltaLog = RemoteDeltaLog( path, + shareCredentialsOptions = options.shareCredentialsOptions, forStreaming = false, responseFormat = options.responseFormat ) @@ -258,9 +269,11 @@ private[sharing] class DeltaSharingDataSource // delta features. logInfo(s"createRelation with delta format for table path:$path, parameters:$parameters") // 1. create delta sharing client - val parsedPath = DeltaSharingRestClient.parsePath(path) + val parsedPath = + DeltaSharingRestClient.parsePath(path, options.shareCredentialsOptions) val client = DeltaSharingRestClient( profileFile = parsedPath.profileFile, + shareCredentialsOptions = options.shareCredentialsOptions, forStreaming = false, responseFormat = options.responseFormat, // comma separated delta reader features, used to tell delta sharing server what delta @@ -316,8 +329,10 @@ private[sharing] class DeltaSharingDataSource options: DeltaSharingOptions, sqlContext: SQLContext): BaseRelation = { val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) - logInfo(s"autoResolving BaseRelation for path:${path}, with options:${options.options}.") - val parsedPath = DeltaSharingRestClient.parsePath(path) + logInfo(s"autoResolving BaseRelation for path:${path}, " + + s"with options:${DeltaSharingDataSource.redactOptions(options.options)}.") + val parsedPath = + DeltaSharingRestClient.parsePath(path, options.shareCredentialsOptions) val responseFormat = { if (sqlContext.sparkSession.sessionState.conf.getConf( @@ -336,6 +351,7 @@ private[sharing] class DeltaSharingDataSource val client = DeltaSharingRestClient( profileFile = parsedPath.profileFile, + shareCredentialsOptions = options.shareCredentialsOptions, forStreaming = false, // Indicating that the client is able to process response format in both parquet and delta. responseFormat = responseFormat, @@ -357,22 +373,26 @@ private[sharing] class DeltaSharingDataSource ) if (deltaTableMetadata.respondedFormat == DeltaSharingOptions.RESPONSE_FORMAT_PARQUET) { - logInfo(s"Resolved as parquet format for table path:$path, parameters:${options.options}") + logInfo(s"Resolved as parquet format for table path:$path, " + + s"parameters:${DeltaSharingDataSource.redactOptions(options.options)}") val deltaLog = RemoteDeltaLog( path = path, + options.shareCredentialsOptions, forStreaming = false, responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET, initDeltaTableMetadata = Some(deltaTableMetadata) ) deltaLog.createRelation(options.versionAsOf, options.timestampAsOf, options.cdfOptions) } else if (deltaTableMetadata.respondedFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) { - logInfo(s"Resolved as delta format for table path:$path, parameters:${options.options}") + logInfo(s"Resolved as delta format for table path:$path, " + + s"parameters:${DeltaSharingDataSource.redactOptions(options.options)}") val deltaSharingTableMetadata = DeltaSharingUtils.getDeltaSharingTableMetadata( table = dsTable, deltaTableMetadata = deltaTableMetadata ) val deltaOnlyClient = DeltaSharingRestClient( profileFile = parsedPath.profileFile, + shareCredentialsOptions = options.shareCredentialsOptions, forStreaming = false, // Indicating that the client request delta format in response. responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA, @@ -467,4 +487,14 @@ private[sharing] object DeltaSharingDataSource { ) PreSignedUrlCache.registerIfNeeded(SparkEnv.get) } + + def redactOptions(options: Map[String, String]): Map[String, String] = { + options.map { + case (k, _) if k.equalsIgnoreCase("bearerToken") => (k, "REDACTED") + case (k, _) if k.equalsIgnoreCase("clientId") => (k, "REDACTED") + case (k, _) if k.equalsIgnoreCase("clientSecret") => (k, "REDACTED") + case (k, _) if k.equalsIgnoreCase("scope") => (k, "REDACTED") + case (k, v) => (k, v) + } + } } diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaFormatSharingSourceSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaFormatSharingSourceSuite.scala index 12ad7dfcaca..6f26f8d2370 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaFormatSharingSourceSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaFormatSharingSourceSuite.scala @@ -60,9 +60,10 @@ class DeltaFormatSharingSourceSuite "path", throw DeltaSharingErrors.pathNotSpecifiedException ) - val parsedPath = DeltaSharingRestClient.parsePath(path) + val parsedPath = DeltaSharingRestClient.parsePath(path, Map.empty) val client = DeltaSharingRestClient( profileFile = parsedPath.profileFile, + shareCredentialsOptions = Map.empty, forStreaming = true, responseFormat = "delta", readerFeatures = DeltaSharingUtils.STREAMING_SUPPORTED_READER_FEATURES.mkString(",") diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala index 5a024d34aa8..18f116e9e89 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala @@ -195,7 +195,7 @@ class DeltaSharingCDFUtilsSuite def test(): Unit = { val profilePath = profileFile.getCanonicalPath val tablePath = new Path(s"$profilePath#$shareName.$schemaName.$sharedTableName") - val client = DeltaSharingRestClient(profilePath, false, "delta") + val client = DeltaSharingRestClient(profilePath, Map.empty, false, "delta") val dsTable = Table(share = shareName, schema = schemaName, name = sharedTableName) val options = new DeltaSharingOptions(Map("path" -> tablePath.toString)) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala index 74a87b34cce..1cd22b47df0 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala @@ -1572,6 +1572,43 @@ trait DeltaSharingDataSourceDeltaSuiteBase } } } + + test("DeltaSharingDataSource able to read data with inline credentials") { + withTempDir { tempDir => + val deltaTableName = "delta_table_inline_creds" + withTable(deltaTableName) { + createSimpleTable(deltaTableName, enableCdf = false) + sql(s"""INSERT INTO $deltaTableName VALUES (1, "one"), (2, "two")""") + + val sharedTableName = "shared_table_inline_creds" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + val map = Map( + "shareCredentialsVersion" -> "1", + "bearerToken" -> "xxx", + "endpoint" -> "https://xxx/delta-sharing/", + "expirationTime" -> "2099-01-01T00:00:00.000Z" + ) + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val expectedSchema: StructType = new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + + val df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .options(map) + .load(s"share1.default.$sharedTableName") + + assert(expectedSchema == df.schema) + val expected = spark.read.format("delta").table(deltaTableName) + checkAnswer(df, expected) + } + } + } + } } class DeltaSharingDataSourceDeltaSuite extends DeltaSharingDataSourceDeltaSuiteBase {} diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingFileIndexSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingFileIndexSuite.scala index 0856c891f99..35e933934b4 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingFileIndexSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingFileIndexSuite.scala @@ -229,7 +229,7 @@ class DeltaSharingFileIndexSuite profilePath: String, metaData: String): (Path, DeltaSharingFileIndex, DeltaSharingClient) = { val tablePath = new Path(s"$profilePath#$shareName.$schemaName.$sharedTableName") - val client = DeltaSharingRestClient(profilePath, false, "delta") + val client = DeltaSharingRestClient(profilePath, Map.empty, false, "delta") val spark = SparkSession.active val params = new DeltaSharingFileIndexParams(