Misc s/func/certmaster/ replacements
[certmaster.git] / certmaster / forkbomb.py
1 # forkbomb is a module that partitions arbitrary workloads
2 # among N seperate forks, for a configurable N, and
3 # collates results upon return, as if it never forked.
4 #
5 # Copyright 2007, Red Hat, Inc
6 # Michael DeHaan <mdehaan@redhat.com>
7 #
8 # This software may be freely redistributed under the terms of the GNU
9 # general public license.
10 #
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
14
15 import os
16 import random # for testing only
17 import time # for testing only
18 import shelve
19 import bsddb
20 import sys
21 import tempfile
22 import fcntl
23 import utils
24 import xmlrpclib
25
26 DEFAULT_FORKS = 4
27 DEFAULT_CACHE_DIR = "/var/lib/func"
28
29 def __get_storage(dir):
30 """
31 Return a tempfile we can use for storing data.
32 """
33 dir = os.path.expanduser(dir)
34 if not os.path.exists(dir):
35 os.makedirs(dir)
36 return tempfile.mktemp(suffix='', prefix='asynctmp', dir=dir)
37
38 def __access_buckets(filename,clear,new_key=None,new_value=None):
39 """
40 Access data in forkbomb cache, potentially clearing or
41 modifying it as required.
42 """
43
44 internal_db = bsddb.btopen(filename, 'c', 0644 )
45 handle = open(filename,"r")
46 fcntl.flock(handle.fileno(), fcntl.LOCK_EX)
47 storage = shelve.BsdDbShelf(internal_db)
48
49 if clear:
50 storage.clear()
51 storage.close()
52 fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
53 return {}
54
55 if not storage.has_key("data"):
56 storage["data"] = {}
57 else:
58 pass
59
60 if new_key is not None:
61 # bsdb is a bit weird about this
62 newish = storage["data"].copy()
63 newish[new_key] = new_value
64 storage["data"] = newish
65
66 rc = storage["data"].copy()
67 storage.close()
68 fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
69
70 return rc
71
72 def __bucketize(pool, slots):
73 """
74 Given a pre-existing list of X number of tasks, partition
75 them into a hash of Y number of slots.
76 """
77 buckets = {}
78 count = 0
79 for key in pool:
80 count = count + 1
81 slot = count % slots
82 if not buckets.has_key(slot):
83 buckets[slot] = []
84 buckets[slot].append(key)
85 return buckets
86
87 def __with_my_bucket(bucket_number,buckets,what_to_do,filename):
88 """
89 Process all tasks assigned to a given fork, and save
90 them in the shelf.
91 """
92 things_in_my_bucket = buckets[bucket_number]
93 results = {}
94 for thing in things_in_my_bucket:
95 (nkey,nvalue) = what_to_do(bucket_number,buckets,thing)
96 __access_buckets(filename,False,nkey,nvalue)
97
98 def __forkbomb(mybucket,buckets,what_to_do,filename):
99 """
100 Recursive function to spawn of a lot of worker forks.
101 """
102 nbuckets = len(buckets)
103 pid = os.fork()
104 if pid != 0:
105 if mybucket < (nbuckets-1):
106 __forkbomb(mybucket+1,buckets,what_to_do,filename)
107 try:
108 os.waitpid(pid,0)
109 except OSError, ose:
110 if ose.errno == 10:
111 pass
112 else:
113 raise ose
114 else:
115 __with_my_bucket(mybucket,buckets,what_to_do,filename)
116 sys.exit(0)
117
118 def __demo(bucket_number, buckets, my_item):
119 """
120 This is a demo handler for test purposes.
121 It just multiplies all numbers by 1000, but slowly.
122 """
123 # print ">> I am fork (%s) and I am processing item (%s)" % (bucket_number, my_item)
124 # just to verify forks are not sequential
125 sleep = random.randrange(0,4)
126 time.sleep(sleep)
127 return (my_item, my_item * 1000)
128
129 def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR):
130 """
131 Given an array of items (pool), call callback in each one, but divide
132 the workload over nfork forks. Temporary files used during the
133 operation will be created in cachedir and subsequently deleted.
134 """
135 if nforks <= 1:
136 # modulus voodoo gets crazy otherwise and bad things happen
137 nforks = 2
138 shelf_file = __get_storage(cachedir)
139 __access_buckets(shelf_file,True,None)
140 buckets = __bucketize(pool, nforks)
141 __forkbomb(1,buckets,callback,shelf_file)
142 rc = __access_buckets(shelf_file,False,None)
143 os.remove(shelf_file)
144 return rc
145
146 def __test(nforks=4,sample_size=20):
147 pool = xrange(0,sample_size)
148 print batch_run(pool,__demo,nforks=nforks)
149
150 if __name__ == "__main__":
151 __test()
152
153