Skip to content

Commit

Permalink
Merge pull request #35 from mbari-org/feat/ecsshutdown
Browse files Browse the repository at this point in the history
ecsshutdown command and docs
  • Loading branch information
danellecline authored May 8, 2024
2 parents 02a879b + c50e493 commit 52fa2ab
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 2 deletions.
13 changes: 12 additions & 1 deletion deepsea_ai/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from urllib.parse import urlparse

from deepsea_ai.config.config import Config
from deepsea_ai.commands import upload_tag, process, train, bucket, monitor
from deepsea_ai.commands import upload_tag, process, train, bucket, monitor, ecsshutdown
from deepsea_ai.config import config as cfg
from deepsea_ai.config import setup
from deepsea_ai.database.job.database import Job, init_db
Expand Down Expand Up @@ -154,6 +154,17 @@ def ecs_process(config, upload, clean, cluster, job, input, exclude, dry_run, ar

info(f'==== Submitted {total_submitted} videos to {processor} for processing =====')

@cli.command(name="ecsshutdown")
@common_args.cluster_option
@cfg_option
def ecs_shutdown(config, cluster):
"""
Shutdown the ECS cluster, stopping all running tasks, services, and instances and removing any proccessed tracks
"""
custom_config = init(log_prefix="dsai_ecsshutdown", config=config)
resources = custom_config.get_resources(cluster)
ecsshutdown.ecsshutdown(resources, cluster)


@cli.command(name="process")
# this might be cleaner as an enum but click does not support that fully yet
Expand Down
179 changes: 179 additions & 0 deletions deepsea_ai/commands/ecsshutdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# deepsea-ai, Apache-2.0 license
# Filename: commands/ecsshutdown.py
# Description: ECS cluster shutdown

import boto3
import time
import json
from deepsea_ai.config.config import Config
from deepsea_ai.logger import info, debug, err, create_logger_file


def ecsshutdown(resources: dict, cluster: str):
"""
Shutdown the ECS cluster. This function stops all running services, tasks, and container instances in the ECS cluster.
Any completed video track results are also removed from S3 and auto-scaling is disabled for the ECS service
temporarily to prevent new tasks from starting for 5 minutes.
"""
autoscaling = boto3.client('autoscaling')
sqs = boto3.client('sqs')
s3 = boto3.client('s3')
ecs = boto3.client('ecs')

# Get the cluster arn
response = ecs.list_clusters()
for c in response['clusterArns']:
print(c)
if cluster in c:
cluster_arn = c
info(f"Found cluster {cluster} ARN {cluster_arn}")
break

if cluster_arn is None:
err(f"Cluster {cluster} not found")
return

# Stop the auto-scaling of the ECS service
response = ecs.describe_capacity_providers()
capacity_providers = response['capacityProviders']

# Stop the auto-scaling of the ECS service by disabling the capacity provider
for cp in capacity_providers:
if cp['name'].split('-')[0] == cluster:
autoScalingGroupName = cp['autoScalingGroupProvider']['autoScalingGroupArn'].split('/')[-1]
# Capture the max size of the capacity provider to reset it later
asg = autoscaling.describe_auto_scaling_groups(
AutoScalingGroupNames=[autoScalingGroupName]
)
max_size = max(asg['AutoScalingGroups'][0]['MaxSize'], 6)
response = autoscaling.update_auto_scaling_group(
AutoScalingGroupName=autoScalingGroupName,
MinSize=0,
MaxSize=0,
)
debug(response)
info(f"Autoscaling disabled for capacity provider {cp['name']}")
break

# Stop any running services in the cluster
response = ecs.list_services(cluster=cluster_arn)
services = response['serviceArns']
for service in services:
response = ecs.update_service(
cluster=cluster_arn,
service=service,
desiredCount=0
)
info(f"Stopped service {service}")

# Stop any container instances in the cluster
response = ecs.list_container_instances(cluster=cluster_arn)
container_instances = response['containerInstanceArns']
for container_instance in container_instances:
response = ecs.update_container_instances_state(
cluster=cluster_arn,
containerInstances=[container_instance],
status='DRAINING'
)
info(f"Stopped container instance {container_instance}")

# Stop any running tasks in the cluster
response = ecs.list_tasks(cluster=cluster_arn)
tasks = response['taskArns']
for task in tasks:
response = ecs.stop_task(cluster=cluster_arn, task=task)
debug(response)
info(f"Stopped task {task}")

# Get the messages in the TRACK_QUEUE and delete them
# These are completed video track messages, so capture the locations of the completed track files and
# remove the track results from S3
def receive_track_messages():
response = sqs.receive_message(QueueUrl=resources['TRACK_QUEUE'],
AttributeNames=['All'],
MessageAttributeNames=['All'])

if 'Messages' in response:
messages = response['Messages']
print(f"Received {len(messages)} messages from {resources['TRACK_QUEUE']}")

for message in messages:
debug(f"Message ID: {message['MessageId']}")
debug(f"Message Body: {message['Body']}")
debug(f"Message Attributes: {message.get('MessageAttributes', '')}")
debug("--------------------")
message_body = message['Body']
message_dict = json.loads(message_body)
if message_dict['status'] == 'SUCCESS':
# Get the track file location and remove the file from S3
track_file = message_dict['results']
info(f"Removing track file {track_file}")
s3.delete_object(Bucket=resources['TRACK_BUCKET'], Key=track_file)
# Update the message with messageId as FAILED, send to DEAD_QUEUE and delete from TRACK_QUEUE
message_dict['status'] = 'FAILED'
sqs.send_message(QueueUrl=resources['DEAD_QUEUE'],
MessageBody=json.dumps(message_dict, indent=4),
MessageGroupId=message['Attributes']['MessageGroupId'])
sqs.delete_message(QueueUrl=resources['TRACK_QUEUE'],
ReceiptHandle=message['ReceiptHandle'])
else:
info(f"No messages available in {resources['TRACK_QUEUE']}. Track clean-up and shutdown complete")

# If there are more messages, continue to receive
if 'Messages' in response and len(response['Messages']) == 10:
receive_track_messages()

receive_track_messages()

def receive_video_messages():
response = sqs.receive_message(QueueUrl=resources['VIDEO_QUEUE'],
AttributeNames=['All'],
MessageAttributeNames=['All'])

if 'Messages' in response:
messages = response['Messages']
print(f"Received {len(messages)} messages")

for message in messages:
# on failure push this message to the dead letter queue
message_body = message['Body']
message_body = json.loads(message_body)
message_body["error"] = "ECS cluster shutdown"
message_body["status"] = "FAILED"
print(f'Sending message {message} to dead queue {resources["DEAD_QUEUE"]}')
response = sqs.send_message(QueueUrl=resources['DEAD_QUEUE'],
MessageBody=json.dumps(message_body, indent=4),
MessageGroupId=message['Attributes']['MessageGroupId'])
debug(response)
response = sqs.send_message(QueueUrl=resources['DEAD_QUEUE'],
MessageBody=json.dumps(message_body),
MessageGroupId=message['Attributes']['MessageGroupId'])
debug(response)
else:
info(f"No messages available in {resources['VIDEO_QUEUE']}."
f"No videos were being processed during shutdown.")

receive_video_messages()

info(f'Shutdown of ECS cluster {cluster} complete. Waiting 5 minutes before restarting autoscaling to reenable ')
time.sleep(300)

info(f"Restarting autoscaling for capacity provider {cp['name']} to max size {max_size}")
response = autoscaling.update_auto_scaling_group(
AutoScalingGroupName=autoScalingGroupName,
MinSize=0,
MaxSize=max_size,
)
debug(response)
info(f"Autoscaling enabled. Desired count set to {max_size}.")
info("ECS cluster shutdown complete for {cluster}")


if __name__ == '__main__':
from pathlib import Path
cluster = 'y5x315k'
create_logger_file(log_path=Path.cwd(), prefix="ecsshutdown")
default_config = Config()
resources = default_config.get_resources(cluster)
ecsshutdown(resources, cluster)
print(resources)
2 changes: 1 addition & 1 deletion deepsea_ai/config/config.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[docker]
yolov5_container = mbari/deepsea-yolov5:1.1.2
strongsort_container = mbari/strongsort-yolov5:1.10.0
strongsort_container = mbari/strongsort-yolov5:1.10.7

[aws]
account_id = 548531997526
Expand Down
44 changes: 44 additions & 0 deletions docs/commands/ecsshutdown.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Shutting down video processing in the ECS

If you have deployed a model in an ECS and want to shut it down because you have made a mistake or want to save costs,
you can use the [ecsshutdown](/commands/ecsshutdown) command. This command will stop
all running services, tasks, and container instances in the ECS cluster. Any completed video track results are also
removed from S3 and auto-scaling is disabled for the ECS service temporarily to prevent new tasks from starting for 5 minutes.

!!! note "About the ecsshutdown command"
This is not the same as deleting the stack, which will remove all resources including the ECS cluster,
rendering it unusable. This command is intended to temporarily stop processing and save costs or correct a mistake.

## Examples

To shutdown the ECS cluster called *benthic33k*:

```
deepsea-ai ecsshutdown --cluster benthic33k
```


You should see the following response:

```
2024-05-07 17:39:18,912 INFO Logging to /Users/dcline/Dropbox/code/deepsea-ai/deepsea_ai/commands/ecsshutdown_20240508.log
2024-05-07 17:39:18,912 INFO =============== Config file /Users/dcline/Dropbox/code/deepsea-ai/deepsea_ai/config/config.ini =================
...
2024-05-07 17:39:19,729 INFO Found cluster benthic33k ARN arn:aws:ecs:us-west-2:168552337187:cluster/benthic33k-clustery5x315k4871A792-5nHEy3gwdISI
arn:aws:ecs:us-west-2:168552337187:cluster/benthic33k-clustery5x315k4871A792-5nHEy3gwdISI
2024-05-07 17:39:20,245 DEBUG {'ResponseMetadata': {'RequestId': 'e93a9198-e9ea-4928-989e-9587da64b898', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e93a9198-e9ea-4928-989e-9587da64b898', 'content-type': 'text/xml', 'content-length': '231', 'date': 'Wed, 08 May 2024 00:39:19 GMT'}, 'RetryAttempts': 0}}
2024-05-07 17:39:20,245 INFO Autoscaling disabled for capacity provider benthic33k-asgy5x315kcapacityprovider21F5C2F6-0yhqf6Ix2hIL
2024-05-07 17:39:20,525 INFO Stopped service arn:aws:ecs:us-west-2:168552337187:service/benthic33k-clustery5x315k4871A792-5nHEy3gwdISI/benthic33k-serviceobjectObjectService10D8676F-bc7xtyXkwlMH
2024-05-07 17:39:20,618 INFO Stopped container instance arn:aws:ecs:us-west-2:168552337187:container-instance/benthic33k-clustery5x315k4871A792-5nHEy3gwdISI/4d5c7a9d295c490a82a2ea22d1f9e0ae
2024-05-07 17:39:20,797 INFO No messages available in benthic33k-trackC8D9330E-wgSOCZUJzVC8.fifo. Track clean-up and shutdown complete
Received 1 messages
Sending message {'MessageId': 'a060e1ec-6f70-4c77-8c3d-fe41f725fd75', 'ReceiptHandle': 'AQEBA7BiOsLpJtuVYJmvVPP3LB1uw4PWVzEeDhVPFEccBWOXbNYQwNlpEW4GdcYMRhCVGRj05IKoyed+l/gyN8NdJ3lbcLWJLuPv16Yc5C4J7X8M4ximabObJTrPiAArDldeAAFG0A3qe9ITG+8mxQ9CniQVQcUU0XXy1tp4cE9Tt+odNn4wK8GTT+S9h4xdqCqtyAeZToobScRkyeVN2jADedD1edkINz2KbCsMsZc7+t4GV59x1QhVdHq0yvF6XZYcvhJW30pccyBNEEDrOm/KxWMppgEponAbgCAIUQ95KjgwNGX+8VXDuUz3GyACKsUN', 'MD5OfBody': 'ff91f22db46e165071cbea443f52c3bf', 'Body': '{\n "video": "dcline/Dropbox/code/deepsea-ai/tests/data/V4361_20211006T163856Z_h265_1sec.mp4",\n "clean": "True",\n "user_name": "dcline",\n "metadata_b64": "eyJtZXNzYWdlX3V1aWQiOiAiZWIxYjViYTgtNmMwOC00YTJiLWI5ZjEtMjIyYzViNTI1YzZlIn0=",\n "job_name": "Pycharm test yolov5x-mbay-benthic model from ecsprocess with args",\n "args": "--agnostic-nms --iou-thres=0.2 --conf-thres=0.001"\n}', 'Attributes': {'SenderId': 'AROAX7NYOBNLHGR6NT7GO:dcline@mbari.org', 'ApproximateFirstReceiveTimestamp': '1715128760831', 'ApproximateReceiveCount': '1', 'SentTimestamp': '1715128495537', 'SequenceNumber': '18885816968567023616', 'MessageDeduplicationId': '80132457153d857a101d4cedf5db1e710590b1f5552f2bff0e3add75a5c1a56c', 'MessageGroupId': '20240508T003455-V4361_20211006T163856Z_h265_1sec.mp4'}} to dead queue benthic33k-dead9A6F9BCE-P8fRoyrLgBjO.fifo
2024-05-07 17:39:20,870 DEBUG {'MD5OfMessageBody': '97db372cf9aee247cff4aba2c6e329a6', 'MessageId': 'a041e56c-0219-40ab-a4c0-21bba0c79348', 'SequenceNumber': '18885817036492015872', 'ResponseMetadata': {'RequestId': '3e5ae064-7c86-5804-ad57-bed0b77efd34', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '3e5ae064-7c86-5804-ad57-bed0b77efd34', 'date': 'Wed, 08 May 2024 00:39:20 GMT', 'content-type': 'text/xml', 'content-length': '431', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
2024-05-07 17:39:20,902 DEBUG {'MD5OfMessageBody': '648a41216191ffbcb6b794f70a69d921', 'MessageId': 'd41b86af-70e9-4f32-b39f-a69e91123372', 'SequenceNumber': '18885817036500463616', 'ResponseMetadata': {'RequestId': '1c8b8d46-02a5-539e-9af4-d3f2c95db6ae', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '1c8b8d46-02a5-539e-9af4-d3f2c95db6ae', 'date': 'Wed, 08 May 2024 00:39:20 GMT', 'content-type': 'text/xml', 'content-length': '431', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
2024-05-07 17:39:20,902 INFO No messages available in benthic33k-videoC740E53C-qG4Vah2lWyhV.fifo. Video shutdown complete. No videos were being processed during shutdown.
2024-05-07 17:39:20,902 INFO Shutdown of ECS cluster benthic33k complete. Waiting 5 minutes before restarting autoscaling to reenable
```
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ jupyter notebook
* [`deepsea-ai train --help` - Train a YOLOv5 model and save the model to a bucket](commands/train.md)
* [`deepsea-ai process --help` - Process one or more videos and save the results to a bucket](commands/process.md)
* [`deepsea-ai ecsprocess --help` - Process one or more videos using the Elastic Container Service and save the results to a bucket](commands/process.md)
* [`deepsea-ai ecsshutdown --help` - Shutdown videos procesing in the Elastic Container Service](commands/ecsshutdown.md)
* [`deepsea-ai split --help` - Split your training data. This is required before the train command.](data.md)
* [`deepsea-ai monitor --help` - Monitor processing. Use this after the ecsprocess train command.](commands/monitor.md)
* `deepsea-ai -h` - Print help message and exit.
Expand Down

0 comments on commit 52fa2ab

Please sign in to comment.