From 9517f867dff9a0ef20fc12b0466118a14a46fad7 Mon Sep 17 00:00:00 2001 From: Torin Etheridge Date: Sun, 17 Nov 2024 19:31:52 -0500 Subject: [PATCH] update package --- README.md | 117 ++++++++++++++++++++ atem/__init__.py | 5 + atem/core.py | 117 ++++++++++++++++++++ atem/model_train.py | 176 ++++++++++++++++++++++++++++++ {package => atem}/utils.py | 0 atem_core/adaptive/test_interp.py | 2 +- atem_core/adaptive/train2.py | 140 ------------------------ package/__init__.py | 5 - package/core.py | 31 ------ requirements.txt | 9 +- setup.py | 23 ++-- 11 files changed, 431 insertions(+), 194 deletions(-) create mode 100644 atem/__init__.py create mode 100644 atem/core.py create mode 100644 atem/model_train.py rename {package => atem}/utils.py (100%) delete mode 100644 atem_core/adaptive/train2.py delete mode 100644 package/__init__.py delete mode 100644 package/core.py diff --git a/README.md b/README.md index 6b4b792..feb091e 100644 --- a/README.md +++ b/README.md @@ -142,3 +142,120 @@ The Adaptive Task Prediction Model utilizes a TensorFlow Lite (TFLite) model for - Scores all potential tasks to indicate confidence levels. - The robot executes the predicted task with the highest score. + +# ATEM: Adaptive Task Execution and Machine Learning Package Documentation + +ATEM is a Python package designed for adaptive task execution in robotics and AI applications. It provides tools for training machine learning models, interpreting task sequences, and generating optimal task orders for various scenarios. + +## Features +- **Adaptive Task Model**: Predict the next task based on sensor data and task history. +- **Task Training**: Train custom machine learning models using a `tasks.json` file. +- **Real-time Adaptation**: Simulate real-world scenarios for task execution. +- **Pathfinding Integration**: Extendable for integration with A* pathfinding for robotics. +- **Lightweight TensorFlow Lite Integration**: For efficient model inference. + +--- + +## Installation + +```bash +pip install atem +``` +--- + +## Quick Start Guide +1. **Preparing the tasks.json File** +The tasks.json file defines the tasks and their attributes. + +Example +```json +{ + "tasks": [ + {"name": "Task 1", "points": 10, "time": 5}, + {"name": "Task 2", "points": 20, "time": 15}, + {"name": "Task 3", "points": 15, "time": 10} + ] +} +``` +2. **Training a Model** +Use the ModelTrainer class to train a TensorFlow Lite model. + +Example +```python +from atem.model_train import ModelTrainer + +trainer = ModelTrainer(tasks_file="tasks.json", output_model_path="adaptive_model.tflite") +trainer.train_and_save_model(epochs=20, batch_size=16) + +``` + +3. **Interpreting Tasks** +Use the AdaptiveModel class to interpret task sequences and predict the next task. + +Example +```python +from atem import AdaptiveModel + +model = AdaptiveModel(model_path="adaptive_model.tflite") + +task_to_index = {"Task 1": 0, "Task 2": 1, "Task 3": 2} +index_to_task = {0: "Task 1", 1: "Task 2", 2: "Task 3"} +current_task = "Task 1" +sensor_data = { + "time_elapsed": 20, + "distance_to_target": 1.2, + "gyro_angle": 45, + "battery_level": 80 +} +predicted_task, scores = model.predict_next_task( + current_task=current_task, + sensor_data=sensor_data, + task_to_index=task_to_index, + index_to_task=index_to_task, + max_length=5 +) + +print(f"Predicted Next Task: {predicted_task}") +print(f"Task Scores: {scores}") + +``` + +--- + +## API Reference + +### 1. AdaptiveModel + + +__init__(model_path: str) +- Initialize the adaptive model with a TFLite model path. + +predict_next_task(...) +- Predict the next task based on the current task and sensor data. + +### 2. ModelTrainer + + +__init__(tasks_file: str, output_model_path: str) +- Initialize the trainer with tasks and an output model path. + + +train_and_save_model(epochs: int, batch_size: int) +- Train the model and save it as a TFLite file. + +set_max_length(max_length: int) +- Set the maximum sequence length for task encoding and padding. + + + + + + + + + + + + + + diff --git a/atem/__init__.py b/atem/__init__.py new file mode 100644 index 0000000..c458bf1 --- /dev/null +++ b/atem/__init__.py @@ -0,0 +1,5 @@ +from .core import AdaptiveModel +from .utils import load_tasks, create_task_encoder +from .model_train import ModelTrainer + +__all__ = ["AdaptiveModel", "load_tasks", "create_task_encoder", "ModelTrainer"] \ No newline at end of file diff --git a/atem/core.py b/atem/core.py new file mode 100644 index 0000000..f02061b --- /dev/null +++ b/atem/core.py @@ -0,0 +1,117 @@ +import tensorflow as tf +import numpy as np +import json + + +class AdaptiveModel: + def __init__(self, model_path): + """ + Initializes the AdaptiveModel with a TensorFlow Lite interpreter. + + Args: + model_path (str): Path to the TFLite model file. + """ + self.model_path = model_path + self.interpreter = tf.lite.Interpreter(model_path=model_path) + self.interpreter.allocate_tensors() + + def _prepare_input(self, current_task, sensor_data, task_to_index, max_length): + """ + Prepares the input tensors for model inference. + + Args: + current_task (str): Name of the current task. + sensor_data (dict): Dictionary containing sensor readings. + task_to_index (dict): Mapping of task names to indices. + max_length (int): Maximum length for padding. + + Returns: + tuple: Prepared task and sensor feature arrays. + """ + encoded_task = [task_to_index[current_task]] + padded_task = np.pad(encoded_task, (0, max_length - len(encoded_task)), constant_values=0).astype(np.float32).reshape(1, -1) + + sensor_features = np.array([ + sensor_data["time_elapsed"], + sensor_data["distance_to_target"], + sensor_data["gyro_angle"], + sensor_data["battery_level"] + ], dtype=np.float32).reshape(1, -1) + + return padded_task, sensor_features + + def predict_next_task(self, current_task, sensor_data, task_to_index, index_to_task, max_length): + """ + Predicts the next task based on the current task and sensor data. + + Args: + current_task (str): Name of the current task. + sensor_data (dict): Dictionary containing sensor readings. + task_to_index (dict): Mapping of task names to indices. + index_to_task (dict): Mapping of task indices to names. + max_length (int): Maximum length for padding. + + Returns: + tuple: The predicted task name and raw output probabilities. + """ + input_details = self.interpreter.get_input_details() + output_details = self.interpreter.get_output_details() + + padded_task, sensor_features = self._prepare_input(current_task, sensor_data, task_to_index, max_length) + + self.interpreter.set_tensor(input_details[0]['index'], padded_task) + if len(input_details) > 1: + self.interpreter.set_tensor(input_details[1]['index'], sensor_features) + + self.interpreter.invoke() + output = self.interpreter.get_tensor(output_details[0]['index'])[0] + predicted_index = np.argmax(output) + return index_to_task[predicted_index], output + + def batch_predict(self, task_sensor_pairs, task_to_index, index_to_task, max_length): + """ + Predicts next tasks for a batch of inputs. + + Args: + task_sensor_pairs (list of tuples): List of (current_task, sensor_data) pairs. + task_to_index (dict): Mapping of task names to indices. + index_to_task (dict): Mapping of task indices to names. + max_length (int): Maximum length for padding. + + Returns: + list of tuples: List of predicted tasks and their raw probabilities. + """ + predictions = [] + for current_task, sensor_data in task_sensor_pairs: + predicted_task, raw_output = self.predict_next_task( + current_task, sensor_data, task_to_index, index_to_task, max_length + ) + predictions.append((predicted_task, raw_output)) + return predictions + + def save_predictions(self, predictions, output_file="predictions.json"): + """ + Saves predictions to a JSON file. + + Args: + predictions (list of tuples): List of predictions to save. + output_file (str): Path to the JSON file to save. + """ + formatted_predictions = [ + {"predicted_task": pred[0], "raw_output": pred[1].tolist()} for pred in predictions + ] + with open(output_file, "w") as f: + json.dump(formatted_predictions, f, indent=4) + print(f"Predictions saved to {output_file}") + + def load_model(self, new_model_path): + """ + Reloads the model with a new TFLite file. + + Args: + new_model_path (str): Path to the new TFLite model file. + """ + self.model_path = new_model_path + self.interpreter = tf.lite.Interpreter(model_path=new_model_path) + self.interpreter.allocate_tensors() + print(f"Model reloaded from {new_model_path}") \ No newline at end of file diff --git a/atem/model_train.py b/atem/model_train.py new file mode 100644 index 0000000..ae0007f --- /dev/null +++ b/atem/model_train.py @@ -0,0 +1,176 @@ +import tensorflow as tf +import numpy as np +import json +from colorama import Fore, Style, init + +# Initialize colorama +init(autoreset=True) + +class ModelTrainer: + def __init__(self, tasks_file, output_model_path="trained_model.tflite"): + """ + Initialize the ModelTrainer with a tasks file and output model path. + + Args: + tasks_file (str): Path to the JSON file containing tasks. + output_model_path (str): Path to save the trained TFLite model. + """ + self.tasks_file = tasks_file + self.output_model_path = output_model_path + self.tasks = self._load_tasks() + self.task_to_index, self.index_to_task = self._create_task_encoder() + self.max_length = 5 # Default maximum sequence length + + def _load_tasks(self): + """ + Load tasks from the JSON file. + + Returns: + list: A list of tasks. + """ + print(f"{Fore.BLUE}Loading tasks from {self.tasks_file}...") + with open(self.tasks_file, "r") as file: + data = json.load(file) + print(f"{Fore.GREEN}Tasks loaded successfully!") + return data["tasks"] + + def _create_task_encoder(self): + """ + Create task encoders for mapping task names to indices and vice versa. + + Returns: + tuple: task_to_index and index_to_task mappings. + """ + print(f"{Fore.BLUE}Creating task encoder...") + task_names = sorted(set(task["name"] for task in self.tasks)) + task_to_index = {name: idx for idx, name in enumerate(task_names)} + index_to_task = {idx: name for name, idx in task_to_index.items()} + print(f"{Fore.GREEN}Task encoder created successfully!") + return task_to_index, index_to_task + + def _generate_training_data(self, num_samples=500, time_limit=30): + """ + Generate training data from tasks. + + Args: + num_samples (int): Number of task sequences to generate. + time_limit (int): Maximum time limit for a sequence. + + Returns: + tuple: Encoded task sequences, corresponding points, and sensor data. + """ + task_sequences = [] + points = [] + sensor_data = [] + + for _ in range(num_samples): + sequence = [] + total_time = 0 + total_points = 0 + while total_time < time_limit: + task = np.random.choice(self.tasks) + if total_time + task["time"] <= time_limit: + sequence.append(task["name"]) + total_time += task["time"] + total_points += task["points"] + + # Generate mock sensor data + sensor_data.append({ + "time_elapsed": np.random.randint(10, 30), + "distance_to_target": np.random.uniform(0.5, 2.0), + "gyro_angle": np.random.randint(0, 180), + "battery_level": np.random.randint(50, 100) + }) + + task_sequences.append(sequence) + points.append(total_points) + + return task_sequences, points, sensor_data + + def _encode_and_pad_sequences(self, task_sequences): + """ + Encode and pad task sequences for model training. + + Args: + task_sequences (list): List of task sequences. + + Returns: + np.array: Encoded and padded task sequences. + """ + encoded_sequences = [] + for sequence in task_sequences: + # Ensure all tasks are valid + for task in sequence: + if task not in self.task_to_index: + raise ValueError(f"Task '{task}' not found in task_to_index mapping.") + + # Encode the sequence + encoded = [self.task_to_index[task] for task in sequence] + + # Truncate if sequence length exceeds max_length + if len(encoded) > self.max_length: + encoded = encoded[:self.max_length] + + # Pad to max_length + padded = np.pad(encoded, (0, self.max_length - len(encoded)), constant_values=0) + encoded_sequences.append(padded) + + return np.array(encoded_sequences) + + def train_and_save_model(self, epochs=20, batch_size=16): + """ + Train a model and save it in TFLite format. + + Args: + epochs (int): Number of training epochs. + batch_size (int): Batch size for training. + """ + print("Generating training data...") + task_sequences, points, sensor_data = self._generate_training_data() + + print("Encoding and padding sequences...") + X = self._encode_and_pad_sequences(task_sequences) + y = np.array(points) + + print("Building the model...") + model = tf.keras.Sequential([ + tf.keras.layers.Embedding(input_dim=len(self.task_to_index), output_dim=16, input_length=self.max_length), + tf.keras.layers.Flatten(), + tf.keras.layers.Dense(32, activation="relu"), + tf.keras.layers.Dense(1, activation="linear") + ]) + + model.compile(optimizer="adam", loss="mse", metrics=["mae"]) + + print(f"{Fore.BLUE}Training the model for {epochs} epochs...") + model.fit(X, y, epochs=epochs, batch_size=batch_size, verbose=1) + + print(f"{Fore.BLUE}Converting the model to TensorFlow Lite format...") + converter = tf.lite.TFLiteConverter.from_keras_model(model) + tflite_model = converter.convert() + + print(f"{Fore.GREEN}Saving the model to {self.output_model_path}...") + with open(self.output_model_path, "wb") as file: + file.write(tflite_model) + + print(f"{Fore.GREEN}Model training and saving complete!") + + def set_max_length(self, new_max_length): + """ + Update the maximum sequence length for padding. + + Args: + new_max_length (int): New maximum sequence length. + """ + print(f"{Fore.YELLOW}Updating max_length from {self.max_length} to {new_max_length}...") + self.max_length = new_max_length + print(f"{Fore.GREEN}max_length updated successfully!") + + def get_task_mappings(self): + """ + Get task-to-index and index-to-task mappings. + + Returns: + tuple: task_to_index and index_to_task mappings. + """ + return self.task_to_index, self.index_to_task \ No newline at end of file diff --git a/package/utils.py b/atem/utils.py similarity index 100% rename from package/utils.py rename to atem/utils.py diff --git a/atem_core/adaptive/test_interp.py b/atem_core/adaptive/test_interp.py index 0b5eaa6..25147d1 100644 --- a/atem_core/adaptive/test_interp.py +++ b/atem_core/adaptive/test_interp.py @@ -1,7 +1,7 @@ import json import numpy as np import tensorflow as tf -from train2 import create_task_encoder +from atem_core.core.train import create_task_encoder # Paths for the task JSON and TFLite model TASK_JSON_PATH = "tasks.json" diff --git a/atem_core/adaptive/train2.py b/atem_core/adaptive/train2.py deleted file mode 100644 index 21b1331..0000000 --- a/atem_core/adaptive/train2.py +++ /dev/null @@ -1,140 +0,0 @@ -import json -import os -import numpy as np -import tensorflow as tf -from random import sample - -# Paths for task JSON and trained model -TASK_JSON_PATH = "tasks.json" -TFLITE_MODEL_PATH = os.path.join("atem_core", "models", "auto_task_optimizer.tflite") - - -# Load tasks from JSON file -def load_tasks(file_path): - with open(file_path, "r") as file: - data = json.load(file) - return data["tasks"] - - -# Generate random task orders -def generate_task_orders(tasks, num_samples=500, time_limit=30): - task_sequences = [] - points = [] - times = [] - - for _ in range(num_samples): - sequence = [] - total_time = 0 - total_points = 0 - - for task in sample(tasks, len(tasks)): - if total_time + task["time"] <= time_limit: - sequence.append(task) - total_time += task["time"] - total_points += task["points"] - - task_sequences.append(sequence) - points.append(total_points) - times.append(total_time) - - return task_sequences, points, times - - -# Create task encoders -def create_task_encoder(tasks): - task_names = sorted(set(task["name"] for task in tasks)) - task_to_index = {name: idx for idx, name in enumerate(task_names)} - index_to_task = {idx: name for name, idx in task_to_index.items()} - return task_to_index, index_to_task - - -# Encode and pad task sequences -def encode_and_pad_sequences(task_sequences, task_to_index, max_length): - encoded_sequences = [] - for sequence in task_sequences: - encoded = [task_to_index[task["name"]] for task in sequence] - - # Truncate sequences that exceed max_length - if len(encoded) > max_length: - encoded = encoded[:max_length] - - # Pad sequences to max_length - padded = np.pad(encoded, (0, max_length - len(encoded)), constant_values=0) - encoded_sequences.append(padded) - - return np.array(encoded_sequences) - - -# Train a TensorFlow model -def train_model(X, y, max_length, num_tasks): - model = tf.keras.Sequential([ - tf.keras.layers.Embedding(input_dim=num_tasks, output_dim=16, input_length=max_length), - tf.keras.layers.Flatten(), - tf.keras.layers.Dense(32, activation='relu'), - tf.keras.layers.Dense(1, activation='linear') - ]) - - model.compile(optimizer='adam', loss='mse', metrics=['mae']) - model.fit(X, y, epochs=10, batch_size=16, verbose=1) - return model - - -# Predict the optimal sequence -def predict_optimal_sequence(tasks, task_to_index, model, max_length, time_limit=30): - task_sequences, _, _ = generate_task_orders(tasks, num_samples=1000, time_limit=time_limit) - encoded_sequences = encode_and_pad_sequences(task_sequences, task_to_index, max_length) - - predictions = model.predict(encoded_sequences) - best_sequence_idx = np.argmax(predictions) - best_sequence = task_sequences[best_sequence_idx] - best_score = predictions[best_sequence_idx] - - return best_sequence, best_score - - -# Save the trained model as TensorFlow Lite -def save_tflite_model(model, output_path): - try: - print("Converting the trained model to TensorFlow Lite format...") - converter = tf.lite.TFLiteConverter.from_keras_model(model) - tflite_model = converter.convert() - os.makedirs(os.path.dirname(output_path), exist_ok=True) # Ensure directory exists - with open(output_path, "wb") as f: - f.write(tflite_model) - print(f"Model successfully saved at: {output_path}") - return True - except Exception as e: - print(f"Error occurred during model conversion: {e}") - return False - - -# Main Execution -if __name__ == "__main__": - print("Loading tasks...") - tasks = load_tasks(TASK_JSON_PATH) - - print("Creating task encoders...") - task_to_index, index_to_task = create_task_encoder(tasks) - - print("Generating training data...") - max_length = 5 # Maximum number of tasks in a sequence - task_sequences, points, times = generate_task_orders(tasks, num_samples=500, time_limit=30) - X = encode_and_pad_sequences(task_sequences, task_to_index, max_length) - y = np.array(points) - - print("Training the model...") - num_tasks = len(task_to_index) - model = train_model(X, y, max_length, num_tasks) - - print("Predicting the optimal sequence...") - best_sequence, best_score = predict_optimal_sequence(tasks, task_to_index, model, max_length) - print("\nOptimal Task Sequence:") - for task in best_sequence: - print(f"- {task['name']} (Time: {task['time']}, Points: {task['points']})") - print(f"Predicted Score: {best_score}") - - # Save the model - if save_tflite_model(model, TFLITE_MODEL_PATH): - print("TFLite model saved successfully!") - else: - print("Failed to save the TFLite model.") \ No newline at end of file diff --git a/package/__init__.py b/package/__init__.py deleted file mode 100644 index 0399be3..0000000 --- a/package/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .core import AdaptiveModel -from .utils import load_tasks, create_task_encoder - -__all__ = ["AdaptiveModel", "load_tasks", "create_task_encoder"] - diff --git a/package/core.py b/package/core.py deleted file mode 100644 index 528a3d8..0000000 --- a/package/core.py +++ /dev/null @@ -1,31 +0,0 @@ -import tensorflow as tf -import numpy as np - -class AdaptiveModel: - def __init__(self, model_path): - self.model_path = model_path - self.interpreter = tf.lite.Interpreter(model_path=model_path) - self.interpreter.allocate_tensors() - - def predict_next_task(self, current_task, sensor_data, task_to_index, index_to_task, max_length): - input_details = self.interpreter.get_input_details() - output_details = self.interpreter.get_output_details() - - encoded_task = [task_to_index[current_task]] - padded_task = np.pad(encoded_task, (0, max_length - len(encoded_task)), constant_values=0).astype(np.float32).reshape(1, -1) - - sensor_features = np.array([ - sensor_data["time_elapsed"], - sensor_data["distance_to_target"], - sensor_data["gyro_angle"], - sensor_data["battery_level"] - ], dtype=np.float32).reshape(1, -1) - - self.interpreter.set_tensor(input_details[0]['index'], padded_task) - if len(input_details) > 1: - self.interpreter.set_tensor(input_details[1]['index'], sensor_features) - - self.interpreter.invoke() - output = self.interpreter.get_tensor(output_details[0]['index'])[0] - predicted_index = np.argmax(output) - return index_to_task[predicted_index], output \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 4139931..7874ebe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,6 @@ -tensorflow -numpy -scikit-learn \ No newline at end of file +tensorflow~=2.18.0 +numpy~=2.0.2 +scikit-learn +colorama~=0.4.6 +setuptools~=75.5.0 +atem>=1.2.4 diff --git a/setup.py b/setup.py index 22f449b..35395bc 100644 --- a/setup.py +++ b/setup.py @@ -1,20 +1,19 @@ from setuptools import setup, find_packages -import os -# Read the README.md file -with open("README.md", "r", encoding="utf-8") as fh: - long_description = fh.read() +# Read the long description from README.md +with open("README.md", "r", encoding="utf-8") as f: + long_description = f.read() setup( - name="atem_core", - version="1.2.1", + name="atem", + version="1.2.5", author="Torin Etheridge", author_email="torinriley220@gmail.com", - description="A Python package for adaptive task execution and machine learning integration.", + description="Adaptive task execution and machine learning package.", long_description=long_description, long_description_content_type="text/markdown", - url="https://github.com/CapitalRobotics/ATEM.git", - packages=find_packages(include=["package", "package.*"]), + url="https://github.com/CapitalRobotics/ATEM", + packages=find_packages(include=["atem", "atem.*"]), include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", @@ -25,10 +24,6 @@ install_requires=[ "tensorflow>=2.0.0", "numpy>=1.18.0", + "colorama>=0.4.6" ], - entry_points={ - "console_scripts": [ - "atem-interpreter=package.interpreter:main", - ], - }, ) \ No newline at end of file