|
| 1 | + |
| 2 | +# package import |
| 3 | +from tensorflow.python.framework import dtypes |
| 4 | +from tensorflow_io.bigquery import BigQueryClient |
| 5 | +import tensorflow as tf |
| 6 | +from google.cloud import bigquery |
| 7 | +from google.cloud import aiplatform |
| 8 | +import argparse |
| 9 | +import os |
| 10 | + |
| 11 | +# import argument to local variables |
| 12 | +parser = argparse.ArgumentParser() |
| 13 | +# the passed param, dest: a name for the param, default: if absent fetch this param from the OS, type: type to convert to, help: description of argument |
| 14 | +parser.add_argument('--epochs', dest = 'epochs', default = 10, type = int, help = 'Number of Epochs') |
| 15 | +parser.add_argument('--batch_size', dest = 'batch_size', default = 32, type = int, help = 'Batch Size') |
| 16 | +parser.add_argument('--var_target', dest = 'var_target', type=str) |
| 17 | +parser.add_argument('--var_omit', dest = 'var_omit', type=str)#, nargs='*') |
| 18 | +parser.add_argument('--project_id', dest = 'project_id', type=str) |
| 19 | +parser.add_argument('--bq_project', dest = 'bq_project', type=str) |
| 20 | +parser.add_argument('--bq_dataset', dest = 'bq_dataset', type=str) |
| 21 | +parser.add_argument('--bq_table', dest = 'bq_table', type=str) |
| 22 | +parser.add_argument('--region', dest = 'region', type=str) |
| 23 | +parser.add_argument('--experiment', dest = 'experiment', type=str) |
| 24 | +parser.add_argument('--series', dest = 'series', type=str) |
| 25 | +parser.add_argument('--experiment_name', dest = 'experiment_name', type=str) |
| 26 | +parser.add_argument('--run_name', dest = 'run_name', type=str) |
| 27 | +args = parser.parse_args() |
| 28 | + |
| 29 | +# clients |
| 30 | +bq = bigquery.Client(project = args.project_id) |
| 31 | +aiplatform.init(project = args.project_id, location = args.region) |
| 32 | + |
| 33 | +# Vertex AI Experiment |
| 34 | +if args.run_name in [run.name for run in aiplatform.ExperimentRun.list(experiment = args.experiment_name)]: |
| 35 | + expRun = aiplatform.ExperimentRun(run_name = args.run_name, experiment = args.experiment_name) |
| 36 | +else: |
| 37 | + expRun = aiplatform.ExperimentRun.create(run_name = args.run_name, experiment = args.experiment_name) |
| 38 | +expRun.log_params({'experiment': args.experiment, 'series': args.series, 'project_id': args.project_id}) |
| 39 | + |
| 40 | +# get schema from bigquery source |
| 41 | +query = f"SELECT * FROM {args.bq_project}.{args.bq_dataset}.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{args.bq_table}'" |
| 42 | +schema = bq.query(query).to_dataframe() |
| 43 | + |
| 44 | +# get number of classes from bigquery source |
| 45 | +nclasses = bq.query(query = f'SELECT DISTINCT {args.var_target} FROM {args.bq_project}.{args.bq_dataset}.{args.bq_table} WHERE {args.var_target} is not null').to_dataframe() |
| 46 | +nclasses = nclasses.shape[0] |
| 47 | +expRun.log_params({'data_source': f'bq://{args.bq_project}.{args.bq_dataset}.{args.bq_table}', 'nclasses': nclasses, 'var_split': 'splits', 'var_target': args.var_target}) |
| 48 | + |
| 49 | +# Make a list of columns to omit |
| 50 | +OMIT = [x for x in args.var_omit.split(',') if x != ''] |
| 51 | + |
| 52 | +# use schema to prepare a list of columns to read from BigQuery |
| 53 | +selected_fields = schema[~schema.column_name.isin(OMIT)].column_name.tolist() |
| 54 | + |
| 55 | +# all the columns in this data source are either float64 or int64 |
| 56 | +output_types = [dtypes.float64 if x=='FLOAT64' else dtypes.int64 for x in schema[~schema.column_name.isin(OMIT)].data_type.tolist()] |
| 57 | + |
| 58 | +# remap input data to Tensorflow inputs of features and target |
| 59 | +def transTable(row_dict): |
| 60 | + target = row_dict.pop(args.var_target) |
| 61 | + target = tf.one_hot(tf.cast(target, tf.int64), nclasses) |
| 62 | + target = tf.cast(target, tf.float32) |
| 63 | + return(row_dict, target) |
| 64 | + |
| 65 | +# function to setup a bigquery reader with Tensorflow I/O |
| 66 | +def bq_reader(split): |
| 67 | + reader = BigQueryClient() |
| 68 | + |
| 69 | + training = reader.read_session( |
| 70 | + parent = f"projects/{args.project_id}", |
| 71 | + project_id = args.bq_project, |
| 72 | + table_id = args.bq_table, |
| 73 | + dataset_id = args.bq_dataset, |
| 74 | + selected_fields = selected_fields, |
| 75 | + output_types = output_types, |
| 76 | + row_restriction = f"splits='{split}'", |
| 77 | + requested_streams = 3 |
| 78 | + ) |
| 79 | + |
| 80 | + return training |
| 81 | + |
| 82 | +# setup feed for train, validate and test |
| 83 | +train = bq_reader('TRAIN').parallel_read_rows().prefetch(1).map(transTable).shuffle(args.batch_size*10).batch(args.batch_size) |
| 84 | +validate = bq_reader('VALIDATE').parallel_read_rows().prefetch(1).map(transTable).batch(args.batch_size) |
| 85 | +test = bq_reader('TEST').parallel_read_rows().prefetch(1).map(transTable).batch(args.batch_size) |
| 86 | +expRun.log_params({'training.batch_size': args.batch_size, 'training.shuffle': 10*args.batch_size, 'training.prefetch': 1}) |
| 87 | +# Logistic Regression |
| 88 | + |
| 89 | +# feature list |
| 90 | +numeric_features = [feature for feature in schema[~schema.column_name.isin(OMIT + [args.var_target])]['column_name'].to_list()] |
| 91 | + |
| 92 | +# feature inputs |
| 93 | +features = [tf.keras.Input(shape = (1,), dtype = dtypes.float64, name = feature) for feature in numeric_features] |
| 94 | + |
| 95 | +# normalize features - before training |
| 96 | +#normalized_features = [] |
| 97 | +#for feature in features: |
| 98 | +# normalizer = tf.keras.layers.Normalization(axis = None, name = feature.name + '_normalized') |
| 99 | +# feature_data = train.map(lambda x, y: x[feature.name]) |
| 100 | +# normalizer.adapt(feature_data) |
| 101 | +# normalized_features.append(normalizer(feature)) |
| 102 | + |
| 103 | +# concatenate features |
| 104 | +all_features = tf.keras.layers.Concatenate(name = 'feature_layer')(features) |
| 105 | +#all_features = tf.keras.layers.Concatenate(name = 'feature_layer')(normalized_features) # (features) |
| 106 | + |
| 107 | +# batch normalization of inputs - during training |
| 108 | +all_features = tf.keras.layers.BatchNormalization(name = 'batch_normalization_layer')(all_features) |
| 109 | + |
| 110 | +# logistic - using softmax activation to nclasses |
| 111 | +logistic = tf.keras.layers.Dense(nclasses, activation = tf.nn.softmax, name = 'logistic')(all_features) |
| 112 | + |
| 113 | +# the model |
| 114 | +model = tf.keras.Model( |
| 115 | + inputs = features, |
| 116 | + outputs = logistic, |
| 117 | + name = args.experiment |
| 118 | +) |
| 119 | + |
| 120 | +# compile the model |
| 121 | +model.compile( |
| 122 | + optimizer = tf.keras.optimizers.SGD(), #SGD or Adam |
| 123 | + loss = tf.keras.losses.CategoricalCrossentropy(), |
| 124 | + metrics = ['accuracy', tf.keras.metrics.AUC(curve = 'PR', name = 'auprc')] |
| 125 | +) |
| 126 | + |
| 127 | +# setup tensorboard logs and train |
| 128 | +tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=os.environ['AIP_TENSORBOARD_LOG_DIR'], histogram_freq=1) |
| 129 | +history = model.fit(train, epochs = args.epochs, callbacks = [tensorboard_callback], validation_data = validate) |
| 130 | +expRun.log_params({'training.epochs': history.params['epochs']}) |
| 131 | +for e in range(0, history.params['epochs']): |
| 132 | + expRun.log_time_series_metrics( |
| 133 | + { |
| 134 | + 'train_loss': history.history['loss'][e], |
| 135 | + 'train_accuracy': history.history['accuracy'][e], |
| 136 | + 'train_auprc': history.history['auprc'][e], |
| 137 | + 'val_loss': history.history['val_loss'][e], |
| 138 | + 'val_accuracy': history.history['val_accuracy'][e], |
| 139 | + 'val_auprc': history.history['val_auprc'][e] |
| 140 | + } |
| 141 | + ) |
| 142 | + |
| 143 | +# test evaluations: |
| 144 | +loss, accuracy, auprc = model.evaluate(test) |
| 145 | +expRun.log_metrics({'test_loss': loss, 'test_accuracy': accuracy, 'test_auprc': auprc}) |
| 146 | + |
| 147 | +# val evaluations: |
| 148 | +loss, accuracy, auprc = model.evaluate(validate) |
| 149 | +expRun.log_metrics({'val_loss': loss, 'val_accuracy': accuracy, 'val_auprc': auprc}) |
| 150 | + |
| 151 | +# training evaluations: |
| 152 | +loss, accuracy, auprc = model.evaluate(train) |
| 153 | +expRun.log_metrics({'train_loss': loss, 'train_accuracy': accuracy, 'train_auprc': auprc}) |
| 154 | + |
| 155 | +# output the model save files |
| 156 | +model.save(os.getenv("AIP_MODEL_DIR")) |
| 157 | +expRun.log_params({'model.save': os.getenv("AIP_MODEL_DIR")}) |
| 158 | +expRun.end_run() |
0 commit comments