1010
1111import java .time .Instant ;
1212import java .util .Arrays ;
13+ import java .util .HashMap ;
1314import java .util .List ;
1415import java .util .Map ;
1516import java .util .stream .Collectors ;
1617
1718import org .opensearch .ExceptionsHelper ;
1819import org .opensearch .OpenSearchStatusException ;
1920import org .opensearch .action .ActionRequest ;
20- import org .opensearch .action .bulk .BulkRequest ;
21+ import org .opensearch .action .bulk .BulkItemResponse ;
2122import org .opensearch .action .bulk .BulkResponse ;
2223import org .opensearch .action .search .SearchRequest ;
2324import org .opensearch .action .search .SearchResponse ;
2425import org .opensearch .action .support .ActionFilters ;
2526import org .opensearch .action .support .HandledTransportAction ;
2627import org .opensearch .action .support .WriteRequest ;
27- import org .opensearch .action .update .UpdateRequest ;
2828import org .opensearch .cluster .service .ClusterService ;
2929import org .opensearch .common .inject .Inject ;
3030import org .opensearch .common .settings .Settings ;
5555import org .opensearch .ml .task .MLTaskManager ;
5656import org .opensearch .ml .utils .RestActionUtils ;
5757import org .opensearch .ml .utils .TenantAwareHelper ;
58+ import org .opensearch .remote .metadata .client .BulkDataObjectRequest ;
5859import org .opensearch .remote .metadata .client .SdkClient ;
5960import org .opensearch .remote .metadata .client .SearchDataObjectRequest ;
61+ import org .opensearch .remote .metadata .client .UpdateDataObjectRequest ;
6062import org .opensearch .remote .metadata .common .SdkClientUtils ;
6163import org .opensearch .search .SearchHit ;
6264import org .opensearch .search .builder .SearchSourceBuilder ;
6668import org .opensearch .transport .client .Client ;
6769
6870import com .google .common .annotations .VisibleForTesting ;
69- import com .google .common .collect .ImmutableMap ;
7071
7172import lombok .extern .log4j .Log4j2 ;
7273
@@ -217,8 +218,8 @@ private void undeployModels(
217218 return modelCacheMissForModelIds ;
218219 });
219220 if (response .getNodes ().isEmpty () || modelNotFoundInNodesCache ) {
220- log .warn ("No node found running the model(s): {}" , Arrays . toString ( modelIds ) );
221- bulkSetModelIndexToUndeploy (modelIds , listener , response );
221+ log .warn ("No nodes service these models, performing manual `UNDEPLOY` write to model index" );
222+ bulkSetModelIndexToUndeploy (modelIds , tenantId , listener , response );
222223 return ;
223224 }
224225 log .info ("Successfully undeployed model(s) from nodes: {}" , Arrays .toString (modelIds ));
@@ -228,34 +229,39 @@ private void undeployModels(
228229
229230 private void bulkSetModelIndexToUndeploy (
230231 String [] modelIds ,
232+ String tenantId ,
231233 ActionListener <MLUndeployModelsResponse > listener ,
232- MLUndeployModelNodesResponse response
234+ MLUndeployModelNodesResponse mlUndeployModelNodesResponse
233235 ) {
234- BulkRequest bulkUpdateRequest = new BulkRequest ();
236+ BulkDataObjectRequest bulkRequest = BulkDataObjectRequest .builder ().globalIndex (ML_MODEL_INDEX ).build ();
237+
235238 for (String modelId : modelIds ) {
236- UpdateRequest updateRequest = new UpdateRequest ();
237239
238- ImmutableMap .Builder <String , Object > builder = ImmutableMap .builder ();
239- builder .put (MLModel .MODEL_STATE_FIELD , MLModelState .UNDEPLOYED .name ());
240+ Map <String , Object > updateDocument = new HashMap <>();
240241
241- builder .put (MLModel .PLANNING_WORKER_NODES_FIELD , List .of ());
242- builder .put (MLModel .PLANNING_WORKER_NODE_COUNT_FIELD , 0 );
242+ updateDocument .put (MLModel .MODEL_STATE_FIELD , MLModelState .UNDEPLOYED .name ());
243+ updateDocument .put (MLModel .PLANNING_WORKER_NODES_FIELD , List .of ());
244+ updateDocument .put (MLModel .PLANNING_WORKER_NODE_COUNT_FIELD , 0 );
245+ updateDocument .put (MLModel .LAST_UPDATED_TIME_FIELD , Instant .now ().toEpochMilli ());
246+ updateDocument .put (MLModel .CURRENT_WORKER_NODE_COUNT_FIELD , 0 );
243247
244- builder .put (MLModel .LAST_UPDATED_TIME_FIELD , Instant .now ().toEpochMilli ());
245- builder .put (MLModel .CURRENT_WORKER_NODE_COUNT_FIELD , 0 );
246- updateRequest .index (ML_MODEL_INDEX ).id (modelId ).doc (builder .build ());
247- bulkUpdateRequest .add (updateRequest );
248+ UpdateDataObjectRequest updateRequest = UpdateDataObjectRequest
249+ .builder ()
250+ .id (modelId )
251+ .tenantId (tenantId )
252+ .dataObject (updateDocument )
253+ .build ();
254+ bulkRequest .add (updateRequest ).setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE );
248255 }
249256
250- bulkUpdateRequest .setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE );
251257 log .info ("No nodes running these models: {}" , Arrays .toString (modelIds ));
252258
253259 try (ThreadContext .StoredContext threadContext = client .threadPool ().getThreadContext ().stashContext ()) {
254260 ActionListener <MLUndeployModelsResponse > listenerWithContextRestoration = ActionListener
255261 .runBefore (listener , () -> threadContext .restore ());
262+
256263 ActionListener <BulkResponse > bulkResponseListener = ActionListener .wrap (br -> {
257- log .debug ("Successfully set the following modelId(s) to UNDEPLOY in index: {}" , Arrays .toString (modelIds ));
258- listenerWithContextRestoration .onResponse (new MLUndeployModelsResponse (response ));
264+ listenerWithContextRestoration .onResponse (new MLUndeployModelsResponse (mlUndeployModelNodesResponse ));
259265 }, e -> {
260266 String modelsNotFoundMessage = String
261267 .format ("Failed to set the following modelId(s) to UNDEPLOY in index: %s" , Arrays .toString (modelIds ));
@@ -268,7 +274,40 @@ private void bulkSetModelIndexToUndeploy(
268274 listenerWithContextRestoration .onFailure (exception );
269275 });
270276
271- client .bulk (bulkUpdateRequest , bulkResponseListener );
277+ sdkClient .bulkDataObjectAsync (bulkRequest ).whenComplete ((response , exception ) -> {
278+ if (exception != null ) {
279+ Exception cause = SdkClientUtils .unwrapAndConvertToException (exception , OpenSearchStatusException .class );
280+ bulkResponseListener .onFailure (cause );
281+ return ;
282+ }
283+
284+ try {
285+ BulkResponse bulkResponse = BulkResponse .fromXContent (response .parser ());
286+ log
287+ .info (
288+ "Executed {} bulk operations with {} failures, Took: {}" ,
289+ bulkResponse .getItems ().length ,
290+ bulkResponse .hasFailures ()
291+ ? Arrays .stream (bulkResponse .getItems ()).filter (BulkItemResponse ::isFailed ).count ()
292+ : 0 ,
293+ bulkResponse .getTook ()
294+ );
295+ List <String > unemployedModelIds = Arrays
296+ .stream (bulkResponse .getItems ())
297+ .filter (bulkItemResponse -> !bulkItemResponse .isFailed ())
298+ .map (BulkItemResponse ::getId )
299+ .collect (Collectors .toList ());
300+ log
301+ .debug (
302+ "Successfully set the following modelId(s) to UNDEPLOY in index: {}" ,
303+ Arrays .toString (unemployedModelIds .toArray ())
304+ );
305+
306+ bulkResponseListener .onResponse (bulkResponse );
307+ } catch (Exception e ) {
308+ bulkResponseListener .onFailure (e );
309+ }
310+ });
272311 } catch (Exception e ) {
273312 log .error ("Unexpected error while setting the following modelId(s) to UNDEPLOY in index: {}" , Arrays .toString (modelIds ), e );
274313 listener .onFailure (e );
0 commit comments