diff --git a/api/agent_api_test.py b/api/agent_api_test.py index 066efc4f6..2ad4e0590 100644 --- a/api/agent_api_test.py +++ b/api/agent_api_test.py @@ -1,107 +1,294 @@ import requests from loguru import logger import time - -# Configure loguru -logger.add( - "api_tests_{time}.log", - rotation="100 MB", - level="DEBUG", - format="{time} {level} {message}", -) +from typing import Dict, Optional, Tuple +from uuid import UUID +from datetime import datetime +import sys BASE_URL = "http://localhost:8000/v1" +def check_api_server() -> bool: + """Check if the API server is running and accessible.""" + try: + response = requests.get(f"{BASE_URL}/docs") + return response.status_code == 200 + except requests.exceptions.ConnectionError: + logger.error("API server is not running at {BASE_URL}") + logger.error("Please start the API server first with:") + logger.error(" python main.py") + return False + except Exception as e: + logger.error(f"Error checking API server: {str(e)}") + return False + +class TestSession: + """Manages test session state and authentication.""" + + def __init__(self): + self.user_id: Optional[UUID] = None + self.api_key: Optional[str] = None + self.test_agents: list[UUID] = [] + + @property + def headers(self) -> Dict[str, str]: + """Get headers with authentication.""" + return {"api-key": self.api_key} if self.api_key else {} + +def create_test_user(session: TestSession) -> Tuple[bool, str]: + """Create a test user and store credentials in session.""" + logger.info("Creating test user") + + try: + response = requests.post( + f"{BASE_URL}/users", + json={"username": f"test_user_{int(time.time())}"} + ) + + if response.status_code == 200: + data = response.json() + session.user_id = data["user_id"] + session.api_key = data["api_key"] + logger.success(f"Created user with ID: {session.user_id}") + return True, "Success" + else: + logger.error(f"Failed to create user: {response.text}") + return False, response.text + except Exception as e: + logger.exception("Exception during user creation") + return False, str(e) + +def create_additional_api_key(session: TestSession) -> Tuple[bool, str]: + """Test creating an additional API key.""" + logger.info("Creating additional API key") + + try: + response = requests.post( + f"{BASE_URL}/users/{session.user_id}/api-keys", + headers=session.headers, + json={"name": "Test Key"} + ) + + if response.status_code == 200: + logger.success("Created additional API key") + return True, response.json()["key"] + else: + logger.error(f"Failed to create API key: {response.text}") + return False, response.text + except Exception as e: + logger.exception("Exception during API key creation") + return False, str(e) -def test_create_agent(): +def test_create_agent(session: TestSession) -> Tuple[bool, Optional[UUID]]: """Test creating a new agent.""" logger.info("Testing agent creation") payload = { - "agent_name": "Test Agent", + "agent_name": f"Test Agent {int(time.time())}", "system_prompt": "You are a helpful assistant", "model_name": "gpt-4", "description": "Test agent", - "tags": ["test"], + "tags": ["test", "automated"] } - response = requests.post(f"{BASE_URL}/agent", json=payload) - logger.debug(f"Create response: {response.json()}") + try: + response = requests.post( + f"{BASE_URL}/agent", + headers=session.headers, + json=payload + ) + + if response.status_code == 200: + agent_id = response.json()["agent_id"] + session.test_agents.append(agent_id) + logger.success(f"Created agent with ID: {agent_id}") + return True, agent_id + else: + logger.error(f"Failed to create agent: {response.text}") + return False, None + except Exception as e: + logger.exception("Exception during agent creation") + return False, None - if response.status_code == 200: - logger.success("Successfully created agent") - return response.json()["agent_id"] - else: - logger.error(f"Failed to create agent: {response.text}") - return None +def test_list_user_agents(session: TestSession) -> bool: + """Test listing user's agents.""" + logger.info("Testing user agent listing") + try: + response = requests.get( + f"{BASE_URL}/users/me/agents", + headers=session.headers + ) + + if response.status_code == 200: + agents = response.json() + logger.success(f"Found {len(agents)} user agents") + return True + else: + logger.error(f"Failed to list user agents: {response.text}") + return False + except Exception as e: + logger.exception("Exception during agent listing") + return False -def test_list_agents(): - """Test listing all agents.""" - logger.info("Testing agent listing") +def test_agent_operations(session: TestSession, agent_id: UUID) -> bool: + """Test various operations on an agent.""" + logger.info(f"Testing operations for agent {agent_id}") + + # Test update + try: + update_response = requests.patch( + f"{BASE_URL}/agent/{agent_id}", + headers=session.headers, + json={ + "description": "Updated description", + "tags": ["test", "updated"] + } + ) + if update_response.status_code != 200: + logger.error(f"Failed to update agent: {update_response.text}") + return False + + # Test metrics + metrics_response = requests.get( + f"{BASE_URL}/agent/{agent_id}/metrics", + headers=session.headers + ) + if metrics_response.status_code != 200: + logger.error(f"Failed to get agent metrics: {metrics_response.text}") + return False + + logger.success("Successfully performed agent operations") + return True + except Exception as e: + logger.exception("Exception during agent operations") + return False - response = requests.get(f"{BASE_URL}/agents") - logger.debug(f"List response: {response.json()}") - - if response.status_code == 200: - logger.success(f"Found {len(response.json())} agents") - else: - logger.error(f"Failed to list agents: {response.text}") - - -def test_completion(agent_id): +def test_completion(session: TestSession, agent_id: UUID) -> bool: """Test running a completion.""" logger.info("Testing completion") payload = { "prompt": "What is the weather like today?", "agent_id": agent_id, + "max_tokens": 100 } - response = requests.post( - f"{BASE_URL}/agent/completions", json=payload - ) - logger.debug(f"Completion response: {response.json()}") - - if response.status_code == 200: - logger.success("Successfully got completion") - else: - logger.error(f"Failed to get completion: {response.text}") + try: + response = requests.post( + f"{BASE_URL}/agent/completions", + headers=session.headers, + json=payload + ) + + if response.status_code == 200: + completion_data = response.json() + logger.success( + f"Got completion, used {completion_data['token_usage']['total_tokens']} tokens" + ) + return True + else: + logger.error(f"Failed to get completion: {response.text}") + return False + except Exception as e: + logger.exception("Exception during completion") + return False +def cleanup_test_resources(session: TestSession): + """Clean up all test resources.""" + logger.info("Cleaning up test resources") + + # Delete test agents + for agent_id in session.test_agents: + try: + response = requests.delete( + f"{BASE_URL}/agent/{agent_id}", + headers=session.headers + ) + if response.status_code == 200: + logger.debug(f"Deleted agent {agent_id}") + else: + logger.warning(f"Failed to delete agent {agent_id}: {response.text}") + except Exception as e: + logger.exception(f"Exception deleting agent {agent_id}") -def test_delete_agent(agent_id): - """Test deleting an agent.""" - logger.info("Testing agent deletion") + # Revoke API keys + if session.user_id: + try: + response = requests.get( + f"{BASE_URL}/users/{session.user_id}/api-keys", + headers=session.headers + ) + if response.status_code == 200: + for key in response.json(): + try: + revoke_response = requests.delete( + f"{BASE_URL}/users/{session.user_id}/api-keys/{key['key']}", + headers=session.headers + ) + if revoke_response.status_code == 200: + logger.debug(f"Revoked API key {key['name']}") + else: + logger.warning(f"Failed to revoke API key {key['name']}") + except Exception as e: + logger.exception(f"Exception revoking API key {key['name']}") + except Exception as e: + logger.exception("Exception getting API keys for cleanup") - response = requests.delete(f"{BASE_URL}/agent/{agent_id}") - logger.debug(f"Delete response: {response.json()}") - - if response.status_code == 200: - logger.success("Successfully deleted agent") - else: - logger.error(f"Failed to delete agent: {response.text}") - - -def run_tests(): - """Run all tests in sequence.""" +def run_test_workflow(): + """Run complete test workflow.""" logger.info("Starting API tests") - - # Create agent and get ID - agent_id = test_create_agent() - if not agent_id: - logger.error("Cannot continue tests without agent ID") - return - - # Wait a bit for agent to be ready - time.sleep(1) - - # Run other tests - test_list_agents() - test_completion(agent_id) - test_delete_agent(agent_id) - - logger.info("Tests completed") - + + # Check if API server is running first + if not check_api_server(): + return False + + session = TestSession() + success = True + + try: + # Create user + user_success, message = create_test_user(session) + if not user_success: + logger.error(f"User creation failed: {message}") + return False + + # Create additional API key + key_success, key = create_additional_api_key(session) + if not key_success: + logger.error(f"API key creation failed: {key}") + return False + + # Create agent + agent_success, agent_id = test_create_agent(session) + if not agent_success or not agent_id: + logger.error("Agent creation failed") + return False + + # Test user agent listing + if not test_list_user_agents(session): + logger.error("Agent listing failed") + return False + + # Test agent operations + if not test_agent_operations(session, agent_id): + logger.error("Agent operations failed") + return False + + # Test completion + if not test_completion(session, agent_id): + logger.error("Completion test failed") + return False + + logger.success("All tests completed successfully") + return True + + except Exception as e: + logger.exception("Exception during test workflow") + return False + finally: + cleanup_test_resources(session) if __name__ == "__main__": - run_tests() + success = run_test_workflow() + sys.exit(0 if success else 1) diff --git a/api/main.py b/api/main.py index 768e8d962..cfc5e1b2f 100644 --- a/api/main.py +++ b/api/main.py @@ -1,41 +1,34 @@ import os +import secrets +import traceback +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timedelta +from enum import Enum +from pathlib import Path +from typing import Any, Dict, List, Optional +from uuid import UUID, uuid4 + +import uvicorn +from dotenv import load_dotenv from fastapi import ( + BackgroundTasks, + Depends, FastAPI, + Header, HTTPException, - status, Query, - BackgroundTasks, + Request, + status, ) from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel, Field -from typing import Optional, Dict, Any, List from loguru import logger -import uvicorn -from datetime import datetime, timedelta -from uuid import UUID, uuid4 -from enum import Enum -from pathlib import Path -from concurrent.futures import ThreadPoolExecutor -import traceback +from pydantic import BaseModel, Field from swarms import Agent -from dotenv import load_dotenv -print ("starting") # Load environment variables load_dotenv() -# Configure Loguru -logger.add( - "logs/api_{time}.log", - rotation="500 MB", - retention="10 days", - level="INFO", - format="{time} {level} {message}", - backtrace=True, - diagnose=True, -) - class AgentStatus(str, Enum): """Enum for agent status.""" @@ -44,6 +37,28 @@ class AgentStatus(str, Enum): PROCESSING = "processing" ERROR = "error" MAINTENANCE = "maintenance" + + +# Security configurations +API_KEY_LENGTH = 32 # Length of generated API keys + +class APIKey(BaseModel): + key: str + name: str + created_at: datetime + last_used: datetime + is_active: bool = True + +class APIKeyCreate(BaseModel): + name: str # A friendly name for the API key + +class User(BaseModel): + id: UUID + username: str + is_active: bool = True + is_admin: bool = False + api_keys: Dict[str, APIKey] = {} # key -> APIKey object + class AgentConfig(BaseModel): @@ -105,6 +120,7 @@ class AgentConfig(BaseModel): ) + class AgentUpdate(BaseModel): """Model for updating agent configuration.""" @@ -173,6 +189,9 @@ class AgentStore: def __init__(self): self.agents: Dict[UUID, Agent] = {} self.agent_metadata: Dict[UUID, Dict[str, Any]] = {} + self.users: Dict[UUID, User] = {} # user_id -> User + self.api_keys: Dict[str, UUID] = {} # api_key -> user_id + self.user_agents: Dict[UUID, List[UUID]] = {} # user_id -> [agent_ids] self.executor = ThreadPoolExecutor(max_workers=4) self._ensure_directories() @@ -180,8 +199,56 @@ def _ensure_directories(self): """Ensure required directories exist.""" Path("logs").mkdir(exist_ok=True) Path("states").mkdir(exist_ok=True) + + def create_api_key(self, user_id: UUID, key_name: str) -> APIKey: + """Create a new API key for a user.""" + if user_id not in self.users: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found" + ) - async def create_agent(self, config: AgentConfig) -> UUID: + # Generate a secure random API key + api_key = secrets.token_urlsafe(API_KEY_LENGTH) + + # Create the API key object + key_object = APIKey( + key=api_key, + name=key_name, + created_at=datetime.utcnow(), + last_used=datetime.utcnow() + ) + + # Store the API key + self.users[user_id].api_keys[api_key] = key_object + self.api_keys[api_key] = user_id + + return key_object + + async def verify_agent_access(self, agent_id: UUID, user_id: UUID) -> bool: + """Verify if a user has access to an agent.""" + if agent_id not in self.agents: + return False + return ( + self.agent_metadata[agent_id]["owner_id"] == user_id + or self.users[user_id].is_admin + ) + + def validate_api_key(self, api_key: str) -> Optional[UUID]: + """Validate an API key and return the associated user ID.""" + user_id = self.api_keys.get(api_key) + if not user_id or api_key not in self.users[user_id].api_keys: + return None + + key_object = self.users[user_id].api_keys[api_key] + if not key_object.is_active: + return None + + # Update last used timestamp + key_object.last_used = datetime.utcnow() + return user_id + + async def create_agent(self, config: AgentConfig, user_id: UUID) -> UUID: """Create a new agent with the given configuration.""" try: @@ -220,7 +287,11 @@ async def create_agent(self, config: AgentConfig) -> UUID: "successful_completions": 0, } - logger.info(f"Created agent with ID: {agent_id}") + # Add to user's agents list + if user_id not in self.user_agents: + self.user_agents[user_id] = [] + self.user_agents[user_id].append(agent_id) + return agent_id except Exception as e: @@ -465,6 +536,35 @@ async def process_completion( finally: metadata["status"] = AgentStatus.IDLE +class StoreManager: + _instance = None + + @classmethod + def get_instance(cls) -> 'AgentStore': + if cls._instance is None: + cls._instance = AgentStore() + return cls._instance + +# Modify the dependency function +def get_store() -> AgentStore: + """Dependency to get the AgentStore instance.""" + return StoreManager.get_instance() + +# Security utility function using the new dependency +async def get_current_user( + api_key: str = Header(..., description="API key for authentication"), + store: AgentStore = Depends(get_store) +) -> User: + """Validate API key and return current user.""" + user_id = store.validate_api_key(api_key) + if not user_id: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid or expired API key", + headers={"WWW-Authenticate": "ApiKey"}, + ) + return store.users[user_id] + class SwarmsAPI: """Enhanced API class for Swarms agent integration.""" @@ -477,7 +577,9 @@ def __init__(self): docs_url="/v1/docs", redoc_url="/v1/redoc", ) - self.store = AgentStore() + # Initialize the store using the singleton manager + self.store = StoreManager.get_instance() + # Configure CORS self.app.add_middleware( CORSMiddleware, @@ -493,11 +595,102 @@ def __init__(self): def _setup_routes(self): """Set up API routes.""" + + # In your API code + @self.app.post("/v1/users", response_model=Dict[str, Any]) + async def create_user(request: Request): + """Create a new user and initial API key.""" + try: + body = await request.json() + username = body.get("username") + if not username or len(username) < 3: + raise HTTPException(status_code=400, detail="Invalid username") + + user_id = uuid4() + user = User(id=user_id, username=username) + self.store.users[user_id] = user + initial_key = self.store.create_api_key(user_id, "Initial Key") + return {"user_id": user_id, "api_key": initial_key.key} + except Exception as e: + logger.error(f"Error creating user: {str(e)}") + raise HTTPException(status_code=400, detail=str(e)) + + + + @self.app.post("/v1/users/{user_id}/api-keys", response_model=APIKey) + async def create_api_key( + user_id: UUID, + key_create: APIKeyCreate, + current_user: User = Depends(get_current_user) + ): + """Create a new API key for a user.""" + if current_user.id != user_id and not current_user.is_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Not authorized to create API keys for this user" + ) + + return self.store.create_api_key(user_id, key_create.name) + @self.app.get("/v1/users/{user_id}/api-keys", response_model=List[APIKey]) + async def list_api_keys( + user_id: UUID, + current_user: User = Depends(get_current_user) + ): + """List all API keys for a user.""" + if current_user.id != user_id and not current_user.is_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Not authorized to view API keys for this user" + ) + + return list(self.store.users[user_id].api_keys.values()) + + @self.app.delete("/v1/users/{user_id}/api-keys/{key}") + async def revoke_api_key( + user_id: UUID, + key: str, + current_user: User = Depends(get_current_user) + ): + """Revoke an API key.""" + if current_user.id != user_id and not current_user.is_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Not authorized to revoke API keys for this user" + ) + + if key in self.store.users[user_id].api_keys: + self.store.users[user_id].api_keys[key].is_active = False + del self.store.api_keys[key] + return {"status": "API key revoked"} + + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="API key not found" + ) + + @self.app.get("/v1/users/me/agents", response_model=List[AgentSummary]) + async def list_user_agents( + current_user: User = Depends(get_current_user), + tags: Optional[List[str]] = Query(None), + status: Optional[AgentStatus] = None, + ): + """List all agents owned by the current user.""" + user_agents = self.store.user_agents.get(current_user.id, []) + return [ + agent for agent in await self.store.list_agents(tags, status) + if agent.agent_id in user_agents + ] + + + # Modify existing routes to use API key authentication @self.app.post("/v1/agent", response_model=Dict[str, UUID]) - async def create_agent(config: AgentConfig): + async def create_agent( + config: AgentConfig, + current_user: User = Depends(get_current_user) + ): """Create a new agent with the specified configuration.""" - agent_id = await self.store.create_agent(config) + agent_id = await self.store.create_agent(config, current_user.id) return {"agent_id": agent_id} @self.app.get("/v1/agents", response_model=List[AgentSummary]) @@ -611,28 +804,27 @@ async def _cleanup_old_metrics(self, agent_id: UUID): if k > cutoff } - def create_app() -> FastAPI: """Create and configure the FastAPI application.""" - print("create app") + logger.info("Creating FastAPI application") api = SwarmsAPI() - return api.app + app = api.app + logger.info("FastAPI application created successfully") + return app +app = create_app() -#if __name__ == "__main__": if __name__ == '__main__': - #freeze_support() - print("yes in main") - # Configure uvicorn logging - logger.info("API Starting") - - uvicorn.run( - "main:create_app", - host="0.0.0.0", - port=8000, - # reload=True, - # workers=4, - ) -else: - print("not in main") - + try: + logger.info("Starting API server...") + print("Starting API server on http://0.0.0.0:8000") + + uvicorn.run( + app, # Pass the app instance directly + host="0.0.0.0", + port=8000, + log_level="info" + ) + except Exception as e: + logger.error(f"Failed to start API: {str(e)}") + print(f"Error starting server: {str(e)}") diff --git a/api/requirements.txt b/api/requirements.txt new file mode 100644 index 000000000..4bd48f33e --- /dev/null +++ b/api/requirements.txt @@ -0,0 +1,6 @@ +fastapi +uvicorn +pydantic +loguru +python-dotenv +swarms # Specify the version or source if it's not on PyPI diff --git a/api/skypilot.yaml b/api/skypilot.yaml new file mode 100644 index 000000000..8cd25d907 --- /dev/null +++ b/api/skypilot.yaml @@ -0,0 +1,41 @@ +service: + readiness_probe: + path: /docs + initial_delay_seconds: 300 + timeout_seconds: 30 + + replica_policy: + min_replicas: 1 + max_replicas: 50 + target_qps_per_replica: 5 + upscale_delay_seconds: 180 + downscale_delay_seconds: 600 + +resources: + ports: 8000 # FastAPI default port + cpus: 16 + memory: 64 + disk_size: 100 + use_spot: true + +workdir: /app + +setup: | + git clone https://github.com/kyegomez/swarms.git + cd swarms/api + pip install -r requirements.txt + pip install swarms + +run: | + cd swarms/api + uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 + +# env: +# PYTHONPATH: /app/swarms +# LOG_LEVEL: "INFO" +# # MAX_WORKERS: "4" + +# metadata: +# name: swarms-api-service +# version: "1.0.0" +# environment: production diff --git a/swarms/structs/auto_swarm_builder.py b/swarms/structs/auto_swarm_builder.py index 93e542fd4..16e1f5b95 100644 --- a/swarms/structs/auto_swarm_builder.py +++ b/swarms/structs/auto_swarm_builder.py @@ -50,13 +50,11 @@ class SwarmConfig(BaseModel): name="Research-Agent", description="Gathers information", system_prompt="You are a research agent...", - max_loops=2, ), AgentConfig( name="Writing-Agent", description="Writes content", system_prompt="You are a writing agent...", - max_loops=1, ), ], ) @@ -195,7 +193,7 @@ def _create_agents(self, task: str, *args, **kwargs): self.name = agents_dictionary.name self.description = agents_dictionary.description self.max_loops = getattr( - agents_dictionary, "max_loops", 1 + agents_dictionary ) # Default to 1 if not set logger.info( @@ -213,7 +211,6 @@ def _create_agents(self, task: str, *args, **kwargs): agent_name=agent_config.name, agent_description=agent_config.description, agent_system_prompt=agent_config.system_prompt, - # max_loops=agent_config.max_loops, ) agents.append(agent)