Skip to content
This repository was archived by the owner on Mar 10, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,5 @@ matrix:
include:
- os: linux
jdk: oraclejdk8
- os: osx
osx_image: xcode8
script:
- mvn clean package -DargLine="-DCosmosDBEndpoint=$CosmosDBEndpoint -DCosmosDBKey=$CosmosDBKey"
38 changes: 4 additions & 34 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,42 +125,19 @@ limitations under the License.
<testSourceDirectory>./src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<!-- mvn assembly:single -->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<!-- mvn dependency:copy-dependencies -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/alternateLocation</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<excludeArtifactIds>::*</excludeArtifactIds>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -219,17 +196,10 @@ limitations under the License.
</executions>
</plugin>
<plugin>
<!-- mvn source:jar-no-fork -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
Expand All @@ -244,7 +214,7 @@ limitations under the License.
<execution>
<id>test</id>
<goals>
<!--<goal>test</goal>-->
<goal>test</goal>
</goals>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.microsoft.azure.cosmosdb.spark.config._
import com.microsoft.azure.documentdb._
import com.microsoft.azure.documentdb.internal._
import com.microsoft.azure.documentdb.rx._
import org.codehaus.jackson.map.ObjectMapper

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -162,6 +163,8 @@ private[spark] case class CosmosDBConnection(config: Config) extends LoggingTrai
while (feedResponse.getQueryIterator.hasNext) {
cfDocuments.addAll(feedResponse.getQueryIterable.fetchNextBlock())
}
val objectMapper = new ObjectMapper()
logDebug(s"change feed (partition=${changeFeedOptions.getPartitionKeyRangeId}, token=${changeFeedOptions.getRequestContinuation}): documents with id: ${objectMapper.writeValueAsString(cfDocuments.map(x => x.getId).toArray)}")
Tuple2.apply(cfDocuments.iterator(), feedResponse.getResponseContinuation)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ case class HdfsUtils(configMap: Map[String, String]) extends LoggingTrait {
val os = fs.create(path)
os.writeUTF(content)
os.close()
logInfo(s"Write $content for $path")
logDebug(s"Write $content for $path")
}

def read(base: String, filePath: String): String = {
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ log4j.category.org.apache.http.wire=INFO
log4j.category.org.apache.http.headers=INFO

log4j.category.com.microsoft.azure.documentdb=INFO
log4j.category.com.microsoft.azure.cosmosdb.spark=DEBUG

# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,22 @@ object CosmosDBDefaults {

class CosmosDBDefaults extends LoggingTrait {

val CosmosDBEndpoint: String = System.getProperty("CosmosDBEndpoint")
val CosmosDBKey: String = System.getProperty("CosmosDBKey")
val CosmosDBEndpoint: String = {
val cosmosDBEndpoint = "CosmosDBEndpoint"
val endpoint = System.getProperty(cosmosDBEndpoint)
if (endpoint != null)
endpoint
else
System.getenv(cosmosDBEndpoint)
}
val CosmosDBKey: String = {
val cosmosDBKey = "CosmosDBKey"
val key = System.getProperty(cosmosDBKey)
if (key != null)
key
else
System.getenv(cosmosDBKey)
}
val DatabaseName = "cosmosdb-spark-connector-test"
val PartitionKeyName = "pkey"

Expand Down Expand Up @@ -78,7 +92,7 @@ class CosmosDBDefaults extends LoggingTrait {
documentDBClient.createDatabase(database, null)
logInfo(s"Created collection with Id ${database.getId}")
} catch {
case NonFatal(e) => logError(s"Failed to create database '$databaseName'", e)
case NonFatal(e) => logWarning(s"Failed to create database '$databaseName'", e)
}
}

Expand All @@ -88,7 +102,7 @@ class CosmosDBDefaults extends LoggingTrait {
documentDBClient.deleteDatabase(databaseLink, null)
logInfo(s"Deleted collection with link '$databaseLink'")
} catch {
case NonFatal(e) => logError(s"Failed to delete database '$databaseLink'", e)
case NonFatal(e) => logWarning(s"Failed to delete database '$databaseLink'", e)
}
}

Expand Down Expand Up @@ -125,7 +139,7 @@ class CosmosDBDefaults extends LoggingTrait {
documentDBClient.deleteCollection(collectionLink, null)
logInfo(s"Deleted collection with link '$collectionLink'")
} catch {
case NonFatal(e) => logError(s"Failed to delete collection '$collectionLink'", e)
case NonFatal(e) => logWarning(s"Failed to delete collection '$collectionLink'", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ trait RequiresCosmosDB extends FlatSpecLike with Matchers with BeforeAndAfterAll
override def beforeAll(): Unit = {
// if running against localhost emulator
HttpClientFactory.DISABLE_HOST_NAME_VERIFICATION = true

cosmosDBDefaults.createDatabase(cosmosDBDefaults.DatabaseName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ class CosmosDBDataFrameSpec extends RequiresCosmosDB {
val streamingGapMs = TimeUnit.SECONDS.toMillis(10)
val insertIntervalMs = TimeUnit.SECONDS.toMillis(1) / 2
// There is a delay from starting the writing to the stream to the first data being written
val streamingSinkDelayMs = TimeUnit.SECONDS.toMillis(7)
val streamingSinkDelayMs = TimeUnit.SECONDS.toMillis(8)
val insertIterations: Int = ((streamingGapMs * 2 + streamingTimeMs) / insertIntervalMs).toInt

val cfCheckpointPath = "./changefeedcheckpoint"
Expand Down Expand Up @@ -623,7 +623,6 @@ class CosmosDBDataFrameSpec extends RequiresCosmosDB {
newDoc.set(cosmosDBDefaults.PartitionKeyName, i)
newDoc.set("content", s"sample content for document with ID $i")
documentClient.createDocument(sourceCollectionLink, newDoc, null, true)
logInfo(s"Created document with ID $i")
TimeUnit.MILLISECONDS.sleep(insertIntervalMs)
})
docIdIndex = docIdIndex + insertIterations
Expand Down Expand Up @@ -759,6 +758,8 @@ class CosmosDBDataFrameSpec extends RequiresCosmosDB {

streamingQuery.stop()
CosmosDBRDDIterator.resetCollectionContinuationTokens()

cosmosDBDefaults.deleteCollection(databaseName, sinkCollection)
}

it should "work with a slow source" in withSparkSession() { spark =>
Expand Down