1 # jobthing is a module that allows for background execution of a task, and
2 # getting status of that task. The ultimate goal is to allow ajaxyness
3 # of GUI apps using Func, and also for extremely long running tasks that
4 # we don't want to block on as called by scripts using the FunC API. The
5 # CLI should not use this.
7 # Copyright 2007, Red Hat, Inc
8 # Michael DeHaan <mdehaan@redhat.com>
10 # This software may be freely redistributed under the terms of the GNU
11 # general public license.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 import random
# for testing only
19 import time
# for testing only
31 JOB_ID_LOST_IN_SPACE
= 2
32 JOB_ID_ASYNC_PARTIAL
= 3
33 JOB_ID_ASYNC_FINISHED
= 4
35 # how long to retain old job records in the job id database
36 RETAIN_INTERVAL
= 60 * 60
38 # where to store the internal job id database
39 CACHE_DIR
= "/var/lib/func"
41 def __update_status(jobid
, status
, results
, clear
=False):
42 return __access_status(jobid
=jobid
, status
=status
, results
=results
, write
=True)
44 def __get_status(jobid
):
45 return __access_status(jobid
=jobid
, write
=False)
48 return __access_status(purge
=True)
50 def __purge_old_jobs(storage
):
52 Deletes jobs older than RETAIN_INTERVAL seconds.
53 MINOR FIXME: this probably should be a more intelligent algorithm that only
54 deletes jobs if the database is too big and then only the oldest jobs
55 but this will work just as well.
58 for x
in storage
.keys():
59 # minion jobs have "-minion" in the job id so disambiguation so we need to remove that
60 jobkey
= x
.replace("-","").replace("minion","")
61 create_time
= float(jobkey
)
62 if nowtime
- create_time
> RETAIN_INTERVAL
:
65 def __access_status(jobid
=0, status
=0, results
=0, clear
=False, write
=False, purge
=False):
67 dir = os
.path
.expanduser(CACHE_DIR
)
68 if not os
.path
.exists(dir):
70 filename
= os
.path
.join(dir,"status-%s" % os
.getuid())
72 internal_db
= bsddb
.btopen(filename
, 'c', 0644 )
73 handle
= open(filename
,"r")
74 fcntl
.flock(handle
.fileno(), fcntl
.LOCK_EX
)
75 storage
= shelve
.BsdDbShelf(internal_db
)
81 fcntl
.flock(handle
.fileno(), fcntl
.LOCK_UN
)
85 __purge_old_jobs(storage
)
88 storage
[str(jobid
)] = (status
, results
)
91 if storage
.has_key(str(jobid
)):
92 # tuple of (status, results)
94 rc
= storage
[str(jobid
)]
96 rc
= (JOB_ID_LOST_IN_SPACE
, 0)
101 fcntl
.flock(handle
.fileno(), fcntl
.LOCK_UN
)
105 def batch_run(server
, process_server
, nforks
):
107 This is the method used by the overlord side usage of jobthing.
108 Minion side usage will use minion_async_run instead.
110 Given an array of items (pool), call callback in each one, but divide
111 the workload over nfork forks. Temporary files used during the
112 operation will be created in cachedir and subsequently deleted.
118 __update_status(job_id
, JOB_ID_RUNNING
, -1)
122 __update_status(job_id
, JOB_ID_RUNNING
, -1)
123 results
= forkbomb
.batch_run(server
, process_server
, nforks
)
125 # we now have a list of job id's for each minion, kill the task
126 __update_status(job_id
, JOB_ID_ASYNC_PARTIAL
, results
)
129 def minion_async_run(retriever
, method
, args
):
131 This is a simpler invocation for minion side async usage.
133 # to avoid confusion of job id's (we use the same job database)
134 # minion jobs contain the string "minion".
137 job_id
= "%s-minion" % time
.time()
140 __update_status(job_id
, JOB_ID_RUNNING
, -1)
143 __update_status(job_id
, JOB_ID_RUNNING
, -1)
145 function_ref
= retriever(method
)
146 rc
= function_ref(*args
)
148 (t
, v
, tb
) = sys
.exc_info()
149 rc
= utils
.nice_exception(t
,v
,tb
)
151 __update_status(job_id
, JOB_ID_FINISHED
, rc
)
154 def job_status(jobid
, client_class
=None):
156 # NOTE: client_class is here to get around some evil circular reference
157 # type stuff. This is intended to be called by minions (who can leave it None)
158 # or by the Client module code (which does not need to be worried about it). API
159 # users should not be calling jobthing.py methods directly.
161 got_status
= __get_status(jobid
)
163 # if the status comes back as JOB_ID_ASYNC_PARTIAL what we have is actually a hash
164 # of hostname/minion-jobid pairs. Instantiate a client handle for each and poll them
165 # for their actual status, filling in only the ones that are actually done.
167 (interim_rc
, interim_results
) = got_status
169 if interim_rc
== JOB_ID_ASYNC_PARTIAL
:
175 for host
in interim_results
.keys():
177 minion_job
= interim_results
[host
]
178 client
= client_class(host
, noglobs
=True, async=False)
179 minion_result
= client
.jobs
.job_status(minion_job
)
181 (minion_interim_rc
, minion_interim_result
) = minion_result
183 if minion_interim_rc
not in [ JOB_ID_RUNNING
]:
184 if minion_interim_rc
in [ JOB_ID_LOST_IN_SPACE
]:
185 partial_results
[host
] = [ utils
.REMOTE_ERROR
, "lost job" ]
187 partial_results
[host
] = minion_interim_result
192 return (JOB_ID_ASYNC_PARTIAL
, partial_results
)
194 return (JOB_ID_ASYNC_FINISHED
, partial_results
)
199 # of job id's on the minion in results.
201 if __name__
== "__main__":