Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions doodad/gcp/gcp_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,32 @@

def upload_file_to_gcp_storage(
bucket_name,
file_name,
file_name=None,
file_contents=None,
remote_filename=None,
dry=False,
check_exists=True
check_exists=True,
):
from google.cloud import storage
assert file_name or file_contents, "must specify local filepath or contents"
assert not (file_name and file_contents)

storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)

if remote_filename is None:
assert file_name
remote_filename = os.path.basename(file_name)
remote_path = 'doodad/mount/' + remote_filename
blob = bucket.blob(remote_path)
if check_exists and blob.exists(storage_client):
print("{remote_path} already exists".format(remote_path=remote_path))
return remote_path
blob.upload_from_filename(file_name)

if file_name:
blob.upload_from_filename(file_name)
elif file_contents:
blob.upload_from_string(file_contents)
return remote_path

def get_machine_type(zone, instance_type):
Expand Down
49 changes: 46 additions & 3 deletions doodad/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import base64
import json
import pickle

try:
from StringIO import StringIO
Expand Down Expand Up @@ -640,8 +641,13 @@ def __init__(
gcp_log_name=None,
gcp_log_path=None,
gpu_kwargs=None,
preemption_bucket=None,
**kwargs
):
"""
preemption_bucket: optional. Used if handling preemption. Dumps preemption
and checkpoint info this bucket.
"""
super(GCPDocker, self).__init__(**kwargs)
assert 'CLOUDSDK_CORE_PROJECT' in os.environ.keys()
self.project = os.environ['CLOUDSDK_CORE_PROJECT']
Expand All @@ -665,7 +671,13 @@ def __init__(
import googleapiclient.discovery
self.compute = googleapiclient.discovery.build('compute', 'v1')

def launch_command(self, main_cmd, mount_points=None, dry=False, verbose=False):
self.preemption_bucket = preemption_bucket

def launch_command(self, main_cmd, mount_points=None, dry=False, verbose=False, max_retries=2):
"""
max_retries: (int) max number of retries before relaunching as a
non-preemptible instance by the relaunch server
"""
if self.gcp_log_name is None:
exp_name = "{}-{}".format(self.gcp_log_prefix, EC2SpotDocker.make_timekey(self))
else:
Expand Down Expand Up @@ -728,6 +740,9 @@ def launch_command(self, main_cmd, mount_points=None, dry=False, verbose=False):
'terminate': json.dumps(self.terminate),
'startup-script': open(GCP_STARTUP_SCRIPT_PATH, "r").read(),
'shutdown-script': open(GCP_SHUTDOWN_SCRIPT_PATH, "r").read(),
'retry': 0,
'max_retries': max_retries,
'preemption_bucket': self.preemption_bucket,
}
# instance name must match regex '(?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)'">
unique_name= "doodad" + str(uuid.uuid4()).replace("-", "")
Expand Down Expand Up @@ -784,11 +799,39 @@ def create_instance(self, metadata, name, exp_name="", exp_prefix=""):
"acceleratorType": self.gpu_type,
"acceleratorCount": self.num_gpu,
}]
return self.compute.instances().insert(

instance_launch_dict = dict(
project=self.project,
zone=self.zone,
body=config
).execute()
)
response = self.compute.instances().insert(**instance_launch_dict).execute()
# insert_operation_name = response['name']
# print("Waiting for insert to complete")
# while True:
# status = self.compute.zoneOperations().get(
# project=self.project,
# zone=self.zone,
# operation=insert_operation_name).execute()
# if status['status'] == 'DONE':
# if 'httpErrorStatusCode' in status:
# print('Insert failed with errorcode: ', status['httpErrorStatusCode'])
# break
# time.sleep(1)

# import pdb; pdb.set_trace()

if self.preemption_bucket:
# For resuming preempted experiments
launch_config_filename = 'launch_config/{instance_name}.pkl'.format(
instance_name=name
)
upload_file_to_gcp_storage(
bucket_name=self.preemption_bucket,
file_contents=pickle.dumps(instance_launch_dict),
remote_filename=launch_config_filename,
)


class CodalabDocker(DockerMode):
def __init__(self):
Expand Down
119 changes: 119 additions & 0 deletions scripts/gcp/gcp_restart_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import copy
import pickle
import sys
import json
import time
import logging

from google.cloud import storage
from doodad.gcp.gcp_util import upload_file_to_gcp_storage

def get_metadata_value(metadata, key):
for pair in metadata['items']:
if pair['key'] == key:
return pair['value']
raise KeyError

def update_metadata_value(metadata, key, value, create=False):
for pair in metadata['items']:
if pair['key'] == key:
pair['value'] = value
return
if not create:
raise KeyError
metadata['items'].append(
{'key': key, 'value': value}
)

