Skip to content

About the ADK sample, How can i integrate with MCP? #281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
johnson7788 opened this issue Apr 23, 2025 · 2 comments
Open

About the ADK sample, How can i integrate with MCP? #281

johnson7788 opened this issue Apr 23, 2025 · 2 comments

Comments

@johnson7788
Copy link

code path:
samples/python/agents/google_adk

@johnson7788
Copy link
Author

I try to integrate with MCP, But errors

mcp server

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date  : 2025/4/22 20:15
# @File  : invoice.py.py
# @Author: johnson
# @Contact : github: johnson7788
# @Desc  :


import json
import random
from typing import Any, AsyncIterable, Dict, Optional

from fastmcp import FastMCP

# Local cache of created request_ids for demo purposes.
request_ids = set()

# Create server
mcp = FastMCP("Expense Reimbursement Server")

@mcp.tool()
def create_request_form(date: Optional[str] = None, amount: Optional[str] = None, purpose: Optional[str] = None) -> dict[str, Any]:
    """
    Create a request form for the employee to fill out.

    Args:
        date (str): The date of the request. Can be an empty string.
        amount (str): The requested amount. Can be an empty string.
        purpose (str): The purpose of the request. Can be an empty string.

    Returns:
        dict[str, Any]: A dictionary containing the request form data.
    """
    request_id = "request_id_" + str(random.randint(1000000, 9999999))
    request_ids.add(request_id)
    print(f"Created request form with ID: {request_id}") # Added for logging
    return {
        "request_id": request_id,
        "date": "<transaction date>" if not date else date,
        "amount": "<transaction dollar amount>" if not amount else amount,
        "purpose": "<business justification/purpose of the transaction>" if not purpose else purpose,
    }

@mcp.tool()
def return_form(
    form_request: Dict[str, Any], # Use Dict for type hinting
    instructions: Optional[str] = None) -> str: # Return type is str (JSON string)
    """
    Returns a structured json object indicating a form to complete.
    Args:
        form_request (dict[str, Any]): The request form data.
        instructions (str): Instructions for processing the form. Can be an empty string.

    Returns:
        str: A JSON dictionary string for the form response.
    """
    # FastMCP should handle deserialization, but keeping the check for robustness
    if isinstance(form_request, str):
        try:
            form_request = json.loads(form_request)
        except json.JSONDecodeError:
            return json.dumps({"error": "Invalid JSON format for form_request"})
    print(f"Returning form with data: {form_request}") # Added for logging
    form_dict = {
        'type': 'form',
        'form': {
            'type': 'object',
            'properties': {
                'date': {
                    'type': 'string',
                    'format': 'date',
                    'description': 'Date of expense',
                    'title': 'Date',
                },
                'amount': {
                    'type': 'string',
                    'format': 'number',
                    'description': 'Amount of expense',
                    'title': 'Amount',
                },
                'purpose': {
                    'type': 'string',
                    'description': 'Purpose of expense',
                    'title': 'Purpose',
                },
                 'request_id': {
                    'type': 'string',
                    'description': 'Request id',
                    'title': 'Request ID',
                },
            },
            # Ensure 'request_id' is included in required if it's always expected
            'required': list(form_request.keys()),
        },
        'form_data': form_request,
        'instructions': instructions,
    }
    return json.dumps(form_dict)

@mcp.tool()
def reimburse(request_id: str) -> dict[str, Any]:
    """Reimburse the amount of money to the employee for a given request_id.模拟报销审批"""
    print(f"Attempting to reimburse request ID: {request_id}") # Added for logging
    if request_id not in request_ids:
        print(f"Request ID {request_id} not found.") # Added for logging
        return {"request_id": request_id, "status": "Error: Invalid request_id."}
    print(f"Request ID {request_id} approved.") # Added for logging
    return {"request_id": request_id, "status": "approved"}

# Example of how to run the server (you might need to adjust this based on your FastMCP setup)
# if __name__ == "__main__":
#     import uvicorn
#     uvicorn.run(mcp.app, host="0.0.0.0", port=8000)

samples/python/agents/google_adk/agent.py

import json
import random
import asyncio
from typing import Any, AsyncIterable, Dict, Optional
from google.adk.agents.llm_agent import LlmAgent
from google.adk.tools.tool_context import ToolContext
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, SseServerParams, StdioServerParameters

