Misc s/func/certmaster/ replacements
[certmaster.git] / certmaster / jobthing.py
1 # jobthing is a module that allows for background execution of a task, and
2 # getting status of that task. The ultimate goal is to allow ajaxyness
3 # of GUI apps using Func, and also for extremely long running tasks that
4 # we don't want to block on as called by scripts using the FunC API. The
5 # CLI should not use this.
6 #
7 # Copyright 2007, Red Hat, Inc
8 # Michael DeHaan <mdehaan@redhat.com>
9 #
10 # This software may be freely redistributed under the terms of the GNU
11 # general public license.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16
17 import os
18 import random # for testing only
19 import time # for testing only
20 import shelve
21 import bsddb
22 import sys
23 import tempfile
24 import fcntl
25 import forkbomb
26 import utils
27 import traceback
28
29 JOB_ID_RUNNING = 0
30 JOB_ID_FINISHED = 1
31 JOB_ID_LOST_IN_SPACE = 2
32 JOB_ID_ASYNC_PARTIAL = 3
33 JOB_ID_ASYNC_FINISHED = 4
34
35 # how long to retain old job records in the job id database
36 RETAIN_INTERVAL = 60 * 60
37
38 # where to store the internal job id database
39 CACHE_DIR = "/var/lib/func"
40
41 def __update_status(jobid, status, results, clear=False):
42 return __access_status(jobid=jobid, status=status, results=results, write=True)
43
44 def __get_status(jobid):
45 return __access_status(jobid=jobid, write=False)
46
47 def purge_old_jobs():
48 return __access_status(purge=True)
49
50 def __purge_old_jobs(storage):
51 """
52 Deletes jobs older than RETAIN_INTERVAL seconds.
53 MINOR FIXME: this probably should be a more intelligent algorithm that only
54 deletes jobs if the database is too big and then only the oldest jobs
55 but this will work just as well.
56 """
57 nowtime = time.time()
58 for x in storage.keys():
59 # minion jobs have "-minion" in the job id so disambiguation so we need to remove that
60 jobkey = x.replace("-","").replace("minion","")
61 create_time = float(jobkey)
62 if nowtime - create_time > RETAIN_INTERVAL:
63 del storage[x]
64
65 def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purge=False):
66
67 dir = os.path.expanduser(CACHE_DIR)
68 if not os.path.exists(dir):
69 os.makedirs(dir)
70 filename = os.path.join(dir,"status-%s" % os.getuid())
71
72 internal_db = bsddb.btopen(filename, 'c', 0644 )
73 handle = open(filename,"r")
74 fcntl.flock(handle.fileno(), fcntl.LOCK_EX)
75 storage = shelve.BsdDbShelf(internal_db)
76
77
78 if clear:
79 storage.clear()
80 storage.close()
81 fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
82 return {}
83
84 if purge or write:
85 __purge_old_jobs(storage)
86
87 if write:
88 storage[str(jobid)] = (status, results)
89 rc = jobid
90 elif not purge:
91 if storage.has_key(str(jobid)):
92 # tuple of (status, results)
93
94 rc = storage[str(jobid)]
95 else:
96 rc = (JOB_ID_LOST_IN_SPACE, 0)
97 else:
98 rc = 0
99
100 storage.close()
101 fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
102
103 return rc
104
105 def batch_run(server, process_server, nforks):
106 """
107 This is the method used by the overlord side usage of jobthing.
108 Minion side usage will use minion_async_run instead.
109
110 Given an array of items (pool), call callback in each one, but divide
111 the workload over nfork forks. Temporary files used during the
112 operation will be created in cachedir and subsequently deleted.
113 """
114
115 job_id = time.time()
116 pid = os.fork()
117 if pid != 0:
118 __update_status(job_id, JOB_ID_RUNNING, -1)
119 return job_id
120 else:
121 # kick off the job
122 __update_status(job_id, JOB_ID_RUNNING, -1)
123 results = forkbomb.batch_run(server, process_server, nforks)
124
125 # we now have a list of job id's for each minion, kill the task
126 __update_status(job_id, JOB_ID_ASYNC_PARTIAL, results)
127 sys.exit(0)
128
129 def minion_async_run(retriever, method, args):
130 """
131 This is a simpler invocation for minion side async usage.
132 """
133 # to avoid confusion of job id's (we use the same job database)
134 # minion jobs contain the string "minion".
135
136
137 job_id = "%s-minion" % time.time()
138 pid = os.fork()
139 if pid != 0:
140 __update_status(job_id, JOB_ID_RUNNING, -1)
141 return job_id
142 else:
143 __update_status(job_id, JOB_ID_RUNNING, -1)
144 try:
145 function_ref = retriever(method)
146 rc = function_ref(*args)
147 except Exception, e:
148 (t, v, tb) = sys.exc_info()
149 rc = utils.nice_exception(t,v,tb)
150
151 __update_status(job_id, JOB_ID_FINISHED, rc)
152 sys.exit(0)
153
154 def job_status(jobid, client_class=None):
155
156 # NOTE: client_class is here to get around some evil circular reference
157 # type stuff. This is intended to be called by minions (who can leave it None)
158 # or by the Client module code (which does not need to be worried about it). API
159 # users should not be calling jobthing.py methods directly.
160
161 got_status = __get_status(jobid)
162
163 # if the status comes back as JOB_ID_ASYNC_PARTIAL what we have is actually a hash
164 # of hostname/minion-jobid pairs. Instantiate a client handle for each and poll them
165 # for their actual status, filling in only the ones that are actually done.
166
167 (interim_rc, interim_results) = got_status
168
169 if interim_rc == JOB_ID_ASYNC_PARTIAL:
170
171 partial_results = {}
172
173
174 some_missing = False
175 for host in interim_results.keys():
176
177 minion_job = interim_results[host]
178 client = client_class(host, noglobs=True, async=False)
179 minion_result = client.jobs.job_status(minion_job)
180
181 (minion_interim_rc, minion_interim_result) = minion_result
182
183 if minion_interim_rc not in [ JOB_ID_RUNNING ]:
184 if minion_interim_rc in [ JOB_ID_LOST_IN_SPACE ]:
185 partial_results[host] = [ utils.REMOTE_ERROR, "lost job" ]
186 else:
187 partial_results[host] = minion_interim_result
188 else:
189 some_missing = True
190
191 if some_missing:
192 return (JOB_ID_ASYNC_PARTIAL, partial_results)
193 else:
194 return (JOB_ID_ASYNC_FINISHED, partial_results)
195
196 else:
197 return got_status
198
199 # of job id's on the minion in results.
200
201 if __name__ == "__main__":
202 __test()
203
204