def update_launch_config(launch_config, checkpoint_commands):
# Weird google config format
metadata = launch_config['body']['metadata']
max_retries = get_metadata_value(metadata, 'max_retries')
retries_so_far = get_metadata_value(metadata, 'retry')

if retries_so_far == max_retries:
launch_config['body']['scheduling']['preemptible'] = False
else:
update_metadata_value(metadata, 'retry', retries_so_far + 1)
update_metadata_value(
metadata,
'checkpoint_commands',
json.dumps(checkpoint_commands),
create=True
)
return launch_config


if __name__ == "__main__":
preemption_bucket = sys.argv[1]
storage_client = storage.Client()
bucket = storage_client.get_bucket(preemption_bucket)
import googleapiclient.discovery
compute = googleapiclient.discovery.build('compute', 'v1')
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
handlers=[
logging.FileHandler('gcp_restart.log'),
logging.StreamHandler(sys.stdout)
]
)

while True:
for blob in copy.copy(bucket.list_blobs(prefix='preempted/')):
failure = False
checkpoint_info = blob.download_as_string().decode("utf-8").splitlines()
instance_name = checkpoint_info[0]
checkpoint_commands = checkpoint_info[1:]
log_msg = "Restarting {instance_name}".format(
instance_name=instance_name
)
logging.info(log_msg)

launch_config_path = \
'doodad/mount/launch_config/{instance_name}.pkl'.format(
instance_name=instance_name
)
launch_config = pickle.loads(
bucket.get_blob(launch_config_path).download_as_string()
)

updated_launch_config = update_launch_config(
launch_config,
checkpoint_commands
)
try:
compute.instances().insert(**updated_launch_config).execute()
except Exception as e:
failure = True
print(e)
log_msg = "Failed to relaunch {instance_name}. Trying again later".format(
instance_name=instance_name
)
logging.warning(log_msg)


# update launch_config file with retry and preemption changes
# upload_file_to_gcp_storage automatically prepends with doodad/mount
launch_config_filename = 'launch_config/{instance_name}.pkl'.format(
instance_name=instance_name
)
upload_file_to_gcp_storage(
bucket_name=preemption_bucket,
file_contents=pickle.dumps(updated_launch_config),
remote_filename=launch_config_filename,
check_exists=False,
)
# mark the preemption as handled
if not failure:
blob.delete()
log_msg = "Successfully relaunched {instance_name}".format(
instance_name=instance_name
)
logging.info(log_msg)
upload_file_to_gcp_storage(
bucket_name=preemption_bucket,
file_name='gcp_restart.log',
check_exists=False,
)
time.sleep(60)
38 changes: 36 additions & 2 deletions scripts/gcp/gcp_shutdown_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,42 @@ query_metadata() {
curl http://metadata/computeMetadata/v1/instance/attributes/$attribute_name -H "Metadata-Flavor: Google"
}

