50
50
51
51
import ray
52
52
from ray .util .annotations import PublicAPI
53
+ from ray .util .scheduling_strategies import PlacementGroupSchedulingStrategy
53
54
54
55
from xgboost_ray .main import (
55
56
_handle_queue , RayXGBoostActor , LEGACY_MATRIX , RayDeviceQuantileDMatrix ,
60
61
_Checkpoint , _create_communication_processes , RayTaskError ,
61
62
RayXGBoostActorAvailable , RayXGBoostTrainingError , _create_placement_group ,
62
63
_shutdown , PlacementGroup , ActorHandle , combine_data , _trigger_data_load ,
63
- DEFAULT_PG , _autodetect_resources as _autodetect_resources_base )
64
+ DEFAULT_PG , _autodetect_resources as _autodetect_resources_base ,
65
+ _ray_get_actor_cpus )
64
66
from xgboost_ray .session import put_queue
65
67
from xgboost_ray import RayDMatrix
66
68
@@ -329,9 +331,8 @@ def train(self, return_bst: bool, params: Dict[str, Any],
329
331
local_params = _choose_param_value (
330
332
main_param_name = "num_threads" ,
331
333
params = params ,
332
- default_value = num_threads if num_threads > 0 else
333
- sum (num
334
- for _ , num in ray .worker .get_resource_ids ().get ("CPU" , [])))
334
+ default_value = num_threads
335
+ if num_threads > 0 else _ray_get_actor_cpus ())
335
336
336
337
if "init_model" in kwargs :
337
338
if isinstance (kwargs ["init_model" ], bytes ):
@@ -537,19 +538,23 @@ def _create_actor(
537
538
# Send DEFAULT_PG here, which changed in Ray > 1.4.0
538
539
# If we send `None`, this will ignore the parent placement group and
539
540
# lead to errors e.g. when used within Ray Tune
540
- return _RemoteRayLightGBMActor .options (
541
+ actor_cls = _RemoteRayLightGBMActor .options (
541
542
num_cpus = num_cpus_per_actor ,
542
543
num_gpus = num_gpus_per_actor ,
543
544
resources = resources_per_actor ,
544
- placement_group_capture_child_tasks = True ,
545
- placement_group = placement_group or DEFAULT_PG ).remote (
546
- rank = rank ,
547
- num_actors = num_actors ,
548
- model_factory = model_factory ,
549
- queue = queue ,
550
- checkpoint_frequency = checkpoint_frequency ,
551
- distributed_callbacks = distributed_callbacks ,
552
- network_params = {"local_listen_port" : port } if port else None )
545
+ scheduling_strategy = PlacementGroupSchedulingStrategy (
546
+ placement_group = placement_group or DEFAULT_PG ,
547
+ placement_group_capture_child_tasks = True ,
548
+ ))
549
+
550
+ return actor_cls .remote (
551
+ rank = rank ,
552
+ num_actors = num_actors ,
553
+ model_factory = model_factory ,
554
+ queue = queue ,
555
+ checkpoint_frequency = checkpoint_frequency ,
556
+ distributed_callbacks = distributed_callbacks ,
557
+ network_params = {"local_listen_port" : port } if port else None )
553
558
554
559
555
560
def _train (params : Dict ,
@@ -734,7 +739,9 @@ def handle_actor_failure(actor_id):
734
739
# confilict, it can try and choose a new one. Most of the times
735
740
# it will complete in one iteration
736
741
machines = None
737
- for n in range (5 ):
742
+ max_attempts = 5
743
+ i = 0
744
+ for i in range (max_attempts ):
738
745
addresses = ray .get (
739
746
[actor .find_free_address .remote () for actor in live_actors ])
740
747
if addresses :
@@ -750,7 +757,7 @@ def handle_actor_failure(actor_id):
750
757
else :
751
758
logger .debug ("Couldn't obtain unique addresses, trying again." )
752
759
if machines :
753
- logger .debug (f"Obtained unique addresses in { n } attempts." )
760
+ logger .debug (f"Obtained unique addresses in { i } attempts." )
754
761
else :
755
762
raise ValueError (
756
763
f"Couldn't obtain enough unique addresses for { len (live_actors )} ."
0 commit comments