Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ lazy val sharing = (project in file("sharing"))
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",

"io.delta" %% "delta-sharing-client" % "1.3.6",
"io.delta" %% "delta-sharing-client" % "1.3.8",

// Test deps
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ private[sharing] class DeltaSharingDataSource
logInfo(s"sourceSchema with parquet format for table path:$path, parameters:$parameters")
val deltaLog = RemoteDeltaLog(
path,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = true,
responseFormat = options.responseFormat
)
Expand All @@ -94,9 +95,11 @@ private[sharing] class DeltaSharingDataSource
)
}
// 1. create delta sharing client
val parsedPath = DeltaSharingRestClient.parsePath(path)
val parsedPath =
DeltaSharingRestClient.parsePath(path, options.shareCredentialsOptions)
val client = DeltaSharingRestClient(
profileFile = parsedPath.profileFile,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = true,
responseFormat = options.responseFormat,
// comma separated delta reader features, used to tell delta sharing server what delta
Expand Down Expand Up @@ -185,7 +188,12 @@ private[sharing] class DeltaSharingDataSource

if (options.responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_PARQUET) {
logInfo(s"createSource with parquet format for table path:$path, parameters:$parameters")
val deltaLog = RemoteDeltaLog(path, forStreaming = true, options.responseFormat)
val deltaLog = RemoteDeltaLog(
path,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = true,
responseFormat = options.responseFormat
)
DeltaSharingSource(SparkSession.active, deltaLog, options)
} else if (options.responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) {
logInfo(s"createSource with delta format for table path:$path, parameters:$parameters")
Expand All @@ -195,9 +203,11 @@ private[sharing] class DeltaSharingDataSource
)
}
// 1. create delta sharing client
val parsedPath = DeltaSharingRestClient.parsePath(path)
val parsedPath =
DeltaSharingRestClient.parsePath(path, options.shareCredentialsOptions)
val client = DeltaSharingRestClient(
profileFile = parsedPath.profileFile,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = true,
responseFormat = options.responseFormat,
// comma separated delta reader features, used to tell delta sharing server what delta
Expand Down Expand Up @@ -245,6 +255,7 @@ private[sharing] class DeltaSharingDataSource
logInfo(s"createRelation with parquet format for table path:$path, parameters:$parameters")
val deltaLog = RemoteDeltaLog(
path,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = false,
responseFormat = options.responseFormat
)
Expand All @@ -258,9 +269,11 @@ private[sharing] class DeltaSharingDataSource
// delta features.
logInfo(s"createRelation with delta format for table path:$path, parameters:$parameters")
// 1. create delta sharing client
val parsedPath = DeltaSharingRestClient.parsePath(path)
val parsedPath =
DeltaSharingRestClient.parsePath(path, options.shareCredentialsOptions)
val client = DeltaSharingRestClient(
profileFile = parsedPath.profileFile,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = false,
responseFormat = options.responseFormat,
// comma separated delta reader features, used to tell delta sharing server what delta
Expand Down Expand Up @@ -316,8 +329,10 @@ private[sharing] class DeltaSharingDataSource
options: DeltaSharingOptions,
sqlContext: SQLContext): BaseRelation = {
val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)
logInfo(s"autoResolving BaseRelation for path:${path}, with options:${options.options}.")
val parsedPath = DeltaSharingRestClient.parsePath(path)
logInfo(s"autoResolving BaseRelation for path:${path}, " +
s"with options:${DeltaSharingDataSource.redactOptions(options.options)}.")
val parsedPath =
DeltaSharingRestClient.parsePath(path, options.shareCredentialsOptions)

val responseFormat = {
if (sqlContext.sparkSession.sessionState.conf.getConf(
Expand All @@ -336,6 +351,7 @@ private[sharing] class DeltaSharingDataSource

val client = DeltaSharingRestClient(
profileFile = parsedPath.profileFile,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = false,
// Indicating that the client is able to process response format in both parquet and delta.
responseFormat = responseFormat,
Expand All @@ -357,22 +373,26 @@ private[sharing] class DeltaSharingDataSource
)

if (deltaTableMetadata.respondedFormat == DeltaSharingOptions.RESPONSE_FORMAT_PARQUET) {
logInfo(s"Resolved as parquet format for table path:$path, parameters:${options.options}")
logInfo(s"Resolved as parquet format for table path:$path, " +
s"parameters:${DeltaSharingDataSource.redactOptions(options.options)}")
val deltaLog = RemoteDeltaLog(
path = path,
options.shareCredentialsOptions,
forStreaming = false,
responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET,
initDeltaTableMetadata = Some(deltaTableMetadata)
)
deltaLog.createRelation(options.versionAsOf, options.timestampAsOf, options.cdfOptions)
} else if (deltaTableMetadata.respondedFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) {
logInfo(s"Resolved as delta format for table path:$path, parameters:${options.options}")
logInfo(s"Resolved as delta format for table path:$path, " +
s"parameters:${DeltaSharingDataSource.redactOptions(options.options)}")
val deltaSharingTableMetadata = DeltaSharingUtils.getDeltaSharingTableMetadata(
table = dsTable,
deltaTableMetadata = deltaTableMetadata
)
val deltaOnlyClient = DeltaSharingRestClient(
profileFile = parsedPath.profileFile,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = false,
// Indicating that the client request delta format in response.
responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
Expand Down Expand Up @@ -467,4 +487,14 @@ private[sharing] object DeltaSharingDataSource {
)
PreSignedUrlCache.registerIfNeeded(SparkEnv.get)
}

def redactOptions(options: Map[String, String]): Map[String, String] = {
options.map {
case (k, _) if k.equalsIgnoreCase("bearerToken") => (k, "REDACTED")
case (k, _) if k.equalsIgnoreCase("clientId") => (k, "REDACTED")
case (k, _) if k.equalsIgnoreCase("clientSecret") => (k, "REDACTED")
case (k, _) if k.equalsIgnoreCase("scope") => (k, "REDACTED")
case (k, v) => (k, v)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ class DeltaFormatSharingSourceSuite
"path",
throw DeltaSharingErrors.pathNotSpecifiedException
)
val parsedPath = DeltaSharingRestClient.parsePath(path)
val parsedPath = DeltaSharingRestClient.parsePath(path, Map.empty)
val client = DeltaSharingRestClient(
profileFile = parsedPath.profileFile,
shareCredentialsOptions = Map.empty,
forStreaming = true,
responseFormat = "delta",
readerFeatures = DeltaSharingUtils.STREAMING_SUPPORTED_READER_FEATURES.mkString(",")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class DeltaSharingCDFUtilsSuite
def test(): Unit = {
val profilePath = profileFile.getCanonicalPath
val tablePath = new Path(s"$profilePath#$shareName.$schemaName.$sharedTableName")
val client = DeltaSharingRestClient(profilePath, false, "delta")
val client = DeltaSharingRestClient(profilePath, Map.empty, false, "delta")
val dsTable = Table(share = shareName, schema = schemaName, name = sharedTableName)

val options = new DeltaSharingOptions(Map("path" -> tablePath.toString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,43 @@ trait DeltaSharingDataSourceDeltaSuiteBase
}
}
}

test("DeltaSharingDataSource able to read data with inline credentials") {
withTempDir { tempDir =>
val deltaTableName = "delta_table_inline_creds"
withTable(deltaTableName) {
createSimpleTable(deltaTableName, enableCdf = false)
sql(s"""INSERT INTO $deltaTableName VALUES (1, "one"), (2, "two")""")

val sharedTableName = "shared_table_inline_creds"
prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName)
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)

val map = Map(
"shareCredentialsVersion" -> "1",
"bearerToken" -> "xxx",
"endpoint" -> "https://xxx/delta-sharing/",
"expirationTime" -> "2099-01-01T00:00:00.000Z"
)

withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
val expectedSchema: StructType = new StructType()
.add("c1", IntegerType)
.add("c2", StringType)

val df = spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.options(map)
.load(s"share1.default.$sharedTableName")

assert(expectedSchema == df.schema)
val expected = spark.read.format("delta").table(deltaTableName)
checkAnswer(df, expected)
}
}
}
}
}

class DeltaSharingDataSourceDeltaSuite extends DeltaSharingDataSourceDeltaSuiteBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class DeltaSharingFileIndexSuite
profilePath: String,
metaData: String): (Path, DeltaSharingFileIndex, DeltaSharingClient) = {
val tablePath = new Path(s"$profilePath#$shareName.$schemaName.$sharedTableName")
val client = DeltaSharingRestClient(profilePath, false, "delta")
val client = DeltaSharingRestClient(profilePath, Map.empty, false, "delta")

val spark = SparkSession.active
val params = new DeltaSharingFileIndexParams(
Expand Down
Loading