ll.sisyphus – Writing jobs with Python
sisyphus simplifies running Python stuff as jobs.
This can either be done under the direction of a cron daemon or a similar
process runner, then sisyphus makes sure that there will be no more
than one job of a certain name running at any given time.
Or sisyphus can be used as its own minimal cron daemon and can
execute the job repeatedly.
A job has a maximum allowed runtime. If this maximum is exceeded, the job will kill itself. In addition to that, job execution can be logged and in case of job failure an email can be sent, a message can be posted to a Mattermost chat channel or an event can be emitted to a Sentry server.
To use this module, you must derive your own class from Job,
implement the execute() method and then call the module level
function execute() or executewithargs() with your job object
(preferably in an if __name__ == "__main__" block).
Logs will (by default) be created in the ~/ll.sisyphus directory.
This can be changed by overwriting the appropriate methods in the subclass.
To execute a job, use the module level function execute() (or
executewithargs() when you want to support command line arguments).
Example
The following example illustrates the use of this module:
import os
import urllib.request
from ll import sisyphus
class Fetch(sisyphus.Job):
projectname = "ACME.FooBar"
jobname = "Fetch"
argdescription = "fetch http://www.python.org/ and save it to a local file"
maxtime = 3 * 60
def __init__(self):
self.url = "http://www.python.org/"
self.tmpname = f"Fetch_Tmp_{os.getpid()}.html"
self.officialname = "Python.html"
def execute(self):
self.log(f"fetching data from {self.url!r}")
data = urllib.request.urlopen(self.url).read()
datasize = len(data)
self.log(f"writing file {self.tmpname!r} ({datasize:,} bytes)")
with open(self.tmpname, "wb") as f:
f.write(data)
self.log(f"renaming file {self.tmpname!r} to {self.officialname!r}")
os.rename(self.tmpname, self.officialname)
return f"cached {self.url!r} as {self.officialname!r} ({datasize:,} bytes)"
if __name__ == "__main__":
sisyphus.executewithargs(Fetch())
You will find the log files for this job in
~/ll.sisyphus/ACME.FooBar/Fetch/.
Result status of a job run
The method Job.execute() (which must be overwritten to implement the jobs
main functionality) should return a one-line summary of what the job did
(this is called a “successful run”). It can also return None to report
that the job had nothing to do (this is called an “uneventful run”).
Apart from “uneventful” and “successful” runs, the following results are possible:
- “interrupted”
The job failed with an
KeyboardInterrupt.- “failed”
The job failed with an exception (other than
KeyboardInterrupt).- “timeout”
The job ran longer than that the allowed maximum runtime.
Repeat mode
Normally sisyphus jobs run under the control of a cron daemon or similar process
runner. In this mode the method Job.execute() is executed once and after
that, execution of the Python script ends.
However it is possible to activate repeat mode with the class/instance attribute
repeat (or the command line option --repeat).
If repeat is true, execution of the job will be repeated indefinitely.
By default the next job run starts immediately after the end of the previous
run, but it is possible to delay the next run. For this the class/instance
attribute nextrun (or the command line option --nextrun) can be
used. In its simplest form this is the number of seconds to wait until the next
job run is started. It can also be a datetime.timedelta object that
specifies the delay, or it can be a datetime.datetime object specifying
the next job run. Furthermore nextrun can be callable (so it can be
implemented as a method) and can return any of the types int,
float, datetime.timedelta or datetime.datetime.
And, if Job.nextrun is None, the job run will be repeated
immediately.
Exceptions
When an exception object is passed to self.log the tag exc will be added
to the log call automatically.
Delayed logs
If a log message has the tag delay it is considered a delayed message.
Delayed messages will be buffered up until the first log message that isn’t
delayed is encountered (sisyphuss messages all are delayed).
Then all buffered messages will be output. If only delayed messages are output
during the complete job run, only the result of the job run will be output.
If this output is None nothing will be output. This means that you will get
no log entries until something “interesting” happens.
Log files
By default logging is done to the log file (whose name changes from run to run as it includes the start time of the job).
However logging to stdout and stderr can also be activated.
Logfiles for uneventful runs wil be deleted after the run.
Multiple links will be created that automatically point to the last log file.
The “current” link (by default named current.sisyphuslog) will always
point to the log file of the currently running job. If no job is running,
but the last run was eventful, it will point to the newest log file. If the last
run was uneventful the link will point to a nonexistent log file (whose name can
be used to determine the date of the last run).
The following links will be created at the end of the job run and will only start to point to non-existent files when the log files they point to get cleaned up:
The “last successful” link (by default named
last_successful.sisyphuslog) will always point to the last successful job run,last_failed.sisyphuslogpoints to the last failed run,last_interrupted.sisyphuslogpoints to the last interrupted run andlast_timeout.sisyphuslogpoints to the last run that timed out.
It is possible to send an email when a job fails. For this, the options
--fromemail, --toemail and --smtphost (or the
appropriate class attributes) have to be set. If the job terminates because of
an exception or exceeds its maximum runtime (and the option
--noisykills is set) or any of the calls to log() include
the tag email, an email will be sent. This email includes the last 10
logging calls and the final exception (if there is any) in plain text and HTML
format as well as as a JSON attachment.
Mattermost
It is possible to send log entries to a Mattermost chat channel. For this the
options --mattermost_url, --mattermost_channel and
--mattermost_token (or the appropriate class attributes) must be
specified. All log entries including the tag mattermost, as well as
all exceptions that abort the job will be sent to the Mattermost channel.
Sentry
It is possible to send log entries to a Sentry server. For this the
option --sentry_dsn (or the appropriate class attribute) must be
specified. All log entries including the tag sentry, as well as
all exceptions that abort the job will be sent to the Sentry server.
If the logging call includes any of the tags fatal, error, warning,
info, debug this will be used as the event level. If the log argument
is an exception the event level will be fatal. Otherwise it wil default to
info.
All tags will be converted to Sentry tags like this: A sisyphus tag foo
will be converted into a Sentry tag sisypus.tag.foo with a value of true.
Active tasks will be converted into Sentry breadcrumbs (See the methods
task() and tasks() for more info).
Health checks
When a job is started with the option --healthcheck, instead of
running the job normally a health check is done. This bypasses the normal
mechanism that prevents multiple instances of the job from running (i.e. you can
have a normal job execution and a health check running in parallel).
If the job is healthy this will exit with an exit status of 0, otherwise it will
exit with an exit status of 1 and an error message on stdout stating the
reason why the job is considered unhealthy. There are three possible scenarios
for this:
The job has never been run.
The last run has ended with an error.
The last run was too long ago.
To configure how scenario 3 is handled the class/instance attribute
maxhealthcheckage (or the command line option
--maxhealthcheckage) can be used. In its simplest form this is a
number of seconds or a datetime.timedelta object. A job run that is
older that this value triggers scenario 3. maxhealthcheckage can be also be
a datetime.datetime object specifying the cut-off date.
Furthermore maxhealthcheckage can be callable (so it can be implemented
as a method) and can return any of the types int, float,
datetime.timedelta or datetime.datetime.
And if Job.maxhealthcheckage is None, scenario 3 will never trigger.
Requirements
To reliably stop the job after the allowed maximum runtime, sisyphus
forks the process and kills the child process after the maximum runtime is
expired (via os.fork() and signal.signal()). This won’t work on
Windows. So on Windows the job will always run to completion without being
killed after the maximum runtime.
To make sure that only one job instance runs concurrently, sisyphus
uses fcntl to create an exclusive lock on the file of the running script.
This won’t work on Windows either. So on Windows you might have multiple
running instances of the job.
sisyphus uses the module setproctitle to change the process
title during various phases of running the job. If setproctitle is not
available the process title will not be changed.
If the module psutil is available it will be used to kill the child
process and any of its own child processes after the maximum runtime of the job
is exceeded. If psutil isn’t available just the child process will be
killed (which is no problem as long as the child process doesn’t spawn any
other processes).
If logging to Mattermost is used, requests has to be installed.
If logging to Sentry is used, sentry_sdk has to be installed.
For compressing the log files one of the modules gzip, bz2 or
lzma is required (which might not be part of your Python installation).
Module documentation
- class ll.sisyphus.Status[source]
Bases:
enum.IntEnumThe result status of a job run.
Possible values are:
UNEVENTFUL,SUCCESSFUL,FAILED,INTERRUPTED,TIMEOUT.
- class ll.sisyphus.Process[source]
Bases:
enum.EnumThe type of a running
sisyphusprocess.Possible values are:
SOLO(when in non-forking mode),PARENT(the parent process in forking mode),CHILD(the child process in forking mode).
- class ll.sisyphus.Job[source]
Bases:
objectA Job object executes a task (either once or repeatedly).
To use this class, derive your own class from it and overwrite the
execute()method.The job can be configured in three ways: By class attributes in the
Jobsubclass, by attributes of theJobinstance (e.g. set in__init__()) and by command line arguments (ifexecutewithargs()is used). The following command line arguments are supported (the name of the attribute is the same as the long command line argument name):- -p <projectname>, --projectname <projectname>
The name of the project this job belongs to. This might be a dot-separated hierarchical project name (e.g. including customer names or similar stuff).
- -j <jobname>, --jobname <jobname>
The name of the job itself (defaulting to the name of the class if none is given).
- --fromemail <emailadress>
The sender email address for the failure report email.
This email will only be sent if the options
--fromemail,--toemailand--smtphostare set (and any error or output to the email log occured).
- --smtpuser <username>
The user name used to log into the SMTP server. (Login will only be done if both
--smtpuserand--smtppasswordare given)
- --mattermost_url <url>
The URL where log entries can be posted to a Mattermost chat. For example:
https://mattermost.example.org/api/v4/posts
A log entry will only be posted to the Mattermost chat channel if the options
--mattermost_url,--mattermost_channeland--mattermost_tokenare set (and the log entry has the tagmattermost).Note that using this feature requires
requests.
- --mattermost_channel <id>
The channel id of the Mattermost chat channel where log entries should be posted. For example:
4cnszmopr3ntjexi4qmx499inc
- --mattermost_token <auth>
The “Personal Access Token” used for authorizing the post with the Mattermost server. For example:
9xuqwrwgstrb3mzrxb83nb357a
- --sentry_dsn <dsn>
Sentry DSN for logging to a Sentry server. Something like:
https://examplePublicKey@o0.ingest.sentry.io/0
- --sentry_debug <flag>
Activates/deactivates Sentry debug mode.
(Allowed
<flag>values arefalse,no,0,true,yesor1)
- -m <seconds>, --maxtime <seconds>
Maximum allowed runtime for the job (as the number of seconds). If the job runs longer than that it will kill itself.
(The instance attribute will always be converted to the type
datetime.timedelta)
- --fork <flag>
Forks the process and does the work in the child process. The parent process is responsible for monitoring the maximum runtime (this is the default). In non-forking mode the single process does both the work and the runtime monitoring.
(Allowed
<flag>values arefalse,no,0,true,yesor1)
- --noisykills <flag>
Should a message be printed/a failure email be sent when the maximum runtime is exceeded?
(Allowed
<flag>values arefalse,no,0,true,yesor1)
- --exit_on_error <flag>
End job execution even in repeat mode when an exception is thrown?
(Allowed
<flag>values arefalse,no,0,true,yesor1)
- -n <flag>, --notify <flag>
Should a notification be issued to the OS X Notification center? (done via terminal-notifier).
(Allowed
<flag>values arefalse,no,0,true,yesor1)
- -r <flag>, --repeat <flag>
Should job execution be repeated indefinitely?
(This means that the job basically functions as its own cron daemon).
(Allowed
<flag>values arefalse,no,0,true,yesor1)
- --nextrun <seconds>
How many seconds should we wait after a job run before the next run gets started (only when
--repeatis set)?The class/instance attribute can also be a callable (i.e. it’s possible to implement this as a method). Also
datetime.datetimeis supported and specifies the start date for the next job run.
- --healthcheck <flag>
Instead of normally executing the job, run a health check instead.
(Allowed
<flag>values arefalse,no,0,true,yesor1)
- --maxhealthcheckage <seconds>
If the last uneventful or successful job run is older then this number of seconds, consider the job to be unhealthy.
- -f <flag>, --log2file <flag>
Should a logfile be written at all?
(Allowed
<flag>values arefalse,no,0,true,yesor1)
- --formatlogline <format>
An UL4 template for formatting each line in the logfile. Available variables are
time(current time),starttime(start time of the job),tags(list of tags for the line) andline(the log line itself).
- --keepfilelogs <days>
The number of days the logfiles are kept. Old logfiles (i.e. all files in the same directory as the current logfile that are more than
keepfilelogsdays old) will be removed at the end of the job.(The instance attribute will always be converted to the type
datetime.timedelta)
- --compressfilelogs <days>
The number of days after which log files are compressed (if they aren’t deleted via
--keepfilelogs).(The instance attribute will always be converted to the type
datetime.timedelta)
- --compressmode <mode>
How to compress the logfiles. Possible values are:
"gzip","bzip2"and"lzma". The default is"bzip2".
- --errors <errorhandlingname>
Encoding error handler name (goes with
--encoding). The default is"strict".
- --maxemailerrors <integer>
This options limits the number of exceptions and errors messages that will get attached to the failure email. The default is 10.
- --proctitle <flag>
When this options is specified, the process title will be modified during execution of the job, so that the ps command shows what the processes are doing. The default is
True. (This requiressetproctitle.)(Allowed
<flag>values arefalse,no,0,true,yesor1)
Command line arguments take precedence over instance attributes (if
executewithargs()is used) and those take precedence over class attributes.Furthermore the following class attribute can be set to customize the help message:
argdescriptionDescription for the help message of the command line argument parser.
- basedir() pathlib.Path[source]
Return the base directory where all log files will be kept.
The path must be absolute.
- logfilename() Optional[pathlib.Path][source]
Return the filename of the logfile for this job.
The value must by an absolute
pathlib.Pathobject (orNoneto disable creating the logfile).
- currentloglinkname() Optional[pathlib.Path][source]
Return the filename of the link to the currently active logfile.
The value must by an absolute
pathlib.Pathobject (orNoneto disable creating the link).
- lastsuccessfulloglinkname() Optional[pathlib.Path][source]
Return the filename of the link that points to the logfile of the last successful run of the job.
The value must by an absolute
pathlib.Pathobject (orNoneto disable creating the link).
- lastfailedloglinkname() Optional[pathlib.Path][source]
Return the filename of the link that points to the logfile of the last failed run of the job.
The value must by an absolute
pathlib.Pathobject (orNoneto disable creating the link).
- lastinterruptedloglinkname() Optional[pathlib.Path][source]
Return the filename of the link that points to the logfile of the last interrupted run of the job.
The value must by an absolute
pathlib.Pathobject (orNoneto disable creating the link).
- lasttimeoutloglinkname() Optional[pathlib.Path][source]
Return the filename of the link that points to the logfile of the last run of the job with a timeout.
The value must by an absolute
pathlib.Pathobject (orNoneto disable creating the link).
- healthfilename() pathlib.Path[source]
Return the filename where the health of the last job run is stored.
The value must by an absolute
pathlib.Pathobject and may not beNone.
- emailfilename(process: Optional[ll.sisyphus.Process] = None) pathlib.Path[source]
Return the filename where the parent and child process can log message that should be part of the email report.
The value must by an absolute
pathlib.Pathobject and may not beNone.
- execute() Optional[str][source]
Execute the job once.
Overwrite in subclasses to implement your job functionality.
The return value is a one line summary of what the job did.
When this method returns
Noneinstead this tells the job machinery that the run of the job was uneventful and that the logfile can be deleted.
- healthcheck() Optional[str][source]
Called in parallel to a running job to check whether the job is healthy.
Returns
Noneif everything is ok, or an error message otherwise.
- argparser() argparse.ArgumentParser[source]
Return an
argparseparser for parsing the command line arguments. This can be overwritten in subclasses to add more arguments.
- parseargs(args: Optional[List[str]]) argparse.Namespace[source]
Use the parser returned by
argparser()to parse the argument sequenceargs, modifyselfaccordingly and return the result of the parsersparse_args()call.
- task(type: Optional[str] = None, name: Optional[str] = None, index: Optional[int] = None, count: Optional[int] = None, **data) ll.sisyphus.Task[source]
task()is a context manager and can be used to specify subtasks.Arguments have the following meaning:
typestrorNoneThe type of the task.
namestrorNoneThe name of the task.
indexintorNoneIf this task is one in a sequence of similar tasks,
indexshould be the index of this task, i.e. the first task of this type hasindex==0, the second oneindex==1etc.countintorNoneIf this task is one in a sequence of similar tasks and the total number of tasks is known,
countshould be the total number of tasks.**dataAdditional information about the task. This will be added to the Sentry breadcrumbs when logging to Sentry. Otherwise this is ignored.
- tasks(iterable: Iterable[ll.sisyphus.T], type: Union[str, None, Callable[[...], Optional[str]]] = None, name: Union[str, None, Callable[[...], Optional[str]]] = None, data: Union[dict, None, Callable[[...], Optional[dict]]] = None) Generator[ll.sisyphus.T, None, None][source]
tasks()iterates throughiterableand callstask()for each item.indexandcountwill be passed totask()automatically.type,nameanddatawill be used for the type, name and additional data of the task. They can either be constants (in which case they will be passed as is) or callables (in which case they will be called with the item to get the type/name/data).Example:
import sys, operator items = list(sys.modules.items()) for (name, module) in self.tasks(items, "module", operator.itemgetter(0)): self.log(f"module is {module}")
The log output will look something like the following:
[2019-05-06 18:52:31.366810]=[t+0:00:00.263849] :: parent 19448 :: {sisyphus}{init} >> /Users/walter/x/gurk.py (max time 0:01:40) [2019-05-06 18:52:31.367831]=[t+0:00:00.264870] :: parent 19448 :: {sisyphus}{init} >> logging to <stdout>, /Users/walter/ll.sisyphus/Test/Job/2019-05-06-18-52-31-102961.sisyphuslog [2019-05-06 18:52:31.371690]=[t+0:00:00.268729] :: [1] child 19451 :: {sisyphus}{init} >> forked worker child [2019-05-06 18:52:31.376598]=[t+0:00:00.273637] :: [1] child 19451 :: [1/226] module sys >> module is <module 'sys' (built-in)> [2019-05-06 18:52:31.378561]=[t+0:00:00.275600] :: [1] child 19451 :: [2/226] module builtins >> module is <module 'builtins' (built-in)> [2019-05-06 18:52:31.380381]=[t+0:00:00.277420] :: [1] child 19451 :: [3/226] module _frozen_importlib >> module is <module 'importlib._bootstrap' (frozen)> [2019-05-06 18:52:31.382248]=[t+0:00:00.279287] :: [1] child 19451 :: [4/226] module _imp >> module is <module '_imp' (built-in)> [2019-05-06 18:52:31.384064]=[t+0:00:00.281103] :: [1] child 19451 :: [5/226] module _thread >> module is <module '_thread' (built-in)> [2019-05-06 18:52:31.386047]=[t+0:00:00.283086] :: [1] child 19451 :: [6/226] module _warnings >> module is <module '_warnings' (built-in)> [2019-05-06 18:52:31.388009]=[t+0:00:00.285048] :: [1] child 19451 :: [7/226] module _weakref >> module is <module '_weakref' (built-in)> [...] [2019-05-06 18:52:31.847315]=[t+0:00:00.744354] :: [1] child 19451 :: {sisyphus}{result}{ok} >> done
- class ll.sisyphus.Task[source]
Bases:
objectA subtask of a
Job.- __init__(job: ll.sisyphus.Job, type: Optional[str] = None, name: Optional[str] = None, index: Optional[int] = None, count: Optional[int] = None, **data)[source]
Create a
Taskobject. For the meaning of the parameters seeJob.task().
- class ll.sisyphus.Tag[source]
Bases:
objectA
Tagobject can be used to call a function with an additional list of tags. Tags can be added via__getattr__()or__getitem__()calls.
- class ll.sisyphus.Logger[source]
Bases:
objectA
Loggeris called by theJobfor each logging event.- log(timestamp: datetime.datetime, tags: Tuple[str, ...], tasks: List[ll.sisyphus.Task], text: str) None[source]
Called by the
Jobwhen a log entry has to be made.Arguments have the following meaning:
timestampdatetime.datetimeThe moment when the logging call was made.
tagsList of stringsThe tags that were part of the logging call. For example for the logging call:
self.log.xml.warning("Skipping foobar")
the list of tags is:
["xml", "warning"]
tasksList ofTaskobjectsThe currently active stack of
Taskobjects.textAny objectThe log text. This can be any object. If it’s not a string it will be converted to a string via
pprint.pformat()(ortraceback.format_exception()if it’s an exception)
- taskstart(tasks: List[ll.sisyphus.Task]) None[source]
Called by the
Jobwhen a new subtask has been started.tasksis the stack of currently active tasks (sotasks[-1]is the task that has been started).
- taskend(tasks: List[ll.sisyphus.Task]) None[source]
Called by the
Jobwhen a subtask is about to end.tasksis the stack of currently active tasks (sotasks[-1]is the task that’s about to end).
- close(status: ll.sisyphus.Status) bool[source]
Called by the
Jobwhen job execution has finished.status(aStatus) is the result status of the job run.Return whether the logfile has been closed. (All normal loggers will close except
stdoutandstderrloggers).
- class ll.sisyphus.StreamLogger[source]
Bases:
ll.sisyphus.LoggerLogger that writes logging events into an open file-like object. Is is used for logging to
stdoutandstderr.
- class ll.sisyphus.FileLogger[source]
Bases:
ll.sisyphus.StreamLoggerLogger that writes logging events into a file specified via an
URLobject. This is used for logging to the standard log file.
- class ll.sisyphus.LinkLogger[source]
Bases:
ll.sisyphus.LoggerBaseclass of all loggers that handle links to the log file.
- class ll.sisyphus.CurrentLinkLogger[source]
Bases:
ll.sisyphus.LinkLoggerLogger that handles the link to the current log file.
- class ll.sisyphus.LastStatusLinkLogger[source]
Bases:
ll.sisyphus.LinkLoggerLogger that handles the link to the log file for a specific job status.
- class ll.sisyphus.EmailLogger[source]
Bases:
ll.sisyphus.LoggerLogger that handles sending an email report of the job run.
- class ll.sisyphus.MattermostLogger[source]
Bases:
ll.sisyphus.LoggerLogger that logs messages to a Mattermost chat channel.
- class ll.sisyphus.SentryLogger[source]
Bases:
ll.sisyphus.LoggerLogger that logs messages and exceptions to Sentry.
- ll.sisyphus.execute(job: ll.sisyphus.Job) None[source]
Execute the job
jobonce or repeatedly.
- ll.sisyphus.executewithargs(job: ll.sisyphus.Job, args: Optional[List[str]] = None) None[source]
Execute the job
jobonce or repeatedly with command line arguments.argsare the command line arguments (Noneresults insys.argvbeing used).