Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add leaderelection module #347

Merged
merged 6 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions examples/leaderelection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import logging
import os
import uuid

from kubernetes_asyncio import config
from kubernetes_asyncio.client import api_client
from kubernetes_asyncio.leaderelection import electionconfig, leaderelection
from kubernetes_asyncio.leaderelection.resourcelock.leaselock import LeaseLock

logging.basicConfig(level=logging.INFO)


async def main():

# Authenticate using config file
await config.load_kube_config(config_file=os.environ.get("KUBECONFIG", ""))

# Parameters required from the user

# A unique identifier for this candidate
candidate_id = uuid.uuid4()

# Name of the lock object to be created
lock_name = "examplepython"

# Kubernetes namespace
lock_namespace = "default"

# The function that a user wants to run once a candidate is elected as a
# leader. Cancellation is supported (when a held leader lock is lost).
async def example_start_func():
try:
print("I am leader")
except asyncio.CancelledError:
print(
"Start function cancelled - lost leader election after becoming leader"
)

async def example_end_func():
print("I am no longer leader")

# A user can choose not to provide any callbacks for what to do when a candidate fails to lead - onStoppedLeading()
# In that case, a default callback function will be used

async with api_client.ApiClient() as apic:
# Create config
leader_election_config = electionconfig.Config(
# A legacy ConfigMapLock is also available
LeaseLock(lock_name, lock_namespace, candidate_id, apic),
lease_duration=17,
renew_deadline=15,
retry_period=5,
# Coroutines are also accepted, to facilitate providing context
# (e.g. passing apic)
onstarted_leading=example_start_func,
onstopped_leading=example_end_func,
)

# Enter leader election
await leaderelection.LeaderElection(leader_election_config).run()
# User can choose to do another round of election or simply exit
print("Exited leader election")


if __name__ == "__main__":
asyncio.run(main())
Empty file.
68 changes: 68 additions & 0 deletions kubernetes_asyncio/leaderelection/electionconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Callable, Coroutine # noqa:F401


class Config:
# Validate config, exit if an error is detected

# onstarted_leading and onstopped_leading accept either coroutines or
# coroutine functions. Coroutines faciliate passing context, but coroutine
# functions can be simpler when passing context is not required.
#
# One example of when passing context is helpful is sharing the ApiClient
# used by the leader election, which can then be used for subsequent
# Kubernetes API operations upon onstopped_leading or onstopped_leading.
def __init__(
self,
lock,
lease_duration,
renew_deadline,
retry_period,
onstarted_leading, # type: Coroutine | Callable[[], Coroutine]
onstopped_leading=None, # type: Coroutine | Callable[[], Coroutine] | None
):
self.jitter_factor = 1.2

if lock is None:
raise ValueError("lock cannot be None")

Check warning on line 40 in kubernetes_asyncio/leaderelection/electionconfig.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/electionconfig.py#L40

Added line #L40 was not covered by tests
self.lock = lock

if lease_duration <= renew_deadline:
raise ValueError("lease_duration must be greater than renew_deadline")

Check warning on line 44 in kubernetes_asyncio/leaderelection/electionconfig.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/electionconfig.py#L44

Added line #L44 was not covered by tests

if renew_deadline <= self.jitter_factor * retry_period:
raise ValueError(

Check warning on line 47 in kubernetes_asyncio/leaderelection/electionconfig.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/electionconfig.py#L47

Added line #L47 was not covered by tests
"renewDeadline must be greater than retry_period*jitter_factor"
)

if lease_duration < 1:
raise ValueError("lease_duration must be greater than one")

Check warning on line 52 in kubernetes_asyncio/leaderelection/electionconfig.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/electionconfig.py#L52

Added line #L52 was not covered by tests

if renew_deadline < 1:
raise ValueError("renew_deadline must be greater than one")

Check warning on line 55 in kubernetes_asyncio/leaderelection/electionconfig.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/electionconfig.py#L55

