Examples

Examples of using the Agentuity Python SDK

Basic Agent

A simple agent that returns a greeting.

from agentuity import AgentRequest, AgentResponse, AgentContext
from datetime import datetime
 
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
    # Get the request data
    data = request.data.json
    name = data.get("name", "Guest")
    
    # Log the request
    context.logger.info(f"Received greeting request for {name}")
    
    # Return a personalized greeting
    return response.json({
        "message": f"Hello, {name}! Welcome to Agentuity.",
        "timestamp": datetime.now().isoformat()
    })

Key-Value Storage

An agent that uses key-value storage to store and retrieve user preferences.

from agentuity import AgentRequest, AgentResponse, AgentContext
import json
 
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
    data = request.data.json
    action = data.get("action")
    user_id = data.get("userId")
    preferences = data.get("preferences")
 
    if action == "get":
        # Retrieve user preferences
        result = await context.kv.get("user-preferences", user_id)
 
        if not result.exists:
            return response.json({"message": "No preferences found"})
 
        # Access the data
        user_prefs = result.data.json
 
        return response.json({"preferences": user_prefs})
    
    elif action == "set":
        # Store user preferences
        await context.kv.set(
            "user-preferences",
            user_id,
            preferences,
            {"ttl": 60 * 60 * 24 * 30}  # 30 days in seconds
        )
 
        return response.json({"message": "Preferences saved successfully!"})
    
    elif action == "delete":
        # Delete user preferences
        await context.kv.delete("user-preferences", user_id)
        
        return response.json({"message": "Preferences deleted successfully!"})
    
    else:
        return response.json({"error": 'Invalid action. Use "get", "set", or "delete".'})

Welcome Function Example

This example demonstrates how to customize the initial appearance of DevMode when it starts interacting with your agents:

# agent.py
 
def welcome():
    return {
        "welcome": "Welcome to my Python Agent! How can I help you today?",
        "prompts": [
            {
                "data": "What can you do?",
                "contentType": "text/plain"
            },
            {
                "data": "Tell me about yourself",
                "contentType": "text/plain"
            }
        ]
    }
 
def run(request, response, context):
    # Agent logic
    return response.text("Hello, World!")

Real-World Example

Here's a more comprehensive example inspired by the React Miami 2025 Concierge template:

# concierge_agent.py
 
def welcome():
    return {
        "welcome": "Welcome to the Python Concierge! How can I help you today?",
        "prompts": [
            {
                "data": "Where should I go for dinner in Miami, tonight?",
                "contentType": "text/plain"
            },
            {
                "data": "What sessions about Python are happening today?",
                "contentType": "text/plain"
            },
            {
                "data": "Tell me more about the conference speakers",
                "contentType": "text/plain"
            },
            {
                "data": "I'm looking for good restaurants near the venue",
                "contentType": "text/plain"
            },
            {
                "data": "What is Agentuity all about?",
                "contentType": "text/plain"
            }
        ]
    }
 
def run(request, response, context):
    # Agent logic implementation
    return response.text("Hello, I'm your Python Concierge!")

Vector Storage

An agent that uses vector storage for semantic search.

from agentuity import AgentRequest, AgentResponse, AgentContext
 
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
    data = request.data.json
    action = data.get("action")
    query = data.get("query")
    products = data.get("products")
 
    if action == "index":
        # Index products in vector storage
        if not isinstance(products, list) or len(products) == 0:
            return response.json({"error": "No products to index"})
 
        # Prepare documents for vector storage
        documents = [
            {
                "key": product["id"],
                "document": product["description"],
                "metadata": {
                    "id": product["id"],
                    "name": product["name"],
                    "price": product["price"],
                    "category": product["category"]
                }
            }
            for product in products
        ]
 
        # Store in vector database
        ids = await context.vector.upsert("products", documents)
 
        return response.json({
            "message": f"Indexed {len(ids)} products successfully",
            "ids": ids
        })
    
    elif action == "search":
        # Search for products by semantic similarity
        if not query:
            return response.json({"error": "Query is required for search"})
 
        # Perform semantic search
        results = await context.vector.search(
            "products",
            query,
            limit=5,
            similarity=0.5,
            metadata={}
        )
 
        # Format results
        formatted_results = [
            {
                "id": result.id,
                "key": result.key,
                "similarity": 1.0 - result.distance,
                "metadata": result.metadata
            }
            for result in results
        ]
 
        return response.json({
            "message": f"Found {len(results)} matching products",
            "results": formatted_results
        })
    
    elif action == "delete":
        # Delete products from vector storage
        if not isinstance(products, list) or len(products) == 0:
            return response.json({"error": "No product IDs to delete"})
 
        # Extract product IDs
        product_ids = [p["id"] for p in products]
        
        # Delete from vector database
        count = await context.vector.delete("products", product_ids[0])
        
        return response.json({
            "message": f"Deleted {count} products successfully",
            "ids": product_ids
        })
    
    else:
        return response.json({"error": 'Invalid action. Use "index", "search", or "delete".'})

Agent Communication

An agent that demonstrates communication between agents.

from agentuity import AgentRequest, AgentResponse, AgentContext
from datetime import datetime
 
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
    data = request.data.json
    action = data.get("action")
    message = data.get("message")
    agent_id = data.get("agentId")
 
    if action == "send":
        # Send a message to another agent
        if not agent_id:
            return response.json({"error": "Agent ID is required"})
 
        if not message:
            return response.json({"error": "Message is required"})
 
        # Send message to the specified agent using handoff
        result = await response.handoff(
            {"id": agent_id},
            {
                "message": message,
                "sender": context.agent.id,
                "timestamp": datetime.now().isoformat()
            }
        )
 
        return result
    
    elif action == "receive":
        # This is a handler for receiving messages from other agents
        data = request.data.json
        
        context.logger.info(f"Received message from agent {data.get('sender')}: {data.get('message')}")
        
        # Process the received message
        return response.json({
            "message": "Message received and processed",
            "echo": data.get("message"),
            "receivedAt": datetime.now().isoformat()
        })
    
    else:
        return response.json({"error": 'Invalid action. Use "send" or "receive".'})

Streaming Responses from OpenAI

This example demonstrates how to stream a response from OpenAI using the Agentuity Python SDK:

from openai import OpenAI
from agentuity import AgentRequest, AgentResponse, AgentContext
 
client = OpenAI()
 
 
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
    chat_completion = client.chat.completions.create(
        messages=[
            {
                "role": "system",
                "content": "You are a friendly assistant!",
            },
            {
                "role": "user",
                "content": request.data.text or "Why is the sky blue?",
            },
        ],
        model="gpt-4o",
        stream=True,
    )
    return response.stream(chat_completion, lambda x: x.choices[0].delta.content)

This example:

  1. Imports the necessary libraries from OpenAI and Agentuity
  2. Creates an OpenAI client
  3. Defines an async run function that:
    • Creates a chat completion with streaming enabled
    • Uses the user's input text or defaults to "Why is the sky blue?"
    • Returns a streamed response using Agentuity's response.stream() method
    • Uses a lambda function to extract the content from each chunk of the stream

The response.stream() method handles the complexity of streaming the response back to the user, making it easy to implement streaming in your agents.

On this page