ll.sisyphus simplifies running Python stuff as cron jobs.
There will be no more than one sisyphus job of a certain name running at any given time. A job has a maximum allowed runtime. If this maximum is exceeded, the job will kill itself. In addition to that, job execution can be logged and in case of job failure an email can be sent.
To use this module, you must derive your own class from Job and
implement the execute method.
Logs will (by default) be created in the ~/ll.sisyphus directory.
This can be changed by deriving a new subclass and overwriting the appropriate
class attribute.
To execute a job, use the module level function execute (or
executewithargs when you want to support command line arguments).
Example
The following example illustrates the use of this module:
#!/usr/bin/env python
import os
import urllib.request
from ll import sisyphus
class Fetch(sisyphus.Job):
projectname = "ACME.FooBar"
jobname = "Fetch"
argdescription = "fetch http://www.python.org/ and save it to a local file"
maxtime = 3 * 60
def __init__(self):
self.url = "http://www.python.org/"
self.tmpname = "Fetch_Tmp_{}.html".format(os.getpid())
self.officialname = "Python.html"
def execute(self):
self.log("fetching data from {!r}".format(self.url))
data = urllib.request.urlopen(self.url).read()
datasize = len(data)
self.log("writing file {!r} ({:,} bytes)".format(self.tmpname, datasize))
open(self.tmpname, "wb").write(data)
self.log("renaming file {!r} to {!r}".format(self.tmpname, self.officialname))
os.rename(self.tmpname, self.officialname)
return "cached {!r} as {!r} ({:,} bytes)".format(self.url, self.officialname, datasize)
if __name__=="__main__":
sisyphus.executewithargs(Fetch())You will find the log files for this job in ~/ll.sisyphus/ACME.FooBar/Fetch/.
Logging and tags
Logging itself is done by calling self.log:
self.log("can't parse XML file {}".format(filename))This logs the argument without tagging the line.
It is possible to add tags to the logging call. This is done by accessing
attributes of the log pseudo method. I.e. to add the tags xml and
warning to a log call you can do the following:
self.log.xml.warning("can't parse XML file {}".format(filename))It's also possible to do this via __getitem__ calls, i.e. the above can be
written like this:
self.log['xml']['warning']("can't parse XML file {}".format(filename))ll.sisyphus itself uses the following tags:
sisyphusThis tag will be added to all log lines produced by
ll.sisyphusitself.initThis tag is used for the log lines output at the start of the job.
reportThis tag will be added for all log messages related to sending the failure report email.
resultThis tag is used for the final line written to the log files that shows a summary of what the job did (or why it failed).
failThis tag is used in the result line if the job failed with an exception.
errorsThis tag is used in the result line if the job ran to completion, but some exceptions where logged.
okThis tag is used in the result line if the job ran to completion without any exceptions.
killThis tag is used in the result line if the job was killed because it exceeded the maximum allowed runtime.
Exceptions
When an exception object is passed to self.log the tag exc will be added
to the log call automatically.
It is possible to send an email when a job fails. For this the options
--fromemail, --toemail and --smtphost have to be
set. If the job terminates because of an exception, or exceeds its maximum
runtime (and the option --noisykills is set) or any of the calls to
self.log include the tag email, the email will be sent. This email
includes all logging calls and the final exception (if there is any) in plain
text and HTML format as well as as a JSON attachment.
def _formattraceback(exc):
def _formatlines(obj):
class Job(object):
A Job object executes a task once.
To use this class, derive your own class from it and overwrite the
execute method.
The job can be configured in three ways: By class attributes in the
Job subclass, by attributes of the Job instance (e.g. set
in __init__) and by command line arguments (if executewithargs
is used). The following attributes/arguments are supported:
projectname(-por--projectname)The name of the project this job belongs to. This might be a dot-separated hierarchical project name (e.g. including customer names or similar stuff).
jobname(-jor--jobname)The name of the job itself (defaulting to the name of the class if none is given).
identifier(--identifier)An additional identifier that will be added to the failure report email.
argdescription(No command line equivalent)Description for help message of the command line argument parser.
fromemail(--fromemail)The sender email address for the failure report email.
This email will only be sent if the options
--fromemail,--toemailand--smtphostare set (and any error or output to the email log occured).toemail(--toemail)An email address where an email will be sent in case of a failure.
smtphost(--smtphost)The SMTP server to be used for sending the failure report email.
smtpport(--smtpport)The port number used for the connection to the SMTP server.
smtpuser(--smtpuser)The user name used to log into the SMTP server. (Login will only be done if both
--smtpuserand--smtppasswordare given)smtppassword(--smtppassword)The password used to log into the SMTP server.
maxtime(-mor--maxtime)Maximum allowed runtime for the job (as the number of seconds). If the job runs longer than that it will kill itself.
fork(--fork)Forks the process and does the work in the child process. The parent process is responsible for monitoring the maximum runtime (this is the default). In non-forking mode the single process does both the work and the runtime monitoring.
noisykills(--noisykills)Should a message be printed/a failure email be sent when the maximum runtime is exceeded?
notify(-nor--notify)Should a notification be issued to the OS X Notification center? (done via terminal-notifier).
logfilename(--logfilename)Path/name of the logfile for this job as an UL4 template. Variables available in the template include
user_name,projectname,jobnameandstarttime.loglinkname(--loglinkname)The filename of a link that points to the currently active logfile (as an UL4 template). If this is
Noneno link will be created.log2file(-for--log2file)Should a logfile be written at all?
formatlogline(--formatlogline)An UL4 template for formatting each line in the logfile. Available variables are
time(current time),starttime(start time of the job),tags(list of tags for the line) andline(the log line itself).keepfilelogs(--keepfilelogs)The number of days the logfiles are kept. Old logfiles (i.e. all files in the same directory as the current logfile that are more than
keepfilelogsdays old) will be removed at the end of the job.compressfilelogs(--compressfilelogs)The number of days after which log files are compressed (if they aren't deleted via
keepfilelogs).compressmode(--compressmode)How to compress the logfiles. Possible values are:
"gzip","bzip2"and"lzma". The default is"bzip2".encoding(--encoding)The encoding to be used for the logfile. The default is
"utf-8".errors(--errors)Encoding error handler name (goes with
encoding). The default is"strict".maxemailerrors(--maxemailerrors)This options limits the number of exceptions and errors messages that will get attached to the failure email. The default is 10.
proctitle(--proctitle)When this options is specified, the process title will be modified during execution of the job, so that the
pscommand shows what the processes are doing. (This requiressetproctitle.)
Command line arguments take precedence over instance attributes (if
executewithargs is used) and those take precedence over class
attributes.
def execute(self):
Execute the job once. The return value is a one line summary of what the job did. Overwrite in subclasses.
def failed(self):
Called when running the job generated an exception. Overwrite in subclasses, to e.g. rollback your database transactions.
def argparser(self):
Return an argparse parser for parsing the command line arguments.
This can be overwritten in subclasses to add more arguments.
def parseargs(self, args=None):
Use the parser returned by argparser to parse the argument
sequence args, modify self accordingly and return
the result of the parsers parse_args call.
def getmaxtime(self):
def getmaxtime_seconds(self):
def _alarm_fork(self, signum, frame):
def _alarm_nofork(self, signum, frame):
def _handleexecution(self):
Handle executing the job including handling of duplicate or hanging jobs.
def notifystart(self):
def notifyfinish(self, result):
def task(self, type=None, name=None, index=None, count=None):
task is a context manager and can be used to specify subtasks.
Arguments have the following meaning:
type(string orNone)The type of the task.
name(string orNone)The name of the task.
index(integer orNone)If this task is one in a sequence of similar tasks,
indexshould be the index of this task, i.e. the first task of this type hasindex==0, the second oneindex==1etc.count(integer orNone)If this task is one in a sequence of similar tasks and the total number of tasks is known,
countshould be the total number of tasks.
def tasks(self, iterable, type=None, name=None):
tasks iterates through iterable and calls task for
each item. index and count will be passed to task
automatically. type and name will be used for the type and
name of the task. They can either be constants (in which case they will
be passed as is) or callables (in which case they will be called with the
item to get the type/name).
Example:
import sys, operator
items = sys.modules.items()
for (name, module) in self.tasks(items, "module", operator.itemgetter(0)):
self.log("module is {}".format(module))The log output will look something like the following:
[2014-11-14 11:17:01.319291]=[t+0:00:00.342013] :: {sisyphus}{init} >> /Users/walter/test.py (max time 0:05:00; pid 33482)
[2014-11-14 11:17:01.321860]=[t+0:00:00.344582] :: {sisyphus}{init} >> forked worker child (child pid 33485)
[2014-11-14 11:17:01.324067]=[t+0:00:00.346789] :: module tokenize (1/212) :: {email} >> module is <module 'tokenize' from '/Users/walter/.local/lib/python3.4/tokenize.py'>
[2014-11-14 11:17:01.327711]=[t+0:00:03.350433] :: module heapq (2/212) :: {email} >> module is <module 'heapq' from '/Users/walter/.local/lib/python3.4/heapq.py'>
[2014-11-14 11:17:01.333471]=[t+0:00:09.356193] :: module marshal (3/212) :: {email} >> module is <module 'marshal' (built-in)>
[2014-11-14 11:17:01.340733]=[t+0:00:15.363455] :: module math (4/212) :: {email} >> module is <module 'math' from '/Users/walter/.local/lib/python3.4/lib-dynload/math.so'>
[2014-11-14 11:17:01.354177]=[t+0:00:18.366899] :: module urllib.parse (5/212) :: {email} >> module is <module 'urllib.parse' from '/Users/walter/.local/lib/python3.4/urllib/parse.py'>
[2014-11-14 11:17:01.368187]=[t+0:00:21.370909] :: module _posixsubprocess (6/212) :: {email} >> module is <module '_posixsubprocess' from '/Users/walter/.local/lib/python3.4/lib-dynload/_posixsubprocess.so'>
[2014-11-14 11:17:01.372633]=[t+0:00:33.385355] :: module pickle (7/212) :: {email} >> module is <module 'pickle' from '/Users/walter/.local/lib/python3.4/pickle.py'>
[...]
[2014-11-14 11:17:03.768065]=[t+0:00:39.790787] :: {sisyphus}{info} >> Compressing logfiles older than 7 days, 0:00:00 via bzip2
[2014-11-14 11:17:03.768588]=[t+0:00:39.791310] :: {sisyphus}{info} >> Compressing logfile /Users/walter/ll.sisyphus/ACME.FooBar/Test/2014-11-06-16-44-22-416878.sisyphuslog
[2014-11-14 11:17:03.772348]=[t+0:00:39.795070] :: {sisyphus}{info} >> Compressing logfile /Users/walter/ll.sisyphus/ACME.FooBar/Test/2014-11-06-16-44-37-839632.sisyphuslog
[2014-11-14 11:17:03.774178]=[t+0:00:39.796900] :: {sisyphus}{info} >> Cleanup donedef makeproctitle(self, process, detail=None):
def setproctitle(self, process, detail=None):
def _log(self, tags, obj):
Log obj to the log file using tags as the list of tags.
def _getscriptsource(self):
Reads the source code of the script into self.source.
def _getcrontab(self):
Reads the current crontab into self.crontab.
def _createlog(self):
Create the logfile and the link to the logfile (if requested).
class Task(object):
A subtask of a Job.
def __init__(self, job, type=None, name=None, index=None, count=None):
Create a Task object. For the meaning of the parameters see
Job.task.
def __enter__(self):
def __exit__(self, type, value, traceback):
def __str__(self):
def asjson(self):
def __repr__(self):
class Tag(object):
A Tag object can be used to call a function with an additional list
of tags. Tags can be added via __getattr__ or __getitem__ calls.
def __init__(self, func, *tags):
def __getattr__(self, tag):
def __getitem__(self, tag):
def __call__(self, *args, **kwargs):
class Logger(object):
def log(self, timestamp, tags, tasks, text):
def taskstart(self, tasks):
def taskend(self, tasks):
def close(self):
class StreamLogger(Logger):
def __init__(self, job, stream, linetemplate):
def log(self, timestamp, tags, tasks, text):
def __repr__(self):
class URLResourceLogger(StreamLogger):
def __init__(self, job, resource, skipurls, linetemplate):
def close(self):
def remove(self, fileurl):
def compress(self, fileurl, bufsize=65536):
class EmailLogger(Logger):
def __init__(self, job):
def log(self, timestamp, tags, tasks, text):
def close(self):
def execute(job):
Execute the job job once.
def executewithargs(job, args=None):
Execute the job job once with command line arguments.
args are the command line arguments (None results in
sys.argv being used).