@@ -182,6 +182,7 @@ int populate(FDBTransaction *transaction, mako_args_t *args, int worker_id,
182
182
int end = insert_end (args -> rows , worker_id , thread_id , args -> num_processes ,
183
183
args -> num_threads );
184
184
int xacts = 0 ;
185
+ int tracetimer = 0 ;
185
186
186
187
keystr = (char * )malloc (sizeof (char ) * args -> key_length + 1 );
187
188
if (!keystr )
@@ -200,8 +201,13 @@ int populate(FDBTransaction *transaction, mako_args_t *args, int worker_id,
200
201
201
202
for (i = begin ; i <= end ; i ++ ) {
202
203
203
- if ((thread_tps > 0 ) && (xacts >= thread_tps )) {
204
- /* throttling is on */
204
+ /* sequential keys */
205
+ genkey (keystr , i , args -> rows , args -> key_length + 1 );
206
+ /* random values */
207
+ randstr (valstr , args -> value_length + 1 );
208
+
209
+ if (((thread_tps > 0 ) && (xacts >= thread_tps )) /* throttle */ ||
210
+ (args -> txntrace ) /* txn tracing */ ){
205
211
206
212
throttle :
207
213
clock_gettime (CLOCK_MONOTONIC_COARSE , & timer_now );
@@ -212,17 +218,40 @@ int populate(FDBTransaction *transaction, mako_args_t *args, int worker_id,
212
218
xacts = 0 ;
213
219
timer_prev .tv_sec = timer_now .tv_sec ;
214
220
timer_prev .tv_nsec = timer_now .tv_nsec ;
221
+
222
+ /* enable transaction tracing */
223
+ if (args -> txntrace ) {
224
+ tracetimer ++ ;
225
+ if (tracetimer == args -> txntrace ) {
226
+ fdb_error_t err ;
227
+ tracetimer = 0 ;
228
+ fprintf (debugme , "DEBUG: txn tracing %s\n" , keystr );
229
+ err = fdb_transaction_set_option (transaction ,
230
+ FDB_TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER ,
231
+ (uint8_t * )keystr , strlen (keystr ));
232
+ if (err ) {
233
+ fprintf (stderr ,
234
+ "ERROR: fdb_transaction_set_option(FDB_TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER): %s\n" ,
235
+ fdb_get_error (err ));
236
+ }
237
+ err = fdb_transaction_set_option (transaction ,
238
+ FDB_TR_OPTION_LOG_TRANSACTION ,
239
+ (uint8_t * )NULL , 0 );
240
+ if (err ) {
241
+ fprintf (stderr ,
242
+ "ERROR: fdb_transaction_set_option(FDB_TR_OPTION_LOG_TRANSACTION): %s\n" ,
243
+ fdb_get_error (err ));
244
+ }
245
+ }
246
+ }
215
247
} else {
216
- /* 1 second not passed, throttle */
217
- usleep (1000 ); /* sleep for 1ms */
218
- goto throttle ;
248
+ if (thread_tps > 0 ) {
249
+ /* 1 second not passed, throttle */
250
+ usleep (1000 ); /* sleep for 1ms */
251
+ goto throttle ;
252
+ }
219
253
}
220
- } /* throttle */
221
-
222
- /* sequential keys */
223
- genkey (keystr , i , args -> rows , args -> key_length + 1 );
224
- /* random values */
225
- randstr (valstr , args -> value_length + 1 );
254
+ } /* throttle or txntrace */
226
255
227
256
/* insert (SET) */
228
257
fdb_transaction_set (transaction , (uint8_t * )keystr , strlen (keystr ),
@@ -427,8 +456,10 @@ int run_one_transaction(FDBTransaction *transaction, mako_args_t *args,
427
456
int randstrlen ;
428
457
int rangei ;
429
458
459
+ #if 0 /* this call conflicts with debug transaction */
430
460
/* make sure that the transaction object is clean */
431
461
fdb_transaction_reset (transaction );
462
+ #endif
432
463
433
464
clock_gettime (CLOCK_MONOTONIC , & timer_per_xact_start );
434
465
@@ -531,6 +562,8 @@ int run_one_transaction(FDBTransaction *transaction, mako_args_t *args,
531
562
stats -> errors [OP_COMMIT ]++ ;
532
563
}
533
564
if (rc == FDB_ERROR_ABORT ) {
565
+ /* make sure to reset transaction */
566
+ fdb_transaction_reset (transaction );
534
567
return rc ; /* abort */
535
568
}
536
569
goto retryTxn ;
@@ -562,6 +595,8 @@ int run_one_transaction(FDBTransaction *transaction, mako_args_t *args,
562
595
if (rc == FDB_ERROR_RETRY ) {
563
596
goto retryTxn ;
564
597
} else if (rc == FDB_ERROR_ABORT ) {
598
+ /* make sure to reset transaction */
599
+ fdb_transaction_reset (transaction );
565
600
return rc ; /* abort */
566
601
}
567
602
}
@@ -580,6 +615,8 @@ int run_one_transaction(FDBTransaction *transaction, mako_args_t *args,
580
615
stats -> errors [OP_COMMIT ]++ ;
581
616
}
582
617
if (rc == FDB_ERROR_ABORT ) {
618
+ /* make sure to reset transaction */
619
+ fdb_transaction_reset (transaction );
583
620
return rc ; /* abort */
584
621
}
585
622
goto retryTxn ;
@@ -612,6 +649,8 @@ int run_one_transaction(FDBTransaction *transaction, mako_args_t *args,
612
649
stats -> errors [OP_COMMIT ]++ ;
613
650
}
614
651
if (rc == FDB_ERROR_ABORT ) {
652
+ /* make sure to reset transaction */
653
+ fdb_transaction_reset (transaction );
615
654
return rc ; /* abort */
616
655
}
617
656
goto retryTxn ;
@@ -637,6 +676,8 @@ int run_one_transaction(FDBTransaction *transaction, mako_args_t *args,
637
676
stats -> errors [OP_COMMIT ]++ ;
638
677
}
639
678
if (rc == FDB_ERROR_ABORT ) {
679
+ /* make sure to reset transaction */
680
+ fdb_transaction_reset (transaction );
640
681
return rc ; /* abort */
641
682
}
642
683
goto retryTxn ;
@@ -645,24 +686,34 @@ int run_one_transaction(FDBTransaction *transaction, mako_args_t *args,
645
686
646
687
stats -> xacts ++ ;
647
688
689
+ /* make sure to reset transaction */
690
+ fdb_transaction_reset (transaction );
648
691
return 0 ;
649
692
}
650
693
651
694
652
695
int run_workload (FDBTransaction * transaction , mako_args_t * args ,
653
696
int thread_tps , volatile double * throttle_factor ,
654
- int thread_iters , volatile int * signal , mako_stats_t * stats ) {
697
+ int thread_iters , volatile int * signal , mako_stats_t * stats ,
698
+ int dotrace ) {
655
699
int xacts = 0 ;
700
+ int64_t total_xacts = 0 ;
656
701
int rc = 0 ;
657
702
struct timespec timer_prev , timer_now ;
658
703
char * keystr ;
659
704
char * keystr2 ;
660
705
char * valstr ;
661
706
int current_tps ;
707
+ char * traceid ;
708
+ int tracetimer = 0 ;
662
709
663
710
if (thread_tps < 0 )
664
711
return 0 ;
665
712
713
+ if (dotrace ) {
714
+ traceid = (char * )malloc (32 );
715
+ }
716
+
666
717
current_tps = (int )((double )thread_tps * * throttle_factor );
667
718
668
719
keystr = (char * )malloc (sizeof (char ) * args -> key_length + 1 );
@@ -685,25 +736,52 @@ int run_workload(FDBTransaction *transaction, mako_args_t *args,
685
736
/* main transaction loop */
686
737
while (1 ) {
687
738
688
- if ((thread_tps > 0 ) && (xacts >= current_tps )) {
689
- /* throttling is on */
739
+ if ((( thread_tps > 0 ) && (xacts >= current_tps )) /* throttle on */ ||
740
+ dotrace /* transaction tracing on */ ){
690
741
691
742
clock_gettime (CLOCK_MONOTONIC_COARSE , & timer_now );
692
743
if ((timer_now .tv_sec > timer_prev .tv_sec + 1 ) ||
693
744
((timer_now .tv_sec == timer_prev .tv_sec + 1 ) &&
694
745
(timer_now .tv_nsec > timer_prev .tv_nsec ))) {
695
746
/* more than 1 second passed, no need to throttle */
696
- xacts = 0 ;
747
+ xacts = 0 ;
697
748
timer_prev .tv_sec = timer_now .tv_sec ;
698
749
timer_prev .tv_nsec = timer_now .tv_nsec ;
750
+
699
751
/* update throttle rate */
700
- current_tps = (int )((double )thread_tps * * throttle_factor );
752
+ if (thread_tps > 0 ) {
753
+ current_tps = (int )((double )thread_tps * * throttle_factor );
754
+ }
755
+
756
+ /* enable transaction trace */
757
+ if (dotrace ) {
758
+ tracetimer ++ ;
759
+ if (tracetimer == dotrace ) {
760
+ fdb_error_t err ;
761
+ tracetimer = 0 ;
762
+ snprintf (traceid , 32 , "makotrace%019lld" , total_xacts );
763
+ fprintf (debugme , "DEBUG: txn tracing %s\n" , traceid );
764
+ err = fdb_transaction_set_option (transaction , FDB_TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER ,
765
+ (uint8_t * )traceid , strlen (traceid ));
766
+ if (err ) {
767
+ fprintf (stderr , "ERROR: FDB_TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER: %s\n" , fdb_get_error (err ));
768
+ }
769
+ err = fdb_transaction_set_option (transaction , FDB_TR_OPTION_LOG_TRANSACTION ,
770
+ (uint8_t * )NULL , 0 );
771
+ if (err ) {
772
+ fprintf (stderr , "ERROR: FDB_TR_OPTION_LOG_TRANSACTION: %s\n" , fdb_get_error (err ));
773
+ }
774
+ }
775
+ }
776
+
701
777
} else {
702
- /* 1 second not passed, throttle */
703
- usleep (1000 );
704
- continue ;
778
+ if (thread_tps > 0 ) {
779
+ /* 1 second not passed, throttle */
780
+ usleep (1000 );
781
+ continue ;
782
+ }
705
783
}
706
- }
784
+ } /* throttle or txntrace */
707
785
708
786
rc = run_one_transaction (transaction , args , stats , keystr , keystr2 , valstr );
709
787
if (rc ) {
@@ -721,10 +799,14 @@ int run_workload(FDBTransaction *transaction, mako_args_t *args,
721
799
break ;
722
800
}
723
801
xacts ++ ;
802
+ total_xacts ++ ;
724
803
}
725
804
free (keystr );
726
805
free (keystr2 );
727
806
free (valstr );
807
+ if (dotrace ) {
808
+ free (traceid );
809
+ }
728
810
729
811
return rc ;
730
812
}
@@ -742,6 +824,7 @@ void *worker_thread(void *thread_args) {
742
824
int thread_tps = 0 ;
743
825
int thread_iters = 0 ;
744
826
int op ;
827
+ int dotrace = (worker_id == 0 && thread_id == 0 && args -> txntrace ) ? args -> txntrace : 0 ;
745
828
volatile int * signal = & ((thread_args_t * )thread_args )-> process -> shm -> signal ;
746
829
volatile double * throttle_factor = & ((thread_args_t * )thread_args )-> process -> shm -> throttle_factor ;
747
830
volatile int * readycount =
@@ -801,7 +884,7 @@ void *worker_thread(void *thread_args) {
801
884
/* run the workload */
802
885
else if (args -> mode == MODE_RUN ) {
803
886
rc = run_workload (transaction , args , thread_tps , throttle_factor ,
804
- thread_iters , signal , stats );
887
+ thread_iters , signal , stats , dotrace );
805
888
if (rc < 0 ) {
806
889
fprintf (stderr , "ERROR: run_workload failed\n" );
807
890
}
@@ -859,9 +942,9 @@ int worker_process_main(mako_args_t *args, int worker_id, mako_shmhdr_t *shm) {
859
942
860
943
/* enable tracing if specified */
861
944
if (args -> trace ) {
862
- fprintf (debugme , "DEBUG: Enable Tracing (%s)\n" , ( args -> tracepath [ 0 ] == '\0' )
863
- ? "current directory"
864
- : args -> tracepath );
945
+ fprintf (debugme , "DEBUG: Enable Tracing in %s (%s)\n" ,
946
+ ( args -> traceformat == 0 ) ? "XML" : "JSON" ,
947
+ ( args -> tracepath [ 0 ] == '\0' ) ? "current directory" : args -> tracepath );
865
948
err = fdb_network_set_option (FDB_NET_OPTION_TRACE_ENABLE ,
866
949
(uint8_t * )args -> tracepath ,
867
950
strlen (args -> tracepath ));
@@ -871,6 +954,16 @@ int worker_process_main(mako_args_t *args, int worker_id, mako_shmhdr_t *shm) {
871
954
"ERROR: fdb_network_set_option(FDB_NET_OPTION_TRACE_ENABLE): %s\n" ,
872
955
fdb_get_error (err ));
873
956
}
957
+ if (args -> traceformat == 1 ) {
958
+ err = fdb_network_set_option (FDB_NET_OPTION_TRACE_FORMAT ,
959
+ (uint8_t * )"json" , 4 );
960
+ if (err ) {
961
+ fprintf (
962
+ stderr ,
963
+ "ERROR: fdb_network_set_option(FDB_NET_OPTION_TRACE_FORMAT): %s\n" ,
964
+ fdb_get_error (err ));
965
+ }
966
+ }
874
967
}
875
968
876
969
/* enable knobs if specified */
@@ -1019,6 +1112,8 @@ int init_args(mako_args_t *args) {
1019
1112
args -> knobs [0 ] = '\0' ;
1020
1113
args -> trace = 0 ;
1021
1114
args -> tracepath [0 ] = '\0' ;
1115
+ args -> traceformat = 0 ; /* default to client's default (XML) */
1116
+ args -> txntrace = 0 ;
1022
1117
for (i = 0 ; i < MAX_OP ; i ++ ) {
1023
1118
args -> txnspec .ops [i ][OP_COUNT ] = 0 ;
1024
1119
}
@@ -1148,40 +1243,42 @@ int parse_transaction(mako_args_t *args, char *optarg) {
1148
1243
1149
1244
void usage () {
1150
1245
printf ("Usage:\n" );
1151
- printf ("%-24s%s\n" , "-h, --help" , "Print this message" );
1152
- printf ("%-24s%s\n" , " --version" , "Print FDB version" );
1153
- printf ("%-24s%s\n" , "-v, --verbose" , "Specify verbosity" );
1154
- printf ("%-24s%s\n" , "-a, --api_version=API_VERSION" , "Specify API_VERSION to use" );
1155
- printf ("%-24s%s\n" , "-c, --cluster=FILE" , "Specify FDB cluster file" );
1156
- printf ("%-24s%s\n" , "-p, --procs=PROCS" ,
1246
+ printf ("%-24s %s\n" , "-h, --help" , "Print this message" );
1247
+ printf ("%-24s %s\n" , " --version" , "Print FDB version" );
1248
+ printf ("%-24s %s\n" , "-v, --verbose" , "Specify verbosity" );
1249
+ printf ("%-24s %s\n" , "-a, --api_version=API_VERSION" , "Specify API_VERSION to use" );
1250
+ printf ("%-24s %s\n" , "-c, --cluster=FILE" , "Specify FDB cluster file" );
1251
+ printf ("%-24s %s\n" , "-p, --procs=PROCS" ,
1157
1252
"Specify number of worker processes" );
1158
- printf ("%-24s%s\n" , "-t, --threads=THREADS" ,
1253
+ printf ("%-24s %s\n" , "-t, --threads=THREADS" ,
1159
1254
"Specify number of worker threads" );
1160
- printf ("%-24s%s\n" , "-r, --rows=ROWS" , "Specify number of records" );
1161
- printf ("%-24s%s\n" , "-s, --seconds=SECONDS" ,
1255
+ printf ("%-24s %s\n" , "-r, --rows=ROWS" , "Specify number of records" );
1256
+ printf ("%-24s %s\n" , "-s, --seconds=SECONDS" ,
1162
1257
"Specify the test duration in seconds\n" );
1163
- printf ("%-24s%s\n" , "" , "This option cannot be specified with --iteration." );
1164
- printf ("%-24s%s\n" , "-i, --iteration=ITERS" ,
1258
+ printf ("%-24s %s\n" , "" , "This option cannot be specified with --iteration." );
1259
+ printf ("%-24s %s\n" , "-i, --iteration=ITERS" ,
1165
1260
"Specify the number of iterations.\n" );
1166
- printf ("%-24s%s\n" , "" , "This option cannot be specified with --seconds." );
1167
- printf ("%-24s%s\n" , " --keylen=LENGTH" , "Specify the key lengths" );
1168
- printf ("%-24s%s\n" , " --vallen=LENGTH" , "Specify the value lengths" );
1169
- printf ("%-24s%s\n" , "-x, --transaction=SPEC" , "Transaction specification" );
1170
- printf ("%-24s%s\n" , " --tps|--tpsmax=TPS" , "Specify the target max TPS" );
1171
- printf ("%-24s%s\n" , " --tpsmin=TPS" , "Specify the target min TPS" );
1172
- printf ("%-24s%s\n" , " --tpsinterval=SEC" , "Specify the TPS change interval (Default: 10 seconds)" );
1173
- printf ("%-24s%s\n" , " --tpschange=<sin|square|pulse>" , "Specify the TPS change type (Default: sin)" );
1174
- printf ("%-24s%s\n" , " --sampling=RATE" ,
1261
+ printf ("%-24s %s\n" , "" , "This option cannot be specified with --seconds." );
1262
+ printf ("%-24s %s\n" , " --keylen=LENGTH" , "Specify the key lengths" );
1263
+ printf ("%-24s %s\n" , " --vallen=LENGTH" , "Specify the value lengths" );
1264
+ printf ("%-24s %s\n" , "-x, --transaction=SPEC" , "Transaction specification" );
1265
+ printf ("%-24s %s\n" , " --tps|--tpsmax=TPS" , "Specify the target max TPS" );
1266
+ printf ("%-24s %s\n" , " --tpsmin=TPS" , "Specify the target min TPS" );
1267
+ printf ("%-24s %s\n" , " --tpsinterval=SEC" , "Specify the TPS change interval (Default: 10 seconds)" );
1268
+ printf ("%-24s %s\n" , " --tpschange=<sin|square|pulse>" , "Specify the TPS change type (Default: sin)" );
1269
+ printf ("%-24s %s\n" , " --sampling=RATE" ,
1175
1270
"Specify the sampling rate for latency stats" );
1176
- printf ("%-24s%s\n" , "-m, --mode=MODE" ,
1271
+ printf ("%-24s %s\n" , "-m, --mode=MODE" ,
1177
1272
"Specify the mode (build, run, clean)" );
1178
- printf ("%-24s%s\n" , "-z, --zipf" ,
1273
+ printf ("%-24s %s\n" , "-z, --zipf" ,
1179
1274
"Use zipfian distribution instead of uniform distribution" );
1180
- printf ("%-24s%s\n" , " --commitget" , "Commit GETs" );
1181
- printf ("%-24s%s\n" , " --trace" , "Enable tracing" );
1182
- printf ("%-24s%s\n" , " --tracepath=PATH" , "Set trace file path" );
1183
- printf ("%-24s%s\n" , " --knobs=KNOBS" , "Set client knobs" );
1184
- printf ("%-24s%s\n" , " --flatbuffers" , "Use flatbuffers" );
1275
+ printf ("%-24s %s\n" , " --commitget" , "Commit GETs" );
1276
+ printf ("%-24s %s\n" , " --trace" , "Enable tracing" );
1277
+ printf ("%-24s %s\n" , " --tracepath=PATH" , "Set trace file path" );
1278
+ printf ("%-24s %s\n" , " --trace_format <xml|json>" , "Set trace format (Default: json)" );
1279
+ printf ("%-24s %s\n" , " --txntrace=sec" , "Specify transaction tracing interval (Default: 0)" );
1280
+ printf ("%-24s %s\n" , " --knobs=KNOBS" , "Set client knobs" );
1281
+ printf ("%-24s %s\n" , " --flatbuffers" , "Use flatbuffers" );
1185
1282
}
1186
1283
1187
1284
@@ -1214,6 +1311,8 @@ int parse_args(int argc, char *argv[], mako_args_t *args) {
1214
1311
{"mode" , required_argument , NULL , 'm' },
1215
1312
{"knobs" , required_argument , NULL , ARG_KNOBS },
1216
1313
{"tracepath" , required_argument , NULL , ARG_TRACEPATH },
1314
+ {"trace_format" , required_argument , NULL , ARG_TRACEFORMAT },
1315
+ {"txntrace" , required_argument , NULL , ARG_TXNTRACE },
1217
1316
/* no args */
1218
1317
{"help" , no_argument , NULL , 'h' },
1219
1318
{"json" , no_argument , NULL , 'j' },
@@ -1324,6 +1423,19 @@ int parse_args(int argc, char *argv[], mako_args_t *args) {
1324
1423
args -> trace = 1 ;
1325
1424
memcpy (args -> tracepath , optarg , strlen (optarg ) + 1 );
1326
1425
break ;
1426
+ case ARG_TRACEFORMAT :
1427
+ if (strncmp (optarg , "json" , 5 ) == 0 ) {
1428
+ args -> traceformat = 1 ;
1429
+ } else if (strncmp (optarg , "xml" , 4 ) == 0 ) {
1430
+ args -> traceformat = 0 ;
1431
+ } else {
1432
+ fprintf (stderr , "Error: Invalid trace_format %s\n" , optarg );
1433
+ exit (0 );
1434
+ }
1435
+ break ;
1436
+ case ARG_TXNTRACE :
1437
+ args -> txntrace = atoi (optarg );
1438
+ break ;
1327
1439
}
1328
1440
}
1329
1441
if ((args -> tpsmin == -1 ) || (args -> tpsmin > args -> tpsmax )) {
0 commit comments