用mtr.sh服务配合python做一个简单的全球服务监控

MagicQ 6月前 331

开篇前先给大家安利一个服务https://mtr.sh,是一个提供全球节点做ping、dnstrace、mtr等测试平台,是一个开源项目,测试PoP可以自己家,也可以使用公共的,详情参见https://github.com/Fusl/intrace/。


目前测试过各个平台的拨测质量和效率,发现CatchPoint点最多、功能最全,但同样入门费用较高,相对应的pingdom便宜很多,但是支持的功能简单,好在便宜和测试频率可以很高,对比听云、博睿 在国内用还行,但是出国就点不多了,各种奇葩网络下的点很让人惆怅,偶然间发现了mtr的服务,感觉使用方便点也不少,遂而用python编写个脚本把数据丢ES里供后续使用,废话不多说,直接上脚本

PS:本人的脚本历来都会显的很复杂,有点尴尬,将就看看  哈哈。。。

mtr.sh的服务有时候会出现没响应的问题(limit hit后遗症)需要临时切换一下服务器,代码里做了服务器随机选择的功能,怎么做到源站只有一个服务器地址的情况下,在使用时当有多个使用?proxy、socat、multi client 等等方法多的是。。。

#!/usr/bin/env python    
# Monitoring service, use python2    
# Powerd by MagicQ at 2019-11-20    
# Use mtr.sh service to monitor cname changing and latency from all probes    
# note, mtr.sh have ratelimit about 4000rps per hour; support dnst(dig +trace),ping,dnsr(dig @resolver),mtr,trace command; some probe doesn't support some command,    
# simple API and simple response, only have 3 API    
# execute: https://mtr.sh/{probeID}/{command}/destination; GET probes list: https://mtr.sh/probes.json; GET command list: https://mtr.sh/caps.json    
import sys,json,requests,traceback,time,datetime,random,re,subprocess
from datetime import datetime
from elasticsearch import Elasticsearch as es
from elasticsearch import helpers as es_helpers
import threading

[ttlogin]

es_client=es(['localhost:9200'],http_auth=('elastic', 'somepassword'))
delay_check={}
commands={
    'dnst':'dig @{NS} +noall +answer +trace +norecurse +timeout=1 +tries=5 A {TARGET} 2>&1; dig @{NS} +noall +answer +trace +norecurse +timeout=1 +tries=5 AAAA {TARGET} 2>&1',
    'ping':'ping -c 10 -i 0.2 -t 255 {TARGET} 2>&1'
}
class logs:
    color={
        'INFO':'\033[92m',
        'ERROR':'\033[91m',
        'DEBUG':'\033[1m',
        'END':'\033[0m'
    }
    def __init__(self,msg,level='info'):
        if level == 1 :
            level = 'ERROR'
        elif level == 2:
            level = 'DEBUG'
        else :
            level='INFO'
        level=level.upper()
        d=time.strftime('%F %H:%M:%S')
        print d,self.color[level],msg,self.color['END']