async def get_tools_async():
  """Gets tools from the File System MCP Server."""
  print("Attempting to connect to MCP Filesystem server...")
  tools, exit_stack = await MCPToolset.from_server(
      # Use StdioServerParameters for local process communication
      connection_params=StdioServerParameters(
          command='uv', # Command to run the server
          args=["run",    # Arguments for the command
                "--with",
                "fastmcp",
                "fastmcp",
                "run",
                "/Users/admin/git/fastmcp/examples/invoice.py"
                ],
      )
      # For remote servers, you would use SseServerParams instead:
      # connection_params=SseServerParams(url="http://remote-server:port/path", headers={...})
  )
  print("MCP Toolset created successfully.")
  # MCP requires maintaining a connection to the local MCP Server.
  # exit_stack manages the cleanup of this connection.
  return tools, exit_stack

class ReimbursementAgent:
  """An agent that handles reimbursement requests."""

  SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]

  def __init__(self):
    loop = asyncio.get_event_loop()
    if loop.is_running():
      print("Warning: An event loop is already running. Blocking __init__.")
      pass
    print("Running async connection setup synchronously...")
    # Run the async function and wait for it to complete
    self.tools, self.exit_stack = loop.run_until_complete(get_tools_async())
    print("__init__ finished: Tools obtained.")
    self._agent = self._build_agent()
    self._user_id = "remote_agent"
    self._runner = Runner(
        app_name=self._agent.name,
        agent=self._agent,
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
    )

  def invoke(self, query, session_id) -> str:
    # 传入用户的问题 query 和 session ID, 创建或获取一个会话, 使用 Runner 执行一次交互(调用 LLM),最后提取最终返回的文本内容
    session = self._runner.session_service.get_session(
        app_name=self._agent.name, user_id=self._user_id, session_id=session_id
    )
    content = types.Content(
        role="user", parts=[types.Part.from_text(text=query)]
    )
    if session is None:
      session = self._runner.session_service.create_session(
          app_name=self._agent.name,
          user_id=self._user_id,
          state={},
          session_id=session_id,
      )
    events = list(self._runner.run(
        user_id=self._user_id, session_id=session.id, new_message=content
    ))
    if not events or not events[-1].content or not events[-1].content.parts:
      return ""
    return "\n".join([p.text for p in events[-1].content.parts if p.text])

  async def stream(self, query, session_id) -> AsyncIterable[Dict[str, Any]]:
    # 流式响应模式(异步)
    session = self._runner.session_service.get_session(
        app_name=self._agent.name, user_id=self._user_id, session_id=session_id
    )
    content = types.Content(
        role="user", parts=[types.Part.from_text(text=query)]
    )
    if session is None:
      session = self._runner.session_service.create_session(
          app_name=self._agent.name,
          user_id=self._user_id,
          state={},
          session_id=session_id,
      )
    async for event in self._runner.run_async(
        user_id=self._user_id, session_id=session.id, new_message=content
    ):
      if event.is_final_response():
        response = ""
        if (
            event.content
            and event.content.parts
            and event.content.parts[0].text
        ):
          response = "\n".join([p.text for p in event.content.parts if p.text])
        elif (
            event.content
            and event.content.parts
            and any([True for p in event.content.parts if p.function_response])):
          response = next((p.function_response.model_dump() for p in event.content.parts))
        yield {
            "is_task_complete": True,
            "content": response,
        }
      else:
        yield {
            "is_task_complete": False,
            "updates": "Processing the reimbursement request...",
        }

  def _build_agent(self) -> LlmAgent:
    """Builds the LLM agent for the reimbursement agent."""
    return LlmAgent(
        model="gemini-2.0-flash-001",
        name="reimbursement_agent",
        description=(
            "This agent handles the reimbursement process for the employees"
            " given the amount and purpose of the reimbursement."
        ),
        instruction="""
    You are an agent who handle the reimbursement process for employees.

    When you receive an reimbursement request, you should first create a new request form using create_request_form(). Only provide default values if they are provided by the user, otherwise use an empty string as the default value.
      1. 'Date': the date of the transaction.
      2. 'Amount': the dollar amount of the transaction.
      3. 'Business Justification/Purpose': the reason for the reimbursement.

    Once you created the form, you should return the result of calling return_form with the form data from the create_request_form call.

    Once you received the filled-out form back from the user, you should then check the form contains all required information:
      1. 'Date': the date of the transaction.
      2. 'Amount': the value of the amount of the reimbursement being requested.
      3. 'Business Justification/Purpose': the item/object/artifact of the reimbursement.

    If you don't have all of the information, you should reject the request directly by calling the request_form method, providing the missing fields.


    For valid reimbursement requests, you can then use reimburse() to reimburse the employee.
      * In your response, you should include the request_id and the status of the reimbursement request.

    """,
        tools=self.tools,
    )

