-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmt.pingsweep.py
executable file
·163 lines (124 loc) · 4.82 KB
/
mt.pingsweep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import time # let's time our script
import ipaddress # https://docs.python.org/3/library/ipaddress.html
# convert ip/mask to list of hosts
# also allows proper sort of ip
import subprocess # https://docs.python.org/3/library/subprocess.html
# to make a popup window quiet
from colorama import init # colors https://en.wikipedia.org/wiki/ANSI_escape_code
init() # https://pypi.org/project/colorama/
import threading # for threading functions, lock, queue
from queue import Queue # https://docs.python.org/3/library/queue.html
import sys # is this win/nix
# if you want to see each ping result, as it happens, set to 1
debug = 0
# define a lock that we can use later to keep
# prints from writing over itself
print_lock = threading.Lock()
# Prompt the user to input a network address
net_addr = input("Enter Network (192.168.1.0/24): ")
# actual code start time
start = time.time()
# Create the network
ip_net = ipaddress.ip_network(net_addr)
# Get all hosts on that network
all_hosts = list(ip_net.hosts())
# Configure subprocess to hide the console window
info = subprocess.STARTUPINFO()
info.dwFlags |= subprocess.STARTF_USESHOWWINDOW
info.wShowWindow = subprocess.SW_HIDE
# quick message/update
print ('Sweeping Network with ICMP: ', net_addr)
# the actual ping definition and logic.
# it's called from a pool, repeatedly threaded, not serial
def pingsweep(ip):
"""
does a ping sweep using subprocess
for 1 count, with a 1000ms wait
accepts ip as input
builds list as it occurs
spits out unreachable messages only, not timeouts
"""
# predefine output so the if/elif can use them
output = ''
# in practice, some computers on the local network took up to 300ms to respond.
# If you set the timeout too low, you might get an incomplete response list
if sys.platform.startswith('win32'):
output = subprocess.Popen(['ping', '-n', '1', '-w',
'1000', str(all_hosts[ip])], stdout=subprocess.PIPE,
startupinfo=info).communicate()[0]
elif sys.platform.startswith('linux'):
output = subprocess.Popen(['ping', '-c', '1', '-w',
'1000', str(all_hosts[ip])], stdout=subprocess.PIPE,
startupinfo=info).communicate()[0]
else:
print("Cannot determine OS type")
sys.exit()
# lock this section, until we get a complete chunk
# then free it (so it doesn't write all over itself)
with print_lock:
# reset color for debugging too
if debug:
print('\033[93m', end='')
# code logic if we have/don't have good response
# used casefold for case insenstive search
# Win: Reply from 8.8.8.8: bytes=32 time=19ms TTL=53
# Nix: 64 bytes from 8.8.8.8: icmp_seq=4 ttl=53 time=22.003 ms
if 'ttl'.casefold() in output.decode('utf-8').casefold():
if debug:
print(str(all_hosts[ip]), '\033[32m'+"is Online")
# it's good, put into our list
iplist.append(all_hosts[ip])
elif "reachable" in output.decode('utf-8'):
if debug:
print(str(all_hosts[ip]), '\033[90m'+"is Offline (Unreachable)")
pass
elif "timed" in output.decode('utf-8'):
if debug:
print(str(all_hosts[ip]), '\033[90m'+"is Offline (Timeout)")
pass
elif "failed" in output.decode('utf-8'):
if debug:
print(str(all_hosts[ip]), '\033[90m'+"is Offline (Transmit Failed)")
pass
else:
# print colors in green if online
print(str(all_hosts[ip]), '\033[90m'+"is Unknown")
def threader():
"""
defines a new ping using def pingsweep for each thread
holds task until thread completes
"""
while True:
worker = q.get()
pingsweep(worker)
q.task_done()
# global list variable to store wanted IP
iplist = []
# new queue object
q = Queue()
# up to 100 threads, daemon for cleaner shutdown
# just spawns the threads and makes them daemon mode
for x in range(100):
t = threading.Thread(target = threader)
t.daemon = True
t.start()
# loops over list created by ipaddress module
# passing it to q.put (entering it into queue)
for worker in range(len(all_hosts)):
q.put(worker)
# queue management to stitch them together
q.join()
# print out our sorted list
count = 0
for ip in sorted(iplist, key = ipaddress.IPv4Address):
# keep track of ip count
count += 1
# reset color
print('\033[93m', end='')
# colorize desired output
print(ip, '\033[32m'+"is Online")
# reset color
print('\033[93m', end='')
# ok, give us a final time report to 2 decimal places
runtime = float("%0.2f" % (time.time() - start))
print(count, "hosts responded to ping in", runtime, "seconds")