Added line #L55 was not covered by tests

if retry_period < 1:
raise ValueError("retry_period must be greater than one")

Check warning on line 58 in kubernetes_asyncio/leaderelection/electionconfig.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/electionconfig.py#L58

Added line #L58 was not covered by tests

self.lease_duration = lease_duration
self.renew_deadline = renew_deadline
self.retry_period = retry_period

if onstarted_leading is None:
raise ValueError("callback onstarted_leading cannot be None")

Check warning on line 65 in kubernetes_asyncio/leaderelection/electionconfig.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/electionconfig.py#L65

Added line #L65 was not covered by tests
self.onstarted_leading = onstarted_leading

self.onstopped_leading = onstopped_leading
243 changes: 243 additions & 0 deletions kubernetes_asyncio/leaderelection/leaderelection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import datetime
import inspect
import json
import logging
import sys
import time
from http import HTTPStatus

from .leaderelectionrecord import LeaderElectionRecord

"""
This package implements leader election using an annotation in a Kubernetes
object. The onstarted_leading coroutine is run as a task, which is cancelled if
the leader lock is obtained and then lost.

At first all candidates are considered followers. The one to create a lock or
update an existing lock first becomes the leader and remains so until it fails
to renew its lease.
"""


class LeaderElection:
def __init__(self, election_config):
if election_config is None:
sys.exit("argument config not passed")

Check warning on line 40 in kubernetes_asyncio/leaderelection/leaderelection.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/leaderelection.py#L40

Added line #L40 was not covered by tests

# Latest record observed in the created lock object
self.observed_record = None

# The configuration set for this candidate
self.election_config = election_config

# Latest update time of the lock
self.observed_time_milliseconds = 0

# Point of entry to Leader election
async def run(self):
# Try to create/ acquire a lock
if await self.acquire():
logging.info(
"%s successfully acquired lease", self.election_config.lock.identity
)

onstarted_leading_coroutine = (
self.election_config.onstarted_leading
if inspect.iscoroutine(self.election_config.onstarted_leading)
else self.election_config.onstarted_leading()
)

task = asyncio.create_task(onstarted_leading_coroutine)

await self.renew_loop()

# Leader lock lost - cancel the onstarted_leading coroutine if it's
# still running. This permits onstarted_leading to clean up state
# that might not be accessible to onstopped_leading.
task.cancel()

# Failed to update lease, run onstopped_leading callback. This is
# preserved in order to continue to provide an interface similar to
# the one provided by `kubernetes-client/python`.
if self.election_config.onstopped_leading is not None:
await (
self.election_config.onstopped_leading
if inspect.iscoroutine(self.election_config.onstopped_leading)
else self.election_config.onstopped_leading()
)

async def acquire(self):
# Follower
logging.debug("%s is a follower", self.election_config.lock.identity)
retry_period = self.election_config.retry_period

while True:
succeeded = await self.try_acquire_or_renew()

if succeeded:
return True

await asyncio.sleep(retry_period)

async def renew_loop(self):
# Leader
logging.debug(
"Leader has entered renew loop and will try to update lease continuously"
)

retry_period = self.election_config.retry_period
renew_deadline = self.election_config.renew_deadline * 1000

while True:
timeout = int(time.time() * 1000) + renew_deadline
succeeded = False

while int(time.time() * 1000) < timeout:
succeeded = await self.try_acquire_or_renew()

if succeeded:
break
await asyncio.sleep(retry_period)

if succeeded:
await asyncio.sleep(retry_period)
continue

# failed to renew, return
return

async def try_acquire_or_renew(self):
now_timestamp = time.time()
now = datetime.datetime.fromtimestamp(now_timestamp)

# Check if lock is created
lock_status, old_election_record = await self.election_config.lock.get(
self.election_config.lock.name, self.election_config.lock.namespace
)

