Skip to content

Commit

Permalink
Merge pull request #1214 from JosepSampe/lithops-dev
Browse files Browse the repository at this point in the history
[Standalone] Notify stop status when the VM is dismantled
  • Loading branch information
JosepSampe authored Dec 1, 2023
2 parents bb13906 + 041adb3 commit 873fdb2
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 27 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
- [IBM VPC] Added delete_image() method for deleting VM images through the cli
- [localhost] New localhost backend v2 to maximize resource utilization when multiple maps are executed from the same FunctionExecutor
- [Standalone] Automatically retrieve the CPU_COUNT from the VM in case worker_processes is not set in config
- [Standalone] Keep track of the worker and job status
- [Storage] Include "Config" paramater to download_file() and upload_file() methods for boto3 related backends
- [Cli] Include 'worker name' in the 'lithops runtime list' cmd
- [AWS Lambda] Created 'namespace' config key to virtualy separate worker deployments
- [AWS Lambda] Created 'namespace' config key to virtually separate worker deployments

### Changed
- [Standalone] Changed default mode of execution from 'consume' to 'reuse'
Expand Down
7 changes: 4 additions & 3 deletions lithops/serverless/backends/aws_lambda/aws_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ def __init__(self, lambda_config, internal_storage):
self.user_key = caller_id["UserId"][-4:].lower()

self.ecr_client = self.aws_session.client('ecr', region_name=self.region_name)
self.package = f'lithops_v{__version__.replace(".", "")}_{self.user_key}'
package = f'lithops_v{__version__.replace(".", "")}_{self.user_key}'
self.package = f"{package}_{self.namespace}" if self.namespace else package

msg = COMPUTE_CLI_MSG.format('AWS Lambda')
if self.namespace:
Expand All @@ -116,11 +117,11 @@ def _format_layer_name(self, runtime_name, version=__version__):

def _get_default_runtime_name(self):
py_version = utils.CURRENT_PY_VERSION.replace('.', '')
return f'default-layer-runtime-v{py_version}'
return f'default-runtime-v{py_version}'

def _is_container_runtime(self, runtime_name):
name = runtime_name.split('/', 1)[-1]
return 'default-layer-runtime-v' not in name
return 'default-runtime-v' not in name

def _format_repo_name(self, runtime_name):
if ':' in runtime_name:
Expand Down
37 changes: 21 additions & 16 deletions lithops/standalone/keeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ class BudgetKeeper(threading.Thread):
"""
BudgetKeeper class used to automatically stop the VM instance
"""
def __init__(self, config):
def __init__(self, config, stop_callback=None):
threading.Thread.__init__(self)
self.last_usage_time = time.time()

self.standalone_config = config
self.stop_callback = stop_callback
self.auto_dismantle = config['auto_dismantle']
self.soft_dismantle_timeout = config['soft_dismantle_timeout']
self.hard_dismantle_timeout = config['hard_dismantle_timeout']
self.exec_mode = config['exec_mode']

self.runing = False
self.jobs = {}

with open(SA_DATA_FILE, 'r') as ad:
Expand All @@ -37,15 +39,12 @@ def __init__(self, config):
f"instance ID: {self.instance.instance_id}")
logger.debug(f"Delete {self.instance.name} on dismantle: {self.instance.delete_on_dismantle}")

def update_config(self, config):
self.standalone_config.update(config)
self.auto_dismantle = config['auto_dismantle']
self.soft_dismantle_timeout = config['soft_dismantle_timeout']
self.hard_dismantle_timeout = config['hard_dismantle_timeout']
self.exec_mode = config['exec_mode']
def add_job(self, job_key):
self.last_usage_time = time.time()
self.jobs[job_key] = JobStatus.RUNNING.value

def run(self):
runing = True
self.runing = True
jobs_running = False

logger.debug("BudgetKeeper started")
Expand All @@ -59,7 +58,7 @@ def run(self):
# being started forever due a wrong configuration
logger.debug(f'Auto dismantle deactivated - Hard Timeout: {self.hard_dismantle_timeout}s')

while runing:
while self.runing:
time_since_last_usage = time.time() - self.last_usage_time

