Skip to content

Artifacts Handling on Agentspace #2886

@ssingh5bmcd

Description

@ssingh5bmcd

Hi folks,

I have ADK Agent that uploads file to RAG Corpus, File is accessed by agent via Artifact tool_context.
It works well locally on ADK Web. I am able to get the file uploaded in corpus. When deployed it on Agent Engine and use it from Agentspace app it fails silently in _upload_artifact, unfortunately there is no documentations or anything regarding it. Has anyone faced the similar issue, is there any information how file can be accessed by custom agent on Agentspace custom agent? What code changes will be required for Agentspace? On Agentspace there are no properties like inline_data for getting the data?

from google.adk.agents import LlmAgent
from ..tools import upload_file_tool
from ..utils import AI_MODEL
from google.adk.tools.load_artifacts_tool import load_artifacts_tool

upload_file_agent = LlmAgent(
    name="upload_file_agent",
    model=AI_MODEL,
    description="This AI agent's sole responsibility is to upload a received file by invoking the upload_file_tool.",
    instruction="""
You are an upload-only agent. Your task is simple and focused:
- Receive the user's uploaded file.
- Use the provided tool `upload_file_tool` to upload it.
- Do not generate any additional content or reports.
- Do not perform any actions other than calling the tool to upload the file.
- After calling the tool, respond with a brief confirmation message, such as:
    "File uploaded successfully." only if file succeeds,
""",
    tools=[load_artifacts_tool,upload_file_tool],
)
from google.adk.tools import ToolContext
from google.adk.tools import LongRunningFunctionTool
from google.auth.transport.requests import Request
import httpx  
import json
from ..utils.variables import GOOGLE_CLOUD_PROJECT, GOOGLE_CLOUD_LOCATION, GOOGLE_CLOUD_CORPUS_ID

async def _upload_to_rag_rest_api(bytes_data: bytes, filename: str, mime_type: str):
    """
    Upload to RAG corpus using an ASYNCHRONOUS REST API call.
    This is the corrected function.
    """

    from ..utils.auth import credential

    if credential is None:
        return {"status": "error", "error": "Server credentials are not available"}
    
    try:
        # Get access token
        request = Request()
        credential.refresh(request)
        access_token = credential.token
        
        if not access_token:
            return {"status": "error", "error": "Failed to get access token"}
        
        print(f"📤 Uploading {filename} via ASYNC REST API...")
        
        # REST API endpoint for multipart upload
        project_id = GOOGLE_CLOUD_PROJECT
        location = GOOGLE_CLOUD_LOCATION
        corpus_id = GOOGLE_CLOUD_CORPUS_ID
        url = f"https://{location}-aiplatform.googleapis.com/upload/v1beta1/projects/{project_id}/locations/{location}/ragCorpora/{corpus_id}/ragFiles:upload"
    
        headers = {
            "Authorization": f"Bearer {access_token}",
        }

        # Prepare multipart form data
        metadata = {
            "ragFile": {
                "displayName": filename,
                "description": f"File uploaded via REST API from web interface"
            }
        }
        
        files_for_upload = {
            "metadata": (None, json.dumps(metadata), "application/json; charset=UTF-8"),
            "file": (filename, bytes_data, mime_type)
        }
    
        # Use httpx.AsyncClient for the asynchronous network call
        async with httpx.AsyncClient(timeout=300.0) as client:
            response = await client.post(url, headers=headers, files=files_for_upload)
        
        if response.status_code == 200:
            print(f"✅ REST API upload successful!")
            return {"status": "success", "message": f"File {filename} uploaded to RAG corpus via REST API!"}
        else:
            print(f"❌ REST API failed: {response.status_code} - {response.text}")
            return {"status": "error", "error": f"REST API failed: {response.status_code} - {response.text}"}
            
    except Exception as e:
        print(f"❌ REST API error: {e}")
        return {"status": "error", "error": str(e)}


async def _upload_artifact(tool_context: ToolContext):
    """Analyze content of an uploaded artifact"""
    try:
        artifacts = await tool_context.list_artifacts()

        if not artifacts or len(artifacts) != 1:
            return {"status": "error", "error": "Only support processing single artifact."}
        
        artifact_part = artifacts[0]
        artifact = await tool_context.load_artifact(filename=artifact_part)

        blob = artifact.inline_data
        name = blob.display_name
        mime_type = blob.mime_type
        data = blob.data

        # It calls the corrected function above
        result = await _upload_to_rag_rest_api(data, name, mime_type)
        return result

    except Exception as e:
        return {"status": "error", "error": str(e)}

upload_file_tool = LongRunningFunctionTool(_upload_artifact)

Metadata

Metadata

Assignees

Labels

agent engine[Component] This issue is related to Vertex AI Agent Engineneeds review[Status] The PR/issue is awaiting review from the maintainer

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions