Python SDK Examples
Examples for using the Agentuity Python SDK
This section contains examples for using the Agentuity Python SDK.
Basic Agent
A simple agent that returns a greeting.
from agentuity import AgentRequest, AgentResponse, AgentContext
from datetime import datetime
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
# Get the request data
data = request.data.json
name = data.get("name", "Guest")
# Log the request
context.logger.info(f"Received greeting request for {name}")
# Return a personalized greeting
return response.json({
"message": f"Hello, {name}! Welcome to Agentuity.",
"timestamp": datetime.now().isoformat()
})
Key-Value Storage
An agent that uses key-value storage to store and retrieve user preferences.
from agentuity import AgentRequest, AgentResponse, AgentContext
import json
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
data = request.data.json
action = data.get("action")
user_id = data.get("userId")
preferences = data.get("preferences")
if action == "get":
# Retrieve user preferences
result = await context.kv.get("user-preferences", user_id)
if not result.exists:
return response.json({"message": "No preferences found"})
# Access the data
user_prefs = result.data.json
return response.json({"preferences": user_prefs})
elif action == "set":
# Store user preferences
await context.kv.set(
"user-preferences",
user_id,
preferences,
{"ttl": 60 * 60 * 24 * 30} # 30 days in seconds
)
return response.json({"message": "Preferences saved successfully!"})
elif action == "delete":
# Delete user preferences
await context.kv.delete("user-preferences", user_id)
return response.json({"message": "Preferences deleted successfully!"})
else:
return response.json({"error": 'Invalid action. Use "get", "set", or "delete".'})
Vector Storage
An agent that uses vector storage for semantic search.
from agentuity import AgentRequest, AgentResponse, AgentContext
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
data = request.data.json
action = data.get("action")
query = data.get("query")
products = data.get("products")
if action == "index":
# Index products in vector storage
if not isinstance(products, list) or len(products) == 0:
return response.json({"error": "No products to index"})
# Prepare documents for vector storage
documents = [
{
"key": product["id"],
"document": product["description"],
"metadata": {
"id": product["id"],
"name": product["name"],
"price": product["price"],
"category": product["category"]
}
}
for product in products
]
# Store in vector database
ids = await context.vector.upsert("products", documents)
return response.json({
"message": f"Indexed {len(ids)} products successfully",
"ids": ids
})
elif action == "search":
# Search for products by semantic similarity
if not query:
return response.json({"error": "Query is required for search"})
# Perform semantic search
results = await context.vector.search(
"products",
query,
limit=5,
similarity=0.5,
metadata={}
)
# Format results
formatted_results = [
{
"id": result.id,
"key": result.key,
"similarity": 1.0 - result.distance,
"metadata": result.metadata
}
for result in results
]
return response.json({
"message": f"Found {len(results)} matching products",
"results": formatted_results
})
elif action == "delete":
# Delete products from vector storage
if not isinstance(products, list) or len(products) == 0:
return response.json({"error": "No product IDs to delete"})
# Extract product IDs
product_ids = [p["id"] for p in products]
# Delete from vector database
count = await context.vector.delete("products", product_ids[0])
return response.json({
"message": f"Deleted {count} products successfully",
"ids": product_ids
})
else:
return response.json({"error": 'Invalid action. Use "index", "search", or "delete".'})
Agent Communication
An agent that demonstrates communication between agents.
from agentuity import AgentRequest, AgentResponse, AgentContext
from datetime import datetime
async def run(request: AgentRequest, response: AgentResponse, context: AgentContext):
data = request.data.json
action = data.get("action")
message = data.get("message")
agent_id = data.get("agentId")
if action == "send":
# Send a message to another agent
if not agent_id:
return response.json({"error": "Agent ID is required"})
if not message:
return response.json({"error": "Message is required"})
# Send message to the specified agent using handoff
result = await response.handoff(
{"id": agent_id},
{
"message": message,
"sender": context.agent.id,
"timestamp": datetime.now().isoformat()
}
)
return result
elif action == "receive":
# This is a handler for receiving messages from other agents
data = request.data.json
context.logger.info(f"Received message from agent {data.get('sender')}: {data.get('message')}")
# Process the received message
return response.json({
"message": "Message received and processed",
"echo": data.get("message"),
"receivedAt": datetime.now().isoformat()
})
else:
return response.json({"error": 'Invalid action. Use "send" or "receive".'})