From e0d971cbd0e1138e8be94016b878bacb3e9c7618 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Prostko?= Date: Mon, 12 Sep 2022 12:51:41 +0200 Subject: [PATCH] python3 migration --- pytpcc/constants.py | 2 +- pytpcc/coordinator.py | 2 +- pytpcc/drivers/mongodbdriver.py | 36 ++++++++++++++++++--------------- pytpcc/runtime/executor.py | 4 ++-- pytpcc/runtime/loader.py | 6 +++--- pytpcc/tpcc.py | 24 ++++++++++++++++------ pytpcc/util/nurand.py | 16 +++++++-------- pytpcc/util/rand.py | 4 ++-- pytpcc/util/results.py | 2 +- 9 files changed, 56 insertions(+), 40 deletions(-) diff --git a/pytpcc/constants.py b/pytpcc/constants.py index b5a9bbe..3918b3c 100644 --- a/pytpcc/constants.py +++ b/pytpcc/constants.py @@ -91,7 +91,7 @@ MIN_CARRIER_ID = 1 MAX_CARRIER_ID = 10 # HACK: This is not strictly correct, but it works -NULL_CARRIER_ID = 0L +NULL_CARRIER_ID = 0 # o_id < than this value, carrier != null, >= -> carrier == null NULL_CARRIER_LOWER_BOUND = 2101 MIN_OL_CNT = 5 diff --git a/pytpcc/coordinator.py b/pytpcc/coordinator.py index 499b85f..412b2c2 100755 --- a/pytpcc/coordinator.py +++ b/pytpcc/coordinator.py @@ -160,7 +160,7 @@ def startExecution(scaleParameters, args, config,channels): ## Load Configuration file if args['config']: logging.debug("Loading configuration file '%s'" % args['config']) - cparser = SafeConfigParser() + cparser = ConfigParser() cparser.read(os.path.realpath(args['config'].name)) config = dict(cparser.items(args['system'])) else: diff --git a/pytpcc/drivers/mongodbdriver.py b/pytpcc/drivers/mongodbdriver.py index cd4dd32..2eba85e 100644 --- a/pytpcc/drivers/mongodbdriver.py +++ b/pytpcc/drivers/mongodbdriver.py @@ -40,7 +40,7 @@ import pymongo import constants -from abstractdriver import AbstractDriver +from .abstractdriver import AbstractDriver TABLE_COLUMNS = { constants.TABLENAME_ITEM: [ @@ -345,9 +345,9 @@ def loadConfig(self, config): logging.error("ConnectionFailure %d (%s) when connected to %s: ", exc.code, exc.details, display_uri) return - except pymongo.errors.PyMongoError, err: + except pymongo.errors.PyMongoError as err: logging.error("Some general error (%s) when connected to %s: ", str(err), display_uri) - print "Got some other error: %s" % str(err) + print("Got some other error: %s" % str(err)) return ## ---------------------------------------------- @@ -408,7 +408,7 @@ def loadTuples(self, tableName, tuples): tuple_dicts.append(dict([(columns[i], t[i]) for i in num_columns])) ## FOR - self.database[tableName].insert(tuple_dicts) + self.database[tableName].insert_many(tuple_dicts) ## IF return @@ -416,7 +416,7 @@ def loadTuples(self, tableName, tuples): def loadFinishDistrict(self, w_id, d_id): if self.denormalize: logging.debug("Pushing %d denormalized ORDERS records for WAREHOUSE %d DISTRICT %d into MongoDB", len(self.w_orders), w_id, d_id) - self.database[constants.TABLENAME_ORDERS].insert(self.w_orders.values()) + self.database[constants.TABLENAME_ORDERS].insert_many(self.w_orders.values()) self.w_orders.clear() ## IF @@ -593,7 +593,7 @@ def _doNewOrderTxn(self, s, params): session=s) if not d: d1 = self.district.find_one({"D_ID": d_id, "D_W_ID": w_id, "$comment": "new order did not find district"}) - print d1, w_id, d_id, c_id, i_ids, i_w_ids, s_dist_col + print(d1, w_id, d_id, c_id, i_ids, i_w_ids, s_dist_col) assert d, "Couldn't find district in new order w_id %d d_id %d" % (w_id, d_id) else: d = self.district.find_one({"D_ID": d_id, "D_W_ID": w_id, "$comment": comment}, @@ -621,7 +621,9 @@ def _doNewOrderTxn(self, s, params): #print constants.INVALID_ITEM_MESSAGE + ", Aborting transaction (ok for 1%)" return None ## IF - items = sorted(items, key=lambda x: i_ids.index(x['I_ID'])) + + xxi_ids = tuple(map(lambda o: o['I_ID'], items)) + items = sorted(items, key=lambda x: xxi_ids.index(x['I_ID'])) # getWarehouseTaxRate w = self.warehouse.find_one({"W_ID": w_id, "$comment": comment}, {"_id":0, "W_TAX": 1}, session=s) @@ -676,7 +678,9 @@ def _doNewOrderTxn(self, s, params): session=s)) ## IF assert len(all_stocks) == ol_cnt, "all_stocks len %d != ol_cnt %d" % (len(all_stocks), ol_cnt) - all_stocks = sorted(all_stocks, key=lambda x: item_w_list.index((x['S_I_ID'], x["S_W_ID"]))) + + xxxi_ids = tuple(map(lambda o: (o['S_I_ID'], o['S_W_ID']), all_stocks)) + all_stocks = sorted(all_stocks, key=lambda x: xxxi_ids.index((x['S_I_ID'], x["S_W_ID"]))) ## ---------------- ## Insert Order Line, Stock Item Information @@ -820,7 +824,7 @@ def _doOrderStatusTxn(self, s, params): all_customers = list(self.customer.find(search_fields, return_fields, session=s)) namecnt = len(all_customers) assert namecnt > 0, "No matching customer for last name %s!" % c_last - index = (namecnt-1)/2 + index = (namecnt-1)//2 c = all_customers[index] c_id = c["C_ID"] ## IF @@ -891,7 +895,7 @@ def _doPaymentTxn(self, s, params): session=s) if not d: d1 = self.district.find_one({"D_ID": d_id, "D_W_ID": w_id, "$comment": "payment did not find district"}) - print d1, w_id, d_id, h_amount, c_w_id, c_d_id, c_id, c_last, h_date + print(d1, w_id, d_id, h_amount, c_w_id, c_d_id, c_id, c_last, h_date) assert d, "Couldn't find district in payment w_id %d d_id %d" % (w_id, d_id) else: d = self.district.find_one({"D_W_ID": w_id, "D_ID": d_id, "$comment": comment}, @@ -942,7 +946,7 @@ def _doPaymentTxn(self, s, params): all_customers = list(self.customer.find(search_fields, return_fields, session=s)) namecnt = len(all_customers) assert namecnt > 0, "No matching customer w %d d %d clast %s" % (w_id, d_id, c_last) - index = (namecnt-1)/2 + index = (namecnt-1)//2 c = all_customers[index] c_id = c["C_ID"] ## IF @@ -1075,9 +1079,9 @@ def _doStockLevelTxn(self, s, params): ol_ids.add(ol["OL_I_ID"]) ## FOR - result = self.stock.find({"S_W_ID": w_id, + result = self.stock.count_documents({"S_W_ID": w_id, "S_I_ID": {"$in": list(ol_ids)}, - "S_QUANTITY": {"$lt": threshold}, "$comment": comment}).count() + "S_QUANTITY": {"$lt": threshold}, "$comment": comment}) return int(result) @@ -1095,11 +1099,11 @@ def run_transaction(self, txn_callback, session, name, params): exc.code, exc.details, name) return (False, None) logging.error("Failed with unknown OperationFailure: %d", exc.code) - print "Failed with unknown OperationFailure: %d" % exc.code - print exc.details + print("Failed with unknown OperationFailure: %d" % exc.code) + print(exc.details) raise except pymongo.errors.ConnectionFailure: - print "ConnectionFailure during %s: " % name + print("ConnectionFailure during %s: " % name) return (False, None) ## TRY diff --git a/pytpcc/runtime/executor.py b/pytpcc/runtime/executor.py index 0ce3a74..7026b10 100644 --- a/pytpcc/runtime/executor.py +++ b/pytpcc/runtime/executor.py @@ -68,10 +68,10 @@ def execute(self, duration): (val, retries) = self.driver.executeTransaction(txn, params) except KeyboardInterrupt: return -1 - except (Exception, AssertionError), ex: + except (Exception, AssertionError) as ex: logging.warn("Failed to execute Transaction '%s': %s" % (txn, ex)) traceback.print_exc(file=sys.stdout) - print "Aborting some transaction with some error %s %s" % (txn, ex) + print("Aborting some transaction with some error %s %s" % (txn, ex)) global_result.abortTransaction(global_txn_id) batch_result.abortTransaction(batch_txn_id) if self.stop_on_error: raise diff --git a/pytpcc/runtime/loader.py b/pytpcc/runtime/loader.py index ad9d930..3e8795f 100644 --- a/pytpcc/runtime/loader.py +++ b/pytpcc/runtime/loader.py @@ -73,7 +73,7 @@ def execute(self): ## ============================================== def loadItems(self): ## Select 10% of the rows to be marked "original" - originalRows = rand.selectUniqueIds(self.scaleParameters.items / 10, 1, self.scaleParameters.items) + originalRows = rand.selectUniqueIds(self.scaleParameters.items // 10, 1, self.scaleParameters.items) ## Load all of the items tuples = [ ] @@ -112,7 +112,7 @@ def loadWarehouse(self, w_id): h_tuples = [ ] ## Select 10% of the customers to have bad credit - selectedRows = rand.selectUniqueIds(self.scaleParameters.customersPerDistrict / 10, 1, self.scaleParameters.customersPerDistrict) + selectedRows = rand.selectUniqueIds(self.scaleParameters.customersPerDistrict // 10, 1, self.scaleParameters.customersPerDistrict) ## TPC-C 4.3.3.1. says that o_c_id should be a permutation of [1, 3000]. But since it ## is a c_id field, it seems to make sense to have it be a permutation of the @@ -160,7 +160,7 @@ def loadWarehouse(self, w_id): ## Select 10% of the stock to be marked "original" s_tuples = [ ] - selectedRows = rand.selectUniqueIds(self.scaleParameters.items / 10, 1, self.scaleParameters.items) + selectedRows = rand.selectUniqueIds(self.scaleParameters.items // 10, 1, self.scaleParameters.items) total_tuples = 0 for i_id in range(1, self.scaleParameters.items+1): original = (i_id in selectedRows) diff --git a/pytpcc/tpcc.py b/pytpcc/tpcc.py index 8cc6900..1205b90 100755 --- a/pytpcc/tpcc.py +++ b/pytpcc/tpcc.py @@ -35,7 +35,7 @@ import glob import time import multiprocessing -from ConfigParser import SafeConfigParser +from configparser import ConfigParser from pprint import pprint, pformat from util import results, scaleparameters @@ -84,12 +84,18 @@ def startLoading(driverClass, scaleParameters, args, config): # Split the warehouses into chunks w_ids = [[] for _ in range(args['clients'])] + for w_id in range(scaleParameters.starting_warehouse, scaleParameters.ending_warehouse+1): idx = w_id % args['clients'] w_ids[idx].append(w_id) ## FOR loader_results = [] + try: + del args['config'] + except KeyError: + print() + for i in range(args['clients']): r = pool.apply_async(loaderFunc, (driverClass, scaleParameters, args, config, w_ids[i])) loader_results.append(r) @@ -104,6 +110,7 @@ def startLoading(driverClass, scaleParameters, args, config): ## loaderFunc ## ============================================== def loaderFunc(driverClass, scaleParameters, args, config, w_ids): + driver = driverClass(args['ddl']) assert driver != None, "Driver in loadFunc is none!" logging.debug("Starting client execution: %s [warehouses=%d]", driver, len(w_ids)) @@ -122,7 +129,7 @@ def loaderFunc(driverClass, scaleParameters, args, config, w_ids): driver.loadFinish() except KeyboardInterrupt: return -1 - except (Exception, AssertionError), ex: + except (Exception, AssertionError) as ex: logging.warn("Failed to load data: %s", ex) raise @@ -136,6 +143,11 @@ def startExecution(driverClass, scaleParameters, args, config): pool = multiprocessing.Pool(args['clients']) debug = logging.getLogger().isEnabledFor(logging.DEBUG) + try: + del args['config'] + except KeyError: + print() + worker_results = [] for _ in range(args['clients']): r = pool.apply_async(executorFunc, (driverClass, scaleParameters, args, config, debug,)) @@ -184,7 +196,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): aparser = argparse.ArgumentParser(description='Python implementation of the TPC-C Benchmark') aparser.add_argument('system', choices=getDrivers(), help='Target system driver') - aparser.add_argument('--config', type=file, + aparser.add_argument('--config', type=open, help='Path to driver configuration file') aparser.add_argument('--reset', action='store_true', help='Instruct the driver to reset the contents of the database') @@ -221,14 +233,14 @@ def executorFunc(driverClass, scaleParameters, args, config, debug): assert driver != None, "Failed to create '%s' driver" % args['system'] if args['print_config']: config = driver.makeDefaultConfig() - print driver.formatConfig(config) - print + print(driver.formatConfig(config)) + print() sys.exit(0) ## Load Configuration file if args['config']: logging.debug("Loading configuration file '%s'", args['config']) - cparser = SafeConfigParser() + cparser = ConfigParser() cparser.read(os.path.realpath(args['config'].name)) config = dict(cparser.items(args['system'])) else: diff --git a/pytpcc/util/nurand.py b/pytpcc/util/nurand.py index 361cec7..6c428a8 100644 --- a/pytpcc/util/nurand.py +++ b/pytpcc/util/nurand.py @@ -29,13 +29,13 @@ # OTHER DEALINGS IN THE SOFTWARE. # ----------------------------------------------------------------------- -import rand +import random def makeForLoad(): """Create random NURand constants, appropriate for loading the database.""" - cLast = rand.number(0, 255) - cId = rand.number(0, 1023) - orderLineItemId = rand.number(0, 8191) + cLast = random.randint(0, 255) + cId = random.randint(0, 1023) + orderLineItemId = random.randint(0, 8191) return NURandC(cLast, cId, orderLineItemId) def validCRun(cRun, cLoad): @@ -45,13 +45,13 @@ def validCRun(cRun, cLoad): def makeForRun(loadC): """Create random NURand constants for running TPC-C. TPC-C 2.1.6.1. (page 20) specifies the valid range for these constants.""" - cRun = rand.number(0, 255) + cRun = random.randint(0, 255) while validCRun(cRun, loadC.cLast) == False: - cRun = rand.number(0, 255) + cRun = random.randint(0, 255) assert validCRun(cRun, loadC.cLast) - cId = rand.number(0, 1023) - orderLineItemId = rand.number(0, 8191) + cId = random.randint(0, 1023) + orderLineItemId = random.randint(0, 8191) return NURandC(cRun, cId, orderLineItemId) class NURandC: diff --git a/pytpcc/util/rand.py b/pytpcc/util/rand.py index 27c5592..89b9796 100644 --- a/pytpcc/util/rand.py +++ b/pytpcc/util/rand.py @@ -30,7 +30,7 @@ # ----------------------------------------------------------------------- import random -import nurand +from . import nurand SYLLABLES = [ "BAR", "OUGHT", "ABLE", "PRI", "PRES", "ESE", "ANTI", "CALLY", "ATION", "EING" ] @@ -129,7 +129,7 @@ def makeLastName(number): """A last name as defined by TPC-C 4.3.2.3. Not actually random.""" global SYLLABLES assert 0 <= number and number <= 999 - indicies = [ number/100, (number/10)%10, number%10 ] + indicies = [ number//100, (number//10)%10, number%10 ] return "".join(map(lambda x: SYLLABLES[x], indicies)) ## DEF diff --git a/pytpcc/util/results.py b/pytpcc/util/results.py index 6572494..60e2c09 100644 --- a/pytpcc/util/results.py +++ b/pytpcc/util/results.py @@ -126,7 +126,7 @@ def append(self, r): self.txn_times[txn_name] = orig_time + r.txn_times[txn_name] self.txn_retries[txn_name] = orig_retries + r.txn_retries[txn_name] self.txn_aborts[txn_name] = orig_aborts + r.txn_aborts[txn_name] - print "%s [cnt=%d, time=%d]" % (txn_name, self.txn_counters[txn_name], self.txn_times[txn_name]) + print("%s [cnt=%d, time=%d]" % (txn_name, self.txn_counters[txn_name], self.txn_times[txn_name])) # logging.debug("%s [cnt=%d, time=%d]" % (txn_name, self.txn_counters[txn_name], self.txn_times[txn_name])) if txn_name not in self.latencies: self.latencies[txn_name] = []