Examples
Examples of using the Agentuity Python SDK
Basic Agent
A simple agent that returns a greeting.
from agentuity import AgentRequest, AgentResponse, AgentContext
from datetime import datetime
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
# Get the request data
data = request.data.json
name = data.get("name", "Guest")
# Log the request
context.logger.info(f"Received greeting request for {name}")
# Return a personalized greeting
return response.json({
"message": f"Hello, {name}! Welcome to Agentuity.",
"timestamp": datetime.now().isoformat()
})
Key-Value Storage
An agent that uses key-value storage to store and retrieve user preferences.
from agentuity import AgentRequest, AgentResponse, AgentContext
import json
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
data = request.data.json
action = data.get("action")
user_id = data.get("userId")
preferences = data.get("preferences")
if action == "get":
# Retrieve user preferences
result = await context.kv.get("user-preferences", user_id)
if not result.exists:
return response.json({"message": "No preferences found"})
# Access the data
user_prefs = result.data.json
return response.json({"preferences": user_prefs})
elif action == "set":
# Store user preferences
await context.kv.set(
"user-preferences",
user_id,
preferences,
{"ttl": 60 * 60 * 24 * 30} # 30 days in seconds
)
return response.json({"message": "Preferences saved successfully!"})
elif action == "delete":
# Delete user preferences
await context.kv.delete("user-preferences", user_id)
return response.json({"message": "Preferences deleted successfully!"})
else:
return response.json({"error": 'Invalid action. Use "get", "set", or "delete".'})
Welcome Function Example
This example demonstrates how to customize the initial appearance of DevMode when it starts interacting with your agents:
# agent.py
def welcome():
return {
"welcome": "Welcome to my Python Agent! How can I help you today?",
"prompts": [
{
"data": "What can you do?",
"contentType": "text/plain"
},
{
"data": "Tell me about yourself",
"contentType": "text/plain"
}
]
}
def run(request, response, context):
# Agent logic
return response.text("Hello, World!")
Real-World Example
Here's a more comprehensive example inspired by the React Miami 2025 Concierge template:
# concierge_agent.py
def welcome():
return {
"welcome": "Welcome to the Python Concierge! How can I help you today?",
"prompts": [
{
"data": "Where should I go for dinner in Miami, tonight?",
"contentType": "text/plain"
},
{
"data": "What sessions about Python are happening today?",
"contentType": "text/plain"
},
{
"data": "Tell me more about the conference speakers",
"contentType": "text/plain"
},
{
"data": "I'm looking for good restaurants near the venue",
"contentType": "text/plain"
},
{
"data": "What is Agentuity all about?",
"contentType": "text/plain"
}
]
}
def run(request, response, context):
# Agent logic implementation
return response.text("Hello, I'm your Python Concierge!")
Vector Storage
An agent that uses vector storage for semantic search.
from agentuity import AgentRequest, AgentResponse, AgentContext
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
data = request.data.json
action = data.get("action")
query = data.get("query")
products = data.get("products")
if action == "index":
# Index products in vector storage
if not isinstance(products, list) or len(products) == 0:
return response.json({"error": "No products to index"})
# Prepare documents for vector storage
documents = [
{
"key": product["id"],
"document": product["description"],
"metadata": {
"id": product["id"],
"name": product["name"],
"price": product["price"],
"category": product["category"]
}
}
for product in products
]
# Store in vector database
ids = await context.vector.upsert("products", documents)
return response.json({
"message": f"Indexed {len(ids)} products successfully",
"ids": ids
})
elif action == "search":
# Search for products by semantic similarity
if not query:
return response.json({"error": "Query is required for search"})
# Perform semantic search
results = await context.vector.search(
"products",
query,
limit=5,
similarity=0.5,
metadata={}
)
# Format results
formatted_results = [
{
"id": result.id,
"key": result.key,
"similarity": 1.0 - result.distance,
"metadata": result.metadata
}
for result in results
]
return response.json({
"message": f"Found {len(results)} matching products",
"results": formatted_results
})
elif action == "delete":
# Delete a product from vector storage
if not isinstance(products, list) or len(products) == 0:
return response.json({"error": "No product ID to delete"})
# Extract first product ID (single key deletion)
product_id = products[0]["id"]
# Delete from vector database
count = await context.vector.delete("products", product_id)
return response.json({
"message": f"Deleted {count} product successfully",
"id": product_id
})
else:
return response.json({"error": 'Invalid action. Use "index", "search", or "delete".'})
Agent Communication
An agent that demonstrates communication between agents.
from agentuity import AgentRequest, AgentResponse, AgentContext
from datetime import datetime
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
data = request.data.json
action = data.get("action")
message = data.get("message")
agent_id = data.get("agentId")
if action == "send":
# Send a message to another agent
if not agent_id:
return response.json({"error": "Agent ID is required"})
if not message:
return response.json({"error": "Message is required"})
# Send message to the specified agent using handoff
result = await response.handoff(
{"id": agent_id},
{
"message": message,
"sender": context.agent.id,
"timestamp": datetime.now().isoformat()
}
)
return result
elif action == "receive":
# This is a handler for receiving messages from other agents
data = request.data.json
context.logger.info(f"Received message from agent {data.get('sender')}: {data.get('message')}")
# Process the received message
return response.json({
"message": "Message received and processed",
"echo": data.get("message"),
"receivedAt": datetime.now().isoformat()
})
else:
return response.json({"error": 'Invalid action. Use "send" or "receive".'})
Streaming Responses from OpenAI
This example demonstrates how to stream a response from OpenAI using the Agentuity Python SDK:
from openai import OpenAI
from agentuity import AgentRequest, AgentResponse, AgentContext
client = OpenAI()
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
chat_completion = client.chat.completions.create(
messages=[
{
"role": "system",
"content": "You are a friendly assistant!",
},
{
"role": "user",
"content": request.data.text or "Why is the sky blue?",
},
],
model="gpt-4o",
stream=True,
)
return response.stream(chat_completion, lambda x: x.choices[0].delta.content)
This example:
- Imports the necessary libraries from OpenAI and Agentuity
- Creates an OpenAI client
- Defines an async
run
function that:- Creates a chat completion with streaming enabled
- Uses the user's input text or defaults to "Why is the sky blue?"
- Returns a streamed response using Agentuity's
response.stream()
method - Uses a lambda function to extract the content from each chunk of the stream
The response.stream()
method handles the complexity of streaming the response back to the user, making it easy to implement streaming in your agents.
Need Help?
Join our Community for assistance or just to hang with other humans building agents.
Send us an email at hi@agentuity.com if you'd like to get in touch.
Please Follow us on
If you haven't already, please Signup for your free account now and start building your first agent!