服务器hang的检查

做运维的都知道,最怕的不是机器直接挂掉,而是怕机器hang在那里,能ping通但是又登录不上去。周末加班写了个检测脚本,发送icmp包进行ping的检查,如果有返回再继续做ssl端口的检查或者ssh登录的检查。python不像perl下直接有个很好用的net::ping,自己网上找了个python-ping,修改了一下放脚本里面直接用。

#!/usr/bin/env python2.7  
import socket  
import sys  
import paramiko  
import os  
import select  
import struct  
import time  
import threading  
import Queue  
import copy  
import string  
import hashlib  
from collections import deque  
ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.  
class CheckHang:  
def __init__(self,server):  
self.server=server  
def check_ssh(self):  
"""  
return 1 when i can't ssh to the server  
"""  
ssh = paramiko.SSHClient()  
key = paramiko.RSAKey.from_private_key_file("/home/pm/keys/id_rsa")  
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())  
try:  
ssh.connect(self.server,username="root",pkey=key,timeout=1)  
flag=1  
ssh.close()  
except:  
flag=0  
return flag  
def check_ssh_port(self,port):  
"""  
check the 22 port alive, return 1 when the port is alive.  
"""  
port_test = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
try:  
port_test.settimeout(1)  
port_test.connect( (self.server,port) )  
port_test.close()  
flag=1  
except :  
flag=0  
return flag  
def checksum(self,source_string):  
"""  
I'm not too confident that this is right but testing seems  
to suggest that it gives the same answers as in_cksum in ping.c  
"""  
sum = 0  
count_to = (len(source_string) / 2) * 2  
for count in xrange(0, count_to, 2):  
this = ord(source_string[count + 1]) * 256 + ord(source_string[count])  
sum = sum + this  
sum = sum & 0xffffffff # Necessary?  
  
if count_to < len(source_string):  
sum = sum + ord(source_string[len(source_string) - 1])  
sum = sum & 0xffffffff # Necessary?  
  
sum = (sum >> 16) + (sum & 0xffff)  
sum = sum + (sum >> 16)  
answer = ~sum  
answer = answer & 0xffff  
  
# Swap bytes. Bugger me if I know why.  
answer = answer >> 8 | (answer << 8 & 0xff00)  
  
return answer  
  
  
def receive_one_ping(self,my_socket, id, timeout):  
&quot;&quot;&quot;  
Receive the ping from the socket.  
&quot;&quot;&quot;  
time_left = timeout  
while True:  
started_select = time.time()  
what_ready = select.select([my_socket], [], [], time_left)  
how_long_in_select = (time.time() - started_select)  
if what_ready[0] == []: # Timeout  
return  
  
time_received = time.time()  
received_packet, addr = my_socket.recvfrom(1024)  
icmpHeader = received_packet[20:28]  
type, code, checksum, packet_id, sequence = struct.unpack(  
&quot;bbHHh&quot;, icmpHeader  
)  
if packet_id == id:  
bytes = struct.calcsize(&quot;d&quot;)  
time_sent = struct.unpack(&quot;d&quot;, received_packet[28:28 + bytes])[0]  
return time_received - time_sent  
  
time_left = time_left - how_long_in_select  
if time_left <= 0:  
return  
  
  
def send_one_ping(self,my_socket, dest_addr, id, psize):  
&quot;&quot;&quot;  
Send one ping to the given >dest_addr<.  
&quot;&quot;&quot;  
dest_addr  =  socket.gethostbyname(dest_addr)  
  
# Remove header size from packet size  
psize = psize - 8  
  
# Header is type (8), code (8), checksum (16), id (16), sequence (16)  
my_checksum = 0  
  
# Make a dummy heder with a 0 checksum.  
header = struct.pack(&quot;bbHHh&quot;, ICMP_ECHO_REQUEST, 0, my_checksum, id, 1)  
bytes = struct.calcsize(&quot;d&quot;)  
data = (psize - bytes) * &quot;Q&quot;  
data = struct.pack(&quot;d&quot;, time.time()) + data  
  
# Calculate the checksum on the data and the dummy header.  
my_checksum = self.checksum(header + data)  
  
# Now that we have the right checksum, we put that in. It's just easier  
# to make up a new header than to stuff it into the dummy.  
header = struct.pack(  
&quot;bbHHh&quot;, ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), id, 1  
)  
packet = header + data  
my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1  
  
  
def do_one(self, timeout, psize):  
&quot;&quot;&quot;  
Returns either the delay (in seconds) or none on timeout.  
&quot;&quot;&quot;  
icmp = socket.getprotobyname(&quot;icmp&quot;)  
try:  
my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)  
except socket.error, (errno, msg):  
if errno == 1:  
# Operation not permitted  
msg = msg + (  
&quot; - Note that ICMP messages can only be sent from processes&quot;  
&quot; running as root.&quot;  
)  
raise socket.error(msg)  
raise # raise the original error  
  
#my_id = os.getpid() & 0xFFFF  
my_id= int(hashlib.md5(self.server).hexdigest(), 16) &  0xFFFF  
self.send_one_ping(my_socket,self.server, my_id, psize)  
delay = self.receive_one_ping(my_socket, my_id, timeout)  
  
my_socket.close()  
return delay  
def check_ping(self, timeout = 2, maxcount = 4, psize = 64 ):  
&quot;&quot;&quot;  
if success to receive 1 response,return 1.max retry time is maxcount  
&quot;&quot;&quot;  
for i in xrange(maxcount):  
try:  
delay = self.do_one(timeout,psize)  
except:  
continue  
if delay:  
return 1  
return 0  
  
def verbose_ping(self, timeout = 2, count = 4, psize = 64):  
&quot;&quot;&quot;  
Send `count' ping with `psize' size to `dest_addr' with  
the given `timeout' and display the result.  
&quot;&quot;&quot;  
for i in xrange(count):  
print &quot;ping %s with ...&quot; % self.server,  
try:  
delay  =  self.do_one( timeout, psize)  
except socket.gaierror, e:  
print &quot;failed. (socket error: '%s')&quot; % e[1]  
break  
  
if delay  ==  None:  
print &quot;failed. (timeout within %ssec.)&quot; % timeout  
else:  
delay  =  delay * 1000  
print &quot;get ping in %0.4fms&quot; % delay  
print  
  
class Muti_Check:  
&quot;&quot;&quot;  
mutithread check  
&quot;&quot;&quot;  
def __init__(self,servers):  
self.servers=servers  
self.downlist=deque()  
self.hanglist=deque()  
def server_check(self,ser):  
ser=ser.strip()  
test=CheckHang(ser)  
ping=test.check_ping(timeout=1)  
if ping == 1:  
ssh=test.check_ssh_port(22)  
#ssh=test.check_ssh()  
if ssh != 1:  
self.hanglist.append(ser)  
else:  
self.downlist.append(ser)  
def get_result(self):  
for ser in  self.hanglist:  
print &quot;Hang: %s&quot;%ser  
for ser in self.downlist:  
print &quot;Down: %s&quot;%ser  
def multi_check(self,concurrent_max):  
lists=copy.deepcopy(self.servers)  
concurrent=0  
thread_list=set()  
while( len( lists ) > 0 ):  
if len(thread_list) <= concurrent_max:  
ser=string.strip(lists.pop())  
pid=threading.Thread(target=self.server_check,args=(ser,))  
thread_list.add(pid)  
pid.start()  
else:  
alive_threads=set(threading.enumerate())  
join_threads=thread_list-alive_threads  
for job in join_threads:  
job.join()  
thread_list.remove(job)  
time.sleep(0.01)  
while(len(thread_list)>0):  
alive_threads=set(threading.enumerate())  
join_threads=thread_list-alive_threads  
for job in join_threads:  
job.join()  
thread_list.remove(job)  
time.sleep(0.01)  
  
if __name__ == &quot;__main__&quot;:  
fd=open(sys.argv[1])  
servers=fd.readlines()  
fd.close()  
cluster=Muti_Check(servers)  
cluster.multi_check(20)  
cluster.get_result()  
if __name__ == &quot;__main2__&quot;:  
fd=open(sys.argv[1])  
num=0  
servers=fd.readlines()  
fd.close()  
hang_list=set()  
down_list=set()  
for ser in servers:  
ser=ser.strip()  
test=CheckHang(ser)  
ping=test.check_ping(timeout=1)  
if ping == 1:  
#ssh=test.check_ssh()  
ssh=test.check_ssh_port(22)  
if ssh != 1:  
num=num+1  
hang_list.add(ser)  
else:  
down_list.add(ser)  
for ser in  hang_list:  
print &quot;Hang: %s&quot; %ser  
for ser in down_list:  
print &quot;Down: %s&quot; %ser  
if not ( hang_list | down_list):  
print &quot;all %d server ok&quot;%(len(servers))

输入的列表就是机器名,使用多线程进行检测,线程数可以multi_check传入。如果机器上不用多线程的版本,那就用后面的那个直接简单的轮询。

另外说一下对hang的模拟,iptables放行icmp但是把22端口封掉,对于宕机的场景就是把这个检查服务器的IP直接封掉就OK。