1 # Copyright 2008, Red Hat, Inc
2 # Steve 'Ashcrow' Milner <smilner@redhat.com>
4 # This software may be freely redistributed under the terms of the GNU
5 # general public license.
7 # You should have received a copy of the GNU General Public License
8 # along with this program; if not, write to the Free Software
9 # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
13 from codes
import FuncException
17 class NetworkTest(func_module
.FuncModule
):
21 description
= "Defines various network testing tools."
23 def ping(self
, *args
):
25 raise(FuncException("You must define a count with -c!"))
26 return self
.__run
_command
('/bin/ping', self
.__args
_to
_list
(args
))
28 def netstat(self
, *args
):
29 return self
.__run
_command
('/bin/netstat',
30 self
.__args
_to
_list
(args
))
32 def traceroute(self
, *args
):
33 return self
.__run
_command
('/bin/traceroute',
34 self
.__args
_to
_list
(args
))
37 return self
.__run
_command
('/usr/bin/dig',
38 self
.__args
_to
_list
(args
))
40 def isportopen(self
, host
, port
):
41 # FIXME: the return api here needs some work... -akl
43 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
45 sock
.settimeout(timeout
)
47 sock
.connect((host
, int(port
)))
48 except socket
.error
, e
:
50 return [1, ("connection to %s:%s failed" % (host
, port
), "%s" % e
)]
51 except socket
.timeout
:
53 return [2, ("connection to %s:%s timed out after %s seconds" % (host
, port
, timeout
))]
56 return [0, "connection to %s:%s succeeded" % (host
, port
)]
58 def __args_to_list(self
, args
):
59 return [arg
for arg
in args
]
61 def __run_command(self
, command
, opts
=[]):
62 full_cmd
= [command
] + opts
63 cmd
= sub_process
.Popen(full_cmd
, stdout
=sub_process
.PIPE
)
64 return [line
for line
in cmd
.communicate()[0].split('\n')]