diff --git a/gramex/handlers/mlhandler.py b/gramex/handlers/mlhandler.py index b1a9e662b..12098e61a 100644 --- a/gramex/handlers/mlhandler.py +++ b/gramex/handlers/mlhandler.py @@ -98,9 +98,14 @@ def setup(cls, data=None, model={}, config_dir='', template=DEFAULT_TEMPLATE, ** if op.exists(cls.store.model_path): # If the pkl exists, load it if op.isdir(cls.store.model_path): mclass, wrapper = ml.search_modelclass(mclass) - cls.model = locate(wrapper).from_disk(mclass, cls.store.model_path) + cls.model = locate(wrapper).from_disk(cls.store.model_path, mclass) else: - cls.model = get_model(cls.store.model_path, {}) + try: + cls.model = get_model(cls.store.model_path, {}) + except Exception as err: + app_log.warning(err) + mclass, wrapper = ml.search_modelclass(mclass) + cls.model = locate(wrapper).from_disk(cls.store.model_path, mclass) elif data is not None: data = cls._filtercols(data) data = cls._filterrows(data) @@ -172,7 +177,12 @@ def _filterrows(cls, data, **kwargs): action = kwargs.get(method, cls.store.load(method, True)) if action: subset = action if isinstance(action, list) else None - data = getattr(data, method)(subset=subset) + try: + data = getattr(data, method)(subset=subset) + except TypeError as exc: + # The label column for an NER dataset is a nested list. + # Can't do drop_duplicates on that. + app_log.warning(exc) return data def _transform(self, data, **kwargs): @@ -200,7 +210,7 @@ def _predict(self, data=None, score_col=''): # Set data in the same order as the transformer requests try: tcol = self.store.load('target_col', '_prediction') - data = self.model.predict(data, target_col=tcol) + data = self.model.predict(data, target_col=tcol, **self.args) except Exception as exc: app_log.exception(exc) return data diff --git a/gramex/ml_api.py b/gramex/ml_api.py index 98d3380b0..442414fba 100644 --- a/gramex/ml_api.py +++ b/gramex/ml_api.py @@ -41,10 +41,11 @@ "sklearn.decomposition", "gramex.ml", ], - "gramex.sm_api.StatsModel": [ + "gramex.timeseries.StatsModel": [ "statsmodels.tsa.api", "statsmodels.tsa.statespace.sarimax", ], + "gramex.timeseries.Prophet": ["prophet"], "gramex.ml_api.HFTransformer": ["gramex.transformers"], } @@ -368,7 +369,11 @@ def predict( """ p = self._predict(X, **kwargs) if target_col: - X[target_col] = p + try: + X[target_col] = p + except ValueError: + # This happens for NER: predictions of a single sample can be multiple entities. + X[target_col] = [p] return X return p diff --git a/gramex/sm_api.py b/gramex/timeseries.py similarity index 72% rename from gramex/sm_api.py rename to gramex/timeseries.py index d838210f5..e7e0b8abd 100644 --- a/gramex/sm_api.py +++ b/gramex/timeseries.py @@ -1,6 +1,7 @@ import pandas as pd import numpy as np import joblib +from typing import Union from gramex.config import app_log from gramex import cache from statsmodels import api as sm @@ -10,7 +11,6 @@ class StatsModel(AbstractModel): - @classmethod def from_disk(cls, path, **kwargs): model = cache.open(path, joblib.load) @@ -51,8 +51,14 @@ def _get_stl(self, endog): return pd.Series(result, index=endog.index) def fit( - self, X, y=None, model_path=None, name=None, index_col=None, target_col=None, - **kwargs + self, + X, + y=None, + model_path=None, + name=None, + index_col=None, + target_col=None, + **kwargs, ): """Only a dataframe is accepted. Index and target columns are both expected to be in it.""" params = self.params.copy() @@ -106,3 +112,47 @@ def get_attributes(self): if not result: return {} return result.summary().as_html() + + +class Prophet(StatsModel): + def fit( + self, + X: Union[pd.DataFrame, np.ndarray], + y: Union[pd.Series, np.ndarray], + model_path: str = "", + name: str = "", + **kwargs, + ): + X["y"] = y + self.model = self.mclass.fit(X) + from prophet.serialize import model_to_json + + with open(model_path, "w") as fout: + fout.write(model_to_json(self.model)) + score = self.score(X[["ds"]], y) + return score + + @classmethod + def from_disk(cls, path, *args, **kwargs): + from prophet.serialize import model_from_json + + with open(path, "r") as fin: + model = model_from_json(fin.read()) + return cls(model, params={}) + + def score(self, X, y_true, **kwargs): + return mean_absolute_error(y_true, self.mclass.predict(X)["yhat"]) + + def predict( + self, + X: Union[pd.DataFrame, np.ndarray] = None, + n_periods=None, + include_history=False, + **kwargs, + ): + if n_periods is not None: + future = self.mclass.make_future_dataframe( + periods=int(n_periods), include_history=include_history + ) + return self.mclass.predict(future) + return self.mclass.predict(X) diff --git a/gramex/transformers.py b/gramex/transformers.py index 12514dee8..007513059 100644 --- a/gramex/transformers.py +++ b/gramex/transformers.py @@ -1,15 +1,107 @@ import os.path as op +from typing import List import pandas as pd +import spacy import transformers as trf -from datasets import Dataset +from datasets import Dataset, load_metric from gramex.config import app_log from gramex import cache from sklearn.metrics import roc_auc_score +_CACHE = {} -DEFAULT_MODEL = DEFAULT_TOKENIZER = "distilbert-base-uncased-finetuned-sst-2-english" + +def biluo2iob(tags: List[str]) -> List[str]: + """Convert BILOU tags to IOB tags. + + spaCy insists on BILOU tags, but most transformers models use IOB tags. + + Parameters + ---------- + tags : list + List of BILOU tags + + Returns + ------- + list + List of IOB tags. + + Example + ------- + >>> # "Joe R Biden is President of the United States ." + >>> tags = ['B-PER', 'I-PER', 'L-PER', 'O', 'U-PER', 'O', 'O', 'B-LOC', 'L-LOC', 'O'] + >>> biluo2iob(tags) + ['B-PER', 'I-PER', 'I-PER', 'O', 'B-PER', 'O', 'O', 'B-LOC', 'I-LOC', 'O'] + """ + # Replace L + tags = [t.replace("L-", "I-") for t in tags] + # Replace U + tags = [t.replace("U-", "B-") for t in tags] + return tags + + +def offsets2iob(text: spacy.tokens.Doc, entities: List[dict]) -> List[str]: + """Convert named entity offsets to a sequence of IOB tags. + + Parameters + ---------- + text : spacy.tokens.Doc + spaCy document of the original text + entities : list + Named entities present in the document as a list of dicts. + Each dict represents one named entity and must contain three keys: + 1. "start": the start offset of the entity + 2. "end": the end offset of the entity + 3. "label": the label of the entity + + Returns + ------- + list + A list of IOB tags for the document. + + Example + ------- + >>> import spacy + >>> nlp = load('en') + >>> doc = nlp('Narendra Modi is the PM of India.') + >>> entities = [{'start': 0, 'end': 13, 'label': 'PER'}, + ... {'start': 27, 'end': 32, 'label': 'LOC'}] + >>> offsets2iob(doc, entities) + ['B-PER', 'I-PER', 'O', 'O', 'O', 'O', 'B-LOC', 'O'] + """ + entities = [(ent["start"], ent["end"], ent["label"]) for ent in entities] + tags = spacy.training.offsets_to_biluo_tags(text, entities) + return biluo2iob(tags) + + +def tokenize_and_align_labels(examples, tokenizer): + tokenized_inputs = tokenizer( + examples["text"], truncation=True, is_split_into_words=True + ) + + labels = [] + for i, label in enumerate(examples["ner_tags"]): + word_ids = tokenized_inputs.word_ids( + batch_index=i + ) # Map tokens to their respective word. + previous_word_idx = None + label_ids = [] + for word_idx in word_ids: # Set the special tokens to -100. + if word_idx is None: + label_ids.append(-100) + elif ( + word_idx != previous_word_idx + ): # Only label the first token of a given word. + label_ids.append(label[word_idx]) + else: + label_ids.append(-100) + previous_word_idx = word_idx + labels.append(label_ids) + + tokenized_inputs["labels"] = labels + return tokenized_inputs def load_pretrained(klass, path, default, **kwargs): @@ -27,22 +119,41 @@ def load_pretrained(klass, path, default, **kwargs): class BaseTransformer(object): - def __init__(self, model=DEFAULT_MODEL, tokenizer=DEFAULT_TOKENIZER, **kwargs): + def __init__(self, model=None, tokenizer=None, **kwargs): + if model is None: + model = self.DEFAULT_MODEL + if tokenizer is None: + tokenizer = self.DEFAULT_TOKENIZER self._model = model self._tokenizer = tokenizer - self.model = load_pretrained( - trf.AutoModelForSequenceClassification, model, DEFAULT_MODEL - ) + self.model = load_pretrained(self.AUTO_CLASS, model, self.DEFAULT_MODEL) self.tokenizer = load_pretrained( - trf.AutoTokenizer, tokenizer, DEFAULT_TOKENIZER + trf.AutoTokenizer, tokenizer, self.DEFAULT_TOKENIZER + ) + self.pipeline_kwargs = kwargs + self.pipeline = trf.pipeline( + self.task, model=self.model, tokenizer=self.tokenizer, **kwargs ) + + def post_train(self, model_path): + """Move the model to the CPU, save it with the tokenizer, recreate the pipeline.""" + self.model.to("cpu") + self.model.save_pretrained(op.join(model_path, "model")) + self.tokenizer.save_pretrained(op.join(model_path, "tokenizer")) self.pipeline = trf.pipeline( - self.task, model=self.model, tokenizer=self.tokenizer + self.task, + model=self.model, + tokenizer=self.tokenizer, + **self.pipeline_kwargs, ) class SentimentAnalysis(BaseTransformer): task = "sentiment-analysis" + DEFAULT_MODEL = ( + DEFAULT_TOKENIZER + ) = "distilbert-base-uncased-finetuned-sst-2-english" + AUTO_CLASS = trf.AutoModelForSequenceClassification def fit(self, text, labels, model_path, **kwargs): if pd.api.types.is_object_dtype(labels): @@ -57,14 +168,7 @@ def fit(self, text, labels, model_path, **kwargs): model=self.model, train_dataset=tokenized, args=train_args ) trainer.train() - self.model.to("cpu") - self.model.save_pretrained(op.join(model_path, "model")) - self.tokenizer.save_pretrained(op.join(model_path, "tokenizer")) - self.pipeline = trf.pipeline( - self.task, - model=self.model, - tokenizer=self.tokenizer, - ) + self.post_train(model_path) def predict(self, text, **kwargs): text = text.tolist() @@ -75,4 +179,82 @@ def score(self, X, y_true, **kwargs): y_true = [self.model.config.label2id[x] for x in y_true] y_pred = self.predict(X.squeeze("columns")) y_pred = [self.model.config.label2id[x] for x in y_pred] - return roc_auc_score(y_true, y_pred) + try: + score = roc_auc_score(y_true, y_pred) + # Can't find roc_auc_scores for single samples, or when only one class is present. + except ValueError: + score = 0 + return score + + +class NER(BaseTransformer): + task = "ner" + DEFAULT_TOKENIZER = ( + DEFAULT_MODEL + ) = "dbmdz/bert-large-cased-finetuned-conll03-english" + AUTO_CLASS = trf.AutoModelForTokenClassification + + def __init__(self, model=None, tokenizer=None, **kwargs): + self.nlp = spacy.blank("en") + super(NER, self).__init__( + model=model, tokenizer=tokenizer, aggregation_strategy="first", **kwargs + ) + + @property + def labels(self): + return set([k.split("-")[-1] for k in self.model.config.label2id]) + + def predict(self, text, **kwargs): + text = text.tolist() + return self.pipeline(text) + + def score(self, X, y_true, **kwargs): + try: + metric = load_metric("seqeval") + except ImportError: + app_log.error("Could not load the seqeval metric. Scoring not supported.") + return 0 + # Get references and predictions + X = X.squeeze("columns") + predictions = self.predict(X) + for pred in predictions: + for ent in pred: + ent.update({"label": ent.pop("entity_group")}) + preds = [] + refs = [] + for doc, pred, ref in zip(self.nlp.pipe(X.tolist()), predictions, y_true): + preds.append(offsets2iob(doc, pred)) + refs.append(offsets2iob(doc, ref)) + metrics = metric.compute(references=refs, predictions=preds) + return pd.DataFrame( + {k: v for k, v in metrics.items() if k in self.labels} + ).reset_index() + + def fit(self, text, labels, model_path, **kwargs): + texts = [] + ner_tags = [] + for doc, ents in zip(self.nlp.pipe(text.tolist()), labels): + texts.append([t.text for t in doc]) + ner_tags.append(offsets2iob(doc, ents)) + + label2id = self.model.config.label2id + ner_tags = [[label2id.get(k, 0) for k in tags] for tags in ner_tags] + + dataset = Dataset.from_dict({"text": texts, "ner_tags": ner_tags}) + tokenized = dataset.map( + lambda x: tokenize_and_align_labels(x, self.tokenizer), batched=True + ) + collator = trf.DataCollatorForTokenClassification(tokenizer=self.tokenizer) + args = trf.TrainingArguments( + save_strategy="no", output_dir=model_path, evaluation_strategy="epoch" + ) + trainer = trf.Trainer( + model=self.model, + args=args, + train_dataset=tokenized, + eval_dataset=tokenized, + tokenizer=self.tokenizer, + data_collator=collator, + ) + trainer.train() + self.post_train(model_path) diff --git a/setup.cfg b/setup.cfg index de2ecab31..189297afe 100644 --- a/setup.cfg +++ b/setup.cfg @@ -19,7 +19,7 @@ per-file-ignores = testlib/test_scale.py:E912 ; ML libraries use capital "X" as a function argument or a variable. That's OK gramex/ml_api.py:N803,N806 - gramex/sm_api.py:N803,N806 + gramex/timeseries.py:N803,N806 [nosetests] diff --git a/tests/gramex.yaml b/tests/gramex.yaml index d1bdabd64..308f63f38 100644 --- a/tests/gramex.yaml +++ b/tests/gramex.yaml @@ -1279,6 +1279,22 @@ url: class: SentimentAnalysis xsrf_cookies: false + mlhandler/huggingface/ner: + pattern: /ner + handler: MLHandler + kwargs: + model: + class: NER + xsrf_cookies: false + + mlhandler/prophet: + pattern: /prophet + handler: MLHandler + kwargs: + model: + class: Prophet + xsrf_cookies: false + capture: pattern: /capture handler: CaptureHandler diff --git a/tests/test_mlhandler.py b/tests/test_mlhandler.py index 3851f4deb..6e5016702 100644 --- a/tests/test_mlhandler.py +++ b/tests/test_mlhandler.py @@ -21,24 +21,82 @@ from . import TestGramex, folder, tempfiles +STATSMODELS_INSTALLED = PROPHET_INSTALLED = TRANSFORMERS_INSTALLED = True + try: from statsmodels.datasets.interest_inflation import load as infl_load - STATSMODELS_INSTALLED = True except ImportError: STATSMODELS_INSTALLED = False try: logging.getLogger("tensorflow").disabled = True import transformers as trf # NOQA: F401 - TRANSFORMERS_INSTALLED = True except ImportError: TRANSFORMERS_INSTALLED = False +try: + import prophet # NOQA: F401 +except ImportError: + PROPHET_INSTALLED = False + op = os.path +@skipUnless(PROPHET_INSTALLED, "Please install Prophet to run these tests.") +class TestProphet(TestGramex): + @classmethod + def setUpClass(cls): + df = pd.read_csv( + "https://bit.ly/39d7Y6r", index_col="ds", parse_dates=["ds"] + ) # Peyton Manning dataset + train, test = df.loc[:"2014"], df.loc["2015":] + train, test = train.reset_index(), test.reset_index() + train["ds"] = train["ds"].astype(str) + test["ds"] = test["ds"].astype(str) + cls.train, cls.test = train, test + + @classmethod + def tearDownClass(cls): + + path = op.join( + gramex.config.variables["GRAMEXDATA"], + "apps", + "mlhandler", + "mlhandler-prophet", + ) + if op.isdir(path): + shutil.rmtree(path) + + def setUp(self): + resp = self.get( + "/prophet?_action=train&target_col=y", + method="post", + data=self.train.to_json(orient="records"), + headers={"Content-Type": "application/json"}, + ) + self.assertTrue(resp.json()["score"] < 0.4) + + def test_default(self): + + # Get predictions + resp = self.get( + "/prophet?_action=predict", + method="post", + data=self.test[["ds"]].to_json(orient="records"), + headers={"Content-Type": "application/json"}, + ) + yhat = pd.DataFrame.from_records(resp.json())["yhat"] + self.assertTrue(mean_absolute_error(self.test["y"], yhat) < 0.5) + + def test_forecast(self): + n_periods = self.test.shape[0] + resp = self.get(f"/prophet?_action=predict&n_periods={n_periods}") + yhat = pd.DataFrame.from_records(resp.json())["yhat"] + self.assertTrue(mean_absolute_error(self.test["y"], yhat) < 0.5) + + @skipUnless(TRANSFORMERS_INSTALLED, "Please install transformers to run these tests.") class TestTransformers(TestGramex): @classmethod @@ -52,12 +110,12 @@ def tearDownClass(cls): if op.isdir(path): shutil.rmtree(path) - def test_blank_predictions(self): + def test_default_sentiment(self): """Ensure that the default model predicts something.""" resp = self.get("/sentiment?text=This is bad.&text=This is good.", timeout=60) self.assertEqual(resp.json(), ["NEGATIVE", "POSITIVE"]) - def test_train(self): + def test_train_sentiment(self): """Train with some vague sentences.""" warnings.warn("This test takes a LONG time. Leave while you can.") df = pd.read_json("https://bit.ly/3NesHFs") @@ -70,6 +128,38 @@ def test_train(self): ) self.assertGreaterEqual(resp.json()['score'], 0.9) + def test_default_ner(self): + """Ensure that the default model predicts something.""" + resp = self.get("/ner?text=Narendra Modi is the PM of India.", timeout=300) + labels = [c["labels"] for c in resp.json()] + ents = [[(r["word"], r["entity_group"]) for r in label] for label in labels] + self.assertListEqual(ents, [[("Narendra Modi", "PER"), ("India", "LOC")]]) + + resp = self.get( + "/ner?text=Narendra Modi is the PM of India.&text=Joe Biden is POTUS.", + timeout=300, + ) + labels = [c["labels"] for c in resp.json()] + ents = [[(r["word"], r["entity_group"]) for r in label] for label in labels] + self.assertListEqual( + ents, [[("Narendra Modi", "PER"), ("India", "LOC")], [("Joe Biden", "PER")]] + ) + + def test_train_ner(self): + warnings.warn("This test takes a LONG time. Leave while you can.") + df = pd.read_json("https://bit.ly/3wZYsf5") + resp = self.get( + "/ner?_action=train&target_col=labels", + method="post", + data=df.to_json(orient="records"), + headers={"Content-Type": "application/json"}, + timeout=300, + ) + # Ensure that f1, precision and recall are > 0.6 for all NEs + metrics = pd.DataFrame.from_records(resp.json()["score"]).set_index("index") + metrics = metrics.drop(["number"], axis=0).mean(axis=1) + self.assertTrue((metrics > 0.6).all()) + @skipUnless(STATSMODELS_INSTALLED, "Please install statsmodels to run these tests.") class TestStatsmodels(TestGramex):