Skip to content

Commit

Permalink
Merge pull request #1 from Javinator9889/master
Browse files Browse the repository at this point in the history
v0.1.12
  • Loading branch information
Javinator9889 committed Jun 16, 2022
2 parents 8877879 + dc37279 commit 61716e4
Show file tree
Hide file tree
Showing 10 changed files with 371 additions and 112 deletions.
21 changes: 21 additions & 0 deletions LICENSE
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2021 Javier Alonso <jalonso@teldat.com>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
49 changes: 47 additions & 2 deletions debian/changelog
@@ -1,8 +1,53 @@
python3-orcha (0.1.7) UNRELEASED; urgency=medium
python3-orcha (0.1.12) UNRELEASED; urgency=medium

* Key parameter is not mandatory anymore - some plugins may not require it for working.
* Improve authentication error messages by giving some extra information.
* `connect` (from "lib/manager") now returns a boolean indicating whether the connection
was successful or not.

-- Javier Alonso <jalonso@teldat.com> Thu, 16 Jun 2022 09:00:00 +0200

python3-orcha (0.1.11) unstable; urgency=medium

* Fix internal digest key for Orcha clients that was causing an exception
on Python versions lower or equal than 3.7.

-- Javier Alonso <jalonso@teldat.com> Wed, 15 Jun 2022 15:50:00 +0200

python3-orcha (0.1.10) unstable; urgency=medium

* Improve watchdog mechanism by leveraging the handling to the internal
processor, the same that handles all the petitions (and which could fail
due to a deadlock or being blocked or any queue mechanism not working as
expected).

-- Javier Alonso <jalonso@teldat.com> Thu, 19 May 2022 09:25:00 +0200

python3-orcha (0.1.9) unstable; urgency=medium

* Processor class is now more resilient and handles unexpected exceptions by
notifying SystemD about our status more frequently. If an unexpected error
happens (and we are a SystemD service), the watchdog will be triggered and
we expect to be restarted.
* Petition objects now implement both equality and comparison operators. We
relied on dataclasses for such purpose but we've seen some exceptions that
are happening when an EmptyPetition (for notifying that we have finished) is
enqueued. Looks like dataclass' __eq__ and __lt__ operators require both
objects to be exactly the same class, and it does not support subclasses.

-- Javier Alonso <jalonso@teldat.com> Mon, 17 May 2022 09:00:00 +0200

python3-orcha (0.1.8) unstable; urgency=medium

* Protect manager "on_start" and "on_finish" calls with a mutex region

-- Javier Alonso <jalonso@teldat.com> Mon, 18 Apr 2022 09:55:00 +0200

python3-orcha (0.1.7) unstable; urgency=medium

