Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support Lease-based leader election (rather than ConfigMaps) #1877 #2314

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions kubernetes/base/leaderelection/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from kubernetes import client, config
from kubernetes.leaderelection import leaderelection
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock
from kubernetes.leaderelection.resourcelock.leaselock import LeaseLock
from kubernetes.leaderelection import electionconfig


Expand All @@ -42,10 +43,13 @@ def example_func():
# A user can choose not to provide any callbacks for what to do when a candidate fails to lead - onStoppedLeading()
# In that case, a default callback function will be used

# Choose the type of lock mechanism to use
lock_object = LeaseLock(lock_name, lock_namespace, candidate_id)
#lock_object = ConfigMapLock(lock_name, lock_namespace, candidate_id)

# Create config
config = electionconfig.Config(ConfigMapLock(lock_name, lock_namespace, candidate_id), lease_duration=17,
renew_deadline=15, retry_period=5, onstarted_leading=example_func,
onstopped_leading=None)
config = electionconfig.Config(lock_object, lease_duration=17, renew_deadline=15, retry_period=5,
onstarted_leading=example_func, onstopped_leading=None)

# Enter leader election
leaderelection.LeaderElection(config).run()
Expand Down
2 changes: 1 addition & 1 deletion kubernetes/base/leaderelection/leaderelectionrecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


class LeaderElectionRecord:
# Annotation used in the lock object
# Lease configuration from in the lock object.
def __init__(self, holder_identity, lease_duration, acquire_time, renew_time):
self.holder_identity = holder_identity
self.lease_duration = lease_duration
Expand Down
5 changes: 2 additions & 3 deletions kubernetes/base/leaderelection/resourcelock/configmaplock.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
# limitations under the License.

from kubernetes.client.rest import ApiException
from kubernetes import client, config
from kubernetes.client.api_client import ApiClient
from kubernetes import client
from ..leaderelectionrecord import LeaderElectionRecord
import json
import logging
Expand Down Expand Up @@ -126,4 +125,4 @@ def get_lock_dict(self, leader_election_record):
self.lock_record['acquireTime'] = leader_election_record.acquire_time
self.lock_record['renewTime'] = leader_election_record.renew_time

return self.lock_record
return self.lock_record
129 changes: 129 additions & 0 deletions kubernetes/base/leaderelection/resourcelock/leaselock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kubernetes.client.rest import ApiException
from kubernetes import client
from ..leaderelectionrecord import LeaderElectionRecord
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)


class LeaseLock:
def __init__(self, name, namespace, identity):
"""
:param name: name of the lock
:param namespace: namespace
:param identity: A unique identifier that the candidate is using
"""
self.api_instance = client.CoordinationV1Api()

# lease resource identity and reference
self.name = name
self.namespace = namespace
self.lease_reference = None

# identity of this candidate
self.identity = str(identity)

# get returns the election record from a Lease Annotation
def get(self, name, namespace):
"""
:param name: Name of the lease object information to get
:param namespace: Namespace in which the lease object is to be searched
:return: 'True, election record' if object found else 'False, exception response'
"""
try:
lease = self.api_instance.read_namespaced_lease(name, namespace)

except ApiException as e:
return False, e
else:
self.lease_reference = lease
return True, self.election_record(lease)

def create(self, name, namespace, election_record):
"""
:param electionRecord: Annotation string
:param name: Name of the lease object to be created
:param namespace: Namespace in which the lease object is to be created
:return: 'True' if object is created else 'False' if failed
"""
body = client.V1Lease(metadata={"name": name},
spec=self.update_lease(election_record))

try:
_ = self.api_instance.create_namespaced_lease(namespace, body, pretty=True)
return True
except ApiException as e:
logging.info("Failed to create lock as {}".format(e))
return False

def update(self, name, namespace, updated_record):
"""
:param name: name of the lock to be updated
:param namespace: namespace the lock is in
:param updated_record: the updated election record
:return: True if update is successful False if it fails
"""
try:
# update the Lease from the updated record
self.lease_reference.spec = self.update_lease(updated_record,
self.lease_reference.spec)

_ = self.api_instance.replace_namespaced_lease(name=name, namespace=namespace,
body=self.lease_reference)
return True
except ApiException as e:
logging.info("Failed to update lock as {}".format(e))
return False

def update_lease(self, leader_election_record, current_spec=None):
# existing or new lease?
spec = current_spec if current_spec else client.V1LeaseSpec()

# lease configuration
spec.holder_identity = leader_election_record.holder_identity
spec.lease_duration_seconds = int(leader_election_record.lease_duration)
spec.acquire_time = self.time_str_to_iso(leader_election_record.acquire_time)
spec.renew_time = self.time_str_to_iso(leader_election_record.renew_time)

return spec

def election_record(self, lease):
"""
Get leader election record from Lease spec.
"""
leader_election_record = LeaderElectionRecord(None, None, None, None)

if lease.spec and lease.spec.holder_identity:
leader_election_record.holder_identity = lease.spec.holder_identity
if lease.spec and lease.spec.lease_duration_seconds:
leader_election_record.lease_duration = str(lease.spec.lease_duration_seconds)
if lease.spec and lease.spec.acquire_time:
leader_election_record.acquire_time = str(datetime.replace(lease.spec.acquire_time, tzinfo=None))
if lease.spec and lease.spec.renew_time:
leader_election_record.renew_time = str(datetime.replace(lease.spec.renew_time, tzinfo=None))

return leader_election_record

# conversion between kubernetes ISO formatted time and elector record time
def time_str_to_iso(self, str_time):
formats = ["%Y-%m-%d %H:%M:%S.%f%z", "%Y-%m-%d %H:%M:%S.%f"]
for fmt in formats:
try:
return datetime.strptime(str_time, fmt).isoformat()+'Z'
except ValueError:
pass
logging.error("Failed to parse time string: {}".format(str_time))