diff --git a/src/planai/graph.py b/src/planai/graph.py index 1f897bc..7b9e1b2 100644 --- a/src/planai/graph.py +++ b/src/planai/graph.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import math import shutil import time from concurrent.futures import ThreadPoolExecutor @@ -292,7 +291,7 @@ def _terminal_display_thread(self): try: while not self._stop_terminal_display_event.is_set(): self.display_terminal_status() - time.sleep(1) # Update interval + time.sleep(0.25) # Update interval finally: self._clear_terminal() self._print_log() @@ -303,7 +302,7 @@ def _clear_terminal(self): def _print_log(self): print("\nLog:") - for line in self._log_lines[-10:]: + for line in self._log_lines[-15:]: print(line) def display_terminal_status(self): @@ -327,70 +326,43 @@ def display_terminal_status(self): failed = sum(1 for t in data["failed"] if t["worker"] == worker) total_tasks = completed + active + queued + failed - # Including space for worker name and separators, and accounting for emoji width - available_width = (terminal_width - 48) // 2 + available_width = ( + terminal_width - 24 + ) // 2 # Adjust for worker name and separator if total_tasks > available_width: - # Scale down if total tasks exceed available width scale_factor = available_width / total_tasks - completed_scaled = math.floor(completed * scale_factor) - active_scaled = math.floor(active * scale_factor) - queued_scaled = math.floor(queued * scale_factor) - failed_scaled = math.floor(failed * scale_factor) - - # Ensure at least one emoji for non-zero values - completed_scaled = max(1, completed_scaled) if completed > 0 else 0 - active_scaled = max(1, active_scaled) if active > 0 else 0 - queued_scaled = max(1, queued_scaled) if queued > 0 else 0 - failed_scaled = max(1, failed_scaled) if failed > 0 else 0 - - # Adjust to fit exactly within available width - total_scaled = ( - completed_scaled + active_scaled + queued_scaled + failed_scaled + completed_scaled = ( + max(1, int(completed * scale_factor)) if completed > 0 else 0 ) - while total_scaled > available_width: - if completed_scaled > 1 and completed_scaled == max( - completed_scaled, active_scaled, queued_scaled, failed_scaled - ): - completed_scaled -= 1 - elif active_scaled > 1 and active_scaled == max( - active_scaled, queued_scaled, failed_scaled - ): - active_scaled -= 1 - elif queued_scaled > 1 and queued_scaled == max( - queued_scaled, failed_scaled - ): - queued_scaled -= 1 - elif failed_scaled > 1: - failed_scaled -= 1 - total_scaled = ( - completed_scaled + active_scaled + queued_scaled + failed_scaled - ) + active_scaled = max(1, int(active * scale_factor)) if active > 0 else 0 + queued_scaled = max(1, int(queued * scale_factor)) if queued > 0 else 0 + failed_scaled = max(1, int(failed * scale_factor)) if failed > 0 else 0 else: - # No scaling needed - completed_scaled = completed - active_scaled = active - queued_scaled = queued - failed_scaled = failed + completed_scaled, active_scaled, queued_scaled, failed_scaled = ( + completed, + active, + queued, + failed, + ) - # Create bars based on scaled values + # Create bars completed_bar = Fore.GREEN + "🟩" * completed_scaled active_bar = Fore.BLUE + "🔵" * active_scaled queued_bar = Fore.LIGHTYELLOW_EX + "🟠" * queued_scaled failed_bar = Fore.RED + "❌" * failed_scaled - # Construct the status line + # First print: worker name and bars status_line = f"{worker:20} | {completed_bar}{active_bar}{queued_bar}{failed_bar}{Style.RESET_ALL}" + print(status_line, end="") - # Add task counts - counts = f" C:{completed} A:{active} Q:{queued} F:{failed}" - status_line += counts + # Calculate and print counts at the right edge + counts = f"C:{completed} A:{active} Q:{queued} F:{failed}" - # Truncate if still too long - if len(status_line) > terminal_width: - status_line = status_line[: terminal_width - 3] + "..." + # Move cursor to the right edge minus the length of counts + cursor_move = f"\033[{terminal_width - len(counts)}G" - print(status_line) + print(f"{cursor_move}{counts}") self._print_log() @@ -409,6 +381,8 @@ def __repr__(self) -> str: def main(): + import random + # Define custom Task classes class Task1WorkItem(Task): data: str @@ -425,25 +399,29 @@ class Task1Worker(TaskWorker): def consume_work(self, task: Task1WorkItem): self.print(f"Task1 consuming: {task.data}") - time.sleep(1) - processed = f"Processed: {task.data.upper()}" - self.publish_work(Task2WorkItem(processed_data=processed), input_task=task) + time.sleep(random.uniform(0.2, 0.7)) + for i in range(5): + processed = f"Processed: {task.data.upper()} at iteration {i}" + self.publish_work( + Task2WorkItem(processed_data=processed), input_task=task + ) class Task2Worker(TaskWorker): output_types: List[Type[Task]] = [Task3WorkItem] def consume_work(self, task: Task2WorkItem): self.print(f"Task2 consuming: {task.processed_data}") - time.sleep(1) - final = f"Final: {task.processed_data}!" - self.publish_work(Task3WorkItem(final_result=final), input_task=task) + time.sleep(random.uniform(0.3, 1.5)) + for i in range(7): + final = f"Final: {task.processed_data} at iteration {i}!" + self.publish_work(Task3WorkItem(final_result=final), input_task=task) class Task3Worker(TaskWorker): output_types: Set[Type[Task]] = set() def consume_work(self, task: Task3WorkItem): self.print(f"Task3 consuming: {task.final_result}") - time.sleep(1) + time.sleep(random.uniform(0.4, 1.2)) self.print("Workflow complete!") # Create Graph @@ -460,12 +438,11 @@ def consume_work(self, task: Task3WorkItem): # Set dependencies graph.set_dependency(task1, task2).next(task3) - # Validate Graph - execution_order = graph.validate_graph() - print(f"Execution order: {execution_order}") - # Prepare initial work item - initial_work = [(task1, Task1WorkItem(data="Hello, Graph!"))] + initial_work = [ + (task1, Task1WorkItem(data="Hello, Graph v1!")), + (task1, Task1WorkItem(data="Hello, Graph v2!")), + ] # Run the Graph graph.run(initial_work)