1 # forkbomb is a module that partitions arbitrary workloads
2 # among N seperate forks, for a configurable N, and
3 # collates results upon return, as if it never forked.
5 # Copyright 2007, Red Hat, Inc
6 # Michael DeHaan <mdehaan@redhat.com>
8 # This software may be freely redistributed under the terms of the GNU
9 # general public license.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
16 import random
# for testing only
17 import time
# for testing only
27 DEFAULT_CACHE_DIR
= "/var/lib/func"
29 def __get_storage(dir):
31 Return a tempfile we can use for storing data.
33 dir = os
.path
.expanduser(dir)
34 if not os
.path
.exists(dir):
36 return tempfile
.mktemp(suffix
='', prefix
='asynctmp', dir=dir)
38 def __access_buckets(filename
,clear
,new_key
=None,new_value
=None):
40 Access data in forkbomb cache, potentially clearing or
41 modifying it as required.
44 internal_db
= bsddb
.btopen(filename
, 'c', 0644 )
45 handle
= open(filename
,"r")
46 fcntl
.flock(handle
.fileno(), fcntl
.LOCK_EX
)
47 storage
= shelve
.BsdDbShelf(internal_db
)
52 fcntl
.flock(handle
.fileno(), fcntl
.LOCK_UN
)
55 if not storage
.has_key("data"):
60 if new_key
is not None:
61 # bsdb is a bit weird about this
62 newish
= storage
["data"].copy()
63 newish
[new_key
] = new_value
64 storage
["data"] = newish
66 rc
= storage
["data"].copy()
68 fcntl
.flock(handle
.fileno(), fcntl
.LOCK_UN
)
72 def __bucketize(pool
, slots
):
74 Given a pre-existing list of X number of tasks, partition
75 them into a hash of Y number of slots.
82 if not buckets
.has_key(slot
):
84 buckets
[slot
].append(key
)
87 def __with_my_bucket(bucket_number
,buckets
,what_to_do
,filename
):
89 Process all tasks assigned to a given fork, and save
92 things_in_my_bucket
= buckets
[bucket_number
]
94 for thing
in things_in_my_bucket
:
95 (nkey
,nvalue
) = what_to_do(bucket_number
,buckets
,thing
)
96 __access_buckets(filename
,False,nkey
,nvalue
)
98 def __forkbomb(mybucket
,buckets
,what_to_do
,filename
):
100 Recursive function to spawn of a lot of worker forks.
102 nbuckets
= len(buckets
)
105 if mybucket
< (nbuckets
-1):
106 __forkbomb(mybucket
+1,buckets
,what_to_do
,filename
)
115 __with_my_bucket(mybucket
,buckets
,what_to_do
,filename
)
118 def __demo(bucket_number
, buckets
, my_item
):
120 This is a demo handler for test purposes.
121 It just multiplies all numbers by 1000, but slowly.
123 # print ">> I am fork (%s) and I am processing item (%s)" % (bucket_number, my_item)
124 # just to verify forks are not sequential
125 sleep
= random
.randrange(0,4)
127 return (my_item
, my_item
* 1000)
129 def batch_run(pool
,callback
,nforks
=DEFAULT_FORKS
,cachedir
=DEFAULT_CACHE_DIR
):
131 Given an array of items (pool), call callback in each one, but divide
132 the workload over nfork forks. Temporary files used during the
133 operation will be created in cachedir and subsequently deleted.
136 # modulus voodoo gets crazy otherwise and bad things happen
138 shelf_file
= __get_storage(cachedir
)
139 __access_buckets(shelf_file
,True,None)
140 buckets
= __bucketize(pool
, nforks
)
141 __forkbomb(1,buckets
,callback
,shelf_file
)
142 rc
= __access_buckets(shelf_file
,False,None)
143 os
.remove(shelf_file
)
146 def __test(nforks
=4,sample_size
=20):
147 pool
= xrange(0,sample_size
)
148 print batch_run(pool
,__demo
,nforks
=nforks
)
150 if __name__
== "__main__":