Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ lazy val sharing = (project in file("sharing"))
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",

"io.delta" %% "delta-sharing-client" % "1.3.6",
"io.delta" %% "delta-sharing-client" % "1.3.7",

// Test deps
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ private[sharing] class DeltaSharingDataSource
logInfo(s"sourceSchema with parquet format for table path:$path, parameters:$parameters")
val deltaLog = RemoteDeltaLog(
path,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = true,
responseFormat = options.responseFormat
)
Expand All @@ -94,9 +95,11 @@ private[sharing] class DeltaSharingDataSource
)
}
// 1. create delta sharing client
val parsedPath = DeltaSharingRestClient.parsePath(path)
val parsedPath =
DeltaSharingRestClient.parsePath(path, options.shareCredentialsOptions)
val client = DeltaSharingRestClient(
profileFile = parsedPath.profileFile,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = true,
responseFormat = options.responseFormat,
// comma separated delta reader features, used to tell delta sharing server what delta
Expand Down Expand Up @@ -185,7 +188,12 @@ private[sharing] class DeltaSharingDataSource

if (options.responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_PARQUET) {
logInfo(s"createSource with parquet format for table path:$path, parameters:$parameters")
val deltaLog = RemoteDeltaLog(path, forStreaming = true, options.responseFormat)
val deltaLog = RemoteDeltaLog(
path,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = true,
responseFormat = options.responseFormat
)
DeltaSharingSource(SparkSession.active, deltaLog, options)
} else if (options.responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) {
logInfo(s"createSource with delta format for table path:$path, parameters:$parameters")
Expand All @@ -195,9 +203,11 @@ private[sharing] class DeltaSharingDataSource
)
}
// 1. create delta sharing client
val parsedPath = DeltaSharingRestClient.parsePath(path)
val parsedPath =
DeltaSharingRestClient.parsePath(path, options.shareCredentialsOptions)
val client = DeltaSharingRestClient(
profileFile = parsedPath.profileFile,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = true,
responseFormat = options.responseFormat,
// comma separated delta reader features, used to tell delta sharing server what delta
Expand Down Expand Up @@ -245,6 +255,7 @@ private[sharing] class DeltaSharingDataSource
logInfo(s"createRelation with parquet format for table path:$path, parameters:$parameters")
val deltaLog = RemoteDeltaLog(
path,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = false,
responseFormat = options.responseFormat
)
Expand All @@ -258,9 +269,11 @@ private[sharing] class DeltaSharingDataSource
// delta features.
logInfo(s"createRelation with delta format for table path:$path, parameters:$parameters")
// 1. create delta sharing client
val parsedPath = DeltaSharingRestClient.parsePath(path)
val parsedPath =
DeltaSharingRestClient.parsePath(path, options.shareCredentialsOptions)
val client = DeltaSharingRestClient(
profileFile = parsedPath.profileFile,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = false,
responseFormat = options.responseFormat,
// comma separated delta reader features, used to tell delta sharing server what delta
Expand Down Expand Up @@ -316,8 +329,10 @@ private[sharing] class DeltaSharingDataSource
options: DeltaSharingOptions,
sqlContext: SQLContext): BaseRelation = {
val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)
logInfo(s"autoResolving BaseRelation for path:${path}, with options:${options.options}.")
val parsedPath = DeltaSharingRestClient.parsePath(path)
logInfo(s"autoResolving BaseRelation for path:${path}, " +
s"with options:${DeltaSharingDataSource.redactOptions(options.options)}.")
val parsedPath =
DeltaSharingRestClient.parsePath(path, options.shareCredentialsOptions)

val responseFormat = {
if (sqlContext.sparkSession.sessionState.conf.getConf(
Expand All @@ -336,6 +351,7 @@ private[sharing] class DeltaSharingDataSource

val client = DeltaSharingRestClient(
profileFile = parsedPath.profileFile,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = false,
// Indicating that the client is able to process response format in both parquet and delta.
responseFormat = responseFormat,
Expand All @@ -357,22 +373,26 @@ private[sharing] class DeltaSharingDataSource
)

if (deltaTableMetadata.respondedFormat == DeltaSharingOptions.RESPONSE_FORMAT_PARQUET) {
logInfo(s"Resolved as parquet format for table path:$path, parameters:${options.options}")
logInfo(s"Resolved as parquet format for table path:$path, " +
s"parameters:${DeltaSharingDataSource.redactOptions(options.options)}")
val deltaLog = RemoteDeltaLog(
path = path,
options.shareCredentialsOptions,
forStreaming = false,
responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET,
initDeltaTableMetadata = Some(deltaTableMetadata)
)
deltaLog.createRelation(options.versionAsOf, options.timestampAsOf, options.cdfOptions)
} else if (deltaTableMetadata.respondedFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) {
logInfo(s"Resolved as delta format for table path:$path, parameters:${options.options}")
logInfo(s"Resolved as delta format for table path:$path, " +
s"parameters:${DeltaSharingDataSource.redactOptions(options.options)}")
val deltaSharingTableMetadata = DeltaSharingUtils.getDeltaSharingTableMetadata(
table = dsTable,
deltaTableMetadata = deltaTableMetadata
)
val deltaOnlyClient = DeltaSharingRestClient(
profileFile = parsedPath.profileFile,
shareCredentialsOptions = options.shareCredentialsOptions,
forStreaming = false,
// Indicating that the client request delta format in response.
responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
Expand Down Expand Up @@ -467,4 +487,11 @@ private[sharing] object DeltaSharingDataSource {
)
PreSignedUrlCache.registerIfNeeded(SparkEnv.get)
}

def redactOptions(options: Map[String, String]): Map[String, String] = {
options.map {
case (k, _) if k.equalsIgnoreCase("bearerToken") => (k, "REDACTED")
case (k, v) => (k, v)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ class DeltaFormatSharingSourceSuite
"path",
throw DeltaSharingErrors.pathNotSpecifiedException
)
val parsedPath = DeltaSharingRestClient.parsePath(path)
val parsedPath = DeltaSharingRestClient.parsePath(path, Map.empty)
val client = DeltaSharingRestClient(
profileFile = parsedPath.profileFile,
shareCredentialsOptions = Map.empty,
forStreaming = true,
responseFormat = "delta",
readerFeatures = DeltaSharingUtils.STREAMING_SUPPORTED_READER_FEATURES.mkString(",")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class DeltaSharingCDFUtilsSuite
def test(): Unit = {
val profilePath = profileFile.getCanonicalPath
val tablePath = new Path(s"$profilePath#$shareName.$schemaName.$sharedTableName")
val client = DeltaSharingRestClient(profilePath, false, "delta")
val client = DeltaSharingRestClient(profilePath, Map.empty, false, "delta")
val dsTable = Table(share = shareName, schema = schemaName, name = sharedTableName)

val options = new DeltaSharingOptions(Map("path" -> tablePath.toString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,43 @@ trait DeltaSharingDataSourceDeltaSuiteBase
}
}
}

test("DeltaSharingDataSource able to read data with inline credentials") {
withTempDir { tempDir =>
val deltaTableName = "delta_table_inline_creds"
withTable(deltaTableName) {
createSimpleTable(deltaTableName, enableCdf = false)
sql(s"""INSERT INTO $deltaTableName VALUES (1, "one"), (2, "two")""")

val sharedTableName = "shared_table_inline_creds"
prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName)
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)

val map = Map(
"shareCredentialsVersion" -> "1",
"bearerToken" -> "xxx",
"endpoint" -> "https://xxx/delta-sharing/",
"expirationTime" -> "2099-01-01T00:00:00.000Z"
)

withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
val expectedSchema: StructType = new StructType()
.add("c1", IntegerType)
.add("c2", StringType)

val df = spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.options(map)
.load(s"share1.default.$sharedTableName")

assert(expectedSchema == df.schema)
val expected = spark.read.format("delta").table(deltaTableName)
checkAnswer(df, expected)
}
}
}
}
}

class DeltaSharingDataSourceDeltaSuite extends DeltaSharingDataSourceDeltaSuiteBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class DeltaSharingFileIndexSuite
profilePath: String,
metaData: String): (Path, DeltaSharingFileIndex, DeltaSharingClient) = {
val tablePath = new Path(s"$profilePath#$shareName.$schemaName.$sharedTableName")
val client = DeltaSharingRestClient(profilePath, false, "delta")
val client = DeltaSharingRestClient(profilePath, Map.empty, false, "delta")

val spark = SparkSession.active
val params = new DeltaSharingFileIndexParams(
Expand Down
Loading