for job_key in self.jobs.keys():
Expand Down Expand Up @@ -88,10 +87,16 @@ def run(self):
check_interval = min(60, max(time_to_dismantle / 10, 1))
time.sleep(check_interval)
else:
logger.debug("Dismantling setup")
try:
self.instance.stop()
runing = False
except Exception as e:
logger.debug(f"Dismantle error {e}")
time.sleep(5)
self.stop_instance()

def stop_instance(self):
logger.debug("Dismantling setup")

self.stop_callback() if self.stop_callback is not None else None

try:
self.instance.stop()
self.runing = False
except Exception as e:
logger.debug(f"Dismantle error {e}")
time.sleep(5)
24 changes: 21 additions & 3 deletions lithops/standalone/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,26 @@ def error(msg):
return response


@app.route('/worker/status/stop', methods=['POST'])
def stop_worker():
"""
Returns the current workers list
"""
worker_ip = flask.request.remote_addr
workers[worker_ip].status = WorkerStatus.STOPPED.value
return ('', 204)


@app.route('/worker/status/idle', methods=['POST'])
def idle_worker():
"""
Returns the current workers list
"""
worker_ip = flask.request.remote_addr
workers[worker_ip].status = WorkerStatus.IDLE.value
return ('', 204)


@app.route('/worker/list', methods=['GET'])
def list_workers():
"""
Expand Down Expand Up @@ -358,7 +378,6 @@ def get_task(work_queue_name):
global work_queues

worker_ip = flask.request.remote_addr
workers[worker_ip].status = WorkerStatus.IDLE.value

try:
task_payload = work_queues.setdefault(work_queue_name, queue.Queue()).get(False)
Expand Down Expand Up @@ -485,8 +504,7 @@ def run():
job_key = job_payload['job_key']
logger.debug(f'Received job {job_key}')

budget_keeper.last_usage_time = time.time()
budget_keeper.jobs[job_key] = JobStatus.RUNNING.value
budget_keeper.add_job(job_key)

exec_mode = job_payload['config']['standalone']['exec_mode']

Expand Down
1 change: 1 addition & 0 deletions lithops/standalone/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class WorkerStatus(Enum):
INSTALLING = "installing"
BUSSY = "active/bussy"
IDLE = "active/idle"
STOPPED = "stopped"


class JobStatus(Enum):
Expand Down
27 changes: 23 additions & 4 deletions lithops/standalone/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import requests
from pathlib import Path
from threading import Thread
from functools import partial
from gevent.pywsgi import WSGIServer

from lithops.constants import LITHOPS_TEMP_DIR, SA_LOG_FILE, JOBS_DIR, \
Expand Down Expand Up @@ -61,6 +62,24 @@ def stop(job_key):
return response


def notify_stop(master_ip):
try:
url = f'http://{master_ip}:{SA_SERVICE_PORT}/worker/status/stop'
resp = requests.post(url)
logger.debug("Stop worker: " + str(resp.status_code))
except Exception as e:
logger.error(e)


def notify_idle(master_ip):
try:
url = f'http://{master_ip}:{SA_SERVICE_PORT}/worker/status/idle'
resp = requests.post(url)
logger.debug("Free worker: " + str(resp.status_code))
except Exception as e:
logger.error(e)


def wait_job_completed(job_key):
"""
Waits until the current job is completed
Expand Down Expand Up @@ -128,15 +147,15 @@ def run_worker(

running_job_key = job_payload['job_key']

budget_keeper.last_usage_time = time.time()
budget_keeper.jobs[running_job_key] = 'running'
budget_keeper.add_job(running_job_key)

try:
localhos_handler.invoke(job_payload)
except Exception as e:
logger.error(e)

wait_job_completed(running_job_key)
notify_idle(master_ip)


def main():
Expand Down Expand Up @@ -166,7 +185,7 @@ def main():

# Start the budget keeper. It is responsible to automatically terminate the
# worker after X seconds
budget_keeper = BudgetKeeper(standalone_config)
budget_keeper = BudgetKeeper(standalone_config, partial(notify_stop, master_ip))
budget_keeper.start()

# Start the http server. This will be used by the master VM to pìng this
Expand All @@ -187,7 +206,7 @@ def run_wsgi():
try:
# Try to stop the current worker VM once no more pending tasks to run
# in case of create mode
budget_keeper.vm.stop()
budget_keeper.stop_instance()
except Exception:
pass

Expand Down

0 comments on commit 873fdb2

Please sign in to comment.