From c50e4936f39cbefdc6abdc282f8ee8163417ed01 Mon Sep 17 00:00:00 2001 From: danellecline Date: Tue, 7 May 2024 18:02:04 -0700 Subject: [PATCH] feat: added shutdown command and bump strongsort to latest release 1.10.7 --- deepsea_ai/__main__.py | 13 ++- deepsea_ai/commands/ecsshutdown.py | 179 +++++++++++++++++++++++++++++ deepsea_ai/config/config.ini | 2 +- docs/commands/ecsshutdown.md | 44 +++++++ docs/index.md | 1 + 5 files changed, 237 insertions(+), 2 deletions(-) create mode 100644 deepsea_ai/commands/ecsshutdown.py create mode 100644 docs/commands/ecsshutdown.md diff --git a/deepsea_ai/__main__.py b/deepsea_ai/__main__.py index e763635..da921d1 100644 --- a/deepsea_ai/__main__.py +++ b/deepsea_ai/__main__.py @@ -11,7 +11,7 @@ from urllib.parse import urlparse from deepsea_ai.config.config import Config -from deepsea_ai.commands import upload_tag, process, train, bucket, monitor +from deepsea_ai.commands import upload_tag, process, train, bucket, monitor, ecsshutdown from deepsea_ai.config import config as cfg from deepsea_ai.config import setup from deepsea_ai.database.job.database import Job, init_db @@ -154,6 +154,17 @@ def ecs_process(config, upload, clean, cluster, job, input, exclude, dry_run, ar info(f'==== Submitted {total_submitted} videos to {processor} for processing =====') +@cli.command(name="ecsshutdown") +@common_args.cluster_option +@cfg_option +def ecs_shutdown(config, cluster): + """ + Shutdown the ECS cluster, stopping all running tasks, services, and instances and removing any proccessed tracks + """ + custom_config = init(log_prefix="dsai_ecsshutdown", config=config) + resources = custom_config.get_resources(cluster) + ecsshutdown.ecsshutdown(resources, cluster) + @cli.command(name="process") # this might be cleaner as an enum but click does not support that fully yet diff --git a/deepsea_ai/commands/ecsshutdown.py b/deepsea_ai/commands/ecsshutdown.py new file mode 100644 index 0000000..cae5179 --- /dev/null +++ b/deepsea_ai/commands/ecsshutdown.py @@ -0,0 +1,179 @@ +# deepsea-ai, Apache-2.0 license +# Filename: commands/ecsshutdown.py +# Description: ECS cluster shutdown + +import boto3 +import time +import json +from deepsea_ai.config.config import Config +from deepsea_ai.logger import info, debug, err, create_logger_file + + +def ecsshutdown(resources: dict, cluster: str): + """ + Shutdown the ECS cluster. This function stops all running services, tasks, and container instances in the ECS cluster. + Any completed video track results are also removed from S3 and auto-scaling is disabled for the ECS service + temporarily to prevent new tasks from starting for 5 minutes. + """ + autoscaling = boto3.client('autoscaling') + sqs = boto3.client('sqs') + s3 = boto3.client('s3') + ecs = boto3.client('ecs') + + # Get the cluster arn + response = ecs.list_clusters() + for c in response['clusterArns']: + print(c) + if cluster in c: + cluster_arn = c + info(f"Found cluster {cluster} ARN {cluster_arn}") + break + + if cluster_arn is None: + err(f"Cluster {cluster} not found") + return + + # Stop the auto-scaling of the ECS service + response = ecs.describe_capacity_providers() + capacity_providers = response['capacityProviders'] + + # Stop the auto-scaling of the ECS service by disabling the capacity provider + for cp in capacity_providers: + if cp['name'].split('-')[0] == cluster: + autoScalingGroupName = cp['autoScalingGroupProvider']['autoScalingGroupArn'].split('/')[-1] + # Capture the max size of the capacity provider to reset it later + asg = autoscaling.describe_auto_scaling_groups( + AutoScalingGroupNames=[autoScalingGroupName] + ) + max_size = max(asg['AutoScalingGroups'][0]['MaxSize'], 6) + response = autoscaling.update_auto_scaling_group( + AutoScalingGroupName=autoScalingGroupName, + MinSize=0, + MaxSize=0, + ) + debug(response) + info(f"Autoscaling disabled for capacity provider {cp['name']}") + break + + # Stop any running services in the cluster + response = ecs.list_services(cluster=cluster_arn) + services = response['serviceArns'] + for service in services: + response = ecs.update_service( + cluster=cluster_arn, + service=service, + desiredCount=0 + ) + info(f"Stopped service {service}") + + # Stop any container instances in the cluster + response = ecs.list_container_instances(cluster=cluster_arn) + container_instances = response['containerInstanceArns'] + for container_instance in container_instances: + response = ecs.update_container_instances_state( + cluster=cluster_arn, + containerInstances=[container_instance], + status='DRAINING' + ) + info(f"Stopped container instance {container_instance}") + + # Stop any running tasks in the cluster + response = ecs.list_tasks(cluster=cluster_arn) + tasks = response['taskArns'] + for task in tasks: + response = ecs.stop_task(cluster=cluster_arn, task=task) + debug(response) + info(f"Stopped task {task}") + + # Get the messages in the TRACK_QUEUE and delete them + # These are completed video track messages, so capture the locations of the completed track files and + # remove the track results from S3 + def receive_track_messages(): + response = sqs.receive_message(QueueUrl=resources['TRACK_QUEUE'], + AttributeNames=['All'], + MessageAttributeNames=['All']) + + if 'Messages' in response: + messages = response['Messages'] + print(f"Received {len(messages)} messages from {resources['TRACK_QUEUE']}") + + for message in messages: + debug(f"Message ID: {message['MessageId']}") + debug(f"Message Body: {message['Body']}") + debug(f"Message Attributes: {message.get('MessageAttributes', '')}") + debug("--------------------") + message_body = message['Body'] + message_dict = json.loads(message_body) + if message_dict['status'] == 'SUCCESS': + # Get the track file location and remove the file from S3 + track_file = message_dict['results'] + info(f"Removing track file {track_file}") + s3.delete_object(Bucket=resources['TRACK_BUCKET'], Key=track_file) + # Update the message with messageId as FAILED, send to DEAD_QUEUE and delete from TRACK_QUEUE + message_dict['status'] = 'FAILED' + sqs.send_message(QueueUrl=resources['DEAD_QUEUE'], + MessageBody=json.dumps(message_dict, indent=4), + MessageGroupId=message['Attributes']['MessageGroupId']) + sqs.delete_message(QueueUrl=resources['TRACK_QUEUE'], + ReceiptHandle=message['ReceiptHandle']) + else: + info(f"No messages available in {resources['TRACK_QUEUE']}. Track clean-up and shutdown complete") + + # If there are more messages, continue to receive + if 'Messages' in response and len(response['Messages']) == 10: + receive_track_messages() + + receive_track_messages() + + def receive_video_messages(): + response = sqs.receive_message(QueueUrl=resources['VIDEO_QUEUE'], + AttributeNames=['All'], + MessageAttributeNames=['All']) + + if 'Messages' in response: + messages = response['Messages'] + print(f"Received {len(messages)} messages") + + for message in messages: + # on failure push this message to the dead letter queue + message_body = message['Body'] + message_body = json.loads(message_body) + message_body["error"] = "ECS cluster shutdown" + message_body["status"] = "FAILED" + print(f'Sending message {message} to dead queue {resources["DEAD_QUEUE"]}') + response = sqs.send_message(QueueUrl=resources['DEAD_QUEUE'], + MessageBody=json.dumps(message_body, indent=4), + MessageGroupId=message['Attributes']['MessageGroupId']) + debug(response) + response = sqs.send_message(QueueUrl=resources['DEAD_QUEUE'], + MessageBody=json.dumps(message_body), + MessageGroupId=message['Attributes']['MessageGroupId']) + debug(response) + else: + info(f"No messages available in {resources['VIDEO_QUEUE']}." + f"No videos were being processed during shutdown.") + + receive_video_messages() + + info(f'Shutdown of ECS cluster {cluster} complete. Waiting 5 minutes before restarting autoscaling to reenable ') + time.sleep(300) + + info(f"Restarting autoscaling for capacity provider {cp['name']} to max size {max_size}") + response = autoscaling.update_auto_scaling_group( + AutoScalingGroupName=autoScalingGroupName, + MinSize=0, + MaxSize=max_size, + ) + debug(response) + info(f"Autoscaling enabled. Desired count set to {max_size}.") + info("ECS cluster shutdown complete for {cluster}") + + +if __name__ == '__main__': + from pathlib import Path + cluster = 'y5x315k' + create_logger_file(log_path=Path.cwd(), prefix="ecsshutdown") + default_config = Config() + resources = default_config.get_resources(cluster) + ecsshutdown(resources, cluster) + print(resources) diff --git a/deepsea_ai/config/config.ini b/deepsea_ai/config/config.ini index db96288..0facc9f 100644 --- a/deepsea_ai/config/config.ini +++ b/deepsea_ai/config/config.ini @@ -1,6 +1,6 @@ [docker] yolov5_container = mbari/deepsea-yolov5:1.1.2 -strongsort_container = mbari/strongsort-yolov5:1.10.0 +strongsort_container = mbari/strongsort-yolov5:1.10.7 [aws] account_id = 548531997526 diff --git a/docs/commands/ecsshutdown.md b/docs/commands/ecsshutdown.md new file mode 100644 index 0000000..3c51009 --- /dev/null +++ b/docs/commands/ecsshutdown.md @@ -0,0 +1,44 @@ +# Shutting down video processing in the ECS + +If you have deployed a model in an ECS and want to shut it down because you have made a mistake or want to save costs, +you can use the [ecsshutdown](/commands/ecsshutdown) command. This command will stop +all running services, tasks, and container instances in the ECS cluster. Any completed video track results are also +removed from S3 and auto-scaling is disabled for the ECS service temporarily to prevent new tasks from starting for 5 minutes. + +!!! note "About the ecsshutdown command" +This is not the same as deleting the stack, which will remove all resources including the ECS cluster, +rendering it unusable. This command is intended to temporarily stop processing and save costs or correct a mistake. + +## Examples + +To shutdown the ECS cluster called *benthic33k*: + +``` +deepsea-ai ecsshutdown --cluster benthic33k +``` + + +You should see the following response: + +``` + +2024-05-07 17:39:18,912 INFO Logging to /Users/dcline/Dropbox/code/deepsea-ai/deepsea_ai/commands/ecsshutdown_20240508.log +2024-05-07 17:39:18,912 INFO =============== Config file /Users/dcline/Dropbox/code/deepsea-ai/deepsea_ai/config/config.ini ================= + +... + +2024-05-07 17:39:19,729 INFO Found cluster benthic33k ARN arn:aws:ecs:us-west-2:168552337187:cluster/benthic33k-clustery5x315k4871A792-5nHEy3gwdISI +arn:aws:ecs:us-west-2:168552337187:cluster/benthic33k-clustery5x315k4871A792-5nHEy3gwdISI +2024-05-07 17:39:20,245 DEBUG {'ResponseMetadata': {'RequestId': 'e93a9198-e9ea-4928-989e-9587da64b898', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e93a9198-e9ea-4928-989e-9587da64b898', 'content-type': 'text/xml', 'content-length': '231', 'date': 'Wed, 08 May 2024 00:39:19 GMT'}, 'RetryAttempts': 0}} +2024-05-07 17:39:20,245 INFO Autoscaling disabled for capacity provider benthic33k-asgy5x315kcapacityprovider21F5C2F6-0yhqf6Ix2hIL +2024-05-07 17:39:20,525 INFO Stopped service arn:aws:ecs:us-west-2:168552337187:service/benthic33k-clustery5x315k4871A792-5nHEy3gwdISI/benthic33k-serviceobjectObjectService10D8676F-bc7xtyXkwlMH +2024-05-07 17:39:20,618 INFO Stopped container instance arn:aws:ecs:us-west-2:168552337187:container-instance/benthic33k-clustery5x315k4871A792-5nHEy3gwdISI/4d5c7a9d295c490a82a2ea22d1f9e0ae +2024-05-07 17:39:20,797 INFO No messages available in benthic33k-trackC8D9330E-wgSOCZUJzVC8.fifo. Track clean-up and shutdown complete +Received 1 messages +Sending message {'MessageId': 'a060e1ec-6f70-4c77-8c3d-fe41f725fd75', 'ReceiptHandle': 'AQEBA7BiOsLpJtuVYJmvVPP3LB1uw4PWVzEeDhVPFEccBWOXbNYQwNlpEW4GdcYMRhCVGRj05IKoyed+l/gyN8NdJ3lbcLWJLuPv16Yc5C4J7X8M4ximabObJTrPiAArDldeAAFG0A3qe9ITG+8mxQ9CniQVQcUU0XXy1tp4cE9Tt+odNn4wK8GTT+S9h4xdqCqtyAeZToobScRkyeVN2jADedD1edkINz2KbCsMsZc7+t4GV59x1QhVdHq0yvF6XZYcvhJW30pccyBNEEDrOm/KxWMppgEponAbgCAIUQ95KjgwNGX+8VXDuUz3GyACKsUN', 'MD5OfBody': 'ff91f22db46e165071cbea443f52c3bf', 'Body': '{\n "video": "dcline/Dropbox/code/deepsea-ai/tests/data/V4361_20211006T163856Z_h265_1sec.mp4",\n "clean": "True",\n "user_name": "dcline",\n "metadata_b64": "eyJtZXNzYWdlX3V1aWQiOiAiZWIxYjViYTgtNmMwOC00YTJiLWI5ZjEtMjIyYzViNTI1YzZlIn0=",\n "job_name": "Pycharm test yolov5x-mbay-benthic model from ecsprocess with args",\n "args": "--agnostic-nms --iou-thres=0.2 --conf-thres=0.001"\n}', 'Attributes': {'SenderId': 'AROAX7NYOBNLHGR6NT7GO:dcline@mbari.org', 'ApproximateFirstReceiveTimestamp': '1715128760831', 'ApproximateReceiveCount': '1', 'SentTimestamp': '1715128495537', 'SequenceNumber': '18885816968567023616', 'MessageDeduplicationId': '80132457153d857a101d4cedf5db1e710590b1f5552f2bff0e3add75a5c1a56c', 'MessageGroupId': '20240508T003455-V4361_20211006T163856Z_h265_1sec.mp4'}} to dead queue benthic33k-dead9A6F9BCE-P8fRoyrLgBjO.fifo +2024-05-07 17:39:20,870 DEBUG {'MD5OfMessageBody': '97db372cf9aee247cff4aba2c6e329a6', 'MessageId': 'a041e56c-0219-40ab-a4c0-21bba0c79348', 'SequenceNumber': '18885817036492015872', 'ResponseMetadata': {'RequestId': '3e5ae064-7c86-5804-ad57-bed0b77efd34', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '3e5ae064-7c86-5804-ad57-bed0b77efd34', 'date': 'Wed, 08 May 2024 00:39:20 GMT', 'content-type': 'text/xml', 'content-length': '431', 'connection': 'keep-alive'}, 'RetryAttempts': 0}} +2024-05-07 17:39:20,902 DEBUG {'MD5OfMessageBody': '648a41216191ffbcb6b794f70a69d921', 'MessageId': 'd41b86af-70e9-4f32-b39f-a69e91123372', 'SequenceNumber': '18885817036500463616', 'ResponseMetadata': {'RequestId': '1c8b8d46-02a5-539e-9af4-d3f2c95db6ae', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '1c8b8d46-02a5-539e-9af4-d3f2c95db6ae', 'date': 'Wed, 08 May 2024 00:39:20 GMT', 'content-type': 'text/xml', 'content-length': '431', 'connection': 'keep-alive'}, 'RetryAttempts': 0}} +2024-05-07 17:39:20,902 INFO No messages available in benthic33k-videoC740E53C-qG4Vah2lWyhV.fifo. Video shutdown complete. No videos were being processed during shutdown. +2024-05-07 17:39:20,902 INFO Shutdown of ECS cluster benthic33k complete. Waiting 5 minutes before restarting autoscaling to reenable + +``` \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index 7f53a80..441b20d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -75,6 +75,7 @@ jupyter notebook * [`deepsea-ai train --help` - Train a YOLOv5 model and save the model to a bucket](commands/train.md) * [`deepsea-ai process --help` - Process one or more videos and save the results to a bucket](commands/process.md) * [`deepsea-ai ecsprocess --help` - Process one or more videos using the Elastic Container Service and save the results to a bucket](commands/process.md) +* [`deepsea-ai ecsshutdown --help` - Shutdown videos procesing in the Elastic Container Service](commands/ecsshutdown.md) * [`deepsea-ai split --help` - Split your training data. This is required before the train command.](data.md) * [`deepsea-ai monitor --help` - Monitor processing. Use this after the ecsprocess train command.](commands/monitor.md) * `deepsea-ai -h` - Print help message and exit.