errors:

INFO:google.adk.models.google_llm:Sending out request, model: gemini-2.0-flash-001, backend: ml_dev, stream: False
INFO:google.adk.models.google_llm:
LLM Request:
-----------------------------------------------------------
System Instruction:
    You are an agent who handle the reimbursement process for employees.
    When you receive an reimbursement request, you should first create a new request form using create_request_form(). Only provide default values if they are provided by the user, otherwise use an empty string as the default value.
      1. 'Date': the date of the transaction.
      2. 'Amount': the dollar amount of the transaction.
      3. 'Business Justification/Purpose': the reason for the reimbursement.
    Once you created the form, you should return the result of calling return_form with the form data from the create_request_form call.
    Once you received the filled-out form back from the user, you should then check the form contains all required information:
      1. 'Date': the date of the transaction.
      2. 'Amount': the value of the amount of the reimbursement being requested.
      3. 'Business Justification/Purpose': the item/object/artifact of the reimbursement.
    If you don't have all of the information, you should reject the request directly by calling the request_form method, providing the missing fields.
    For valid reimbursement requests, you can then use reimburse() to reimburse the employee.
      * In your response, you should include the request_id and the status of the reimbursement request.
    
You are an agent. Your internal name is "reimbursement_agent".
 The description about you is "This agent handles the reimbursement process for the employees given the amount and purpose of the reimbursement."
-----------------------------------------------------------
Contents:
{"parts":[{"text":"Can you reimburse me $20 for my lunch"}],"role":"user"}
{"parts":[{"text":"Can you reimburse me $20 for my lunch"}],"role":"user"}
{"parts":[{"text":"Can you reimburse me $20 for my lunch"}],"role":"user"}
{"parts":[{"text":"Can you reimburse me $20 for my lunch"}],"role":"user"}
-----------------------------------------------------------
Functions:
create_request_form: {'date': {'any_of': [{'type': <Type.STRING: 'STRING'>}, {'type': <Type.null: 'null'>}], 'properties': {'dummy_DO_NOT_GENERATE': {'type': <Type.STRING: 'STRING'>}}, 'type': <Type.OBJECT: 'OBJECT'>}, 'amount': {'any_of': [{'type': <Type.STRING: 'STRING'>}, {'type': <Type.null: 'null'>}], 'properties': {'dummy_DO_NOT_GENERATE': {'type': <Type.STRING: 'STRING'>}}, 'type': <Type.OBJECT: 'OBJECT'>}, 'purpose': {'any_of': [{'type': <Type.STRING: 'STRING'>}, {'type': <Type.null: 'null'>}], 'properties': {'dummy_DO_NOT_GENERATE': {'type': <Type.STRING: 'STRING'>}}, 'type': <Type.OBJECT: 'OBJECT'>}} -> None
return_form: {'form_request': {'properties': {'dummy_DO_NOT_GENERATE': {'type': <Type.STRING: 'STRING'>}}, 'type': <Type.OBJECT: 'OBJECT'>}, 'instructions': {'any_of': [{'type': <Type.STRING: 'STRING'>}, {'type': <Type.null: 'null'>}], 'properties': {'dummy_DO_NOT_GENERATE': {'type': <Type.STRING: 'STRING'>}}, 'type': <Type.OBJECT: 'OBJECT'>}} -> None
reimburse: {'request_id': {'type': <Type.STRING: 'STRING'>}} -> None
-----------------------------------------------------------
INFO:google_genai.models:AFC is enabled with max remote calls: 10.
INFO:httpx:HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-001:generateContent "HTTP/1.1 400 Bad Request"
ERROR:task_manager:An error occurred while streaming the response: 400 INVALID_ARGUMENT. {'error': {'code': 400, 'message': "Unable to submit request because `create_request_form` functionDeclaration `parameters.date` schema didn't specify the schema type field. Learn more: https://cloud.google.com/vertex-ai/generative-ai/docs/multimodal/function-calling", 'status': 'INVALID_ARGUMENT'}}
INFO:common.server.task_manager:Getting task 5bc53a9060d1460a90443a5324701a67

@djsamseng
Copy link
Contributor

djsamseng commented Apr 25, 2025

Try the python tutorial for a much simpler A2A server.

Then to integrate mcp, simply change

+ from langchain_mcp_adapters.tools import load_mcp_tools


+ tools = await load_mcp_tools(session)
- agent = create_react_agent(ollama_chat_llm, tools=[])
+ agent = create_react_agent(ollama_chat_llm, tools=tools)

For example like this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants