diff --git a/README.md b/README.md index 212d6a8..fc56e4e 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,7 @@ [![version](https://img.shields.io/badge/version-0.0.1-orange.svg)]() [![Python 3.9](https://img.shields.io/badge/python-3.9-blue.svg)](https://www.python.org/downloads/release/python-360/) ![CUDA 11.7.1](https://img.shields.io/badge/CUDA-11.7.1-green.svg) +![Docker Compose Version](https://img.shields.io/badge/docker--compose-2.17.0-blue.svg) EmbeddingStudio is an innovative open-source framework designed to seamlessly convert a combined "Embedding Model + Vector DB" into a comprehensive search engine. With built-in functionalities for @@ -31,18 +32,39 @@ EmbeddingStudio is highly customizable, so you can bring your own: ## Getting Started +### Requirements + +* πŸ–₯️ Nvidia GPU with 8GB+ of VRAM +* πŸ’Ύ 10GB+ of RAM +* πŸ’½ 75GB+ of free disk space +* πŸ”₯ CUDA 11.7.1 +* 🐍 Python 3.9 +* 🐳 docker-compose 2.17.0 + +The most suitable EC2 instance is [us-west-2: g3s.xlarge](https://instances.vantage.sh/aws/ec2/g3s.xlarge?region=us-west-2&selected=c4.2xlarge%2Cp2.xlarge%2Cg5.2xlarge%2Cg5.16xlarge&os=linux&cost_duration=hourly&reserved_term=Standard.noUpfront). + ### Hello, Unstructured World! To try out EmbeddingStudio, you can launch the pre-configured demonstration project. We've prepared a dataset stored in a public S3 bucket, an emulator for user clicks, and a basic script for fine-tuning the model. By adapting it to your requirements, you can initiate fine-tuning for your model. + +Ensure that you have the `docker compose version` command working on your system: +```bash +Docker Compose version v2.23.3 +``` +You can also try the docker-compose version command. Moving forward, we will use the newer docker compose version command, +but the docker-compose version command may also work successfully on your system. + Firstly, bring up all the EmbeddingStudio services by executing the following command: ```shell docker compose up -d ``` +Warning: EmbeddingStudio is run upon docker-compose v2.17.0, installation manual you can find [here](https://docs.docker.com/compose/install/linux/). + Upon building and starting, the following services will be launched: 1. **embedding_studio**: The primary service accessible at http://localhost:5000, responsible for the core engine @@ -169,6 +191,29 @@ Epoch 2: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 13/13 [01:17<00:00, 0.17it/s, v_n **Congratulations! You've successfully improved the model!** +#### How to get best model + +To download the best model you can use EmbeddingStudio API: +```bash +curl -X GET http://localhost:5000/api/v1/fine-tuning/task/65844c019fa7cf0957d04758 +``` + +If everything is Ok, you will see following output: +```json +{ + "fine_tuning_method": "Default Fine Tuning Method", + "status": "done", + "best_model_url": "http://localhost:5001/get-artifact?path=model%2Fdata%2Fmodel.pth&run_uuid=571304f0c330448aa8cbce831944cfdd", + ... +} +``` +And `best_model_url` field contains HTTP accessible `model.pth` file. + +You can download *.pth file by executing following command: +```bash +wget http://localhost:5001/get-artifact?path=model%2Fdata%2Fmodel.pth&run_uuid=571304f0c330448aa8cbce831944cfdd +``` + ### Advanced While we've successfully run the demo project, you'll likely want to run EmbeddingStudio on your own model. For this, @@ -253,7 +298,6 @@ To do this, you need to build an image with your plugin and start the worker. Yo ```shell docker compose build --no-cache fine_tuning_worker ``` - and ```shell @@ -529,6 +573,133 @@ and ordering of items within user sessions. scientists in optimizing their models for better performance in real-world scenarios. +## Iteration emulator + +In the section [Hello, unstructured World!](#hello-unstructured-world-) there are two simple emulation steps, +just to test that EmbeddingStudio is build and running well. But you also can test EmbeddingStudio on +an emulated dataset to check algorithmic correctness. We separated these emulations, because of time limitations. + +Worth to mention: by running a full iteration emulator, initial stage of fine-tuning can take hours. + +### What's the difference with [Hello, unstructured World!](#hello-unstructured-world-) + +Section [Hello, unstructured World!](#hello-unstructured-world-) serves a purpose of a simple and quick demo, +just to check that everything is running ok. The very next step is to actually check +whether the service can really improve embedding model. Actual fine-tuning step, especially initial stage is quite long +and can take hours. + +### What does `emulation` mean here + +It's all simple: +1. We picked the easiest domain and the easiest dataset ([Remote landscapes](https://huggingface.co/datasets/EmbeddingStudio/merged_remote_landscapes_v1)), +so we definitely can show positive results in the demo; +2. We generated related text queries using GPT3.5; +3. And for each generated text query we emulated search sessions and user clicks (with some probability of a mistake); +4. All data we put into public to read AWS S3 bucket; + +More about actual emulation you can [find here](./examples/demo/iteration_emulator.py). + +### Emulated data + +#### Dataset + +As I mentioned before for the demo we use **the easiest domain and dataset as we can** +- a merged version of following datasets: *torchgeo/ucmerced*, *NWPU-RESISC45*. + +This is a union of categories from original datasets: *agricultural, airplane, airport, baseball diamond, basketball +court, +beach, bridge, buildings, chaparral, church, circular farmland, cloud, commercial area, desert, forest, freeway, golf +course, +ground track field, harbor, industrial area, intersection, island, lake, meadow, mountain, overpass, palace, parking +lot, +railway, railway station, rectangular farmland, residential, river, roundabout, runway, sea ice, ship, snowberg, +stadium, +storage tanks, tennis court, terrace, thermal power station, wetland*. + +More information available on our [HuggingFace page](https://huggingface.co/datasets/EmbeddingStudio/merged_remote_landscapes_v1). + +Warning: Synonymous and ambiguous categories were combined (see "Merge method"). + +For being easily used for the demo we put all items of this dataset into public for reading AWS S3 Bucket: +* Region name: us-west-2 +* Bucket name: embedding-studio-experiments +* Path to items: remote-lanscapes/items/ + +#### Clickstream + +We pre-generated a batch of clickstream sessions. To check the algorithm of generation, +please visit our [experiments repo](https://github.com/EulerSearch/embedding_studio_experiments/blob/main). + +Briefly, the generation method is: +1. For each category were generated up to 20 queries using GPT-3.5. +2. Using VIT-B-32 OpenAI CLIP, and Faiss.FlatIndexIP for each query were emulated search sessions. +3. And then for each search session out of each positive (related to a category of a given query) we pick random set +as clicks with some probability of a mistake. + +Params of emulation: +* A count of search results; +* A range of random picked positives; +* A probability of a mistake; + + +We put the result of generation into the public reading-available S3 repository: +* Region name: us-west-2 +* Bucket name: embedding-studio-experiments +* Path to items: remote-lanscapes/clickstream +* A result of generation with different conditions were packed into different folders +* Generation params are available by path: remote-lanscapes/clickstream/{generation-id}/conditions.json +* Generated clickstreams are available by path: remote-lanscapes/clickstream/{generation-id}/sessions.json + +### How to start + +Once you've started EmbeddingStudio locally by executing: +```shell +docker compose up -d +``` + +To run full iteration you can execute following command: +```shell +docker compose --profile demo_stage_full_iteration up -d +``` + +It's also beneficial to check the logs of the `fine_tuning_worker` to ensure everything is functioning correctly. To do +this, list all services using the command: + +```shell +docker ps +``` + +You'll see output similar to: +```shell +CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES +ad3a8321e637 embedding_studio-iteration_emulator "python demo/iterati…" 25 seconds ago Up 1 second embedding_studio-iteration_emulator-1 +665eef2e757d embedding_studio-mlflow "mlflow server --bac…" 3 hours ago Up 3 hours 0.0.0.0:5001->5001/tcp embedding_studio-mlflow-1 +65043da928d4 embedding_studio-fine_tuning_worker "dramatiq embedding_…" 3 hours ago Up 3 hours embedding_studio-fine_tuning_worker-1 +c930d9ca07c0 embedding_studio-embedding_studio "uvicorn embedding_s…" 3 hours ago Up 3 hours (healthy) 0.0.0.0:5000->5000/tcp embedding_studio-embedding_studio-1 +5e799aaaf17b redis:6.2-alpine "docker-entrypoint.s…" 3 hours ago Up 3 hours (healthy) 0.0.0.0:6379->6379/tcp embedding_studio-redis-1 +ba608b022828 bitnami/minio:2023 "/opt/bitnami/script…" 3 hours ago Up 3 hours (healthy) 0.0.0.0:9000-9001->9000-9001/tcp embedding_studio-minio-1 +914cb70ed622 mysql/mysql-server:5.7.28 "/entrypoint.sh mysq…" 3 hours ago Up 3 hours (healthy) 0.0.0.0:3306->3306/tcp, 33060/tcp embedding_studio-mlflow_db-1 +493c45f880c0 mongo:4 "docker-entrypoint.s…" 3 hours ago Up 3 hours (healthy) 0.0.0.0:27017->27017/tcp embedding_studio-mongo-1 +``` + +From here, you can access logs for the specific service using its `CONTAINER ID` or `NAME`, e.g., `65043da928d4` or +`embedding_studio-fine_tuning_worker-1`, for details check [here](#hello-unstructured-world-). + +You can check emulator log by executing: +```shell +docker logs embedding_studio-iteration_emulator-1 +``` + +If everything completes successfully, you'll see logs similar to: + +```shell +Download emulated clickstream sessions from S3 Bucket: embedding-studio-experiments by path remote-lanscapes/clickstream/f6816566-cac3-46ac-b5e4-0d5b76757c93/sessions.json +No specific AWS credentials, use Anonymous session +Downloaded 683 emulated clickstream sessions +Use 600 of 683 for emulation +100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 600/600 [00:05<00:00, 119.88it/s] +``` + ## Plugins EmbeddingStudio supports plugins for fine-tuning models. A plugin is a script that inherits from the @@ -662,7 +833,7 @@ def get_fine_tuning_builder( fine_tuning_settings=self.settings, initial_params=self.initial_params, ranking_data=ranking_dataset, - initial_max_evals=5, + initial_max_evals=2, ) return fine_tuning_builder ``` diff --git a/docker-compose.yml b/docker-compose.yml index 5ec83fc..0ba5d72 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -29,6 +29,8 @@ services: build: context: . dockerfile: worker.fine_tuning.Dockerfile + environment: + - NVIDIA_VISIBLE_DEVICES=all restart: always env_file: - .env @@ -40,6 +42,13 @@ services: networks: - internal - public + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [ gpu ] redis: image: redis:6.2-alpine @@ -183,6 +192,27 @@ services: profiles: - demo_stage_finetuning + iteration_emulator: + build: + context: . + dockerfile_inline: | + FROM python:3.9 + RUN pip install boto3 tqdm requests mlflow + COPY ./examples/ /app + WORKDIR /app + ENV PYTHONPATH="." + CMD ["python", "demo/iteration_emulator.py", "-e", "http://embedding_studio:5000", "-m", "http://mlflow:5001"] + environment: + - ES_URL=http://embedding_studio:5000 + - MLFLOW_TRACKING_URI=http://mlflow:5001 + depends_on: + embedding_studio: + condition: service_healthy + networks: + - internal + profiles: + - demo_stage_full_iteration + networks: internal: public: diff --git a/embedding_studio/workers/fine_tuning/experiments/experiments_tracker.py b/embedding_studio/workers/fine_tuning/experiments/experiments_tracker.py index 505eb6a..a9ced14 100644 --- a/embedding_studio/workers/fine_tuning/experiments/experiments_tracker.py +++ b/embedding_studio/workers/fine_tuning/experiments/experiments_tracker.py @@ -107,6 +107,8 @@ def __init__( ) mlflow.set_tracking_uri(tracking_uri) self._tracking_uri = tracking_uri + if self._tracking_uri.endswith("/"): + self._tracking_uri = self._tracking_uri[:-1] self.retry_config = ( retry_config @@ -217,7 +219,7 @@ def is_retryable_error(self, e: Exception) -> bool: return False def _get_model_exists_filter(self) -> str: - return "params.model_uploaded = '1' AND params.model_deleted != '1'" + return "metrics.model_uploaded = 1" def _get_artifact_url(self, run_id: str, artifact_path: str) -> str: return ( @@ -371,10 +373,23 @@ def _get_best_previous_run_id(self) -> Tuple[Optional[str], bool]: ) last_session_id: Optional[str] = self.get_previous_iteration_id() if initial_id == last_session_id or last_session_id is None: - return None, False + return None, True else: run_id, _ = self._get_best_quality(last_session_id) - return run_id, True + return run_id, False + + def _get_best_current_run_id(self) -> Tuple[Optional[str], bool]: + initial_id: Optional[str] = get_experiment_id_by_name( + INITIAL_EXPERIMENT_NAME + ) + if ( + initial_id == self._tuning_iteration_id + or self._tuning_iteration_id is None + ): + return None, True + else: + run_id, _ = self._get_best_quality(self._tuning_iteration_id) + return run_id, False @retry_method(name="load_model") def get_last_model_url(self) -> Optional[str]: @@ -387,13 +402,31 @@ def get_last_model_url(self) -> Optional[str]: else: if run_id is None: logger.warning( - "Can't get the best model URL, no previous iteration's " + "Can't get the best model URL, no previous iterations " "finished runs with uploaded model in history" ) return None path = MODEL_ARTIFACT_PATH return self._get_artifact_url(run_id, path) + @retry_method(name="load_model") + def get_current_model_url(self) -> Optional[str]: + run_id, is_initial = self._get_best_current_run_id() + if is_initial: + logger.warning( + "Can't get the best model URL, current run is initial" + ) + return None + + if run_id is None: + logger.warning( + "Can't get the best model URL, no iterations " + "finished runs with uploaded model in history" + ) + return None + path = MODEL_ARTIFACT_PATH + return self._get_artifact_url(run_id, path) + @retry_method(name="load_model") def get_last_model(self) -> EmbeddingsModelInterface: """Get previous iteration best embedding model. @@ -421,8 +454,38 @@ def get_last_model(self) -> EmbeddingsModelInterface: logger.info("Downloading is finished") return model + @retry_method(name="load_model") + def get_current_model(self) -> Optional[EmbeddingsModelInterface]: + """Get current iteration best embedding model. + + :return: best embedding model + """ + if self._tuning_iteration is None: + logger.error("No current iteration, can't get any model") + return + + if self._tuning_iteration == INITIAL_EXPERIMENT_NAME: + logger.info("Download initial model") + return self.download_initial_model() + + run_id, is_initial = self._get_best_current_run_id() + model_uri: str = f"runs:/{run_id}/model" + logger.info(f"Download the model from {model_uri}") + model = mlflow.pytorch.load_model(model_uri) + logger.info("Downloading is finished") + return model + @retry_method(name="search_experiments") def get_previous_iteration_id(self) -> Optional[str]: + if ( + self._tuning_iteration == INITIAL_EXPERIMENT_NAME + or self._tuning_iteration is None + ): + logger.warning( + f"Can't find previous iteration - no current iteration was setup" + ) + return None + plugin_name = f"{self._tuning_iteration.plugin_name}" experiments: List[Experiment] = [ e @@ -430,6 +493,7 @@ def get_previous_iteration_id(self) -> Optional[str]: if ( e.name.startswith(EXPERIMENT_PREFIX) and e.name.find(plugin_name) != -1 + and e.name != str(self._tuning_iteration) ) ] if len(experiments) == 0: @@ -529,6 +593,8 @@ def set_run(self, params: FineTuningParams) -> bool: for key, value in dict(self._run_params).items(): mlflow.log_param(key, convert_value(value)) + mlflow.log_metric("model_uploaded", 0) + return False else: return self._run.info.status == "FINISHED" @@ -586,7 +652,8 @@ def _set_model_as_deleted(self, run_id: str, experiment_id: str): with mlflow.start_run( run_id=run_id, experiment_id=experiment_id ) as run: - mlflow.log_param("model_deleted", "1") + mlflow.log_metric("model_deleted", 1) + mlflow.log_metric("model_uploaded", 0) @retry_method(name="delete_model") def _delete_model(self, run_id: str, experiment_id: str) -> bool: @@ -663,7 +730,7 @@ def save_model( mlflow.pytorch.log_model( model, "model", pip_requirements=self._requirements ) - mlflow.log_param("model_uploaded", 1) + mlflow.log_metric("model_uploaded", 1) logger.info("Upload is finished") else: current_quality = self.get_quality() @@ -677,7 +744,7 @@ def save_model( mlflow.pytorch.log_model( model, "model", pip_requirements=self._requirements ) - mlflow.log_param("model_uploaded", 1) + mlflow.log_metric("model_uploaded", 1) logger.info("Upload is finished") if best_run_id is not None: diff --git a/embedding_studio/workers/fine_tuning/experiments/finetuning_params.py b/embedding_studio/workers/fine_tuning/experiments/finetuning_params.py index dc26d13..80def76 100644 --- a/embedding_studio/workers/fine_tuning/experiments/finetuning_params.py +++ b/embedding_studio/workers/fine_tuning/experiments/finetuning_params.py @@ -115,7 +115,7 @@ def id(self) -> str: def __str__(self) -> str: vals: List[str] = [] - for key, value in dict(self).items(): + for key, value in sorted(dict(self).items()): value = ( ",".join(map(str, value)) if isinstance(value, list) else value ) diff --git a/embedding_studio/workers/fine_tuning/finetune_embedding.py b/embedding_studio/workers/fine_tuning/finetune_embedding.py index 7ccfe5e..cab3fd1 100644 --- a/embedding_studio/workers/fine_tuning/finetune_embedding.py +++ b/embedding_studio/workers/fine_tuning/finetune_embedding.py @@ -136,7 +136,7 @@ def finetune_embedding_model( logger.info( f"Start hyper parameters optimization process (max evals: {initial_max_evals})" ) - best = fmin( + _ = fmin( lambda params: _finetune_embedding_model_one_step_hyperopt( initial_model_path, settings, diff --git a/embedding_studio/workers/fine_tuning/worker.py b/embedding_studio/workers/fine_tuning/worker.py index 7b6673a..551f485 100644 --- a/embedding_studio/workers/fine_tuning/worker.py +++ b/embedding_studio/workers/fine_tuning/worker.py @@ -100,12 +100,13 @@ def fine_tuning_worker(task_id: str): logger.info( "Fine tuning of the embedding model was completed successfully!" ) - - best_model_url = builder.experiments_manager.get_last_model_url() + builder.experiments_manager.set_iteration(iteration) + best_model_url = builder.experiments_manager.get_current_model_url() logger.info( f"You can download best model using this url: {best_model_url}" ) task.best_model_url = best_model_url + builder.experiments_manager.finish_iteration() except Exception: try: diff --git a/examples/utils/__init__.py b/examples/__init__.py similarity index 100% rename from examples/utils/__init__.py rename to examples/__init__.py diff --git a/examples/demo/__init__.py b/examples/demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/iteration_emulator.py b/examples/demo/iteration_emulator.py similarity index 95% rename from examples/iteration_emulator.py rename to examples/demo/iteration_emulator.py index b27d27f..da9537a 100644 --- a/examples/iteration_emulator.py +++ b/examples/demo/iteration_emulator.py @@ -9,14 +9,14 @@ from botocore.client import Config from tqdm.auto import tqdm -from .utils.api_utils import ( +from demo.utils.api_utils import ( create_session_and_push_events, get_task_status, release_batch, start_fine_tuning, ) -from .utils.aws_utils import download_s3_object_to_memory -from .utils.mlflow_utils import get_mlflow_results_url +from demo.utils.aws_utils import download_s3_object_to_memory +from demo.utils.mlflow_utils import get_mlflow_results_url DESCRIPTION = """ @@ -108,7 +108,7 @@ def emulate_clickstream( connection_url: str, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, - test_sessions_count: int = 50, + test_sessions_count: int = 600, ): """This is a function to run the order of http-requests to the clisktream storage service. @@ -194,7 +194,15 @@ def emulate_fine_tuning(connection_url: str, mlflow_url: str): print("Unable to run fine-tuning") return - experiment_id = get_mlflow_results_url(mlflow_url, batch_id) + experiment_id = None + for attempt in range(100): + experiment_id = get_mlflow_results_url(mlflow_url, batch_id) + if experiment_id is None: + time.sleep(60) + + else: + break + if experiment_id is None: print( "Something went wrong with experiments tracking system, please check logs" @@ -221,7 +229,7 @@ def parse_args(): DEFAULT_AWS_SECRET_ACCESS_KEY = None DEFAULT_EMBEDDING_STUDIO_URL = "http://localhost:5000" DEFAULT_MLFLOW_URL = "http://localhost:5005" - DEFAULT_TEST_SESSIONS_COUNT = 50 + DEFAULT_TEST_SESSIONS_COUNT = 600 parser = argparse.ArgumentParser( description="EmbeddingStudio test script: clickstream and fine-tuning emulation" diff --git a/examples/demo/utils/__init__.py b/examples/demo/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/utils/api_utils.py b/examples/demo/utils/api_utils.py similarity index 96% rename from examples/utils/api_utils.py rename to examples/demo/utils/api_utils.py index 5ab1191..12d0d52 100644 --- a/examples/utils/api_utils.py +++ b/examples/demo/utils/api_utils.py @@ -4,7 +4,7 @@ import requests -from .constants import DEFAULT_FINE_TUNING_METHOD_NAME +from demo.utils.constants import DEFAULT_FINE_TUNING_METHOD_NAME # TODO: Implement separate Python API Client @@ -75,7 +75,7 @@ def create_session_and_push_events( elif len(events) > 0: events_resp = requests.post( - f"{connection_url}/api/v1/clickstream/push_events", + f"{connection_url}/api/v1/clickstream/session/events", json={"session_id": session_id, "events": events}, ) diff --git a/examples/utils/aws_utils.py b/examples/demo/utils/aws_utils.py similarity index 100% rename from examples/utils/aws_utils.py rename to examples/demo/utils/aws_utils.py diff --git a/examples/utils/constants.py b/examples/demo/utils/constants.py similarity index 100% rename from examples/utils/constants.py rename to examples/demo/utils/constants.py diff --git a/examples/utils/mlflow_utils.py b/examples/demo/utils/mlflow_utils.py similarity index 97% rename from examples/utils/mlflow_utils.py rename to examples/demo/utils/mlflow_utils.py index 6fa44b1..ce776d1 100644 --- a/examples/utils/mlflow_utils.py +++ b/examples/demo/utils/mlflow_utils.py @@ -3,7 +3,7 @@ from typing import Optional -from .constants import ( +from demo.utils.constants import ( DEFAULT_FINE_TUNING_METHOD_NAME ) diff --git a/plugins/default_fine_tuning_method.py b/plugins/default_fine_tuning_method.py index f1c2b0b..8ed1b15 100644 --- a/plugins/default_fine_tuning_method.py +++ b/plugins/default_fine_tuning_method.py @@ -149,6 +149,6 @@ def get_fine_tuning_builder( fine_tuning_settings=self.settings, initial_params=self.initial_params, ranking_data=ranking_dataset, - initial_max_evals=5, + initial_max_evals=2, ) return fine_tuning_builder