diff --git a/gramex/dl.py b/gramex/dl.py new file mode 100644 index 000000000..064b7d157 --- /dev/null +++ b/gramex/dl.py @@ -0,0 +1,16 @@ +from torch.utils.data import Dataset +import torch + + +class SentimentDataset(Dataset): + def __init__(self, encodings, labels): + self.encodings = encodings + self.labels = labels + + def __getitem__(self, idx): + item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()} + item['labels'] = torch.tensor(self.labels[idx]).to(torch.int64) + return item + + def __len__(self): + return len(self.labels) diff --git a/gramex/handlers/mlhandler.py b/gramex/handlers/mlhandler.py index ac35a2c9a..4431dc86f 100644 --- a/gramex/handlers/mlhandler.py +++ b/gramex/handlers/mlhandler.py @@ -16,7 +16,8 @@ import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline -from sklearn.preprocessing import OneHotEncoder, StandardScaler +from sklearn.metrics import roc_auc_score +from sklearn.preprocessing import OneHotEncoder, StandardScaler, LabelEncoder from slugify import slugify from tornado.gen import coroutine from tornado.web import HTTPError @@ -42,8 +43,16 @@ 'cats': [], 'target_col': None } +TRANSFORMERS_DEFAULTS = dict( + num_train_epochs=1, + per_device_train_batch_size=16, + per_device_eval_batch_size=32, + weight_decay=0.01, + warmup_steps=100, +) ACTIONS = ['predict', 'score', 'append', 'train', 'retrain'] DEFAULT_TEMPLATE = op.join(op.dirname(__file__), '..', 'apps', 'mlhandler', 'template.html') +SENTIMENT_LENC = LabelEncoder().fit(['NEGATIVE', 'POSITIVE']) search_modelclass = lambda x: locate(x, MLCLASS_MODULES) # NOQA: E731 @@ -59,11 +68,45 @@ def _fit(model, x, y, path=None, name=None): return model +def _train_transformer(model, data, model_path, **kwargs): + enc = model.tokenizer(data['_text'].values.tolist(), truncation=True, padding=True) + labels = SENTIMENT_LENC.transform(data['label']) + from gramex.dl import SentimentDataset + train_dataset = SentimentDataset(enc, labels) + model_output_dir = op.join(op.dirname(model_path), 'results') + model_log_dir = op.join(op.dirname(model_path), 'logs') + from transformers import Trainer, TrainingArguments + trargs = TrainingArguments( + output_dir=model_output_dir, logging_dir=model_log_dir, **kwargs) + Trainer(model=model.model, args=trargs, train_dataset=train_dataset).train() + model.save_pretrained(model_path) + move_to_cpu(model) + pred = model(data['_text'].values.tolist()) + res = { + 'roc_auc': roc_auc_score( + labels, SENTIMENT_LENC.transform([c['label'] for c in pred])) + } + return res + + +def _score_transformer(model, data): + pred = model(data['_text'].values.tolist()) + score = roc_auc_score( + *map(SENTIMENT_LENC.transform, (data['label'], [c['label'] for c in pred]))) + return {'roc_auc': score} + + +def move_to_cpu(model): + getattr(model, 'model', model).to('cpu') + + class MLHandler(FormHandler): @classmethod - def setup(cls, data=None, model={}, config_dir='', **kwargs): + def setup(cls, data=None, model={}, backend="", config_dir='', **kwargs): cls.slug = slugify(cls.name) + cls.backend = model.get('backend') + cls.sentiment_df = pd.DataFrame() # Create the config store directory if not config_dir: config_dir = op.join(gramex.config.variables['GRAMEXDATA'], 'apps', 'mlhandler', @@ -74,7 +117,9 @@ def setup(cls, data=None, model={}, config_dir='', **kwargs): cls.data_store = op.join(cls.config_dir, 'data.h5') cls.template = kwargs.pop('template', DEFAULT_TEMPLATE) + cls.mclass = model.get('class') super(MLHandler, cls).setup(**kwargs) + try: if 'transform' in data: data['transform'] = build_transform( @@ -91,9 +136,6 @@ def setup(cls, data=None, model={}, config_dir='', **kwargs): data = None cls._built_transform = staticmethod(lambda x: x) - default_model_path = op.join(cls.config_dir, slugify(cls.name) + '.pkl') - cls.model_path = model.pop('path', default_model_path) - # store the model kwargs from gramex.yaml into the store for key in TRANSFORMS: cls.set_opt(key, model.get(key, cls.get_opt(key))) @@ -104,20 +146,31 @@ def setup(cls, data=None, model={}, config_dir='', **kwargs): cls.set_opt('class', model.get('class')) cls.set_opt('params', model.get('params', {})) - if op.exists(cls.model_path): # If the pkl exists, load it - cls.model = joblib.load(cls.model_path) - elif data is not None: - mclass = cls.get_opt('class', model.get('class', False)) - params = cls.get_opt('params', {}) - data = cls._filtercols(data) - data = cls._filterrows(data) - cls.model = cls._assemble_pipeline(data, mclass=mclass, params=params) - - # train the model - target = data[target_col] - train = data[[c for c in data if c != target_col]] - gramex.service.threadpool.submit( - _fit, cls.model, train, target, cls.model_path, cls.name) + + if cls.backend == "transformers": + cls.load_transformer(cls.mclass, model) + if data is not None: + data = cls._filtercols(data) + data = cls._filterrows(data) + cls._concatenate(data) + else: + default_model_path = op.join(cls.config_dir, slugify(cls.name) + '.pkl') + cls.model_path = model.pop('path', default_model_path) + + if op.exists(cls.model_path): # If the pkl exists, load it + cls.model = joblib.load(cls.model_path) + elif data is not None: + mclass = cls.get_opt('class', model.get('class', False)) + params = cls.get_opt('params', {}) + data = cls._filtercols(data) + data = cls._filterrows(data) + cls.model = cls._assemble_pipeline(data, mclass=mclass, params=params) + + # train the model + target = data[target_col] + train = data[[c for c in data if c != target_col]] + gramex.service.threadpool.submit( + _fit, cls.model, train, target, cls.model_path, cls.name) cls.config_store.flush() @classmethod @@ -128,6 +181,25 @@ def load_data(cls, default=pd.DataFrame()): df = default return df + @classmethod + def load_transformer(cls, task, _model={}): + default_model_path = op.join( + gramex.config.variables['GRAMEXDATA'], 'apps', 'mlhandler', + slugify(cls.name)) + cls.model_path = _model.get('path', default_model_path) + # try loading from model_path + kwargs = {} + from transformers import pipeline + from transformers import AutoModelForSequenceClassification, AutoTokenizer + try: + kwargs['model'] = AutoModelForSequenceClassification.from_pretrained(cls.model_path) + kwargs['tokenizer'] = AutoTokenizer.from_pretrained(cls.model_path) + except Exception as err: + app_log.warning(f'Could not load model from {cls.model_path}.') + app_log.warning(f'{err}') + model = pipeline(task, **kwargs) + cls.model = model + @classmethod def store_data(cls, df, append=False): df.to_hdf(cls.data_store, format="table", key="data", append=append) @@ -193,6 +265,11 @@ def _parse_data(self, _cache=True, append=False): self.store_data(data, append) return data + def _coerce_transformers_opts(self): + kwargs = {k: self.get_arg(k, TRANSFORMERS_DEFAULTS.get(k)) for k in TRANSFORMERS_DEFAULTS} + kwargs = {k: type(TRANSFORMERS_DEFAULTS.get(k))(v) for k, v in kwargs.items()} + return kwargs + @classmethod def _filtercols(cls, data, **kwargs): include = kwargs.get('include', cls.get_opt('include', [])) @@ -253,6 +330,29 @@ def _assemble_pipeline(cls, data, force=False, mclass='', params=None): return Pipeline([('transform', ct), (model.__class__.__name__, model)]) return cls.model + @classmethod + def _concatenate(cls, data): + cats = set(cls.get_opt('cats', [])) + for cat in cats: + if not data[cat].astype(str).all(): + raise HTTPError(BAD_REQUEST, + reason=f"Columns {cat} should contain string.") + + data.insert(0, column='_text', value='') + + for col in data: + if col in cats: + data['_text'] += data[col] + + cls.sentiment_df = data['_text'].copy() + cls.sentiment_df = cls.sentiment_df.to_frame(name='_text') + if 'label' in data.columns: + cls.sentiment_df['label'] = data['label'] + else: + app_log.error("Column: 'label' missing, training and scoring not available!") + data.drop('_text', axis=1) + cls.store_data(data) + def _transform(self, data, **kwargs): orgdata = self.load_data() for col in data: @@ -307,6 +407,10 @@ def get(self, *path_args, **path_kwargs): self.write(json.dumps(params, indent=2)) elif '_cache' in self.args: self.write(self.load_data().to_json(orient='records')) + elif self.backend == "transformers" and 'text' in self.args: + text = self.get_arguments('text') + result = yield gramex.service.threadpool.submit(self.model, text) + self.write(json.dumps(result, indent=2)) else: self._check_model_path() if '_download' in self.args: @@ -369,8 +473,31 @@ def post(self, *path_args, **path_kwargs): action = self.args.pop('_action', 'predict') if action not in ACTIONS: raise HTTPError(BAD_REQUEST, f'Action {action} not supported.') - res = yield gramex.service.threadpool.submit(getattr(self, f"_{action}")) - self.write(json.dumps(res, indent=2, cls=CustomJSONEncoder)) + if self.backend == "transformers": + data = self.sentiment_df + move_to_cpu(self.model) + kwargs = {} + if action == 'train': + if 'label' not in data.columns: + app_log.error("Column: 'label' missing, training and scoring not available!") + raise HTTPError(BAD_REQUEST, + reason=print("Missing column named label(target values) from data.")) + kwargs = self._coerce_transformers_opts() + kwargs['model_path'] = self.model_path + args = _train_transformer, self.model, data + elif action == 'score': + if 'label' not in data.columns: + app_log.error("Column: 'label' missing, training and scoring not available!") + raise HTTPError(BAD_REQUEST, + reason=print("Missing column named label(target values) from data.")) + args = _score_transformer, self.model, data + elif action == 'predict': + args = self.model, data['_text'].values.tolist() + res = yield gramex.service.threadpool.submit(*args, **kwargs) + self.write(json.dumps(res, indent=2, cls=CustomJSONEncoder)) + else: + res = yield gramex.service.threadpool.submit(getattr(self, f"_{action}")) + self.write(json.dumps(res, indent=2, cls=CustomJSONEncoder)) super(MLHandler, self).post(*path_args, **path_kwargs) def get_cached_arg(self, argname):