Skip to content

Commit

Permalink
refactor: Improve task progress bar scaling and update terminal display
Browse files Browse the repository at this point in the history
- Adjusted the scaling of task progress bars to fit within the available terminal width.
- Updated the terminal display to show worker name and task counts at the right edge.
- Removed unnecessary code for scaling down the bars.
  • Loading branch information
provos committed Sep 12, 2024
1 parent d6d4648 commit 9e38ef2
Showing 1 changed file with 41 additions and 64 deletions.
105 changes: 41 additions & 64 deletions src/planai/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import math
import shutil
import time
from concurrent.futures import ThreadPoolExecutor
Expand Down Expand Up @@ -292,7 +291,7 @@ def _terminal_display_thread(self):
try:
while not self._stop_terminal_display_event.is_set():
self.display_terminal_status()
time.sleep(1) # Update interval
time.sleep(0.25) # Update interval
finally:
self._clear_terminal()
self._print_log()
Expand All @@ -303,7 +302,7 @@ def _clear_terminal(self):

def _print_log(self):
print("\nLog:")
for line in self._log_lines[-10:]:
for line in self._log_lines[-15:]:
print(line)

def display_terminal_status(self):
Expand All @@ -327,70 +326,43 @@ def display_terminal_status(self):
failed = sum(1 for t in data["failed"] if t["worker"] == worker)

total_tasks = completed + active + queued + failed
# Including space for worker name and separators, and accounting for emoji width
available_width = (terminal_width - 48) // 2
available_width = (
terminal_width - 24
) // 2 # Adjust for worker name and separator

if total_tasks > available_width:
# Scale down if total tasks exceed available width
scale_factor = available_width / total_tasks
completed_scaled = math.floor(completed * scale_factor)
active_scaled = math.floor(active * scale_factor)
queued_scaled = math.floor(queued * scale_factor)
failed_scaled = math.floor(failed * scale_factor)

# Ensure at least one emoji for non-zero values
completed_scaled = max(1, completed_scaled) if completed > 0 else 0
active_scaled = max(1, active_scaled) if active > 0 else 0
queued_scaled = max(1, queued_scaled) if queued > 0 else 0
failed_scaled = max(1, failed_scaled) if failed > 0 else 0

# Adjust to fit exactly within available width
total_scaled = (
completed_scaled + active_scaled + queued_scaled + failed_scaled
completed_scaled = (
max(1, int(completed * scale_factor)) if completed > 0 else 0
)
while total_scaled > available_width:
if completed_scaled > 1 and completed_scaled == max(
completed_scaled, active_scaled, queued_scaled, failed_scaled
):
completed_scaled -= 1
elif active_scaled > 1 and active_scaled == max(
active_scaled, queued_scaled, failed_scaled
):
active_scaled -= 1
elif queued_scaled > 1 and queued_scaled == max(
queued_scaled, failed_scaled
):
queued_scaled -= 1
elif failed_scaled > 1:
failed_scaled -= 1
total_scaled = (
completed_scaled + active_scaled + queued_scaled + failed_scaled
)
active_scaled = max(1, int(active * scale_factor)) if active > 0 else 0
queued_scaled = max(1, int(queued * scale_factor)) if queued > 0 else 0
failed_scaled = max(1, int(failed * scale_factor)) if failed > 0 else 0
else:
# No scaling needed
completed_scaled = completed
active_scaled = active
queued_scaled = queued
failed_scaled = failed
completed_scaled, active_scaled, queued_scaled, failed_scaled = (
completed,
active,
queued,
failed,
)

# Create bars based on scaled values
# Create bars
completed_bar = Fore.GREEN + "🟩" * completed_scaled
active_bar = Fore.BLUE + "🔵" * active_scaled
queued_bar = Fore.LIGHTYELLOW_EX + "🟠" * queued_scaled
failed_bar = Fore.RED + "❌" * failed_scaled

# Construct the status line
# First print: worker name and bars
status_line = f"{worker:20} | {completed_bar}{active_bar}{queued_bar}{failed_bar}{Style.RESET_ALL}"
print(status_line, end="")

# Add task counts
counts = f" C:{completed} A:{active} Q:{queued} F:{failed}"
status_line += counts
# Calculate and print counts at the right edge
counts = f"C:{completed} A:{active} Q:{queued} F:{failed}"

# Truncate if still too long
if len(status_line) > terminal_width:
status_line = status_line[: terminal_width - 3] + "..."
# Move cursor to the right edge minus the length of counts
cursor_move = f"\033[{terminal_width - len(counts)}G"

print(status_line)
print(f"{cursor_move}{counts}")

self._print_log()

Expand All @@ -409,6 +381,8 @@ def __repr__(self) -> str:


def main():
import random

# Define custom Task classes
class Task1WorkItem(Task):
data: str
Expand All @@ -425,25 +399,29 @@ class Task1Worker(TaskWorker):

def consume_work(self, task: Task1WorkItem):
self.print(f"Task1 consuming: {task.data}")
time.sleep(1)
processed = f"Processed: {task.data.upper()}"
self.publish_work(Task2WorkItem(processed_data=processed), input_task=task)
time.sleep(random.uniform(0.2, 0.7))
for i in range(5):
processed = f"Processed: {task.data.upper()} at iteration {i}"
self.publish_work(
Task2WorkItem(processed_data=processed), input_task=task
)

class Task2Worker(TaskWorker):
output_types: List[Type[Task]] = [Task3WorkItem]

def consume_work(self, task: Task2WorkItem):
self.print(f"Task2 consuming: {task.processed_data}")
time.sleep(1)
final = f"Final: {task.processed_data}!"
self.publish_work(Task3WorkItem(final_result=final), input_task=task)
time.sleep(random.uniform(0.3, 1.5))
for i in range(7):
final = f"Final: {task.processed_data} at iteration {i}!"
self.publish_work(Task3WorkItem(final_result=final), input_task=task)

class Task3Worker(TaskWorker):
output_types: Set[Type[Task]] = set()

def consume_work(self, task: Task3WorkItem):
self.print(f"Task3 consuming: {task.final_result}")
time.sleep(1)
time.sleep(random.uniform(0.4, 1.2))
self.print("Workflow complete!")

# Create Graph
Expand All @@ -460,12 +438,11 @@ def consume_work(self, task: Task3WorkItem):
# Set dependencies
graph.set_dependency(task1, task2).next(task3)

# Validate Graph
execution_order = graph.validate_graph()
print(f"Execution order: {execution_order}")

# Prepare initial work item
initial_work = [(task1, Task1WorkItem(data="Hello, Graph!"))]
initial_work = [
(task1, Task1WorkItem(data="Hello, Graph v1!")),
(task1, Task1WorkItem(data="Hello, Graph v2!")),
]

# Run the Graph
graph.run(initial_work)
Expand Down

0 comments on commit 9e38ef2

Please sign in to comment.