Python SDK Examples

Examples for using the Agentuity Python SDK

This section contains examples for using the Agentuity Python SDK.

Basic Agent

A simple agent that returns a greeting.

from agentuity import AgentRequest, AgentResponse, AgentContext
from datetime import datetime
 
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
    # Get the request data
    data = request.data.json
    name = data.get("name", "Guest")
    
    # Log the request
    context.logger.info(f"Received greeting request for {name}")
    
    # Return a personalized greeting
    return response.json({
        "message": f"Hello, {name}! Welcome to Agentuity.",
        "timestamp": datetime.now().isoformat()
    })

Key-Value Storage

An agent that uses key-value storage to store and retrieve user preferences.

from agentuity import AgentRequest, AgentResponse, AgentContext
import json
 
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
    data = request.data.json
    action = data.get("action")
    user_id = data.get("userId")
    preferences = data.get("preferences")
 
    if action == "get":
        # Retrieve user preferences
        result = await context.kv.get("user-preferences", user_id)
 
        if not result.exists:
            return response.json({"message": "No preferences found"})
 
        # Access the data
        user_prefs = result.data.json
 
        return response.json({"preferences": user_prefs})
    
    elif action == "set":
        # Store user preferences
        await context.kv.set(
            "user-preferences",
            user_id,
            preferences,
            {"ttl": 60 * 60 * 24 * 30}  # 30 days in seconds
        )
 
        return response.json({"message": "Preferences saved successfully!"})
    
    elif action == "delete":
        # Delete user preferences
        await context.kv.delete("user-preferences", user_id)
        
        return response.json({"message": "Preferences deleted successfully!"})
    
    else:
        return response.json({"error": 'Invalid action. Use "get", "set", or "delete".'})

Vector Storage

An agent that uses vector storage for semantic search.

from agentuity import AgentRequest, AgentResponse, AgentContext
 
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
    data = request.data.json
    action = data.get("action")
    query = data.get("query")
    products = data.get("products")
 
    if action == "index":
        # Index products in vector storage
        if not isinstance(products, list) or len(products) == 0:
            return response.json({"error": "No products to index"})
 
        # Prepare documents for vector storage
        documents = [
            {
                "key": product["id"],
                "document": product["description"],
                "metadata": {
                    "id": product["id"],
                    "name": product["name"],
                    "price": product["price"],
                    "category": product["category"]
                }
            }
            for product in products
        ]
 
        # Store in vector database
        ids = await context.vector.upsert("products", documents)
 
        return response.json({
            "message": f"Indexed {len(ids)} products successfully",
            "ids": ids
        })
    
    elif action == "search":
        # Search for products by semantic similarity
        if not query:
            return response.json({"error": "Query is required for search"})
 
        # Perform semantic search
        results = await context.vector.search(
            "products",
            query,
            limit=5,
            similarity=0.5,
            metadata={}
        )
 
        # Format results
        formatted_results = [
            {
                "id": result.id,
                "key": result.key,
                "similarity": 1.0 - result.distance,
                "metadata": result.metadata
            }
            for result in results
        ]
 
        return response.json({
            "message": f"Found {len(results)} matching products",
            "results": formatted_results
        })
    
    elif action == "delete":
        # Delete products from vector storage
        if not isinstance(products, list) or len(products) == 0:
            return response.json({"error": "No product IDs to delete"})
 
        # Extract product IDs
        product_ids = [p["id"] for p in products]
        
        # Delete from vector database
        count = await context.vector.delete("products", product_ids[0])
        
        return response.json({
            "message": f"Deleted {count} products successfully",
            "ids": product_ids
        })
    
    else:
        return response.json({"error": 'Invalid action. Use "index", "search", or "delete".'})

Agent Communication

An agent that demonstrates communication between agents.

from agentuity import AgentRequest, AgentResponse, AgentContext
from datetime import datetime
 
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
    data = request.data.json
    action = data.get("action")
    message = data.get("message")
    agent_id = data.get("agentId")
 
    if action == "send":
        # Send a message to another agent
        if not agent_id:
            return response.json({"error": "Agent ID is required"})
 
        if not message:
            return response.json({"error": "Message is required"})
 
        # Send message to the specified agent using handoff
        result = await response.handoff(
            {"id": agent_id},
            {
                "message": message,
                "sender": context.agent.id,
                "timestamp": datetime.now().isoformat()
            }
        )
 
        return result
    
    elif action == "receive":
        # This is a handler for receiving messages from other agents
        data = request.data.json
        
        context.logger.info(f"Received message from agent {data.get('sender')}: {data.get('message')}")
        
        # Process the received message
        return response.json({
            "message": "Message received and processed",
            "echo": data.get("message"),
            "receivedAt": datetime.now().isoformat()
        })
    
    else:
        return response.json({"error": 'Invalid action. Use "send" or "receive".'})

On this page