Skip to content

Commit

Permalink
feat(assistants, orchestrator, utils): enhance logging and error hand…
Browse files Browse the repository at this point in the history
…ling

- Add logging setup with rotating file handler and rich tracebacks
- Create custom exception classes for better error handling
- Refactor assistants module to use new logging and exceptions
- Refactor orchestrator to use new logging, exceptions, and helper methods
- Update tests to handle new exceptions
- Add .gitignore entries for logs directory and output files
  • Loading branch information
jeblister committed Jul 4, 2024
1 parent bdf709a commit eb0ce98
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 124 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,7 @@ concatenated_files.txt
copy_py_files.py

# An empty output folder
output/
output/

# Logs
logs/
2 changes: 1 addition & 1 deletion src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
"settings",
"Orchestrator",
"Task",
"TaskExchange"
"TaskExchange",
]
57 changes: 36 additions & 21 deletions src/assistants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import logging
import os
import time

Expand All @@ -11,9 +10,13 @@
from phi.tools.tavily import TavilyTools

from src.config import settings
from src.utils.exceptions import AssistantError
from src.utils.logging import setup_logging

load_dotenv() # This loads the variables from .env

logger = setup_logging()

# Ensure the output directory exists
output_dir = os.path.join(os.getcwd(), "output")
os.makedirs(output_dir, exist_ok=True)
Expand All @@ -22,7 +25,7 @@
try:
vertexai.init(project=settings.PROJECT_ID, location=settings.LOCATION)
except Exception as e:
logging.error(f"Error initializing VertexAI: {str(e)}")
logger.error(f"Error initializing VertexAI: {str(e)}")


def create_file(file_path: str, content: str):
Expand All @@ -44,27 +47,37 @@ def read_file(file_path: str):
def list_files(directory: str = ""):
full_path = os.path.join(output_dir, directory)
if os.path.exists(full_path):
return os.listdir(full_path)
files = os.listdir(full_path)
return ", ".join(files) if files else "No files found"
return f"Directory not found: {full_path}"


# Create assistants
def create_assistant(name: str, model: str):
if model.startswith("gemini"):
llm = Gemini(model=model)
elif model.startswith("claude"):
llm = Claude(model=model, api_key=settings.ANTHROPIC_API_KEY)
elif model.startswith("gpt"):
llm = OpenAIChat(model=model, api_key=settings.OPENAI_API_KEY)
else:
raise ValueError(f"Unsupported model: {model}")

return Assistant(
name=name,
llm=llm,
description="You are a helpful assistant.",
tools=[TavilyTools(api_key=settings.TAVILY_API_KEY), create_file, read_file, list_files],
)
try:
if model.startswith("gemini"):
llm = Gemini(model=model)
elif model.startswith("claude"):
llm = Claude(model=model, api_key=settings.ANTHROPIC_API_KEY)
elif model.startswith("gpt"):
llm = OpenAIChat(model=model, api_key=settings.OPENAI_API_KEY)
else:
raise ValueError(f"Unsupported model: {model}")

return Assistant(
name=name,
llm=llm,
description="You are a helpful assistant.",
tools=[
TavilyTools(api_key=settings.TAVILY_API_KEY),
create_file,
read_file,
list_files,
],
)
except Exception as e:
logger.error(f"Error creating assistant {name} with model {model}: {str(e)}")
raise AssistantError(f"Error creating assistant {name} with model {model}: {str(e)}")


