diff --git a/.gitignore b/.gitignore index 5e5de318..1daee3a1 100644 --- a/.gitignore +++ b/.gitignore @@ -13,11 +13,11 @@ dist/ .venv/ venv/ .idea/ -.vscode/ +mcp_server_debug.log # ---- macOS clutter ----------------------------------------------------- .DS_Store # ---- Secrets ----------------------------------------------------------- client_secret.json - +.credentials/*.json* diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..7d1ead2b --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,23 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Run in Debug Mode", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/main.py", + "args": [ + "--transport", + "streamable-http", + "--tools", + "docs", + "--single-user" + ], + "env": { + "WORKSPACE_MCP_PORT": "8000" + }, + "justMyCode": false, + "console": "integratedTerminal" + } + ] +} \ No newline at end of file diff --git a/.vscode/mcp.json b/.vscode/mcp.json new file mode 100644 index 00000000..404ecb53 --- /dev/null +++ b/.vscode/mcp.json @@ -0,0 +1,9 @@ +{ +"servers": { + "google_workspace": { + "type": "stdio", + "command": "uvx", + "args": ["workspace-mcp", "--single-user"] + } + } +} \ No newline at end of file diff --git a/auth/google_auth.py b/auth/google_auth.py index 03f39435..ed3ca1e9 100644 --- a/auth/google_auth.py +++ b/auth/google_auth.py @@ -1,18 +1,21 @@ # auth/google_auth.py -import os +import asyncio import json import logging -import asyncio -from typing import List, Optional, Tuple, Dict, Any, Callable import os +from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple +import jwt +from google.auth.exceptions import RefreshError +from google.auth.transport.requests import Request +from google.oauth2 import service_account from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import Flow, InstalledAppFlow -from google.auth.transport.requests import Request -from google.auth.exceptions import RefreshError from googleapiclient.discovery import build from googleapiclient.errors import HttpError + from auth.scopes import OAUTH_STATE_TO_SESSION_ID_MAP, SCOPES # Configure logging @@ -43,6 +46,7 @@ def _find_any_credentials(base_dir: str = DEFAULT_CREDENTIALS_DIR) -> Optional[C """ Find and load any valid credentials from the credentials directory. Used in single-user mode to bypass session-to-OAuth mapping. + Supports both OAuth2 and service account credentials. Returns: First valid Credentials object found, or None if none exist. @@ -56,8 +60,24 @@ def _find_any_credentials(base_dir: str = DEFAULT_CREDENTIALS_DIR) -> Optional[C if filename.endswith('.json'): filepath = os.path.join(base_dir, filename) try: + # Check if this is a service account file + if "iam.gserviceaccount.com" in filename: + logger.info(f"[single-user] Found service account file: {filepath}") + try: + credentials = service_account.Credentials.from_service_account_file(filepath,scopes=SCOPES) + logger.info(f"[single-user] Successfully loaded service account credentials from {filepath}") + + # Force refresh to get a token, since by default it's not set and the library considers then credentials to be invalid + credentials.refresh(Request()) + return credentials + except Exception as e: + logger.warning(f"[single-user] Error loading service account credentials from {filepath}: {e}") + continue + + # Handle OAuth2 credentials with open(filepath, 'r') as f: creds_data = json.load(f) + credentials = Credentials( token=creds_data.get('token'), refresh_token=creds_data.get('refresh_token'), @@ -66,7 +86,7 @@ def _find_any_credentials(base_dir: str = DEFAULT_CREDENTIALS_DIR) -> Optional[C client_secret=creds_data.get('client_secret'), scopes=creds_data.get('scopes') ) - logger.info(f"[single-user] Found credentials in {filepath}") + logger.info(f"[single-user] Found OAuth2 credentials in {filepath}") return credentials except (IOError, json.JSONDecodeError, KeyError) as e: logger.warning(f"[single-user] Error loading credentials from {filepath}: {e}") @@ -122,7 +142,6 @@ def load_credentials_from_file(user_google_email: str, base_dir: str = DEFAULT_C expiry = None if creds_data.get('expiry'): try: - from datetime import datetime expiry = datetime.fromisoformat(creds_data['expiry']) except (ValueError, TypeError) as e: logger.warning(f"Could not parse expiry time for {user_google_email}: {e}") @@ -494,7 +513,6 @@ async def get_authenticated_google_service( session_id=None, # Session ID not available in service layer ) - if not credentials or not credentials.valid: logger.warning( f"[{tool_name}] No valid credentials. Email: '{user_google_email}'." @@ -525,10 +543,13 @@ async def get_authenticated_google_service( service = build(service_name, version, credentials=credentials) log_user_email = user_google_email - # Try to get email from credentials if needed for validation - if credentials and credentials.id_token: + # For service accounts, use the service account email + if hasattr(credentials, 'service_account_email'): + log_user_email = credentials.service_account_email + logger.info(f"[{tool_name}] Using service account: {log_user_email}") + # For OAuth2 credentials, try to get email from id_token + elif credentials and hasattr(credentials, 'id_token') and credentials.id_token: try: - import jwt # Decode without verification (just to get email for logging) decoded_token = jwt.decode(credentials.id_token, options={"verify_signature": False}) token_email = decoded_token.get("email") diff --git a/gdocs/docs_tools.py b/gdocs/docs_tools.py index 7f330b71..a7942cd9 100644 --- a/gdocs/docs_tools.py +++ b/gdocs/docs_tools.py @@ -6,9 +6,8 @@ import logging import asyncio import io -from typing import List +from typing import List, Annotated, Optional, Dict -from mcp import types from googleapiclient.errors import HttpError from googleapiclient.http import MediaIoBaseDownload @@ -17,6 +16,8 @@ from core.utils import extract_office_xml_text, handle_http_errors from core.server import server +from pydantic import Field + logger = logging.getLogger(__name__) @server.tool() @@ -214,3 +215,144 @@ async def create_doc( msg = f"Created Google Doc '{title}' (ID: {doc_id}) for {user_google_email}. Link: {link}" logger.info(f"Successfully created Google Doc '{title}' (ID: {doc_id}) for {user_google_email}. Link: {link}") return msg + + +@server.tool() +@require_google_service("drive", "drive_read") +async def copy_google_doc( + service, + user_google_email: str, + template_id: str, + new_title: str, + target_folder_id: Optional[str] = None, +) -> str: + """ + Creates a new Google Doc by making a copy of an existing document. This is useful for creating documents from templates + or duplicating existing documents while preserving their formatting and content. + + The tool will: + 1. Create an exact copy of the source document + 2. Give it the specified new title + 3. Place it in the specified folder (or root if no folder specified) + 4. Return the ID and view link of the new document + + Args: + service: Authenticated Google Drive service instance. + user_google_email: Email of the user making the request. + template_id: The Google Drive ID of the source document that will be used as a template. This is the document you want to copy from. + new_title: The title/name that will be given to the new copy of the document. This is what the document will be called in Google Drive. + target_folder_id: Optional Google Drive folder ID where the new document should be created. If not provided, the document will be created in the root of the user's Google Drive. + Returns: + str: A message containing the new document's ID and view link. + """ + logger.info(f"[copy_google_doc] Copying document {template_id} with new title {new_title}. Email: '{user_google_email}'") + + try: + # Prepare copy metadata + copy_metadata = { + 'name': new_title, + } + + if target_folder_id: + copy_metadata['parents'] = [target_folder_id] + + # Execute the copy + response = service.files().copy( + fileId=template_id, + body=copy_metadata, + fields='id,name,webViewLink' + ).execute() + + document_id = response['id'] + document_name = response['name'] + view_link = response.get('webViewLink') + + return f'Successfully created document "{document_name}" with ID: {document_id}\nView Link: {view_link}' + + except HttpError as e: + status = e.resp.status + logger.error(f"Error copying document: {str(e)}") + if status == 404: + raise Exception("Template document or parent folder not found. Check the IDs. HTTP Status: 404") + elif status == 403: + raise Exception("Permission denied. Make sure you have read access to the template and write access to the destination folder. HTTP Status: 403") + else: + raise Exception(f"Failed to copy document: {e._get_reason() or 'Unknown error'} HTTP Status: {status}") + + except Exception as e: + logger.error(f"Unhandled error: {str(e)}") + raise e + + +@server.tool() +@require_google_service("docs", "docs_write") +async def replace_text_in_google_doc( + service, + user_google_email: Annotated[str, Field(description="Email of the user making the request")], + document_id: Annotated[str, Field(description="The Google Drive ID of the document where text replacements should be performed")], + replacements: Annotated[Dict[str, str], Field( + description="Dictionary mapping text patterns to their replacements. Each key is the text to find (case-insensitive), and each value is the text to replace it with", + json_schema_extra={"additionalProperties": {"type": "string"}} + )], +) -> str: + """ + Performs multiple text replacements within a Google Doc in a single operation. This is useful for: + - Replacing template placeholders with actual content + - Updating multiple instances of the same text + - Making bulk text changes across the document + + The tool will: + 1. Find all instances of each specified text pattern (case-insensitive) + 2. Replace them with their corresponding replacement text + 3. Perform all replacements in a single batch operation + 4. Return a summary of how many replacements were made + + Args: + service: Authenticated Google Docs service instance. + user_google_email: Email of the user making the request. + document_id: The Google Drive ID of the document where text replacements should be performed. This is the document you want to modify. + replacements: A dictionary mapping text patterns to their replacements. Each key is the text to find (case-insensitive), + and each value is the text to replace it with. Example: {'{{NAME}}': 'John Doe', '(% DATE %)': '2025-01-01'} + will replace all instances of '{{NAME}}' with 'John Doe' and '(% DATE %)' with '2025-01-01'. + Returns: + str: A message confirming the number of replacements that were successfully applied. + """ + logger.info(f'Replacing text in document {document_id}. Amount of replacements: {len(replacements)}') + + try: + requests = [] + for search_text, replace_text in replacements.items(): + requests.append({ + "replaceAllText": { + "containsText": { + "text": search_text, + "matchCase": False + }, + "replaceText": replace_text + } + }) + + if not requests: + raise Exception("Error: The replacements dictionary is empty. Please provide at least one replacement.") + + service.documents().batchUpdate( + documentId=document_id, + body={"requests": requests} + ).execute() + + count = len(requests) + return f"Successfully applied {count} text replacement{'s' if count != 1 else ''} to the document." + + except HttpError as e: + status = e.resp.status + logger.error(f"Error replacing text in document: {str(e)}") + if status == 404: + raise Exception("Document not found. Check the document ID. HTTP Status: 404") from e + elif status == 403: + raise Exception("Permission denied. Make sure you have write access to the document. HTTP Status: 403") from e + else: + raise Exception(f"Failed to replace text: {e._get_reason() or 'Unknown error'} HTTP Status: {status}") from e + + except Exception as e: + logger.error(f"Unhandled error: {str(e)}") + raise e \ No newline at end of file