From 2cc30d3057b2badedbda1bbfc36e91658e706d93 Mon Sep 17 00:00:00 2001 From: pm-bhatt Date: Wed, 25 Feb 2026 22:27:13 +0530 Subject: [PATCH] feat: add unlock_user tool for unlocking locked-out accounts Add a new MCP tool that unlocks Okta user accounts with LOCKED_OUT status, returning them to ACTIVE so they can sign in with their existing password. Includes input validation, tests, and README docs. Co-Authored-By: Claude Opus 4.6 --- README.md | 1 + src/okta_mcp_server/tools/users/users.py | 36 +++++++++++++++++ tests/conftest.py | 1 + tests/elicitation/test_users_elicitation.py | 44 ++++++++++++++++++++- 4 files changed, 81 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ddbc72f..fcc4b01 100644 --- a/README.md +++ b/README.md @@ -453,6 +453,7 @@ The Okta MCP Server provides the following tools for LLMs to interact with your | `update_user` | Update an existing user's profile information | - `Update John Doe's department to Engineering`
- `Change the phone number for user jane.smith@company.com`
- `Update the manager for this user` | | `deactivate_user` | Deactivate a user (prompts for confirmation) | - `Deactivate the user john.doe@company.com`
- `Disable access for former employee Jane Smith`
- `Suspend the contractor account temporarily` | | `delete_deactivated_user` | Permanently delete a deactivated user (prompts for confirmation) | - `Delete the deactivated user john.doe@company.com`
- `Remove former employee Jane Smith permanently`
- `Clean up old contractor accounts` | +| `unlock_user` | Unlock a user account with LOCKED_OUT status | - `Unlock the account for john.doe@company.com`
- `Unlock user 00u1234567890 so they can sign in again`
- `Restore access for locked-out user` | | `get_user_profile_attributes` | Retrieve all supported user profile attributes | - `What user profile fields are available?`
- `Show me all the custom attributes we can set`
- `List the standard Okta user attributes` | ### Groups diff --git a/src/okta_mcp_server/tools/users/users.py b/src/okta_mcp_server/tools/users/users.py index 060f4f6..95bdce6 100644 --- a/src/okta_mcp_server/tools/users/users.py +++ b/src/okta_mcp_server/tools/users/users.py @@ -257,6 +257,42 @@ async def update_user(user_id: str, profile: dict, ctx: Context = None) -> list: return [f"Exception: {e}"] +@mcp.tool() +@validate_ids("user_id") +async def unlock_user(user_id: str, ctx: Context = None) -> list: + """Unlock a user account that has a LOCKED_OUT status in the Okta organization. + + This tool unlocks a user whose account has been locked due to exceeding + the failed login attempt threshold. The user is returned to ACTIVE status + and can sign in with their existing password. + + Parameters: + user_id (str, required): The ID of the locked-out user to unlock. + + Returns: + List containing the result of the unlock operation. + """ + logger.info(f"Unlock requested for user: {user_id}") + + manager = ctx.request_context.lifespan_context.okta_auth_manager + + try: + client = await get_okta_client(manager) + logger.debug(f"Calling Okta API to unlock user {user_id}") + + _, err = await client.unlock_user(user_id) + + if err: + logger.error(f"Okta API error while unlocking user {user_id}: {err}") + return [f"Error: {err}"] + + logger.info(f"Successfully unlocked user: {user_id}") + return [f"User {user_id} unlocked successfully."] + except Exception as e: + logger.error(f"Exception while unlocking user {user_id}: {type(e).__name__}: {e}") + return [f"Exception: {e}"] + + @mcp.tool() @validate_ids("user_id") async def deactivate_user(user_id: str, ctx: Context = None) -> list: diff --git a/tests/conftest.py b/tests/conftest.py index 4024a8e..402a3fc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -195,6 +195,7 @@ def mock_okta_client(): client.delete_policy_rule.return_value = (None, None) client.deactivate_user.return_value = (None, None) client.deactivate_or_delete_user.return_value = (None, None) + client.unlock_user.return_value = (None, None) client.deactivate_application.return_value = (None, None) client.deactivate_policy.return_value = (None, None) client.deactivate_policy_rule.return_value = (None, None) diff --git a/tests/elicitation/test_users_elicitation.py b/tests/elicitation/test_users_elicitation.py index a6ee865..edd149e 100644 --- a/tests/elicitation/test_users_elicitation.py +++ b/tests/elicitation/test_users_elicitation.py @@ -5,7 +5,7 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -"""Tests for user deactivation and deletion with elicitation support.""" +"""Tests for user lifecycle operations including unlock, deactivation, and deletion.""" from __future__ import annotations @@ -16,12 +16,54 @@ from okta_mcp_server.tools.users.users import ( deactivate_user, delete_deactivated_user, + unlock_user, ) USER_ID = "00u1234567890ABCDEF" +# =================================================================== +# unlock_user +# =================================================================== + +class TestUnlockUser: + """Tests for unlock_user (non-destructive, no elicitation required).""" + + @pytest.mark.asyncio + @patch("okta_mcp_server.tools.users.users.get_okta_client") + async def test_unlock_success(self, mock_get_client, ctx_elicit_accept_true, mock_okta_client): + """Unlock succeeds and returns a success message.""" + mock_get_client.return_value = mock_okta_client + + result = await unlock_user(user_id=USER_ID, ctx=ctx_elicit_accept_true) + + mock_okta_client.unlock_user.assert_awaited_once_with(USER_ID) + assert "unlocked successfully" in result[0] + + @pytest.mark.asyncio + @patch("okta_mcp_server.tools.users.users.get_okta_client") + async def test_unlock_okta_api_error(self, mock_get_client, ctx_elicit_accept_true): + """Okta API error is surfaced to the caller.""" + client = AsyncMock() + client.unlock_user.return_value = (None, "API Error: user is not locked out") + mock_get_client.return_value = client + + result = await unlock_user(user_id=USER_ID, ctx=ctx_elicit_accept_true) + + assert "Error" in result[0] + + @pytest.mark.asyncio + @patch("okta_mcp_server.tools.users.users.get_okta_client") + async def test_unlock_exception(self, mock_get_client, ctx_elicit_accept_true): + """Generic exception is surfaced to the caller.""" + mock_get_client.side_effect = Exception("Connection refused") + + result = await unlock_user(user_id=USER_ID, ctx=ctx_elicit_accept_true) + + assert "Exception" in result[0] + + # =================================================================== # deactivate_user — elicitation flows # ===================================================================