# Create assistants
Expand All @@ -84,10 +97,12 @@ def get_full_response(assistant: Assistant, prompt: str, max_retries=3, delay=2)
else:
return str(response)
except Exception as e:
logging.error(f"Attempt {attempt + 1} failed: {str(e)}")
logger.error(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
time.sleep(delay)
else:
raise
raise AssistantError(
f"Max retries reached. Could not get a response from the assistant: {str(e)}"
)

raise Exception("Max retries reached. Could not get a response from the assistant.")
raise AssistantError("Max retries reached. Could not get a response from the assistant.")
130 changes: 63 additions & 67 deletions src/orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import json
import logging
import os
from typing import Any, Dict, List, Literal

from pydantic import BaseModel, Field
from rich import print as rprint

from .assistants import create_assistant, get_full_response
from .config import settings
from .utils.exceptions import WorkflowError, AssistantError
from .utils.logging import setup_logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = setup_logging()


class TaskExchange(BaseModel):
Expand Down Expand Up @@ -37,10 +37,6 @@ def to_dict(self) -> Dict[str, Any]:


class Orchestrator(BaseModel):
"""
Manages the workflow of AI assistants to accomplish complex tasks.
"""

state: State = State()
output_dir: str = Field(default_factory=lambda: os.path.join(os.getcwd(), "output"))

Expand All @@ -58,79 +54,79 @@ def run_workflow(self, objective: str) -> str:
Returns:
str: The final refined output of the workflow.
"""
rprint(f"[bold green]Starting workflow with objective:[/bold green] {objective}")
logging.info(f"Starting workflow with objective: {objective}")
self.state.task_exchanges.append(TaskExchange(role="user", content=objective))

logger.info(f"Starting workflow with objective: {objective}")
self.state.task_exchanges.append(TaskExchange(role="user", content=objective))
task_counter = 1
while True:
rprint(f"\n[bold blue]--- Task {task_counter} ---[/bold blue]")
logging.info(f"Starting task {task_counter}")
main_prompt = (
f"Objective: {objective}\n\n"
f"Current progress:\n{json.dumps(self.state.to_dict(), indent=2)}\n\n"
"Break down this objective into the next specific sub-task, or if the objective is fully achieved, "
"start your response with 'ALL DONE:' followed by the final output."
)
logging.debug(f"Main prompt: {main_prompt}")
main_response = get_full_response(
create_assistant("MainAssistant", settings.MAIN_ASSISTANT), main_prompt
)
try:
while True:
logger.info(f"Starting task {task_counter}")
main_prompt = self._generate_main_prompt(objective)
main_response = self._get_assistant_response("main", main_prompt)

if main_response.startswith("ALL DONE:"):
logger.info("Workflow completed")
break

sub_task_prompt = self._generate_sub_task_prompt(main_response)
sub_response = self._get_assistant_response("sub", sub_task_prompt)

self.state.tasks.append(Task(task=main_response, result=sub_response))
task_counter += 1

refined_output = self._get_refined_output(objective)
self._save_exchange_log(objective, refined_output)
logger.info("Exchange log saved")

return refined_output
except AssistantError as e:
logger.error(f"Assistant error: {str(e)}")
raise WorkflowError(f"Workflow failed due to assistant error: {str(e)}")
except Exception as e:
logger.exception("Unexpected error in workflow execution")
raise WorkflowError(f"Unexpected error in workflow execution: {str(e)}")

def _generate_main_prompt(self, objective: str) -> str:
return (
f"Objective: {objective}\n\n"
f"Current progress:\n{json.dumps(self.state.to_dict(), indent=2)}\n\n"
"Break down this objective into the next specific sub-task, or if the objective is fully achieved, "
"start your response with 'ALL DONE:' followed by the final output."
)

logging.info("Main assistant response received")
self.state.task_exchanges.append(
TaskExchange(role="main_assistant", content=main_response)
)
rprint(f"[green]MAIN_ASSISTANT response:[/green] {main_response[:100]}...")

if main_response.startswith("ALL DONE:"):
rprint("[bold green]Workflow completed![/bold green]")
logging.info("Workflow completed")
break

sub_task_prompt = (
f"Previous tasks: {json.dumps([task.to_dict() for task in self.state.tasks], indent=2)}\n\n"
f"Current task: {main_response}\n\n"
"Execute this task and provide the result. Use the provided functions to create, read, or list files as needed. "
f"All file operations should be relative to the '{self.output_dir}' directory."
)
logging.debug(f"Sub-task prompt: {sub_task_prompt}")
sub_response = get_full_response(
create_assistant("SubAssistant", settings.SUB_ASSISTANT), sub_task_prompt
)
def _generate_sub_task_prompt(self, main_response: str) -> str:
return (
f"Previous tasks: {json.dumps([task.to_dict() for task in self.state.tasks], indent=2)}\n\n"
f"Current task: {main_response}\n\n"
"Execute this task and provide the result. Use the provided functions to create, read, or list files as needed. "
f"All file operations should be relative to the '{self.output_dir}' directory."
)

logging.info("Sub-assistant response received")
def _get_assistant_response(self, assistant_type: str, prompt: str) -> str:
try:
assistant_model = getattr(settings, f"{assistant_type.upper()}_ASSISTANT")
assistant = create_assistant(f"{assistant_type.capitalize()}Assistant", assistant_model)
response = get_full_response(assistant, prompt)
logger.info(f"{assistant_type.capitalize()} assistant response received")
self.state.task_exchanges.append(
TaskExchange(role="sub_assistant", content=sub_response)
TaskExchange(role=f"{assistant_type}_assistant", content=response)
)
return response
except Exception as e:
logger.error(f"Error getting response from {assistant_type} assistant: {str(e)}")
raise AssistantError(
f"Error getting response from {assistant_type} assistant: {str(e)}"
)
self.state.tasks.append(Task(task=main_response, result=sub_response))
rprint(f"[green]SUB_ASSISTANT response:[/green] {sub_response[:100]}...")

task_counter += 1

def _get_refined_output(self, objective: str) -> str:
refiner_prompt = (
f"Original objective: {objective}\n\n"
f"Task breakdown and results: {json.dumps([task.to_dict() for task in self.state.tasks], indent=2)}\n\n"
"Please refine these results into a coherent final output, summarizing the project structure created. "
f"You can use the provided functions to list and read files if needed. All files are in the '{self.output_dir}' directory. "
"Provide your response as a string, not a list or dictionary."
)
logging.debug(f"Refiner prompt: {refiner_prompt}")
refined_output = get_full_response(
create_assistant("RefinerAssistant", settings.REFINER_ASSISTANT), refiner_prompt
)

logging.info("Refiner assistant response received")
self.state.task_exchanges.append(
TaskExchange(role="refiner_assistant", content=refined_output)
)
rprint(f"[green]REFINER_ASSISTANT response:[/green] {refined_output[:100]}...")

self._save_exchange_log(objective, refined_output)
rprint("[bold blue]Exchange log saved to 'exchange_log.md'[/bold blue]")
logging.info("Exchange log saved")

return refined_output
return self._get_assistant_response("refiner", refiner_prompt)

def _save_exchange_log(self, objective: str, final_output: str):
"""
Expand All @@ -152,4 +148,4 @@ def _save_exchange_log(self, objective: str, final_output: str):
log_file_path = os.path.join(self.output_dir, "exchange_log.md")
with open(log_file_path, "w") as f:
f.write(log_content)
rprint(f"[blue]Exchange log saved to:[/blue] {log_file_path}")
logger.info(f"Exchange log saved to: {log_file_path}")
18 changes: 18 additions & 0 deletions src/utils/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class SAAOrchestratorError(Exception):
"""Base exception class for SAA Orchestrator"""


class AssistantError(SAAOrchestratorError):
"""Raised when there's an error with an AI assistant"""


class WorkflowError(SAAOrchestratorError):
"""Raised when there's an error in the workflow execution"""


class ConfigurationError(SAAOrchestratorError):
"""Raised when there's an error in the configuration"""


class PluginError(SAAOrchestratorError):
"""Raised when there's an error with a plugin"""
35 changes: 35 additions & 0 deletions src/utils/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging
import os
from logging.handlers import RotatingFileHandler

from rich.logging import RichHandler


def setup_logging(log_level=logging.INFO, log_file="saa_orchestrator.log"):
# Create logs directory if it doesn't exist
logs_dir = os.path.join(os.getcwd(), "logs")
os.makedirs(logs_dir, exist_ok=True)

# Configure root logger
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
RichHandler(rich_tracebacks=True),
RotatingFileHandler(
os.path.join(logs_dir, log_file),
maxBytes=10_000_000, # 10MB
backupCount=5,
),
],
)

# Create a logger for this module
logger = logging.getLogger(__name__)

return logger


# Usage example
logger = setup_logging()
Loading

0 comments on commit eb0ce98

Please sign in to comment.