{
bucket_name=$(query_metadata bucket_name)
gcp_mounts=$(query_metadata gcp_mounts)
instance_name=$(curl http://metadata/computeMetadata/v1/instance/name -H "Metadata-Flavor: Google")

num_gcp_mounts=$(jq length <<< $gcp_mounts)

preempted=$(curl "http://metadata.google.internal/computeMetadata/v1/instance/preempted" -H "Metadata-Flavor: Google")
echo "preempted:" $preempted
if [ "$preempted" = "TRUE" ]; then

echo $instance_name > checkpoint_info
for ((i=0;i<$num_gcp_mounts;i++)); do
gcp_mount_info=$(jq .[$i] <<< $gcp_mounts)
local_path=$(jq .[0] <<< $gcp_mount_info | tr -d '"')
gcp_bucket_path=$(jq .[1] <<< $gcp_mount_info | tr -d '"')
# checkpoint dirs
ls $local_path > /tmp/checkpoint_ls
while read p; do
local_checkpoint_path=$(echo $local_path/$p | sed s#//*#/#g)
remote_checkpoint_path=gs://"$(echo $bucket_name/$gcp_bucket_path/"$p" | sed s#//*#/#g)"
echo "mkdir -p $local_checkpoint_path && gsutil -m rsync -r $remote_checkpoint_path $local_checkpoint_path" >> checkpoint_info
done < /tmp/checkpoint_ls
done
cat checkpoint_info

preemption_bucket=$(query_metadata preemption_bucket)
gsutil cp checkpoint_info gs://$preemption_bucket/preempted/$instance_name
fi
# ensure that $gcp_bucket_path is defined for the stdout log sync
for ((i=0;i<$num_gcp_mounts;i++)); do
gcp_mount_info=$(jq .[$i] <<< $gcp_mounts)
gcp_bucket_path=$(jq .[1] <<< $gcp_mount_info | tr -d '"')
done

gsutil cp /home/ubuntu/user_data.log gs://$bucket_name/$gcp_bucket_path/${instance_name}_stdout.log

for ((i=0;i<$num_gcp_mounts;i++)); do
gcp_mount_info=$(jq .[$i] <<< $gcp_mounts)
# assume gcp_mount_info is a (local_path, bucket_path, include_string, periodic_sync_interval) tuple
Expand All @@ -17,4 +48,7 @@ for ((i=0;i<$num_gcp_mounts;i++)); do
gsutil -m rsync -r $local_path gs://$bucket_name/$gcp_bucket_path
done

gsutil cp /home/ubuntu/user_data.log gs://$bucket_name/$gcp_bucket_path/${instance_name}_stdout.log
} >> /home/ubuntu/terminate.log 2>&1
zone=$(curl http://metadata/computeMetadata/v1/instance/zone -H "Metadata-Flavor: Google")
zone="${zone##*/}"
gcloud compute instances delete $instance_name --zone $zone --quiet
40 changes: 26 additions & 14 deletions scripts/gcp/gcp_startup_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ query_metadata() {
gcp_mounts=$(query_metadata gcp_mounts)
use_gpu=$(query_metadata use_gpu)
terminate=$(query_metadata terminate)
retry=$(query_metadata retry)
instance_name=$(curl http://metadata/computeMetadata/v1/instance/name -H "Metadata-Flavor: Google")
echo "bucket_name:" $bucket_name
echo "docker_cmd:" $docker_cmd
Expand Down Expand Up @@ -56,23 +57,33 @@ query_metadata() {
tar -xvf /tmp/$local_mount.tar -C /tmp/$local_mount
done

if (($retry > 0)); then
echo "resuming experiment"
checkpoint_commands=$(query_metadata checkpoint_commands)
num_checkpoint_commands=$(jq length <<< $checkpoint_commands)
for ((i=0;i<$num_checkpoint_commands;i++)); do
sync_command=$(jq .[$i] <<< $checkpoint_commands | tr -d '"')
echo $sync_command
bash -c "$sync_command"
done
fi

num_gcp_mounts=$(jq length <<< $gcp_mounts)
for ((i=0;i<$num_gcp_mounts;i++)); do
gcp_mount_info=$(jq .[$i] <<< $gcp_mounts)
# assume _mount_info is a (local_path, bucket_path, include_string, periodic_sync_interval) tuple
local_path=$(jq .[0] <<< $gcp_mount_info | tr -d '"')
gcp_bucket_path=$(jq .[1] <<< $gcp_mount_info | tr -d '"')
include_string=$(jq .[2] <<< $gcp_mount_info | tr -d '"')
periodic_sync_interval=$(jq .[3] <<< $gcp_mount_info | tr -d '"')
while /bin/true; do
gsutil -m rsync -r $local_path gs://$bucket_name/$gcp_bucket_path
sleep $periodic_sync_interval
done & echo sync from $local_path to gs://$bucket_name/$gcp_bucket_path initiated
done
gcp_bucket_path=${gcp_bucket_path%/} # remove trailing slash if present
while /bin/true; do
for ((i=0;i<$num_gcp_mounts;i++)); do
gcp_mount_info=$(jq .[$i] <<< $gcp_mounts)
# assume _mount_info is a (local_path, bucket_path, include_string, periodic_sync_interval) tuple
local_path=$(jq .[0] <<< $gcp_mount_info | tr -d '"')
gcp_bucket_path=$(jq .[1] <<< $gcp_mount_info | tr -d '"')
include_string=$(jq .[2] <<< $gcp_mount_info | tr -d '"')
periodic_sync_interval=$(jq .[3] <<< $gcp_mount_info | tr -d '"')

gsutil -m rsync -r $local_path gs://$bucket_name/$gcp_bucket_path
echo syncing from $local_path to gs://$bucket_name/$gcp_bucket_path
done
gcp_bucket_path=${gcp_bucket_path%/} # remove trailing slash if present
gsutil cp /home/ubuntu/user_data.log gs://$bucket_name/$gcp_bucket_path/${instance_name}_stdout.log
sleep 300
sleep $periodic_sync_interval
done &

if [ "$use_gpu" = "true" ]; then
Expand All @@ -89,6 +100,7 @@ query_metadata() {

if [ "$terminate" = "true" ]; then
echo "Finished experiment. Terminating"
gsutil cp /home/ubuntu/user_data.log gs://$bucket_name/$gcp_bucket_path/${instance_name}_stdout.log
zone=$(curl http://metadata/computeMetadata/v1/instance/zone -H "Metadata-Flavor: Google")
zone="${zone##*/}"
gcloud compute instances delete $instance_name --zone $zone --quiet
Expand Down