Skip to content

Commit

Permalink
Added new schema test that creates and drops tables with a configurable
Browse files Browse the repository at this point in the history
number of fields.

Closes #36
  • Loading branch information
rustyrazorblade committed Feb 28, 2025
1 parent d992530 commit 8bcde4d
Show file tree
Hide file tree
Showing 24 changed files with 147 additions and 78 deletions.
49 changes: 4 additions & 45 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ dependencies {
implementation group: 'org.apache.commons', name: 'commons-text', version: '1.3'

// https://mvnrepository.com/artifact/com.datastax.cassandra/cassandra-driver-core
implementation group: 'com.datastax.cassandra', name: 'cassandra-driver-core', version: '3.11.5'
implementation "org.apache.cassandra:cassandra-driver-core:3.12.1"
implementation 'com.fasterxml.jackson.module:jackson-module-kotlin:2.13.4'

// https://mvnrepository.com/artifact/org.reflections/reflections
Expand Down Expand Up @@ -94,10 +94,10 @@ dependencies {
}

compileKotlin {
kotlinOptions.jvmTarget = "1.8"
kotlinOptions.jvmTarget = JavaVersion.VERSION_11
}
compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
kotlinOptions.jvmTarget = JavaVersion.VERSION_11
}

sourceSets {
Expand Down Expand Up @@ -141,7 +141,7 @@ ospackage {
vendor "Rustyrazorblade Consulting"
url "http://rustyrazorblade.com/easy-cass-stress/"
license "Apache License 2.0"
description "Stress Tool for Apache Cassandra by The Last Pickle"
description "Stress Tool for Apache Cassandra by Rustyrazorblade Consulting"
}

buildDeb {
Expand Down Expand Up @@ -174,47 +174,6 @@ applicationDistribution.from("LICENSE.txt") {
into ""
}

//
//task uploadDeb(type: Exec) {
// group = "Publish"
// workingDir 'build/distributions'
//
// def debPackage = "easy-cass-stress_${version}_all.deb"
//
// // get the deb package
//
// logger.info("Uploading DEB $debPackage")
// commandLine "curl", "-T", debPackage, "-u", System.getenv("BINTRAY_USER") + ":" + System.getenv("BINTRAY_KEY"), "https://api.bintray.com/content/rustyrazorblade/tlp-tools-deb/easy-cass-stress/${version}/$debPackage;deb_distribution=weezy,bionic,jessie,xenial;deb_component=main;deb_architecture=amd64;publish=1"
//
//}
//
//task uploadRpm(type: Exec) {
// group = "Publish"
// workingDir 'build/distributions'
//
// def rpmPackage = "easy-cass-stress-${version}.noarch.rpm"
//
// logger.info("Uploading RPM $rpmPackage")
// commandLine "curl", "-T", rpmPackage, "-u", System.getenv("BINTRAY_USER") + ":" + System.getenv("BINTRAY_KEY"), "https://api.bintray.com/content/rustyrazorblade/tlp-tools-rpm/easy-cass-stress/${version}/$rpmPackage;publish=1"
//}
//
//task uploadTar(type:Exec) {
// group = "Publish"
// workingDir 'build/distributions'
// def tarball = "easy-cass-stress-${version}.tar"
//
// logger.info("Uploading Tar $tarball")
// commandLine "curl", "-T", tarball, "-u", System.getenv("BINTRAY_USER") + ":" + System.getenv("BINTRAY_KEY"), "https://api.bintray.com/content/rustyrazorblade/tlp-tools-tarball/easy-cass-stress/${version}/$tarball;publish=1"
//}
//
//task uploadAll {
// group = "Publish"
// dependsOn "jib"
// dependsOn "uploadDeb"
// dependsOn "uploadRpm"
// dependsOn "uploadTar"
//}

wrapper {
distributionType = Wrapper.DistributionType.ALL
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package com.rustyrazorblade.easycassstress

import com.datastax.driver.core.ResultSet
import com.google.common.util.concurrent.FutureCallback
import com.rustyrazorblade.easycassstress.profiles.IStressRunner
import com.rustyrazorblade.easycassstress.profiles.Operation
import com.rustyrazorblade.easycassstress.workloads.IStressRunner
import com.rustyrazorblade.easycassstress.workloads.Operation
import org.apache.logging.log4j.kotlin.logger

/**
Expand Down Expand Up @@ -59,6 +59,12 @@ class OperationCallback(
context.metrics.selectHistogram.recordValue(time)
}
}
is Operation.DDL -> {
if (writeHdr) {
context.metrics.mutationHistogram.recordValue(time)
}
runner.onSuccess(op, result)
}
is Operation.Stop -> {
throw OperationStopException()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class PartitionKeyGenerator(
fun normal(prefix: String = "test"): PartitionKeyGenerator {
val generator = RandomDataGenerator()
return PartitionKeyGenerator({ max ->
var result = 0L
var result: Long
while (true) {
val mid = (max / 2).toDouble()
result = generator.nextGaussian(mid, mid / 4.0).toLong()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.rustyrazorblade.easycassstress

import com.rustyrazorblade.easycassstress.profiles.IStressProfile
import com.rustyrazorblade.easycassstress.workloads.IStressProfile
import org.apache.logging.log4j.kotlin.logger
import org.reflections.Reflections
import java.util.Optional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package com.rustyrazorblade.easycassstress

import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.MoreExecutors
import com.rustyrazorblade.easycassstress.profiles.IStressProfile
import com.rustyrazorblade.easycassstress.workloads.IStressProfile
import com.rustyrazorblade.easycassstress.workloads.Operation
import org.apache.logging.log4j.kotlin.logger
import java.time.Duration
import java.time.LocalTime
import java.time.format.DateTimeFormatter
import java.time.format.FormatStyle

class PartitionKeyGeneratorException(e: String) : Exception()
class PartitionKeyGeneratorException : Exception()

/**
* Single threaded profile runner.
Expand Down Expand Up @@ -43,7 +44,7 @@ class ProfileRunner(
"normal" -> PartitionKeyGenerator.normal(prefix)
"random" -> PartitionKeyGenerator.random(prefix)
"sequence" -> PartitionKeyGenerator.sequence(prefix)
else -> throw PartitionKeyGeneratorException("not a valid generator")
else -> throw PartitionKeyGeneratorException()
}
return partitionKeyGenerator
}
Expand Down Expand Up @@ -122,19 +123,30 @@ class ProfileRunner(

// pull requests off the queue instead of using generateKey
// move the getNextOperation into the queue thing
var paginate = context.mainArguments.paginate
for (op in queue.getNextOperation()) {
val future = context.session.executeAsync(op.bound)
val future = when (op) {
is Operation.DDL -> {
paginate = false
context.session.executeAsync(op.statement)
}
else -> {
context.session.executeAsync(op.bound)

}
}
Futures.addCallback(
future,
OperationCallback(
context,
runner,
op,
paginate = context.mainArguments.paginate,
paginate = paginate,
writeHdr = context.mainArguments.hdrHistogramPrefix != "",
),
MoreExecutors.directExecutor(),
)

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.rustyrazorblade.easycassstress

import com.codahale.metrics.Timer
import com.rustyrazorblade.easycassstress.profiles.IStressRunner
import com.rustyrazorblade.easycassstress.profiles.Operation
import com.rustyrazorblade.easycassstress.workloads.IStressRunner
import com.rustyrazorblade.easycassstress.workloads.Operation
import org.apache.logging.log4j.kotlin.logger
import java.time.LocalDateTime
import java.util.concurrent.ArrayBlockingQueue
Expand Down Expand Up @@ -51,6 +51,8 @@ class RequestQueue(
is Operation.Mutation -> context.metrics.mutations
is Operation.Deletion -> context.metrics.deletions
is Operation.Stop -> throw OperationStopException()
// maybe this should be under DDL, it's a weird case.
is Operation.DDL -> context.metrics.mutations
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rustyrazorblade.easycassstress.profiles
package com.rustyrazorblade.easycassstress.workloads

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rustyrazorblade.easycassstress.profiles
package com.rustyrazorblade.easycassstress.workloads

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rustyrazorblade.easycassstress.profiles
package com.rustyrazorblade.easycassstress.workloads

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.rustyrazorblade.easycassstress.workloads

import com.datastax.driver.core.ResultSet
import com.datastax.driver.core.Session
import com.rustyrazorblade.easycassstress.PartitionKey
import com.rustyrazorblade.easycassstress.StressContext
import com.rustyrazorblade.easycassstress.WorkloadParameter
import org.apache.logging.log4j.kotlin.logger
import java.util.concurrent.atomic.AtomicInteger
import java.util.stream.IntStream
import kotlin.collections.ArrayDeque

/**
* Creates and drops unique tables.
* Most useful when paired with another workload, to determine if
* impact of schema operations on running workloads.
*/
class CreateDrop : IStressProfile {
@WorkloadParameter("Number of fields in each table.")
var fields = 1

@WorkloadParameter("Number of tables to keep active")
var activeTables = 10

lateinit var currentTables : ArrayDeque<String>

var tableCount = AtomicInteger(0)

var logger = logger()

override fun prepare(session: Session) {
}

override fun schema(): List<String> {
return listOf()
}

override fun getDefaultReadRate() = 0.0


override fun getRunner(context: StressContext): IStressRunner {
currentTables = ArrayDeque(activeTables * 2)

return object : IStressRunner {
override fun getNextMutation(partitionKey: PartitionKey): Operation {
// consider removing this to be able to test concurrent schema modifications

synchronized(this) {
if (currentTables.size > activeTables) {
val name = currentTables.removeFirst()
val drop = "DROP TABLE IF EXISTS $name"
return Operation.DDL(statement = drop)
} else {
var next = tableCount.addAndGet(1)
var name = "create_drop_$next"
// create `fields` fields in the table
var fieldsStr = (0 until fields).map { "f${it} text" }.joinToString(", ")

var create = """CREATE TABLE $name ( id text, $fieldsStr, primary key (id) )"""
logger.debug(create)
currentTables.addLast(name)

return Operation.DDL(statement = create)
}
}
}


override fun getNextSelect(partitionKey: PartitionKey): Operation {
TODO("Not yet implemented")
}

override fun getNextDelete(partitionKey: PartitionKey): Operation {
TODO("Not yet implemented")
}

}
}


}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rustyrazorblade.easycassstress.profiles
package com.rustyrazorblade.easycassstress.workloads

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rustyrazorblade.easycassstress.profiles
package com.rustyrazorblade.easycassstress.workloads

import com.codahale.metrics.Timer.Context
import com.datastax.driver.core.BoundStatement
Expand Down Expand Up @@ -38,6 +38,11 @@ interface IStressRunner {
op: Operation.Mutation,
result: ResultSet?,
) { }

fun onSuccess(
op: Operation.DDL,
result: ResultSet?
) { }
}

/**
Expand Down Expand Up @@ -79,7 +84,7 @@ interface IStressProfile {
fun getRunner(context: StressContext): IStressRunner

/**
* returns a map of generators cooresponding to the different fields
* returns a map of generators corresponding to the different fields
* it's required to specify all fields that use a generator
* some fields don't, like TimeUUID or the first partition key
* This is optional, but encouraged
Expand All @@ -100,7 +105,9 @@ interface IStressProfile {
fun getPopulatePartitionKeyGenerator(): Optional<PartitionKeyGenerator> = Optional.empty()
}

sealed class Operation(val bound: BoundStatement?) {
sealed class Operation(val bound: BoundStatement? = null,
val statement: String? = null) {

// we're going to track metrics on the mutations differently
// inserts will also carry data that might be saved for later validation
// clustering keys won't be realistic to compute in the framework
Expand All @@ -113,4 +120,5 @@ sealed class Operation(val bound: BoundStatement?) {
class Deletion(bound: BoundStatement) : Operation(bound)

class Stop : Operation(null)
class DDL(statement: String) : Operation(null, statement=statement)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rustyrazorblade.easycassstress.profiles
package com.rustyrazorblade.easycassstress.workloads

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rustyrazorblade.easycassstress.profiles
package com.rustyrazorblade.easycassstress.workloads

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.ResultSet
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rustyrazorblade.easycassstress.profiles
package com.rustyrazorblade.easycassstress.workloads

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rustyrazorblade.easycassstress.profiles
package com.rustyrazorblade.easycassstress.workloads

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rustyrazorblade.easycassstress.profiles
package com.rustyrazorblade.easycassstress.workloads

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rustyrazorblade.easycassstress.profiles
package com.rustyrazorblade.easycassstress.workloads

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rustyrazorblade.easycassstress.profiles
package com.rustyrazorblade.easycassstress.workloads

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rustyrazorblade.easycassstress.profiles
package com.rustyrazorblade.easycassstress.workloads

import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Session
Expand Down
Loading

0 comments on commit 8bcde4d

Please sign in to comment.