class monitor:
    api=[
        'https://mtr.sh/'
        ]
    retry=5
    server_status={}
    headers = {
        'Host':'mtr.sh',
        'Accept':'application/json;',
        'X-Application-For':'Monitoring-Python',
        'Accept-Encoding':'gzip, deflate',
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36'
        }
    probes={}
    caps={}
    probes_timestamp=0
    def __init__(self,retry_count=5):
        self.retry=retry_count
        # self.probes=self.getProbes(online=True)
        # self.caps=self.getCaps()
    def request(self,path):
        api_server_index=random.randint(0,len(self.api)-1)
        while self.server_status.get(self.api[api_server_index]) and ((self.server_status.get(self.api[api_server_index])['X-RateLimit-Remaining']<=100 and self.server_status.get(self.api[api_server_index])['X-RateLimit-Reset'] < time.time()) or self.server_status.get(self.api[api_server_index])['X-Available-At'] > time.time() ):
            logs(str(self.api[api_server_index])+' Waiting for another available server...'+str(self.server_status))
            api_server_index=random.randint(0,len(self.api)-1)
            time.sleep(1)
        retry_count = 0
        # logging.basicConfig() # you need to initialize logging, otherwise you will not see anything from requests
        # logging.getLogger().setLevel(logging.DEBUG)
        # requests_log = logging.getLogger("urllib3")
        # requests_log.setLevel(logging.DEBUG)
        # requests_log.propagate = True
        while retry_count < self.retry :
            try:
                # print self.params
                api=self.api[api_server_index]
                r = requests.get(api+path , headers=self.headers,timeout=180, verify=bool(re.findall('mtr.sh',api)))
                data = r.text
                self.server_status[api]={
                    'X-RateLimit-Remaining':int(r.headers.get('X-RateLimit-Remaining',0)),
                    'X-RateLimit-Reset':int(r.headers.get('X-RateLimit-Reset',time.time()+60*30)),
                    'X-Available-At':int(time.time())+60
                }
                # print data
                if r.status_code == 200 :
                    return data
                elif r.status_code == 504:
                    self.server_status[api]['X-Available-At']=int(time.time())+60*2*10
                logs( path + ' ErrorMessage : '+str(r.status_code) , 2 )
                return False
            except Exception as e:
                retry_count +=1
                logs(str(e) + ' Retrying... ',retry_count)
                traceback.print_exc()
        return False
    def execute(self,pid,command,dest,NS='198.41.0.4'):
        # Check if pid exists in probes and check command is vaild
        if pid.upper()=='LOCAL':
            if not command in commands:
                raise Exception(command+' defination not exists')
            res, err=subprocess.Popen(commands.get(command).format(TARGET=dest,NS=NS),stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True).communicate()
            if err:
                raise Exception(str(err))
            return res
        p=self.probes.get(pid) 
        if p and not p['status'] :
            #if probeID doesn't exits or status is false, return 1
            logs("ProbeID "+pid+" status offline..."+str(p),1)
            return 1
        if self.caps.get(command) and not p.get('caps')[command]:
            #if command not exists in caps or current selected probe doesn't support that command return 2
            logs("Command "+command+" is not allowed on this probe...\n",1)
            return 2
        return self.request(pid+'/'+command+'/'+dest)
    def getProbes(self,cap=None,region=None,country=None,provider=None,online=False):
        p={}
        if self.probes and not online:
            p=self.probes
        else:
            r=self.request('probes.json')
            if r:
                p=json.loads(r)
                self.probes_timestamp=int(time.time())
            else:
                return {}
        if not ( cap or region or country or provider ) :
            return p
        else:
            r={}
            for pid in p:
                if (cap and p[pid]['caps'][cap]) or (region and p[pid]['group']==region) or (provider and p[pid]['provider']==provider) or (country and p[pid]['country']==country):
                    r[pid]=p[pid]
            return r
        return p
    def getCaps(self):
        r=self.request('caps.json')
        if r:
            return json.loads(r)
        return r
    def dnst(self,pid,dest,NS='198.41.0.4'):
        r=self.execute(pid,'dnst',dest)
        if not r:
            return None
        # searchResult=re.search('('+dest+'.*)\\n',r)
        searchResult=re.findall(r''+dest+'.*',r)
        if not searchResult:
            logs(dest+' didn\'t have dns record')
            # collect SOA or NS info
            SOA=re.findall(r'.*IN\tSOA.*',r)
            NS=re.findall(r'.*IN\tNS.*',r)
            return [{
                'record_type':'SOA' if SOA else 'NS',
                'value':SOA[0] if SOA else NS[len(NS)-1]
            }]
        record=[]
        for res in list(set(searchResult)):
            rList=res.split('\t')
            record.append({
                'record_type':rList[len(rList)-2],
                'value':rList[len(rList)-1]
            })
        return record
        
    def ping(self,pid,dest):
        r=self.execute(pid,'ping',dest)
        if not r:
            return None
        #cannot resolved
        # if re.findall(r'Name or service not known',r) or re.findall(r'unknown host',r) or re.findall(r'No address',r) or re.findall(r'not resolve',r):
        if len(r.split('\n')) < 3:
            return {
                'alias_name': 'name.unknow',
                'real_address':'0.0.0.0',
                'comment':'Name or service not known',
                'packet_loss':1,
                'latency_min':20000,
                'latency_max':20000,
                'latency_avg':20000
            }
        if re.findall(r'100% packet loss',r):
            parseAlias=r.split(' ')
            return {
                'alias_name': parseAlias[1],
                'real_address':parseAlias[2].replace('(','').replace(')',''),
                'comment':'not reachable',
                'packet_loss':1,
                'latency_min':20000,
                'latency_max':20000,
                'latency_avg':20000
            }
        pingResult=re.findall(r'.*packets transmitted.*',r)[0].split(' ')
        latencyInfo=re.findall(r'\/avg\/.*\n$',r)[0].split(' ')[2].split('/')
        jobInfo=re.findall(r'PING.*',r)[0].split(' ')
        return {
                'alias_name': jobInfo[1],
                'real_address':jobInfo[2].replace('(','').replace(')',''),
                'comment':'ok',
                'packet_loss':1-int(pingResult[3])/int(pingResult[0]),
                'latency_min':float(latencyInfo[0]),
                'latency_max':float(latencyInfo[2]),
                'latency_avg':float(latencyInfo[1])
            }
    def mtr(self,pid,dest):
        return self.execute(pid,'mtr',dest)
