patroni.dcs package¶
Submodules¶
- patroni.dcs.consul module
Consul
Consul.adjust_ttl()
Consul.attempt_to_acquire_leader()
Consul.cancel_initialization()
Consul.create_session()
Consul.delete_cluster()
Consul.delete_sync_state()
Consul.deregister_service()
Consul.initialize()
Consul.member()
Consul.refresh_session()
Consul.register_service()
Consul.reload_config()
Consul.retry()
Consul.set_config_value()
Consul.set_failover_value()
Consul.set_history_value()
Consul.set_retry_timeout()
Consul.set_sync_state_value()
Consul.set_ttl()
Consul.take_leader()
Consul.touch_member()
Consul.ttl
Consul.update_service()
Consul.watch()
ConsulClient
ConsulError
ConsulInternalError
HTTPClient
InvalidSession
InvalidSessionTTL
Response
catch_consul_errors()
force_if_last_failed()
service_name_from_scope_name()
- patroni.dcs.etcd module
AbstractEtcd
AbstractEtcdClientWithFailover
AbstractEtcdClientWithFailover.ERROR_CLS
AbstractEtcdClientWithFailover.api_execute()
AbstractEtcdClientWithFailover.get_srv_record()
AbstractEtcdClientWithFailover.machines
AbstractEtcdClientWithFailover.machines_cache
AbstractEtcdClientWithFailover.reload_config()
AbstractEtcdClientWithFailover.set_base_uri()
AbstractEtcdClientWithFailover.set_machines_cache_ttl()
AbstractEtcdClientWithFailover.set_read_timeout()
DnsCachingResolver
Etcd
Etcd.attempt_to_acquire_leader()
Etcd.cancel_initialization()
Etcd.delete_cluster()
Etcd.delete_sync_state()
Etcd.initialize()
Etcd.member()
Etcd.set_config_value()
Etcd.set_failover_value()
Etcd.set_history_value()
Etcd.set_sync_state_value()
Etcd.set_ttl()
Etcd.take_leader()
Etcd.touch_member()
Etcd.watch()
EtcdClient
EtcdError
EtcdRaftInternal
catch_etcd_errors()
- patroni.dcs.etcd3 module
AuthFailed
AuthNotEnabled
DeadlineExceeded
Etcd3
Etcd3.attempt_to_acquire_leader()
Etcd3.cancel_initialization()
Etcd3.cluster_prefix
Etcd3.create_lease()
Etcd3.delete_cluster()
Etcd3.delete_sync_state()
Etcd3.initialize()
Etcd3.member()
Etcd3.refresh_lease()
Etcd3.set_config_value()
Etcd3.set_failover_value()
Etcd3.set_history_value()
Etcd3.set_socket_options()
Etcd3.set_sync_state_value()
Etcd3.set_ttl()
Etcd3.take_leader()
Etcd3.touch_member()
Etcd3.watch()
Etcd3Client
Etcd3Client.ERROR_CLS
Etcd3Client.authenticate()
Etcd3Client.call_rpc()
Etcd3Client.deleteprefix()
Etcd3Client.deleterange()
Etcd3Client.handle_auth_errors()
Etcd3Client.lease_grant()
Etcd3Client.lease_keepalive()
Etcd3Client.prefix()
Etcd3Client.put()
Etcd3Client.range()
Etcd3Client.txn()
Etcd3Client.watchprefix()
Etcd3Client.watchrange()
Etcd3ClientError
Etcd3Error
Etcd3Exception
FailedPrecondition
GRPCCode
GRPCCode.Aborted
GRPCCode.AlreadyExists
GRPCCode.Canceled
GRPCCode.DataLoss
GRPCCode.DeadlineExceeded
GRPCCode.FailedPrecondition
GRPCCode.Internal
GRPCCode.InvalidArgument
GRPCCode.NotFound
GRPCCode.OK
GRPCCode.OutOfRange
GRPCCode.PermissionDenied
GRPCCode.ResourceExhausted
GRPCCode.Unauthenticated
GRPCCode.Unavailable
GRPCCode.Unimplemented
GRPCCode.Unknown
InvalidArgument
InvalidAuthToken
KVCache
LeaseNotFound
NotFound
PatroniEtcd3Client
PermissionDenied
Unavailable
Unknown
UnsupportedEtcdVersion
UserEmpty
base64_decode()
base64_encode()
build_range_request()
prefix_range_end()
to_bytes()
- patroni.dcs.exhibitor module
- patroni.dcs.kubernetes module
CoreV1ApiProxy
K8sClient
K8sConfig
K8sConnectionFailed
K8sException
K8sObject
Kubernetes
Kubernetes.attempt_to_acquire_leader()
Kubernetes.cancel_initialization()
Kubernetes.client_path()
Kubernetes.compare_ports()
Kubernetes.delete_cluster()
Kubernetes.delete_leader()
Kubernetes.delete_sync_state()
Kubernetes.get_citus_coordinator()
Kubernetes.initialize()
Kubernetes.leader_path
Kubernetes.manual_failover()
Kubernetes.member()
Kubernetes.patch_or_create()
Kubernetes.patch_or_create_config()
Kubernetes.reload_config()
Kubernetes.retry()
Kubernetes.set_config_value()
Kubernetes.set_failover_value()
Kubernetes.set_history_value()
Kubernetes.set_retry_timeout()
Kubernetes.set_sync_state_value()
Kubernetes.set_ttl()
Kubernetes.subsets_changed()
Kubernetes.take_leader()
Kubernetes.touch_member()
Kubernetes.ttl
Kubernetes.update_leader()
Kubernetes.watch()
Kubernetes.write_leader_optime()
Kubernetes.write_sync_state()
KubernetesError
KubernetesRetriableException
ObjectCache
catch_kubernetes_errors()
to_camel_case()
- patroni.dcs.raft module
- patroni.dcs.zookeeper module
PatroniKazooClient
PatroniSequentialThreadingHandler
ZooKeeper
ZooKeeper.attempt_to_acquire_leader()
ZooKeeper.cancel_initialization()
ZooKeeper.cluster_watcher()
ZooKeeper.delete_cluster()
ZooKeeper.delete_sync_state()
ZooKeeper.get_children()
ZooKeeper.get_node()
ZooKeeper.get_status()
ZooKeeper.initialize()
ZooKeeper.load_members()
ZooKeeper.member()
ZooKeeper.reload_config()
ZooKeeper.session_listener()
ZooKeeper.set_config_value()
ZooKeeper.set_failover_value()
ZooKeeper.set_history_value()
ZooKeeper.set_retry_timeout()
ZooKeeper.set_sync_state_value()
ZooKeeper.set_ttl()
ZooKeeper.status_watcher()
ZooKeeper.take_leader()
ZooKeeper.touch_member()
ZooKeeper.ttl
ZooKeeper.watch()
ZooKeeperError
Module contents¶
Abstract classes for Distributed Configuration Store.
- class patroni.dcs.AbstractDCS(config: Dict[str, Any])¶
Bases:
ABC
Abstract representation of DCS modules.
Implementations of a concrete DCS class, using appropriate backend client interfaces, must include the following methods and properties.
Functional methods that are critical in their timing, required to complete within
retry_timeout
period in order to prevent the DCS considered inaccessible, each perform construction of complex data objects:_cluster_loader()
:method which processes the structure of data stored in the DCS used to build the
Cluster
object with all relevant associated data.
_citus_cluster_loader()
:Similar to above but specifically representing Citus group and workers information.
_load_cluster()
:main method for calling specific
loader
method to build theCluster
object representing the state and topology of the cluster.
Functional methods that are critical in their timing and must be written with ACID transaction properties in mind:
attempt_to_acquire_leader()
:method used in the leader race to attempt to acquire the leader lock by creating the leader key in the DCS, if it does not exist.
_update_leader()
:method to update
leader
key in DCS. Relies on Compare-And-Set to ensure the Primary lock key is updated. If this fails to update within theretry_timeout
window the Primary will be demoted.
Functional method that relies on Compare-And-Create to ensure only one member creates the relevant key:
initialize()
:method used in the race for cluster initialization which creates the
initialize
key in the DCS.
DCS backend getter and setter methods and properties:
take_leader()
: method to create a new leader key in the DCS.set_ttl()
: method for setting TTL value in DCS.ttl()
: property which returns the current TTL.set_retry_timeout()
: method for settingretry_timeout
in DCS backend._write_leader_optime()
: compatibility method to write WAL LSN to DCS._write_status()
: method to write WAL LSN for slots to the DCS._write_failsafe()
: method to write cluster topology to the DCS, used by failsafe mechanism.touch_member()
: method to update individual member key in the DCS.set_history_value()
: method to set thehistory
key in the DCS.
DCS setter methods using Compare-And-Set which although important are less critical if they fail, attempts can be retried or may result in warning log messages:
set_failover_value()
: method to create and/or update thefailover
key in the DCS.set_config_value()
: method to create and/or update thefailover
key in the DCS.set_sync_state_value()
: method to set the synchronous statesync
key in the DCS.
DCS data and key removal methods:
delete_sync_state()
:likewise, a method to remove synchronous state
sync
key from the DCS.
delete_cluster()
:method which will remove cluster information from the DCS. Used only from patronictl.
_delete_leader()
:method relies on CAS, used by a member that is the current leader, to remove the
leader
key in the DCS.
cancel_initialization()
:method to remove the
initialize
key for the cluster from the DCS.
If either of the sync_state set or delete methods fail, although not critical, this may result in
Synchronous replication key updated by someone else
messages being logged.Care should be taken to consult each abstract method for any additional information and requirements such as expected exceptions that should be raised in certain conditions and the object types for arguments and return from methods and properties.
- abstract attempt_to_acquire_leader() bool ¶
Attempt to acquire leader lock.
Note
This method should create
/leader
key with the value_name
.The key must be created atomically. In case the key already exists it should not be overwritten and
False
must be returned.If key creation fails due to DCS not being accessible or because it is not able to process requests (hopefully temporary), the
DCSError
exception should be raised.- Returns:
True
if key has been created successfully.
- abstract cancel_initialization() bool ¶
Removes the
initialize
key for a cluster.- Returns:
True
if successfully committed to DCS.
- client_path(path: str) str ¶
Construct the absolute key name from appropriate parts for the DCS type.
- Parameters:
path – The key name within the current Patroni cluster.
- Returns:
absolute key name for the current Patroni cluster.
- abstract delete_cluster() bool ¶
Delete cluster from DCS.
- Returns:
True
if successfully committed to DCS.
- delete_leader(last_lsn: Optional[int] = None) bool ¶
Update
optime/leader
and voluntarily remove leader key from DCS.This method should remove leader key if current instance is the leader.
- Parameters:
last_lsn – latest checkpoint location in bytes.
- Returns:
boolean result of called abstract
_delete_leader()
.
- abstract delete_sync_state(version: Optional[Any] = None) bool ¶
Delete the synchronous state from DCS.
- Parameters:
version – for conditional deletion of the key/object.
- Returns:
True
if delete successful.
- get_citus_coordinator() Optional[Cluster] ¶
Load the Patroni cluster for the Citus Coordinator.
Note
This method is only executed on the worker nodes (
group!=0
) to find the coordinator.- Returns:
Select
Cluster
instance associated with the Citus Coordinator group ID.
- get_cluster(force: bool = False) Cluster ¶
Retrieve an appropriate cached or fresh view of DCS.
Note
Stores copy of time, status and failsafe values for comparison in DCS update decisions. Caching is required to avoid overhead placed upon the REST API.
Returns either a Citus or Patroni implementation of
Cluster
depending on availability.- Parameters:
force – a value of
True
will override Zookeeper caching features.- Returns:
- abstract initialize(create_new: bool = True, sysid: str = '') bool ¶
Race for cluster initialization.
This method should atomically create
initialize
key and returnTrue
, otherwise it should returnFalse
.- Parameters:
create_new –
False
if the key should already exist (in the case we are setting the system_id).sysid – PostgreSQL cluster system identifier, if specified, is written to the key.
- Returns:
True
if key has been created successfully.
- is_citus_coordinator() bool ¶
Cluster
instance has a Citus Coordinator group ID.- Returns:
True
if the given node is running as Citus Coordinator (group=0
).
- property leader_optime_path: str¶
Get the client path for
optime/leader
(legacy key, superseded bystatus
).
- manual_failover(leader: Optional[str], candidate: Optional[str], scheduled_at: Optional[datetime] = None, version: Optional[Any] = None) bool ¶
Prepare dictionary with given values and set
/failover
key in DCS.- Parameters:
leader – value to set for
leader
.candidate – value to set for
member
.scheduled_at – value converted to ISO date format for
scheduled_at
.version – for conditional update of the key/object.
- Returns:
True
if successfully committed to DCS.
- reload_config(config: Union[Config, Dict[str, Any]]) None ¶
Load and set relevant values from configuration.
Sets
loop_wait
,ttl
andretry_timeout
properties.- Parameters:
config – Loaded configuration information object or dictionary of key value pairs.
- abstract set_config_value(value: str, version: Optional[Any] = None) bool ¶
Create or update
/config
key in DCS.- Parameters:
value – new value to set in the
config
key.version – for conditional update of the key/object.
- Returns:
True
if successfully committed to DCS.
- abstract set_failover_value(value: str, version: Optional[Any] = None) bool ¶
Create or update
/failover
key.- Parameters:
value – value to set.
version – for conditional update of the key/object.
- Returns:
True
if successfully committed to DCS.
- abstract set_history_value(value: str) bool ¶
Set value for
history
in DCS.- Parameters:
value – new value of
history
key/object.- Returns:
True
if successfully committed to DCS.
- abstract set_sync_state_value(value: str, version: Optional[Any] = None) Union[Any, bool] ¶
Set synchronous state in DCS.
- Parameters:
value – the new value of
/sync
key.version – for conditional update of the key/object.
- Returns:
version of the new object or
False
in case of error.
- static sync_state(leader: Optional[str], sync_standby: Optional[Collection[str]]) Dict[str, Any] ¶
Build
sync_state
dictionary.- Parameters:
leader – name of the leader node that manages
/sync
key.sync_standby – collection of currently known synchronous standby node names.
- Returns:
dictionary that later could be serialized to JSON or saved directly to DCS.
- abstract take_leader() bool ¶
Establish a new leader in DCS.
Note
This method should create leader key with value of
_name
andttl
ofttl
.Since it could be called only on initial cluster bootstrap it could create this key regardless, overwriting the key if necessary.
- Returns:
True
if successfully committed to DCS.
- abstract touch_member(data: Dict[str, Any]) bool ¶
Update member key in DCS.
Note
This method should create or update key with the name with
/members/
+_name
and the value of data in a given DCS.- Parameters:
data – information about an instance (including connection strings).
- Returns:
True
if successfully committed to DCS.
- update_leader(leader: Leader, last_lsn: Optional[int], slots: Optional[Dict[str, int]] = None, failsafe: Optional[Dict[str, str]] = None) bool ¶
Update
leader
key (or session) ttl and optime/leader.- Parameters:
leader –
Leader
object with information about the leader.last_lsn – absolute WAL LSN in bytes.
slots – dictionary with permanent slots
confirmed_flush_lsn
.failsafe – if defined dictionary passed to
write_failsafe()
.
- Returns:
True
ifleader
key (or session) has been updated successfully.
- watch(leader_version: Optional[Any], timeout: float) bool ¶
Sleep if the current node is a leader, otherwise, watch for changes of leader key with a given timeout.
- Parameters:
leader_version – version of a leader key.
timeout – timeout in seconds.
- Returns:
if
True
this will reschedule the next run of the HA cycle.
- write_failsafe(value: Dict[str, str]) None ¶
Write the
/failsafe
key in DCS.- Parameters:
value – dictionary value to set, consisting of the
name
andapi_url
of members.
- write_leader_optime(last_lsn: int) None ¶
Write value for WAL LSN to
optime/leader
key in DCS.Note
This method abstracts away the required data structure of
write_status()
, so it is not needed in the caller. However, theoptime/leader
is only written inwrite_status()
when the cluster has members with a Patroni version that is old enough to require it (i.e. the old Patroni version doesn’t understand the new format).- Parameters:
last_lsn – absolute WAL LSN in bytes.
- write_status(value: Dict[str, Any]) None ¶
Write cluster status to DCS if changed.
Note
The DCS key
/status
was introduced in Patroni version 2.1.0. Previous to this the position of last known leader LSN was stored inoptime/leader
. This method has detection for backwards compatibility of members with a version older than this.- Parameters:
value – JSON serializable dictionary with current WAL LSN and
confirmed_flush_lsn
of permanent slots.
- write_sync_state(leader: Optional[str], sync_standby: Optional[Collection[str]], version: Optional[Any] = None) Optional[SyncState] ¶
Write the new synchronous state to DCS.
Calls
sync_state()
to build a dictionary and then calls DCS specificset_sync_state_value()
.- Parameters:
leader – name of the leader node that manages
/sync
key.sync_standby – collection of currently known synchronous standby node names.
version – for conditional update of the key/object.
- Returns:
the new
SyncState
object orNone
.
- class patroni.dcs.Cluster(*args: Any, **kwargs: Any)¶
Bases:
Cluster
Immutable object (namedtuple) which represents PostgreSQL or Citus cluster.
Note
We are using an old-style attribute declaration here because otherwise it is not possible to override __new__ method. Without it the workers by default gets always the same
dict
object that could be mutated.Consists of the following fields:
- Variables:
initialize – shows whether this cluster has initialization key stored in DC or not.
config – global dynamic configuration, reference to ClusterConfig object.
leader –
Leader
object which represents current leader of the cluster.last_lsn – :class:int object containing position of last known leader LSN. This value is stored in the /status key or /optime/leader (legacy) key.
members – list of:class:` Member` objects, all PostgreSQL cluster members including leader
failover – reference to
Failover
object.sync – reference to
SyncState
object, last observed synchronous replication state.history – reference to TimelineHistory object.
slots – state of permanent logical replication slots on the primary in the format: {“slot_name”: int}.
failsafe – failsafe topology. Node is allowed to become the leader only if its name is found in this list.
workers – dictionary of workers of the Citus cluster, optional. Each key is an
int
representing the group, and the corresponding value is aCluster
instance.
- get_clone_member(exclude_name: str) Optional[Union[Member, Leader]] ¶
Get member or leader object to use as clone source.
- Parameters:
exclude_name – name of a member name to exclude.
- Returns:
a randomly selected candidate member from available running members that are configured to as viable sources for cloning (has tag
clonefrom
in configuration). If no member is appropriate the current leader is used.
- get_my_slot_name_on_primary(my_name: str, replicatefrom: Optional[str]) str ¶
Canonical slot name for physical replication.
Note
P <– I <– L
In case of cascading replication we have to check not our physical slot, but slot of the replica that connects us to the primary.
- Parameters:
my_name – the member node name that is replicating.
replicatefrom – the Intermediate member name that is configured to replicate for cascading replication.
- Returns:
The slot name that is in use for physical replication on this no`de.
- get_replication_slots(my_name: str, role: str, nofailover: bool, major_version: int, *, is_standby_cluster: bool = False, show_error: bool = False) Dict[str, Dict[str, Any]] ¶
Lookup configured slot names in the DCS, report issues found and merge with permanent slots.
Will log an error if:
Conflicting slot names between members are found
Any logical slots are disabled, due to version compatibility, and show_error is
True
.
- Parameters:
my_name – name of this node.
role – role of this node.
nofailover –
True
if this node is tagged to not be a failover candidate.major_version – postgresql major version.
is_standby_cluster –
True
if it is known that this is a standby cluster. We pass the value from the outside because we want to protect from the/config
key removal.show_error – if
True
report error if any disabled logical slots or conflicting slot names are found.
- Returns:
final dictionary of slot names, after merging with permanent slots and performing sanity checks.
- has_member(member_name: str) bool ¶
Check if the given member name is present in the cluster.
- Parameters:
member_name – name to look up in the
members
.- Returns:
True
if the member name is found.
- has_permanent_logical_slots(my_name: str, nofailover: bool, major_version: int = 110000) bool ¶
Check if the given member node has permanent
logical
replication slots configured.- Parameters:
my_name – name of the member node to check.
nofailover –
True
if this node is tagged to not be a failover candidate.major_version – the PostgreSQL major version number.
- Returns:
False
if PostgreSQL is < 11,True
if any detected replications slots arelogical
.
- is_empty()¶
Validate definition of all attributes of this
Cluster
instance.- Returns:
True
if all attributes of the currentCluster
are unpopulated.
- is_unlocked() bool ¶
Check if the cluster does not have the leader.
- Returns:
True
if a leader name is not defined.
- property min_version: Optional[Tuple[int, ...]]¶
Lowest Patroni software version found in known members of the cluster.
- should_enforce_hot_standby_feedback(my_name: str, nofailover: bool, major_version: int) bool ¶
Determine whether
hot_standby_feedback
should be enabled for the given member.The
hot_standby_feedback
must be enabled if the current replica haslogical
slots, or it is working as a cascading replica for the other node that haslogical
slots.- Parameters:
my_name – name of the member node to check.
nofailover –
True
if this node is tagged to not be a failover candidate.major_version – PostgreSQL major version number.
- Returns:
True
if this node or any member replicating from this node has permanent logical slots.False
if PostgreSQL major version is < 11.
- property timeline: int¶
Get the cluster history index from the
history
.- Returns:
If the recorded history is empty assume timeline is
1
, if it is not defined or the stored history is not formatted as expected0
is returned and an error will be logged. Otherwise, the last number stored incremented by 1 is returned.- Example:
No history provided: >>> Cluster(0, 0, 0, 0, 0, 0, 0, 0, 0, None, {}).timeline 0
Empty history assume timeline is
1
: >>> Cluster(0, 0, 0, 0, 0, 0, 0, TimelineHistory.from_node(1, ‘[]’), 0, None, {}).timeline 1Invalid history format, a string of
a
, returns0
: >>> Cluster(0, 0, 0, 0, 0, 0, 0, TimelineHistory.from_node(1, ‘[[“a”]]’), 0, None, {}).timeline 0History as a list of strings: >>> Cluster(0, 0, 0, 0, 0, 0, 0, TimelineHistory.from_node(1, ‘[[“3”, “2”, “1”]]’), 0, None, {}).timeline 4
- class patroni.dcs.ClusterConfig(version: Union[int, str], data: Dict[str, Any], modify_version: Union[int, str])¶
Bases:
NamedTuple
Immutable object (namedtuple) which represents cluster configuration.
- Variables:
version – version number for the object.
data – dictionary of configuration information.
modify_version – modified version number.
- static from_node(version: Union[int, str], value: str, modify_version: Optional[Union[int, str]] = None) ClusterConfig ¶
Factory method to parse value as configuration information.
- Parameters:
version – version number for object.
value – raw JSON serialized data, if not parsable replaced with an empty dictionary.
modify_version – optional modify version number, use version if not provided.
- Returns:
constructed
ClusterConfig
instance.- Example:
>>> ClusterConfig.from_node(1, '{') is None False
- property ignore_slots_matchers: List[Dict[str, Any]]¶
The value for
ignore_slots
fromdata
if defined or an empty list.
- class patroni.dcs.Failover(version: Union[int, str], leader: Optional[str], candidate: Optional[str], scheduled_at: Optional[datetime])¶
Bases:
NamedTuple
Immutable object (namedtuple) representing configuration information required for failover/switchover capability.
- Variables:
version – version of the object.
leader – name of the leader. If value isn’t empty we treat it as a switchover from the specified node.
candidate – the name of the member node to be considered as a failover candidate.
scheduled_at – in the case of a switchover the
datetime
object to perform the scheduled switchover.
- Example:
>>> 'Failover' in str(Failover.from_node(1, '{"leader": "cluster_leader"}')) True
>>> 'Failover' in str(Failover.from_node(1, {"leader": "cluster_leader"})) True
>>> 'Failover' in str(Failover.from_node(1, '{"leader": "cluster_leader", "member": "cluster_candidate"}')) True
>>> Failover.from_node(1, 'null') is None False
>>> n = '''{"leader": "cluster_leader", "member": "cluster_candidate", ... "scheduled_at": "2016-01-14T10:09:57.1394Z"}'''
>>> 'tzinfo=' in str(Failover.from_node(1, n)) True
>>> Failover.from_node(1, None) is None False
>>> Failover.from_node(1, '{}') is None False
>>> 'abc' in Failover.from_node(1, 'abc:def') True
- static from_node(version: Union[int, str], value: Union[str, Dict[str, str]]) Failover ¶
Factory method to parse value as failover configuration.
- Parameters:
version – version number for the object.
value – JSON serialized data or a dictionary of configuration. Can also be a colon
:
delimited list of leader, followed by candidate name (legacy format). Ifscheduled_at
key is defined the value will be parsed bydateutil.parser.parse()
.
- Returns:
constructed
Failover
information object
- class patroni.dcs.Leader(version: Union[int, str], session: Optional[Union[int, float, str]], member: Member)¶
Bases:
NamedTuple
Immutable object (namedtuple) which represents leader key.
Consists of the following fields:
- Variables:
version – modification version of a leader key in a Configuration Store
session – either session id or just ttl in seconds
member – reference to a
Member
object which represents current leader (seeCluster.members
)
- property checkpoint_after_promote: Optional[bool]¶
Determine whether a checkpoint has occurred for this leader after promotion.
- Returns:
True
if the role ismaster
orprimary
andcheckpoint_after_promote
is not set,False
if not amaster
orprimary
or if the checkpoint hasn’t occurred. If the version of Patroni is older than 1.5.6, returnNone
.- Example:
>>> Leader(1, '', Member.from_node(1, '', '', '{"version":"z"}')).checkpoint_after_promote
- class patroni.dcs.Member(version: Union[int, str], name: str, session: Optional[Union[int, float, str]], data: Dict[str, Any])¶
Bases:
Member
Immutable object (namedtuple) which represents single member of PostgreSQL cluster.
Note
We are using an old-style attribute declaration here because otherwise it is not possible to override
__new__
method in theRemoteMember
class.Note
These two keys in data are always written to the DCS, but care is taken to maintain consistency and resilience from data that is read:
conn_url
: connection string containing host, user and password which could be used to access this member.api_url
: REST API url of patroni instanceConsists of the following fields:
- Variables:
version – modification version of a given member key in a Configuration Store.
name – name of PostgreSQL cluster member.
session – either session id or just ttl in seconds.
data – dictionary containing arbitrary data i.e.
conn_url
,api_url
,xlog_location
,state
,role
,tags
, etc…
- conn_kwargs(auth: Optional[Union[Any, Dict[str, Any]]] = None) Dict[str, Any] ¶
Give keyword arguments used for PostgreSQL connection settings.
- Parameters:
auth – Authentication properties - can be defined as anything supported by the
psycopg2
orpsycopg
modules. Converts a key ofusername
touser
if supplied.- Returns:
A dictionary containing a merge of default parameter keys
host
,port
anddbname
, with the contents ofdata
conn_kwargs
key. If those are not defined will parse and reform connection parameters fromconn_url
. One of these two attributes needs to have data defined to construct the output dictionary. Finally, auth parameters are merged with the dictionary before returned.
- property conn_url: Optional[str]¶
The
conn_url
value fromdata
if defined or constructed fromconn_kwargs
.
- static from_node(version: Union[int, str], name: str, session: Optional[Union[int, float, str]], value: str) Member ¶
Factory method for instantiating
Member
from a JSON serialised string or object.- Parameters:
version – modification version of a given member key in a Configuration Store.
name – name of PostgreSQL cluster member.
session – either session id or just ttl in seconds.
value – JSON encoded string containing arbitrary data i.e.
conn_url
,api_url
,xlog_location
,state
,role
,tags
, etc. OR a connection URL starting withpostgres://
.
- Returns:
an
Member
instance built with the given arguments.- Example:
>>> Member.from_node(-1, '', '', '{"conn_url": "postgres://foo@bar/postgres"}') is not None True
>>> Member.from_node(-1, '', '', '{') Member(version=-1, name='', session='', data={})
- class patroni.dcs.RemoteMember(name: str, data: Dict[str, Any])¶
Bases:
Member
Represents a remote member (typically a primary) for a standby cluster.
- Variables:
ALLOWED_KEYS – Controls access to relevant key names that could be in stored
data
.
- exception patroni.dcs.ReturnFalseException¶
Bases:
Exception
Exception to be caught by the
catch_return_false_exception()
decorator.
- class patroni.dcs.SyncState(version: Optional[Union[int, str]], leader: Optional[str], sync_standby: Optional[str])¶
Bases:
NamedTuple
Immutable object (namedtuple) which represents last observed synchronous replication state.
- Variables:
version – modification version of a synchronization key in a Configuration Store.
leader – reference to member that was leader.
sync_standby – synchronous standby list (comma delimited) which are last synchronized to leader.
- static empty(version: Optional[Union[int, str]] = None) SyncState ¶
Construct an empty
SyncState
instance.- Parameters:
version – optional version number.
- Returns:
empty synchronisation state object.
- static from_node(version: Optional[Union[int, str]], value: Optional[Union[str, Dict[str, Any]]]) SyncState ¶
Factory method to parse value as synchronisation state information.
- Parameters:
version – optional version number for the object.
value – (optionally JSON serialised) sychronisation state information
- Returns:
constructed
SyncState
object.- Example:
>>> SyncState.from_node(1, None).leader is None True
>>> SyncState.from_node(1, '{}').leader is None True
>>> SyncState.from_node(1, '{').leader is None True
>>> SyncState.from_node(1, '[]').leader is None True
>>> SyncState.from_node(1, '{"leader": "leader"}').leader == "leader" True
>>> SyncState.from_node(1, {"leader": "leader"}).leader == "leader" True
- leader_matches(name: Optional[str]) bool ¶
Compare the given name to stored leader value.
- Returns:
True
if name is matching theleader
value.
- matches(name: Optional[str], check_leader: bool = False) bool ¶
Checks if node is presented in the /sync state.
Since PostgreSQL does case-insensitive checks for synchronous_standby_name we do it also.
- Parameters:
name – name of the node.
check_leader – by default the name is searched for only in members, a value of
True
will include the leader to list.
- Returns:
True
if the/sync
key notis_empty()
and the given name is among those presented in the sync state.- Example:
>>> s = SyncState(1, 'foo', 'bar,zoo')
>>> s.matches('foo') False
>>> s.matches('fOo', True) True
>>> s.matches('Bar') True
>>> s.matches('zoO') True
>>> s.matches('baz') False
>>> s.matches(None) False
>>> SyncState.empty(1).matches('foo') False
- property members: List[str]¶
sync_standby
as list or an empty list if undefined or object consideredempty
.
- class patroni.dcs.TimelineHistory(version: Union[int, str], value: Any, lines: List[Union[Tuple[int, int, str], Tuple[int, int, str, str], Tuple[int, int, str, str, str]]])¶
Bases:
NamedTuple
Object representing timeline history file.
Note
The content held in lines deserialized from value are lines parsed from PostgreSQL timeline history files, consisting of the timeline number, the LSN where the timeline split and any other string held in the file. The files are parsed by
parse_history()
.- Variables:
version – version number of the file.
value – raw JSON serialised data consisting of parsed lines from history files.
lines –
List
ofTuple
parsed lines from history files.
- static from_node(version: Union[int, str], value: str) TimelineHistory ¶
Parse the given JSON serialized string as a list of timeline history lines.
- Parameters:
version – version number
value – JSON serialized string, consisting of parsed lines of PostgreSQL timeline history files, see
TimelineHistory
.
- Returns:
composed timeline history object using parsed lines.
- Example:
If the passed value argument is not parsed an empty list of lines is returned:
>>> h = TimelineHistory.from_node(1, 2)
>>> h.lines []
- patroni.dcs.catch_return_false_exception(func: Callable[[...], Any]) Any ¶
Decorator function for catching functions raising
ReturnFalseException
.- Parameters:
func – function to be wrapped.
- Returns:
wrapped function.
- patroni.dcs.dcs_modules() List[str] ¶
Get names of DCS modules, depending on execution environment.
Note
If being packaged with PyInstaller, modules aren’t discoverable dynamically by scanning source directory because
importlib.machinery.FrozenImporter
doesn’t implementiter_modules()
. But it is still possible to find all potential DCS modules by iterating throughtoc
, which contains list of all “frozen” resources.- Returns:
list of known module names with absolute python module path namespace, e.g.
patroni.dcs.etcd
.
- patroni.dcs.find_dcs_class_in_module(module: module) Optional[Type[AbstractDCS]] ¶
Try to find the implementation of
AbstractDCS
interface in module matching the module name.- Parameters:
module – Imported DCS module.
- Returns:
class with a name matching the name of module that implements
AbstractDCS
orNone
if not found.
- patroni.dcs.get_dcs(config: Union[Config, Dict[str, Any]]) AbstractDCS ¶
Attempt to load a Distributed Configuration Store from known available implementations.
Note
Using the list of available DCS modules returned by
iter_dcs_modules()
attempt to dynamically import and instantiate the class that implements a DCS using the abstract classAbstractDCS
.Basic top-level configuration parameters retrieved from config are propagated to the DCS specific config before being passed to the module DCS class.
If no module is found to satisfy configuration then report and log an error. This will cause Patroni to exit.
:raises
PatroniFatalException
: if a load of all available DCS modules have been tried and none succeeded.- Parameters:
config – object or dictionary with Patroni configuration. This is normally a representation of the main Patroni
- Returns:
The first successfully loaded DCS module which is an implementation of
AbstractDCS
.
- patroni.dcs.iter_dcs_classes(config: Optional[Union[Config, Dict[str, Any]]] = None) Iterator[Tuple[str, Type[AbstractDCS]]] ¶
Attempt to import DCS modules that are present in the given configuration.
Note
If a module successfully imports we can assume that all its requirements are installed.
- Parameters:
config – configuration information with possible DCS names as keys. If given, only attempt to import DCS modules defined in the configuration. Else, if
None
, attempt to import any supported DCS module.- Yields:
a tuple containing the module
name
and the imported DCS class object.
- patroni.dcs.parse_connection_string(value: str) Tuple[str, Optional[str]] ¶
Split and rejoin a URL string into a connection URL and an API URL.
Note
Original Governor stores connection strings for each cluster members in a following format:
postgres://{username}:{password}@{connect_address}/postgres
Since each of our patroni instances provides their own REST API endpoint, it’s good to store this information in DCS along with PostgreSQL connection string. In order to not introduce new keys and be compatible with original Governor we decided to extend original connection string in a following way:
postgres://{username}:{password}@{connect_address}/postgres?application_name={api_url}
This way original Governor could use such connection string as it is, because of feature of
libpq
library.- Parameters:
value – The URL string to split.
- Returns:
the connection string stored in DCS split into two parts,
conn_url
andapi_url
.
- patroni.dcs.slot_name_from_member_name(member_name: str) str ¶
Translate member name to valid PostgreSQL slot name.
Note
PostgreSQL’s replication slot names must be valid PostgreSQL names. This function maps the wider space of member names to valid PostgreSQL names. Names have their case lowered, dashes and periods common in hostnames are replaced with underscores, other characters are encoded as their unicode codepoint. Name is truncated to 64 characters. Multiple different member names may map to a single slot name.
- Parameters:
member_name – The string to convert to a slot name.
- Returns:
The string converted using the rules described above.