NAT概述
互联网的迅速发展给大家日常带来了无限便利,互联网依赖于tcp/ip网络,ip地址使得全球计算机得以在Internet上互通互联;ip地址(ipv4)数量有限,到了今天已十分紧缺了;这里介绍一下NAT(Network Address Translation,网络地址转换)是解决公有ip地址有限和内外网安全隔离的技术,讲解NAT特点分类,和提供一个测试程序(python)。
在Internet世界里,区分各个主机是用32bit的ip地址(公网的),主机之间的连接是通过路由器,路由器里面有个路由表用指导数据包如何转发的规则;譬如,A主机要发送一条报文'hello'给Internet某个角落的B主机,那么A先将报文封装上带有目标ip的包头发到A的直接路由器R1上,R1用收到的数据包里获取到包头的目标ip,拿头目标ip在自己的路由表里匹配寻找是否有主机记录或从下一跳路由器发出,最终数据包到达B主机。另外ip地址只能定位到主机,至于是主机上属于哪个进程的呢,有个端口号的整数值区分,属于tcp/ip分层里传输层的一个16bit的值。所以在给网络上某个进程请求或回复数据,ip地址+端口号给定了就能正确送达。 在一个局域网里只有某一台主机N能上网,那么其他主机也想连网怎么办呢,不可能各个去电信拉条宽带吧;假设主机N有公有ip地址ipn,能在Internet上通信;Internet网络底层不外乎ip数据包的收发吧;那么局域网其他主机可以把数据包收发中介都通过主机N代理;局域网每个主机有网部的ip,对应进程有端口,要在Internet上通信,要让大家认识才行,那么在主机N上就把局域网主机ip和端口换成自己的公网ip并分配一个端口,这样就有了一个对应关系localip:localport <-->publicip:publicport。后面接收到的数据也按这个关系逆向替换并转发,这种局域网内的主机就可以和外网通信了,主机N在这个过程做的事情就是网络地址转化NAT。
NAT分类
- NAT1: Full Cone
完全椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,并且能接收任何外部主机的数据包 - NAT2: Restricted Cone
地址受限椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,并且能接收曾发送过数据的目标主机的数据包 - NAT3: Port-Restricted Cone
地址和端口受限椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,只能接收来自曾发送过的目标地址+端口的数据包 - NAT4: Symmetric
对称型,相同内部ip:port,当目标不同时自身映射出的外部ip:port是不同的。
按本地和公网地址映射规则划分为上面4种,用1~4越来约束越多,当然对内网的隔离保护就越安全,但越到最后能实现p2p穿透通信的难度越大,有NAT4的网络环境基本上不能p2p通信的。p2p即点对点通信,在大文件大流量传输的场景可以减轻服务端网络IO,加快客户收发速度。譬如有一即可通信的服务端在北京,有一个广州的用户和深圳的用户,两个用户要发文件,传统的发送流程数据流都经过北京服务器中转,而p2p通信会在开始时借助北京服务器查询到对方用户的公网ip和端口,后面的数据流直接发送不经北京服务器。NAT1,NAT2,NAT3都是可以打通进行p2p通信的,实际的网络可能很复杂,局域网用户可能经过多层NAT设置代理,接收方亦如是,只要一个是NAT4对称型的就会失败。曾经也见过有人发表过如何通过猜测端口打通对称型NAT设备,我觉得成功率还得看运气成份。
NAT检测
本人出于兴趣开发了个基于python2的NAT检测程序,就一个py文件(服务端/客户端共用,启动时参数不同),先修改好配置,部署两台服务器,然后在所要测试的环境上跑客户端,结果就会在客户端标准输出打印了。
由于检测时需要借助两个公网服务,我这个现成的提供自带以下两个s1,s2,代码默认配置就是这两个,如果自己另外部署就自行修改。
-
s1: cn.cppcloud.cn
-
s2: usa.cppcloud.cn
github地址:
https://github.com/kawloong/NatDetect/
下面也直接贴出代码,不想去github下载就直接copy到本地保存。
快速运行:
python natchk.py c
#! /usr/bin/python # -*- coding:utf-8 -*- # # @Version : 1.0 # @Time : 2018/4/1 # @Author : hejl # @File : natchk.py # @Summery : Detect client's local network on which type of NAT ''' 测试用户所在环境的NAT设备类型 NAT1: Full Cone NAT2: Restricted Cone NAT3: Port-Restricted Cone NAT4: Symmetric 拓普模型: 两服务端 + 一客户端,服务端要求有有独立公网IP,至少一台, 客户端在要测试的网络环境机器上执行, 服务端的防火墙需放行相关的UDP端口 C(client) __________________________|_____________________________ 1| ↑8 2| ↑7 ↑6 ↑9 10| ↑11 ↓ | ↓ | | | | | Server1 S1:P1 S1:P2 S1:P3 | | | |----------------------------------------↑3 4 5 | ↓ | Server2 +----------------------------------------------------→ S2:P1 -------------→ S2:P2 ------------→ S2:P3 序列图: (UDP响应无顺序性,6/7/8/9先后到达不影响) c S1:P1 S1:P2 S1:P3 S2:P1 S2:P2 S2:P3 | | | | | | | |---------->| 1 | | | | | |-----------+----------->|2 | | | | |<----------+------------| | | | | |7 | | | | | | | |------------+---------->|3 | | | | |------------+-----------+-------------------->|4 | | |6 | | | +----------->|5 | |<----------+------------------------| | | | |8 | | | | | | |<----------+ | | | | | | | | | | | | <----------+------------+-----------+---------------------+------------| | |9 | | | | | | | | | | | | | | | | | | | | |-----------+------------+-----------+---------------------+------------+------------->|10 |<----------+------------+-----------+---------------------+------------+--------------|11 | | | | | | | | | | | | | | | | | | | | | 响应的rStep(程序内的标识响应的身份)与上面时序标号关系: rStep1 <---> 8 rStep2 <---> 7 rStep3 <---> 6 rStep4 <---> 9 rStep5 <---> 11 传参: s1: 服务端1 s2: 服务端2 [可选,没有时会降低准确度] c: 客户端 运行环境注意: 此工具服务端和客户端都使本同一程序文件,服务端要求有两个公网IP(相应端口不能被防火墙拦截),客户端要求能连通公网; 使用示例: 1. 获得natchk.py文件之后,修改程序开始处的服务器地址 s1_ip='第一台具备公网IP的机器' (本文件85行) s2_ip='第二台具备公网IP的机器' (本文件90行) 2. 运行服务端程序 第一台上运行 python natchk.py s1 第二台上运行 python natchk.py s2 3. 在待检测的环境运行客户端程序 python natchk.py c 网络正常的话10内打印出检测结果 结果输出样式Test Summery: NAT3 Port Restricted ''' import threading import socket import json import sys s1_ip='cn.cppcloud.cn' s1_port1=17770 # listen and recv+resp s1_port2=17771 # listen and recv+resp s1_port3=18770 # sendto s2_ip='usa.cppcloud.cn' s2_port1=27770 # listen for s1 notify; s2_port2=27771 # sendto s2_port3=28770 # listen and recv+resp # check Symmetric (Last) c_ip='0' c_port=0 wait_timeout_ms = 5 def createUdpSock(ip, port): us = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) if ip > 0 and port > 0: us.bind( (ip,port) ) print("socket Bind to " + str(us.getsockname()) ) return us def udpSendTo(us, serv, port, data): strdata = json.dumps(data) return us.sendto(strdata, (serv, port)) def udpRecvFrom(us, waitsec, count): try: datalst = [] us.settimeout(waitsec) for i in range(count): datalst.append(us.recvfrom(1024)) except socket.timeout, e: pass return datalst def loadJsonStr(datastr): try: dt = json.loads(datastr) except ValueError,e: dt = {} return dt # Server-1 Process def Server1_1(): serv1 = createUdpSock(s1_ip, s1_port1) serv3 = createUdpSock(s1_ip, s1_port3) while True: datastr,addr = serv1.recvfrom(1024) datadict = loadJsonStr(datastr) datadict["cli_addr"] = addr[0] datadict["cli_port"] = addr[1] s1log = [] # notify Server2 Sendto if s2_ip: datadict["rStep"] = 4 nret = udpSendTo(serv1, s2_ip, s2_port1, datadict) s1log.append('notify S2(%s) nsend=%d' % (s2_ip, nret)) datadict["rStep"] = 3 nret = udpSendTo(serv3, addr[0], addr[1], datadict) # other port response s1log.append('s1_port3 resp nsend=%d' % nret) datadict["rStep"] = 1 datadict["msglog"] = s1log nret = udpSendTo(serv1, addr[0], addr[1], datadict) # response echo s1log.append('s1_port1 resp nsend=%d' % nret) print("Serv1-Recv| client="+str(addr)+'| detail=' + ';'.join(s1log)) # Server-1 Process def Server1_2(): ServEcho(s1_ip, s1_port2, 2) def ServEcho(servIP, servPort, rStep): serv = createUdpSock(servIP, servPort) while True: datastr,addr = serv.recvfrom(1024) datadict = loadJsonStr(datastr) datadict["cli_addr"] = addr[0] datadict["cli_port"] = addr[1] datadict["rStep"] = rStep udpSendTo(serv, addr[0], addr[1], datadict) # Server-2 Process # listen for Server1's notify def Server2_1(): serv1 = createUdpSock(s2_ip, s2_port1) serv2 = createUdpSock(s2_ip, s2_port2) while True: datastr,addr = serv1.recvfrom(1024) datadict = loadJsonStr(datastr) if not "cli_addr" in datadict: print('Invalid NotifyMsg:'+datastr) continue cliaddr = datadict["cli_addr"] cliport = datadict["cli_port"] # datadict["rStep"] = 4 nret = udpSendTo(serv2, cliaddr, cliport, datadict) print('Serv1(%s) Notify test Client(%s), nsend=%d' % (addr,cliaddr, nret) ) # Server-2 Process # Check Last def Server2_2(): ServEcho(s2_ip, s2_port3, 5) # check is valid bind IP addr for server def isValidServAddr(ipv4): try: addrs = socket.getaddrinfo(socket.gethostname(),None) except: addrs = [] addrlist = [item[4][0] for item in addrs if ':' not in item[4][0]] bret = ipv4 in addrlist if not bret: try: stmp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) stmp.bind( (ipv4, 60330) ) bret = True except BaseException,e: bret = False print('IpAddr %s isnot local addr%s'%(ipv4, addrlist)) print(e) finally: stmp.close() return bret def runS1(): if not isValidServAddr(s1_ip): return 1 t1 = threading.Thread(target=Server1_1, name='S1-Listen1') t2 = threading.Thread(target=Server1_2, name='S1-Echo') t1.start() t2.start() t1.join() t2.join() return 0 def runS2(): if not isValidServAddr(s2_ip): return 2 t1 = threading.Thread(target=Server2_1, name='S2-ListenNotify') t2 = threading.Thread(target=Server2_2, name='S2-Echo') t1.start() t2.start() t1.join() t2.join() return 0 # client process def runClient(): clisock = createUdpSock(c_ip, c_port) udpSendTo(clisock, s1_ip, s1_port1, {'step': 1}) udpSendTo(clisock, s1_ip, s1_port2, {'step': 2}) print('client Sock is %s' % str(clisock.getsockname())) # 接收各路响应 resplst1 = udpRecvFrom(clisock, wait_timeout_ms, 4) resplst2 = [] if s2_ip > '': udpSendTo(clisock, s2_ip, s2_port3, {'step': 3}) # server-2 echo resplst2 = udpRecvFrom(clisock, wait_timeout_ms, 1) clisock.close() calcSummery(resplst1, resplst2) def calcSummery(rsp1, rsp2): rspmap = {} for item in rsp1+rsp2: nitem = json.loads(item[0]) nitem['serv'] = item[1] rspmap[nitem.get('rStep')] = nitem print(nitem) if not 1 in rspmap: print('Server1 not work Or Offline') return if s2_ip > '' and not 5 in rspmap: print('Server2 not work') return nat_type = 'unknow' if s2_ip > '': # 完整的服务 if 4 in rspmap: nat_type = 'NAT1 Cone' elif rspmap[1]['cli_port'] != rspmap[5]['cli_port']: nat_type = 'NAT4 Symmetric' elif 3 in rspmap: nat_type = 'NAT2 Address Restricted' else: nat_type = 'NAT3 Port Restricted' else: if rspmap[1]['cli_port'] != rspmap[2]['cli_port']: nat_type = 'NAT4 Symmetric' elif 3 in rspmap: nat_type = 'NAT2 Address Restricted(Maybe) or NAT1' else: nat_type = 'NAT3 Port Restricted(Maybe)' print("Test Summery: "+nat_type) if __name__ == "__main__": param = '' if len(sys.argv) > 1: param = sys.argv[1] runfunlst = {'s1': runS1, 's2': runS2, 'c': runClient} while not param in runfunlst: param = raw_input('''Please Select Run Mode: s1: Server-1 Process s2: Server-2 Process c: Client >>''') param = param.strip() run = runfunlst[param] run()