global_monitor=monitor()
# randomly select available probes to test dns/ping in different region with no delay
class job(threading.Thread):
    lastUpdate_time=0
    dest='1.1.1.1'
    def __init__(self,threadName='thread_dnst',dest='1.1.1.1'):
        threading.Thread.__init__(self)
        self.threadName=threadName
        self.dest=dest
        #init params
    def run(self):
        d=self.dest
        if delay_check.get(d,0) > time.time():
            logs('Delay check '+d+' due to unknow name...')
            return 0
        if re.search('^[\*\.]',d):
            logs(d+' seems like wildcard domain, ignore...')
            return 1
        p={}
        m=global_monitor
        current_pid={}
        try:
            probes=m.getProbes(online=(m.probes_timestamp + 60*60 < time.time()))
            for pid in probes:
                if probes[pid]['status'] and probes[pid]['caps']['dnst'] and probes[pid]['caps']['ping']:
                    probes[pid]['pid']=pid
                    if not p.get(probes[pid]['group']):
                        p[probes[pid]['group']]=[]
                    p[probes[pid]['group']].append(probes[pid])
                        # else:
                            
                        #     p[probes[pid]['group']][0]=probes[pid]
            for g in p:
                rand=random.randint(0,len(p[g])-1)
                tpoint=p[g][rand]
                current_pid=p[g][rand]
                tpoint.pop('caps')
                tpoint.pop('providerurl')
                tpoint.pop('status')
                tpoint['job_type']=self.jobtype
                tpoint['timestamp']=int(time.time()*1000)
                tpoint['host_name']=d
                dnst_res=m.dnst(p[g][rand]['pid'],d)
                if not dnst_res:
                    continue
                elif dnst_res[0]['record_type']=='NS' or dnst_res[0]['record_type']=='SOA':
                    tpoint['dnst']=dnst_res
                    tpoint['ping']={
                                'alias_name': 'name.unknow',
                                'real_address':'0.0.0.0',
                                'comment':'Name or service not known',
                                'packet_loss':1,
                                'latency_min':20000,
                                'latency_max':20000,
                                'latency_avg':20000
                            }
                    delay_check[self.dest]=time.time()+60*60*2
                    return write_data_to_es([tpoint])
                else:
                    tpoint['dnst']=dnst_res
                    # Parse ping result
                    ping_res=m.ping(p[g][rand]['pid'],d)
                    if not ping_res:
                        ping_res={
                                'alias_name': 'job.error',
                                'real_address':'0.0.0.0',
                                'comment':'ping job error',
                                'packet_loss':1,
                                'latency_min':20000,
                                'latency_max':20000,
                                'latency_avg':20000
                            }
                    tpoint['ping']=ping_res
                    write_data_to_es([tpoint])
        except Exception as e:
            logs(self.threadName +" handle "+str(self.dest) + " from "+str(current_pid)+" "+str(e))
            traceback.print_exc()
def write_data_to_es(data):
    tPoints=[]
    for i in range(len(data)):
        tPoints.append({
            '_index':'monitoring-cdn-'+str(datetime.utcnow().strftime("%Y.%m.%d")),
            '_type':'_doc',
            '_id':hash(json.dumps(data[i])),
            '_source':data[i]
        })
    return es_helpers.bulk(es_client,tPoints)
def getHosts():
    try:
        r = es_client.search(
            body={
                    "aggs": {
                        "2": {
                            "terms": {
                                "field": "hostname.domain.keyword",
                                "size": 80000,
                                "order": {
                                    "_count": "desc"
                                }
                            }
                        }
                    },
                    "stored_fields": ["hostname.domain.keyword"], 
                    "query": {
                      "match_all": {
                      }
                    }
                },
            index='config_hosts',
            search_type='query_then_fetch',
            filter_path=['aggregations.2.buckets']
        )['aggregations']['2']['buckets']
        hosts=[]
        for i in range(len(r)):
            if not (re.findall(r'hwcdn.net',str(r[i]['key']))):
                hosts.append(str(r[i]['key']))
        return hosts
    except Exception as e:
        logs('Get hostname from ES error '+str(e))
        traceback.print_exc()
if __name__ == '__main__':
    # Get thread limit, default limit to 10
    # threads={}
    # threads_limit=10 if len(sys.argv)<2 else int(sys.argv[1])
    # while 1:
    #     hosts=getHosts()
    #     for i in range(len(hosts)):
    #         # for i in range(10 if len(sys.argv)<2 else int(sys.argv[1])):
    #         while threading.activeCount() - 1 > threads_limit:
    #             time.sleep(1)
    #         threads[i]=job('dnst+ping-thread-'+str(i),'both',hosts[i])
    #         threads[i].daemon=False
    #         threads[i].start()
    #     while threading.activeCount() > 1:
    #         time.sleep(1)
    #     threads=[]     
    m=monitor(3)
    m.api=['https://mtr.sh/']
    print m.ping('local','www.baidu.com')

   [/ttlogin]

最新回复 (1)
  • 矢量bit 6月前
    0 引用 2
    niubility
    • 运维开源项目互助社区—致敬开源
      3
        立即登录 立即注册 
返回