-
Notifications
You must be signed in to change notification settings - Fork 193
/
Copy pathsklearn.py
118 lines (97 loc) · 4.39 KB
/
sklearn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import joblib
from typing import List
from sklearn.pipeline import Pipeline
from mlserver.codecs import NumpyCodec, NumpyRequestCodec, PandasCodec
from mlserver.errors import InferenceError
from mlserver.model import MLModel
from mlserver.types import (
InferenceRequest,
InferenceResponse,
ResponseOutput,
RequestOutput,
)
from mlserver.utils import get_model_uri
PREDICT_FN_KEY = "predict_fn"
PREDICT_OUTPUT = "predict"
PREDICT_PROBA_OUTPUT = "predict_proba"
PREDICT_TRANSFORM = "transform"
VALID_OUTPUTS = [PREDICT_OUTPUT, PREDICT_PROBA_OUTPUT, PREDICT_TRANSFORM]
WELLKNOWN_MODEL_FILENAMES = ["model.joblib", "model.pickle", "model.pkl"]
class SKLearnModel(MLModel):
"""
Implementation of the MLModel interface to load and serve `scikit-learn`
models persisted with `joblib`.
"""
async def load(self) -> bool:
# TODO: Log info message
model_uri = await get_model_uri(
self._settings, wellknown_filenames=WELLKNOWN_MODEL_FILENAMES
)
self._model = joblib.load(model_uri)
return True
async def predict(self, payload: InferenceRequest) -> InferenceResponse:
payload = self._check_request(payload)
outputs = self._get_model_outputs(payload)
return InferenceResponse(
model_name=self.name,
model_version=self.version,
outputs=outputs,
)
def _check_request(self, payload: InferenceRequest) -> InferenceRequest:
if not payload.outputs:
found_predict_fn = False
if self.settings.parameters:
if self.settings.parameters.extra:
if PREDICT_FN_KEY in self.settings.parameters.extra:
payload.outputs = [
RequestOutput(
name=self.settings.parameters.extra[PREDICT_FN_KEY]
)
]
found_predict_fn = True
# By default, only return the result of `predict()`
if not found_predict_fn:
payload.outputs = [RequestOutput(name=PREDICT_OUTPUT)]
else:
for request_output in payload.outputs:
if request_output.name not in VALID_OUTPUTS:
raise InferenceError(
f"SKLearnModel only supports '{PREDICT_OUTPUT}', "
f"'{PREDICT_PROBA_OUTPUT}' and "
f"'{PREDICT_TRANSFORM}' as outputs "
f"({request_output.name} was received)"
)
# Regression models do not support `predict_proba`
output_names = [o.name for o in payload.outputs] # type: ignore
if PREDICT_PROBA_OUTPUT in output_names:
# Ensure model supports it
maybe_regressor = self._model
if isinstance(self._model, Pipeline):
maybe_regressor = maybe_regressor.steps[-1][-1]
if not hasattr(maybe_regressor, PREDICT_PROBA_OUTPUT):
raise InferenceError(
f"{type(maybe_regressor)} models do not support "
f"'{PREDICT_PROBA_OUTPUT}"
)
return payload
def _get_model_outputs(self, payload: InferenceRequest) -> List[ResponseOutput]:
decoded_request = self.decode_request(payload, default_codec=NumpyRequestCodec)
outputs = []
for request_output in payload.outputs: # type: ignore
predict_fn = getattr(self._model, request_output.name)
y = predict_fn(decoded_request)
# If output is a Pandas DataFrame, encode it manually
if PandasCodec.can_encode(y):
if len(payload.outputs) > 1: # type: ignore
all_output_names = [o.name for o in payload.outputs] # type: ignore
raise InferenceError(
f"{request_output.name} returned columnar data of type"
f" {type(y)} and {all_output_names} were"
f" requested. Cannot encode multiple columnar data responses"
f" into one response."
)
# Use PandasCodec manually, to obtain the list of outputs
return PandasCodec.encode_outputs(y)
output = self.encode(y, request_output, default_codec=NumpyCodec)
outputs.append(output)
return outputs