# create a default Election record for this candidate
leader_election_record = LeaderElectionRecord(
self.election_config.lock.identity,
str(self.election_config.lease_duration),
str(now),
str(now),
)

# A lock is not created with that name, try to create one
if not lock_status:
if json.loads(old_election_record.body)["code"] != HTTPStatus.NOT_FOUND:
logging.error(

Check warning on line 144 in kubernetes_asyncio/leaderelection/leaderelection.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/leaderelection.py#L144

Added line #L144 was not covered by tests
"Error retrieving resource lock %s as %s",
self.election_config.lock.name,
old_election_record.reason,
)
return False

Check warning on line 149 in kubernetes_asyncio/leaderelection/leaderelection.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/leaderelection.py#L149

Added line #L149 was not covered by tests

logging.debug(
"%s is trying to create a lock",
leader_election_record.holder_identity,
)
create_status = await self.election_config.lock.create(
name=self.election_config.lock.name,
namespace=self.election_config.lock.namespace,
election_record=leader_election_record,
)

if not create_status:
logging.error(

Check warning on line 162 in kubernetes_asyncio/leaderelection/leaderelection.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/leaderelection.py#L162

Added line #L162 was not covered by tests
"%s failed to create lock", leader_election_record.holder_identity
)
return False

Check warning on line 165 in kubernetes_asyncio/leaderelection/leaderelection.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/leaderelection.py#L165

Added line #L165 was not covered by tests

self.observed_record = leader_election_record
self.observed_time_milliseconds = int(time.time() * 1000)
return True

# A lock exists with that name
# Validate old_election_record
if old_election_record is None:
# try to update lock with proper election record
return await self.update_lock(leader_election_record)

Check warning on line 175 in kubernetes_asyncio/leaderelection/leaderelection.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/leaderelection.py#L175

Added line #L175 was not covered by tests

if (
old_election_record.holder_identity is None
or old_election_record.lease_duration is None
or old_election_record.acquire_time is None
or old_election_record.renew_time is None
):
# try to update lock with proper election record
return await self.update_lock(leader_election_record)

Check warning on line 184 in kubernetes_asyncio/leaderelection/leaderelection.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/leaderelection.py#L184

Added line #L184 was not covered by tests

# Report transitions
if (
self.observed_record
and self.observed_record.holder_identity
!= old_election_record.holder_identity
):
logging.debug(

Check warning on line 192 in kubernetes_asyncio/leaderelection/leaderelection.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/leaderelection/leaderelection.py#L192

Added line #L192 was not covered by tests
"Leader has switched to %s", old_election_record.holder_identity
)

if (
self.observed_record is None
or old_election_record.__dict__ != self.observed_record.__dict__
):
self.observed_record = old_election_record
self.observed_time_milliseconds = int(time.time() * 1000)

# If This candidate is not the leader and lease duration is yet to finish
if (
self.election_config.lock.identity != self.observed_record.holder_identity
and self.observed_time_milliseconds
+ self.election_config.lease_duration * 1000
> int(now_timestamp * 1000)
):
logging.debug(
"Yet to finish lease_duration, lease held by %s and has not expired",
old_election_record.holder_identity,
)
return False

# If this candidate is the Leader
if self.election_config.lock.identity == self.observed_record.holder_identity:
# Leader updates renewTime, but keeps acquire_time unchanged
leader_election_record.acquire_time = self.observed_record.acquire_time

return await self.update_lock(leader_election_record)

async def update_lock(self, leader_election_record):
# Update object with latest election record
update_status = await self.election_config.lock.update(
self.election_config.lock.name,
self.election_config.lock.namespace,
leader_election_record,
)

if not update_status:
logging.warning(
"%s failed to acquire lease", leader_election_record.holder_identity
)
return False

self.observed_record = leader_election_record
self.observed_time_milliseconds = int(time.time() * 1000)
logging.debug(
"Leader %s has successfully updated lease",
leader_election_record.holder_identity,
)
return True
Loading
Loading