Skip to content
21 changes: 18 additions & 3 deletions client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ int client::connect(void)
evbuffer_drain(m_write_buf, evbuffer_get_length(m_write_buf));

if (m_unix_sockaddr != NULL) {
m_sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
m_sockfd = socket(AF_UNIX, m_config->use_udp ? SOCK_DGRAM : SOCK_STREAM, 0);
if (m_sockfd < 0) {
return -errno;
}
Expand All @@ -300,8 +300,10 @@ int client::connect(void)
error = setsockopt(m_sockfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
assert(error == 0);

error = setsockopt(m_sockfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
assert(error == 0);
if (!m_config->use_udp) {
error = setsockopt(m_sockfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
assert(error == 0);
}
}

// set non-blcoking behavior
Expand Down Expand Up @@ -555,6 +557,7 @@ void client::create_request(void)
assert(key != NULL);
assert(keylen > 0);

keylen = 6; // FIXME const
benchmark_debug_log("GET key=[%.*s]\n", keylen, key);
cmd_size = m_protocol->write_command_get(key, keylen, m_config->data_offset);

Expand Down Expand Up @@ -618,13 +621,25 @@ void client::handle_response(request *request, protocol_response *response)
{
switch (request->m_type) {
case rt_get:
if (m_config->transaction_latency) {
// NOTE using printf adds latency to the client because of the system call, but we're measuring transaction latency, not throughput.
// FIXME might be preferable to print to some user-specified file, rather than to stdout
printf("GET %lu\n", ts_diff_now(request->m_sent_time));
}

m_stats.update_get_op(NULL,
request->m_size + response->get_total_len(),
ts_diff_now(request->m_sent_time),
response->get_hits(),
request->m_keys - response->get_hits());
break;
case rt_set:
if (m_config->transaction_latency) {
// NOTE using printf adds latency to the client because of the system call, but we're measuring transaction latency, not throughput.
// FIXME might be preferable to print to some user-specified file, rather than to stdout
printf("SET %lu\n", ts_diff_now(request->m_sent_time));
}

m_stats.update_set_op(NULL,
request->m_size + response->get_total_len(),
ts_diff_now(request->m_sent_time));
Expand Down
6 changes: 3 additions & 3 deletions config_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ const char* config_weight_list::print(char *buf, int buf_len)
}


server_addr::server_addr(const char *hostname, int port) :
m_hostname(hostname), m_port(port), m_server_addr(NULL), m_used_addr(NULL), m_last_error(0)
server_addr::server_addr(const char *hostname, int port, transport_protocol protocol) :
m_hostname(hostname), m_port(port), m_protocol(protocol), m_server_addr(NULL), m_used_addr(NULL), m_last_error(0)
{
int error = resolve();

Expand All @@ -215,7 +215,7 @@ int server_addr::resolve(void)

memset(&hints, 0, sizeof(hints));
hints.ai_flags = AI_PASSIVE;
hints.ai_socktype = SOCK_STREAM;
hints.ai_socktype = m_protocol;
hints.ai_family = AF_INET; // Don't play with IPv6 for now...

snprintf(port_str, sizeof(port_str)-1, "%u", m_port);
Expand Down
4 changes: 3 additions & 1 deletion config_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ struct connect_info {
};

struct server_addr {
server_addr(const char *hostname, int port);
enum transport_protocol {TCP=SOCK_STREAM, UDP=SOCK_DGRAM};
server_addr(const char *hostname, int port, transport_protocol proto);
virtual ~server_addr();

int get_connect_info(struct connect_info *ci);
Expand All @@ -88,6 +89,7 @@ struct server_addr {

std::string m_hostname;
int m_port;
int m_protocol;
struct addrinfo *m_server_addr;
struct addrinfo *m_used_addr;
int m_last_error;
Expand Down
16 changes: 16 additions & 0 deletions libmemcached_protocol/binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,22 @@ extern "C"
PROTOCOL_BINARY_RAW_BYTES = 0x00
} protocol_binary_datatypes;

/**
* Definition of the extra header that is used when using the
* memcached binary protocol over UDP. This header lies between
* the UDP header and the memcached datagram.
* See https://github.com/memcached/memcached/blob/master/doc/protocol.txt#L922
*/
typedef union {
struct {
uint16_t request_id;
uint16_t sequence_no;
uint16_t total_datagrams;
uint16_t reserved; // Must be set to 0
} header;
uint8_t bytes[8];
} protocol_binary_udp_header;

/**
* Definition of the header structure for a request packet.
* See section 2
Expand Down
47 changes: 40 additions & 7 deletions memtier_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
"multi_key_get = %u\n"
"authenticate = %s\n"
"select-db = %d\n"
"no-expiry = %s\n",
"no-expiry = %s\n"
"transaction_latency = %s\n"
"key-width = %u\n"
"udp = %s\n",
cfg->server,
cfg->port,
cfg->unix_socket,
Expand Down Expand Up @@ -155,7 +158,10 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
cfg->multi_key_get,
cfg->authenticate ? cfg->authenticate : "",
cfg->select_db,
cfg->no_expiry ? "yes" : "no");
cfg->no_expiry ? "yes" : "no",
cfg->transaction_latency ? "yes" : "no",
cfg->key_width,
cfg->use_udp ? "yes" : "no");
}

static void config_init_defaults(struct benchmark_config *cfg)
Expand Down Expand Up @@ -196,6 +202,8 @@ static void config_init_defaults(struct benchmark_config *cfg)
}
if (!cfg->requests && !cfg->test_time)
cfg->requests = 10000;
if (!cfg->key_width)
cfg->key_width = OBJECT_GENERATOR_KEY_WIDTH;
}

static int generate_random_seed()
Expand Down Expand Up @@ -241,7 +249,10 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
o_generate_keys,
o_multi_key_get,
o_select_db,
o_no_expiry
o_no_expiry,
o_transaction_latency,
o_key_width,
o_use_udp,
};

static struct option long_options[] = {
Expand Down Expand Up @@ -287,6 +298,9 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
{ "no-expiry", 0, 0, o_no_expiry },
{ "help", 0, 0, 'h' },
{ "version", 0, 0, 'v' },
{ "transaction_latency", 0, 0, o_transaction_latency },
{ "key-width", 1, 0, o_key_width },
{ "udp", 0, 0, o_use_udp },
{ NULL, 0, 0, 0 }
};

Expand Down Expand Up @@ -553,6 +567,22 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
case o_no_expiry:
cfg->no_expiry = true;
break;
case o_transaction_latency:
cfg->transaction_latency = true;
break;
case o_key_width:
endptr = NULL;
cfg->key_width = (unsigned short) strtoul(optarg, &endptr, 10);
if (!cfg->key_width || cfg->key_width > OBJECT_GENERATOR_KEY_WIDTH || !endptr || *endptr != '\0') {
fprintf(stderr, "error: key-width must be a number in the range [1-%u].\n", OBJECT_GENERATOR_KEY_WIDTH);
return -1;
}
break;
case o_use_udp:
//FIXME check that key and value size can fit in a datagram
fprintf(stderr, "Warning: generating traffic over UDP is still experimental. Check that the size of requests can fit in a UDP datagram.");
cfg->use_udp = true;
break;
default:
return -1;
break;
Expand All @@ -570,6 +600,7 @@ void usage() {
" -s, --server=ADDR Server address (default: localhost)\n"
" -p, --port=PORT Server port (default: 6379)\n"
" -S, --unix-socket=SOCKET UNIX Domain socket name (default: none)\n"
" --udp Connect using UDP rather than TCP (default: false)\n"
" -P, --protocol=PROTOCOL Protocol to use (default: redis). Other\n"
" supported protocols are memcache_text,\n"
" memcache_binary.\n"
Expand Down Expand Up @@ -597,6 +628,7 @@ void usage() {
" --select-db=DB DB number to select, when testing a redis server\n"
" --distinct-client-seed Use a different random seed for each client\n"
" --randomize random seed based on timestamp (default is constant value)\n"
" --transaction_latency Measure and report the latency of each transaction\n"
"\n"
"Object Options:\n"
" -d --data-size=SIZE Object data size (default: 32)\n"
Expand All @@ -619,6 +651,7 @@ void usage() {
" --no-expiry Ignore expiry information in imported data\n"
"\n"
"Key Options:\n"
" --key-width=NUMBER Maximum key size (default: \"250\")\n"
" --key-prefix=PREFIX Prefix for keys (default: \"memtier-\")\n"
" --key-minimum=NUMBER Key ID minimum value (default: 0)\n"
" --key-maximum=NUMBER Key ID maximum value (default: 10000000)\n"
Expand Down Expand Up @@ -651,7 +684,7 @@ struct cg_thread {
cg_thread(unsigned int id, benchmark_config* config, object_generator* obj_gen) :
m_thread_id(id), m_config(config), m_obj_gen(obj_gen), m_cg(NULL), m_protocol(NULL), m_finished(false)
{
m_protocol = protocol_factory(m_config->protocol);
m_protocol = protocol_factory(m_config->protocol, m_config->use_udp);
assert(m_protocol != NULL);

m_cg = new client_group(m_config, m_protocol, m_obj_gen);
Expand Down Expand Up @@ -868,7 +901,7 @@ int main(int argc, char *argv[])

if (cfg.server != NULL && cfg.port > 0) {
try {
cfg.server_addr = new server_addr(cfg.server, cfg.port);
cfg.server_addr = new server_addr(cfg.server, cfg.port, cfg.use_udp ? server_addr::UDP : server_addr::TCP);
} catch (std::runtime_error& e) {
benchmark_error_log("%s:%u: error: %s\n",
cfg.server, cfg.port, e.what());
Expand Down Expand Up @@ -903,7 +936,7 @@ int main(int argc, char *argv[])
exit(1);
}

obj_gen = new object_generator();
obj_gen = new object_generator(cfg.key_width);
assert(obj_gen != NULL);
} else {
// check paramters
Expand Down Expand Up @@ -1094,7 +1127,7 @@ int main(int argc, char *argv[])
// If needed, data verification is done now...
if (cfg.data_verify) {
struct event_base *verify_event_base = event_base_new();
abstract_protocol *verify_protocol = protocol_factory(cfg.protocol);
abstract_protocol *verify_protocol = protocol_factory(cfg.protocol, cfg.use_udp);
verify_client *client = new verify_client(verify_event_base, &cfg, verify_protocol, obj_gen);

fprintf(outfile, "\n\nPerforming data verification...\n");
Expand Down
3 changes: 3 additions & 0 deletions memtier_benchmark.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ struct benchmark_config {
int select_db;
bool no_expiry;
bool resolve_on_connect;
bool transaction_latency;
unsigned short key_width;
bool use_udp;
};


Expand Down
45 changes: 42 additions & 3 deletions obj_gen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <unistd.h>
#include <fcntl.h>
#include <math.h>
#include <stdexcept>

#ifdef HAVE_ASSERT_H
#include <assert.h>
Expand Down Expand Up @@ -117,6 +118,7 @@ int gaussian_noise::gaussian_distribution_range(double stddev, double median, in
}

object_generator::object_generator() :
m_key_width(OBJECT_GENERATOR_KEY_WIDTH),
m_data_size_type(data_size_unknown),
m_data_size_pattern(NULL),
m_random_data(false),
Expand All @@ -136,7 +138,31 @@ object_generator::object_generator() :
m_data_size.size_list = NULL;
}

object_generator::object_generator(unsigned int key_width) :
m_key_width(key_width),
m_data_size_type(data_size_unknown),
m_data_size_pattern(NULL),
m_random_data(false),
m_expiry_min(0),
m_expiry_max(0),
m_key_prefix(NULL),
m_key_min(0),
m_key_max(0),
m_key_stddev(0),
m_key_median(0),
m_value_buffer(NULL),
m_random_fd(-1)
{
assert(m_key_width <= OBJECT_GENERATOR_KEY_WIDTH);

for (int i = 0; i < OBJECT_GENERATOR_KEY_ITERATORS; i++)
m_next_key[i] = 0;

m_data_size.size_list = NULL;
}

object_generator::object_generator(const object_generator& copy) :
m_key_width(copy.m_key_width),
m_data_size_type(copy.m_data_size_type),
m_data_size(copy.m_data_size),
m_data_size_pattern(copy.m_data_size_pattern),
Expand All @@ -150,7 +176,6 @@ object_generator::object_generator(const object_generator& copy) :
m_key_median(copy.m_key_median),
m_value_buffer(NULL),
m_random_fd(-1)

{
if (m_data_size_type == data_size_weighted &&
m_data_size.size_list != NULL) {
Expand Down Expand Up @@ -304,15 +329,29 @@ void object_generator::set_expiry_range(unsigned int expiry_min, unsigned int ex
m_expiry_max = expiry_max;
}

void object_generator::check_key_size()
{
unsigned int width_of_key_prefix = m_key_prefix == NULL ? 0 : strlen(m_key_prefix);
unsigned int width_of_key_max = (unsigned)log10((double)m_key_max) + 1;
if (width_of_key_prefix + width_of_key_max > m_key_width) {
char str [200];
sprintf(str, "Key prefix '%s' (length %u) exceeds maximum key width (%u) when combined with the maximum key index (%u, length %u)", m_key_prefix, width_of_key_prefix, m_key_width, m_key_max, width_of_key_max);
throw std::logic_error(str);
}
}

void object_generator::set_key_prefix(const char *key_prefix)
{
m_key_prefix = key_prefix;
check_key_size();
}

void object_generator::set_key_range(unsigned int key_min, unsigned int key_max)
{
assert (key_min <= key_max);
m_key_min = key_min;
m_key_max = key_max;
check_key_size();
}

void object_generator::set_key_distribution(double key_stddev, double key_median)
Expand Down Expand Up @@ -360,9 +399,9 @@ const char* object_generator::get_key(int iter, unsigned int *len)
{
unsigned int l;
m_key_index = get_key_index(iter);

// format key
l = snprintf(m_key_buffer, sizeof(m_key_buffer)-1,
l = snprintf(m_key_buffer, m_key_width - 1,
"%s%u", m_key_prefix, m_key_index);
if (len != NULL) *len = l;

Expand Down
Loading