diff --git a/README.md b/README.md index 2737ee94..c4e49316 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,14 @@ class MyBot(sc2.BotAI): self.raw_affects_selection = True ``` +### `enable_feature_layer` +Setting this to true allows interaction with the UI +```python +class MyBot(sc2.BotAI): + def __init__(self): + self.enable_feature_layer = True +``` + ### `distance_calculation_method` The distance calculation method: - 0 for raw python diff --git a/examples/protoss/single_unit_unload_test.py b/examples/protoss/single_unit_unload_test.py new file mode 100644 index 00000000..96cb76db --- /dev/null +++ b/examples/protoss/single_unit_unload_test.py @@ -0,0 +1,123 @@ +import sys, os + +sys.path.append(os.path.join(os.path.dirname(__file__), "../..")) + +from typing import Union +from loguru import logger + +import sc2 +from sc2 import Race, Difficulty +from sc2.ids.unit_typeid import UnitTypeId +from sc2.player import Bot, Computer +from sc2.unit import Unit +from sc2.units import Units + +from s2clientprotocol import raw_pb2 as raw_pb +from s2clientprotocol import sc2api_pb2 as sc_pb +from s2clientprotocol import ui_pb2 as ui_pb + + +class SingleUnitUnloadBot(sc2.BotAI): + def __init__(self): + self.raw_affects_selection = True + self.enable_feature_layer = True + + async def on_start(self): + self.client.game_step = 8 + self.load_unit_types = { + UnitTypeId.ZEALOT, + UnitTypeId.STALKER, + UnitTypeId.DARKTEMPLAR, + UnitTypeId.HIGHTEMPLAR, + } + + async def unload_unit(self, transporter_unit: Unit, unload_unit: Union[int, Unit]): + assert isinstance(transporter_unit, Unit) + assert isinstance(unload_unit, (int, Unit)) + assert hasattr(self, "raw_affects_selection") and self.raw_affects_selection is True + assert hasattr(self, "enable_feature_layer") and self.enable_feature_layer is True + if isinstance(unload_unit, Unit): + unload_unit_tag = unload_unit.tag + else: + unload_unit_tag = unload_unit + + # TODO Change unit.py passengers to return a List[Unit] instead of Set[Unit] ? Then I don't have to loop over '._proto' + unload_unit_index = next( + (index for index, unit in enumerate(transporter_unit._proto.passengers) if unit.tag == unload_unit_tag), + None + ) + + if unload_unit_index is None: + logger.info(f"Unable to find unit {unload_unit} in transporter {transporter_unit}") + return + + logger.info(f"Unloading unit at index: {unload_unit_index}") + await self.client._execute( + action=sc_pb.RequestAction( + actions=[ + sc_pb.Action( + action_raw=raw_pb.ActionRaw( + unit_command=raw_pb.ActionRawUnitCommand(ability_id=0, unit_tags=[transporter_unit.tag]) + ) + ), + sc_pb.Action( + action_ui=ui_pb.ActionUI( + cargo_panel=ui_pb.ActionCargoPanelUnload(unit_index=unload_unit_index) + ) + ), + ] + ) + ) + + async def on_step(self, iteration): + # Spawn units + logger.info(f"Spawning units") + await self.client.debug_create_unit( + [ + [UnitTypeId.WARPPRISM, 1, self.game_info.map_center, 1], + [UnitTypeId.ZEALOT, 1, self.game_info.map_center, 1], + [UnitTypeId.STALKER, 1, self.game_info.map_center, 1], + [UnitTypeId.DARKTEMPLAR, 1, self.game_info.map_center, 1], + [UnitTypeId.HIGHTEMPLAR, 1, self.game_info.map_center, 1], + ] + ) + # Load units into prism + await self._advance_steps(50) + prism = self.units(UnitTypeId.WARPPRISM)[0] + my_zealot = self.units(UnitTypeId.ZEALOT)[0] + my_units = self.units(self.load_unit_types) + logger.info(f"Loading units into prism: {my_units}") + for unit in my_units: + unit.smart(prism) + + # Unload single unit - here: zealot + await self._advance_steps(50) + assert self.units(self.load_unit_types).amount == 0 + prism: Unit = self.units(UnitTypeId.WARPPRISM)[0] + await self.unload_unit(prism, my_zealot) + # Also works: + # await self.unload_unit(prism, my_zealot.tag) + + await self._advance_steps(50) + my_units = self.units(self.load_unit_types) + assert my_units.amount == 1, f"{my_units}" + my_zealots = self.units(UnitTypeId.ZEALOT) + assert my_zealots.amount == 1, f"{my_zealots}" + assert my_zealots[0].tag == my_zealot.tag + + logger.info("Everything ran as expected. Terminating.") + await self.client.leave() + + +def main(): + sc2.run_game( + sc2.maps.get("2000AtmospheresAIE"), + [Bot(Race.Protoss, SingleUnitUnloadBot()), + Computer(Race.Terran, Difficulty.Medium)], + realtime=False, + save_replay_as="PvT.SC2Replay", + ) + + +if __name__ == "__main__": + main() diff --git a/examples/zerg/expand_everywhere.py b/examples/zerg/expand_everywhere.py index e9ea2811..339866a2 100644 --- a/examples/zerg/expand_everywhere.py +++ b/examples/zerg/expand_everywhere.py @@ -3,7 +3,6 @@ sys.path.append(os.path.join(os.path.dirname(__file__), "../..")) -import numpy as np from sc2.position import Point2, Point3 import sc2 @@ -76,7 +75,7 @@ async def on_building_construction_complete(self, unit: Unit): def main(): sc2.run_game( - sc2.maps.get("AcropolisLE"), + sc2.maps.get("2000AtmospheresAIE"), [Bot(Race.Zerg, ExpandEverywhere()), Computer(Race.Terran, Difficulty.Medium)], realtime=False, save_replay_as="ZvT.SC2Replay", diff --git a/sc2/client.py b/sc2/client.py index fe852176..6f6939bf 100644 --- a/sc2/client.py +++ b/sc2/client.py @@ -5,6 +5,7 @@ from s2clientprotocol import query_pb2 as query_pb from s2clientprotocol import raw_pb2 as raw_pb from s2clientprotocol import sc2api_pb2 as sc_pb +from s2clientprotocol import common_pb2 as common_pb from .action import combine_actions from .data import ActionResult, ChatChannel, Race, Result, Status @@ -41,12 +42,19 @@ def __init__(self, ws): self._renderer = None self.raw_affects_selection = False + self.enable_feature_layer = False @property def in_game(self): return self._status in {Status.in_game, Status.in_replay} async def join_game(self, name=None, race=None, observed_player_id=None, portconfig=None, rgb_render_config=None): + feature_layer = None + if self.enable_feature_layer: + feature_layer = sc_pb.SpatialCameraSetup( + resolution=common_pb.Size2DI(x=1, y=1), + minimap_resolution=common_pb.Size2DI(x=1, y=1), + ) ifopts = sc_pb.InterfaceOptions( raw=True, score=True, @@ -55,6 +63,7 @@ async def join_game(self, name=None, race=None, observed_player_id=None, portcon raw_affects_selection=self.raw_affects_selection, raw_crop_to_playable_area=False, show_placeholders=True, + feature_layer=feature_layer, ) if rgb_render_config: @@ -197,7 +206,9 @@ async def actions(self, actions, return_successes=False): return [ActionResult(r) for r in res.action.result if ActionResult(r) != ActionResult.Success] async def query_pathing( - self, start: Union[Unit, Point2, Point3], end: Union[Point2, Point3] + self, + start: Union[Unit, Point2, Point3], + end: Union[Point2, Point3], ) -> Optional[Union[int, float]]: """Caution: returns "None" when path not found Try to combine queries with the function below because the pathing query is generally slow. @@ -261,7 +272,10 @@ async def _query_building_placement_fast( return [p.result == 1 for p in result.query.placements] async def query_building_placement( - self, ability: AbilityData, positions: List[Union[Point2, Point3]], ignore_resources: bool = True + self, + ability: AbilityData, + positions: List[Union[Point2, Point3]], + ignore_resources: bool = True ) -> List[ActionResult]: """This function might be deleted in favor of the function above (_query_building_placement_fast). @@ -341,7 +355,8 @@ async def toggle_autocast(self, units: Union[List[Unit], Units], ability: Abilit sc_pb.Action( action_raw=raw_pb.ActionRaw( toggle_autocast=raw_pb.ActionRawToggleAutocast( - ability_id=ability.value, unit_tags=(u.tag for u in units) + ability_id=ability.value, + unit_tags=(u.tag for u in units), ) ) ) @@ -373,8 +388,7 @@ async def debug_create_unit(self, unit_spawn_commands: List[List[Union[UnitTypeI pos=position.as_Point2D, quantity=amount_of_units, ) - ) - for unit_type, amount_of_units, position, owner_id in unit_spawn_commands + ) for unit_type, amount_of_units, position, owner_id in unit_spawn_commands ) ) ) @@ -593,13 +607,11 @@ async def _send_debug(self): debug_pb.DebugCommand( draw=debug_pb.DebugDraw( text=[text.to_proto() for text in self._debug_texts] if self._debug_texts else None, - lines=[line.to_proto() for line in self._debug_lines] - if self._debug_lines - else None, + lines=[line.to_proto() + for line in self._debug_lines] if self._debug_lines else None, boxes=[box.to_proto() for box in self._debug_boxes] if self._debug_boxes else None, - spheres=[sphere.to_proto() for sphere in self._debug_spheres] - if self._debug_spheres - else None, + spheres=[sphere.to_proto() + for sphere in self._debug_spheres] if self._debug_spheres else None, ) ) ] @@ -647,11 +659,9 @@ async def debug_set_unit_value(self, unit_tags: Union[Iterable[int], Units, Unit debug=sc_pb.RequestDebug( debug=( debug_pb.DebugCommand( - unit_value=debug_pb.DebugSetUnitValue( - unit_value=unit_value, value=float(value), unit_tag=unit_tag - ) - ) - for unit_tag in unit_tags + unit_value=debug_pb. + DebugSetUnitValue(unit_value=unit_value, value=float(value), unit_tag=unit_tag) + ) for unit_tag in unit_tags ) ) ) diff --git a/sc2/main.py b/sc2/main.py index aab8e579..a336e32a 100644 --- a/sc2/main.py +++ b/sc2/main.py @@ -472,8 +472,11 @@ async def _host_game( client = await _setup_host_game(server, map_settings, players, realtime, random_seed, disable_fog) # Bot can decide if it wants to launch with 'raw_affects_selection=True' - if not isinstance(players[0], Human) and getattr(players[0].ai, "raw_affects_selection", None) is not None: + if not isinstance(players[0], Human) and getattr(players[0].ai, "enable_feature_layer", None) is not None: client.raw_affects_selection = players[0].ai.raw_affects_selection + # And 'enable_feature_layer=True' + if not isinstance(players[0], Human) and getattr(players[0].ai, "enable_feature_layer", None) is not None: + client.enable_feature_layer = players[0].ai.enable_feature_layer try: result = await _play_game( @@ -510,6 +513,8 @@ async def _host_game_aiter( client = await _setup_host_game(server, map_settings, players, realtime) if not isinstance(players[0], Human) and getattr(players[0].ai, "raw_affects_selection", None) is not None: client.raw_affects_selection = players[0].ai.raw_affects_selection + if not isinstance(players[0], Human) and getattr(players[0].ai, "enable_feature_layer", None) is not None: + client.enable_feature_layer = players[0].ai.enable_feature_layer try: result = await _play_game(players[0], client, realtime, portconfig, step_time_limit, game_time_limit) @@ -548,6 +553,9 @@ async def _join_game( # Bot can decide if it wants to launch with 'raw_affects_selection=True' if not isinstance(players[1], Human) and getattr(players[1].ai, "raw_affects_selection", None) is not None: client.raw_affects_selection = players[1].ai.raw_affects_selection + # And 'enable_feature_layer=True' + if not isinstance(players[1], Human) and getattr(players[1].ai, "enable_feature_layer", None) is not None: + client.enable_feature_layer = players[1].ai.enable_feature_layer try: result = await _play_game(players[1], client, realtime, portconfig, step_time_limit, game_time_limit)