diff --git a/.travis.yml b/.travis.yml index d806a39a..6fc37524 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,7 +9,5 @@ matrix: include: - os: linux jdk: oraclejdk8 - - os: osx - osx_image: xcode8 script: - mvn clean package -DargLine="-DCosmosDBEndpoint=$CosmosDBEndpoint -DCosmosDBKey=$CosmosDBKey" diff --git a/pom.xml b/pom.xml index 0f57d3f6..787212ea 100644 --- a/pom.xml +++ b/pom.xml @@ -125,42 +125,19 @@ limitations under the License. ./src/test/scala + maven-assembly-plugin jar-with-dependencies - - - make-assembly - package - - single - - - + org.apache.maven.plugins maven-dependency-plugin 2.8 - - - copy-dependencies - package - - copy-dependencies - - - ${project.build.directory}/alternateLocation - false - false - true - ::* - - - org.apache.maven.plugins @@ -219,17 +196,10 @@ limitations under the License. + org.apache.maven.plugins maven-source-plugin 2.2.1 - - - attach-sources - - jar-no-fork - - - org.scalatest @@ -244,7 +214,7 @@ limitations under the License. test - + test diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala index af769ab2..d9ecd9a9 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala @@ -27,6 +27,7 @@ import com.microsoft.azure.cosmosdb.spark.config._ import com.microsoft.azure.documentdb._ import com.microsoft.azure.documentdb.internal._ import com.microsoft.azure.documentdb.rx._ +import org.codehaus.jackson.map.ObjectMapper import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer @@ -162,6 +163,8 @@ private[spark] case class CosmosDBConnection(config: Config) extends LoggingTrai while (feedResponse.getQueryIterator.hasNext) { cfDocuments.addAll(feedResponse.getQueryIterable.fetchNextBlock()) } + val objectMapper = new ObjectMapper() + logDebug(s"change feed (partition=${changeFeedOptions.getPartitionKeyRangeId}, token=${changeFeedOptions.getRequestContinuation}): documents with id: ${objectMapper.writeValueAsString(cfDocuments.map(x => x.getId).toArray)}") Tuple2.apply(cfDocuments.iterator(), feedResponse.getResponseContinuation) } diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala index 2501689a..3d0d8fe8 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala @@ -45,7 +45,7 @@ case class HdfsUtils(configMap: Map[String, String]) extends LoggingTrait { val os = fs.create(path) os.writeUTF(content) os.close() - logInfo(s"Write $content for $path") + logDebug(s"Write $content for $path") } def read(base: String, filePath: String): String = { diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties index 577f6f91..9d0b6200 100644 --- a/src/test/resources/log4j.properties +++ b/src/test/resources/log4j.properties @@ -9,6 +9,7 @@ log4j.category.org.apache.http.wire=INFO log4j.category.org.apache.http.headers=INFO log4j.category.com.microsoft.azure.documentdb=INFO +log4j.category.com.microsoft.azure.cosmosdb.spark=DEBUG # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender diff --git a/src/test/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBDefaults.scala b/src/test/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBDefaults.scala index 70ba84b2..8cfe1643 100644 --- a/src/test/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBDefaults.scala +++ b/src/test/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBDefaults.scala @@ -35,8 +35,22 @@ object CosmosDBDefaults { class CosmosDBDefaults extends LoggingTrait { - val CosmosDBEndpoint: String = System.getProperty("CosmosDBEndpoint") - val CosmosDBKey: String = System.getProperty("CosmosDBKey") + val CosmosDBEndpoint: String = { + val cosmosDBEndpoint = "CosmosDBEndpoint" + val endpoint = System.getProperty(cosmosDBEndpoint) + if (endpoint != null) + endpoint + else + System.getenv(cosmosDBEndpoint) + } + val CosmosDBKey: String = { + val cosmosDBKey = "CosmosDBKey" + val key = System.getProperty(cosmosDBKey) + if (key != null) + key + else + System.getenv(cosmosDBKey) + } val DatabaseName = "cosmosdb-spark-connector-test" val PartitionKeyName = "pkey" @@ -78,7 +92,7 @@ class CosmosDBDefaults extends LoggingTrait { documentDBClient.createDatabase(database, null) logInfo(s"Created collection with Id ${database.getId}") } catch { - case NonFatal(e) => logError(s"Failed to create database '$databaseName'", e) + case NonFatal(e) => logWarning(s"Failed to create database '$databaseName'", e) } } @@ -88,7 +102,7 @@ class CosmosDBDefaults extends LoggingTrait { documentDBClient.deleteDatabase(databaseLink, null) logInfo(s"Deleted collection with link '$databaseLink'") } catch { - case NonFatal(e) => logError(s"Failed to delete database '$databaseLink'", e) + case NonFatal(e) => logWarning(s"Failed to delete database '$databaseLink'", e) } } @@ -125,7 +139,7 @@ class CosmosDBDefaults extends LoggingTrait { documentDBClient.deleteCollection(collectionLink, null) logInfo(s"Deleted collection with link '$collectionLink'") } catch { - case NonFatal(e) => logError(s"Failed to delete collection '$collectionLink'", e) + case NonFatal(e) => logWarning(s"Failed to delete collection '$collectionLink'", e) } } diff --git a/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala b/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala index 85b985ca..28481b22 100644 --- a/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala +++ b/src/test/scala/com/microsoft/azure/cosmosdb/spark/RequiresCosmosDB.scala @@ -68,7 +68,7 @@ trait RequiresCosmosDB extends FlatSpecLike with Matchers with BeforeAndAfterAll override def beforeAll(): Unit = { // if running against localhost emulator HttpClientFactory.DISABLE_HOST_NAME_VERIFICATION = true - + cosmosDBDefaults.createDatabase(cosmosDBDefaults.DatabaseName) } diff --git a/src/test/scala/com/microsoft/azure/cosmosdb/spark/schema/CosmosDBDataFrameSpec.scala b/src/test/scala/com/microsoft/azure/cosmosdb/spark/schema/CosmosDBDataFrameSpec.scala index f73f869d..28b295a3 100644 --- a/src/test/scala/com/microsoft/azure/cosmosdb/spark/schema/CosmosDBDataFrameSpec.scala +++ b/src/test/scala/com/microsoft/azure/cosmosdb/spark/schema/CosmosDBDataFrameSpec.scala @@ -580,7 +580,7 @@ class CosmosDBDataFrameSpec extends RequiresCosmosDB { val streamingGapMs = TimeUnit.SECONDS.toMillis(10) val insertIntervalMs = TimeUnit.SECONDS.toMillis(1) / 2 // There is a delay from starting the writing to the stream to the first data being written - val streamingSinkDelayMs = TimeUnit.SECONDS.toMillis(7) + val streamingSinkDelayMs = TimeUnit.SECONDS.toMillis(8) val insertIterations: Int = ((streamingGapMs * 2 + streamingTimeMs) / insertIntervalMs).toInt val cfCheckpointPath = "./changefeedcheckpoint" @@ -623,7 +623,6 @@ class CosmosDBDataFrameSpec extends RequiresCosmosDB { newDoc.set(cosmosDBDefaults.PartitionKeyName, i) newDoc.set("content", s"sample content for document with ID $i") documentClient.createDocument(sourceCollectionLink, newDoc, null, true) - logInfo(s"Created document with ID $i") TimeUnit.MILLISECONDS.sleep(insertIntervalMs) }) docIdIndex = docIdIndex + insertIterations @@ -759,6 +758,8 @@ class CosmosDBDataFrameSpec extends RequiresCosmosDB { streamingQuery.stop() CosmosDBRDDIterator.resetCollectionContinuationTokens() + + cosmosDBDefaults.deleteCollection(databaseName, sinkCollection) } it should "work with a slow source" in withSparkSession() { spark =>