* Add `look_ahead' parameter to the orchestrator processor

-- Jenkins <jenkins@teldat.com> Mon, 21 Feb 2022 12:00:00 +0000
-- Javier Alonso <jalonso@teldat.com> Mon, 21 Feb 2022 12:00:00 +0100

python3-orcha (0.1.6) unstable; urgency=medium

Expand Down
2 changes: 1 addition & 1 deletion debian/copyright
@@ -1,7 +1,7 @@
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: orcha
Upstream-Contact: Javier Alonso <jalonso@teldat.com>
Source: http://repos.id.teldat.com/git/osdx/cgit.cgi/tools/orcha/
Source: https://github.com/Javinator9889/orcha
Files: *
Copyright: 2021 Teldat Inc.
License: MIT
Expand Down
16 changes: 13 additions & 3 deletions orcha/bin/main.py
Expand Up @@ -26,6 +26,7 @@
see: :class:`BasePlugin`.
"""
import argparse
import multiprocessing
import sys

import orcha.properties
Expand All @@ -49,9 +50,7 @@ def main():
--key KEY defines the authentication key used during communication.
This field is not mandatory but **it is recommended to define it**
as it will be necessary for other processes to communicate with the
service itself. By default, it is :attr:`None` which
means that it will be auto-generated by the main process when
running.
service itself.
The application automatically detects the plugins that are installed in the system. It
is important that the installed plugins follows the name convention in order to be
Expand All @@ -65,6 +64,15 @@ def main():
+ ``1`` refers to a standard error happened during execution.
+ ``127`` indicates that no plugins were found or no plugins
can handle the parsed command line options.
.. versionchanged:: 0.1.11
+ ``key`` parameter is now required, the internally generated one won't be used anymore.
+ Orcha clients in Python <= 3.7 now have their internal digest fixed, not throwing an
exception anymore.
.. versionchanged:: 0.1.12
+ ``key`` parameter is not mandatory (again) - some plugins may not require it for
their basic functionality.
"""
parser = argparse.ArgumentParser(
description="Orcha command line utility for handling services",
Expand Down Expand Up @@ -107,6 +115,8 @@ def main():
orcha.properties.port = args.port
if args.key is not None:
orcha.properties.authkey = args.key.encode()
log.debug("fixing internal digest key")
multiprocessing.current_process().authkey = args.key.encode()

for arg, value in vars(args).items():
orcha.properties.extras[arg] = value
Expand Down
10 changes: 9 additions & 1 deletion orcha/interfaces/__init__.py
Expand Up @@ -21,7 +21,14 @@
# SOFTWARE.
""":mod:`interfaces` defines the basic structure needed when communicating with Orcha"""
from .message import Message
from .petition import ActionCallbackT, EmptyPetition, P, Petition, ProcT
from .petition import (
ActionCallbackT,
EmptyPetition,
P,
Petition,
ProcT,
WatchdogPetition,
)
from .service import ServiceWrapper, register_service, start_service

__all__ = [
Expand All @@ -34,4 +41,5 @@
"register_service",
"ServiceWrapper",
"start_service",
"WatchdogPetition",
]
63 changes: 57 additions & 6 deletions orcha/interfaces/petition.py
Expand Up @@ -27,6 +27,7 @@
import subprocess
from abc import ABC
from dataclasses import dataclass, field
from functools import total_ordering
from queue import Queue
from typing import Any, Callable, NoReturn, Optional, Type, TypeVar, Union

Expand All @@ -51,7 +52,8 @@
"""


@dataclass(order=True)
@total_ordering
@dataclass
class Petition(ABC):
"""Class that represents a petition that should be executed on the server.
This class must have the ability to being created from an existing
Expand All @@ -71,11 +73,11 @@ class Petition(ABC):
This class is intended to be a stub so your implementation must inherit
from this one.
Warning:
Subclasses must declare fields with ``compare=False``
as without that the algorithm may break and items are not placed in the order
you expect.
.. versionchanged:: 0.1.9
We define our own equality and comparison operators, there is no need
in subclasses declaring fields as ``compare=False``. It is only checked
the ID (for equality/inequality tests) and the priority (for comparison
tests).
:see: :py:func:`field <dataclasses.field>`
"""
Expand Down Expand Up @@ -177,6 +179,30 @@ def finish(self, ret: Optional[int] = None):
"""
self.queue.put(ret)

def __eq__(self, __o: object) -> bool:
# ensure we are comparing against our class or a subclass of ours
if not isinstance(__o, Petition):
raise NotImplementedError()

sid = self.id
oid = __o.id

# if IDs are not strings, convert them so we can truly compare
if not isinstance(sid, str):
sid = str(sid)

if not isinstance(oid, str):
oid = str(oid)

return sid == oid

def __lt__(self, __o: object) -> bool:
if not isinstance(__o, Petition):
raise NotImplementedError()

# we check equality for the priorities
return self.priority < __o.priority


@dataclass(init=False)
class EmptyPetition(Petition):
Expand All @@ -201,10 +227,35 @@ def __init__(self):
pass


@dataclass(init=False)
class WatchdogPetition(Petition):
"""
Watchdog petition has always the greatest priority so it should be run the first
whenever it is received. It is used for indicating whether the processor should
watchdog the SystemD main process so we inform we are still running.
Warning:
The priority of this petition is always the higher (using ``float("-inf")``),
be careful whenever you place a custom petition with higher priority: do not
use ``float("-inf")`` as an expression, try keeping your priorities above ``0``
an go as high as you want.
"""

priority = float("-inf")
id = -1
queue = None
action = None
condition = None

def __init__(self):
pass


__all__ = [
"ActionCallbackT",
"EmptyPetition",
"P",
"Petition",
"ProcT",
"WatchdogPetition",
]
60 changes: 54 additions & 6 deletions orcha/lib/manager.py
Expand Up @@ -117,9 +117,18 @@ def convert_to_petition(self, *args):
will be prone to memory leaks.
.. versionadded:: 0.1.7
Processor now supports an attribute :attr:`look_ahead` which allows defining an
amount of items that will be pop-ed from the queue, modifying the default behavior
of just obtaining a single item.
Processor now supports an attribute :attr:`look_ahead <orcha.lib.Processor.look_ahead>`
which allows defining an amount of items that will be pop-ed from the queue,
modifying the default behavior of just obtaining a single item. Setting the
:class:`Manager`'s ``look_ahead`` will set :class:`Processor`'s ``look_ahead`` too.
.. versionadded:: 0.1.9
Processor supports a new attribute
:attr:`notify_watchdog <orcha.lib.Processor.notify_watchdog>`
that defines if the processor shall create a background thread that takes care of
notifying systemd about our status and, if dead, to restart us.
Setting the :class:`Manager`'s ``notify_watchdog`` will set
:class:`Processor`'s ``notify_watchdog`` too.
Args:
listen_address (str, optional): address used when declaring a
Expand Down Expand Up @@ -151,6 +160,10 @@ def convert_to_petition(self, *args):
one is (i.e.: if you define priorities based on time, allow the second item to be
executed before the first one). Take special care with this parameter as this may
cause starvation in processes.
notify_watchdog (:obj:`bool`, optional): if the service is running under systemd,
notify periodically (every 5 seconds) that we are alive and doing things. If there
is any kind of unexpected error, a watchdog trigger will be set and the service
will be restarted.
"""

def __init__(
Expand All @@ -163,6 +176,7 @@ def __init__(
finish_queue: Queue = None,
is_client: bool = False,
look_ahead: int = 1,
notify_watchdog: bool = False,
):
self.manager = SyncManager(address=(listen_address, port), authkey=auth_key)
"""
Expand All @@ -181,7 +195,7 @@ def __init__(
log.debug("creating processor for %s", self)
queue = queue or _queue
finish_queue = finish_queue or _finish_queue
self.processor = Processor(queue, finish_queue, self, look_ahead)
self.processor = Processor(queue, finish_queue, self, look_ahead, notify_watchdog)

log.debug("manager created - running setup...")
try:
Expand Down Expand Up @@ -221,14 +235,34 @@ def processor(self, processor: Processor):

self._processor = processor

def connect(self):
def connect(self) -> bool:
"""
Connects to an existing :class:`Manager` when acting as a client. This
method can be used also when the manager is a server, if you want that
server to behave like a client.
Returns:
:obj:`bool`: :obj:`True` if connection was successful, :obj:`False` otherwise.
.. versionadded:: 0.1.12
This method catches the
:obj:`AuthenticationError <multiprocessing.AuthenticationError>`
exception and produces an informative message indicating that, maybe,
authentication key is missing. In addition, this method returns a :obj:`bool`
indicating whether if connection was successful or not.
"""
log.debug("connecting to manager")
self.manager.connect() # pylint: disable=no-member
try:
self.manager.connect() # pylint: disable=no-member
return True
except multiprocessing.AuthenticationError as e:
log.fatal(
'Authentication against server [%s:%d] failed! Maybe "--key" is missing?',
self.manager.address[0], # pylint: disable=no-member
self.manager.address[1], # pylint: disable=no-member
)
log.fatal(e)
return False

def start(self):
"""
Expand Down Expand Up @@ -345,6 +379,7 @@ def temp(*args, **kwds):

setattr(self, name, temp)

# pylint: disable=no-self-use ; method is a stub, overwritten by "setup()"
def send(self, message: Message):
"""Sends a :class:`Message <orcha.interface.Message>` to the server manager.
This method is a stub until :func:`setup` is called (as that function overrides it).
Expand All @@ -360,7 +395,9 @@ def send(self, message: Message):
ManagerShutdownError: if the manager has been shutdown and a new message
has been tried to enqueue.
"""
...

# pylint: disable=no-self-use ; method is a stub, overwritten by "setup()"
def finish(self, message: Union[Message, int, str]):
"""Requests the ending of a running :class:`message <orcha.interfaces.Message>`.
This method is a stub until :func:`setup` is called (as that function overrides it).
Expand All @@ -380,6 +417,7 @@ def finish(self, message: Union[Message, int, str]):
ManagerShutdownError: if the manager has been shutdown and a new finish request
has been tried to enqueue.
"""
...

def _add_message(self, m: Message):
if not self._shutdown.value:
Expand Down Expand Up @@ -508,6 +546,11 @@ class ServerManager(Manager):
def on_start(self, *args):
super().on_start(*args)
In addition, this method is run by the :class:`Processor <orcha.lib.Processor>` in
a mutex environment, so it is **required** that no unhandled exception happens here
and that the operations done are minimal, as other processes will have to wait until
this call is done.
Args:
petition (Petition): the petition that has just started
"""
Expand Down Expand Up @@ -554,6 +597,11 @@ def on_finish(self, *args) -> bool:
+ The :class:`petition <orcha.interfaces.Petition>` was not registered (it is not
a running petition).
In addition, this method is run by the :class:`Processor <orcha.lib.Processor>` in
a mutex environment, so it is **required** that no unhandled exception happens here
and that the operations done are minimal, as other processes will have to wait until
this call is done.
Args:
petition (Petition): the petition that has just started
Expand Down

0 comments on commit 61716e4

Please sign in to comment.