NAT概述
互联网的迅速发展给大家日常带来了无限便利,互联网依赖于tcp/ip网络,ip地址使得全球计算机得以在Internet上互通互联;ip地址(ipv4)数量有限,到了今天已十分紧缺了;这里介绍一下NAT(Network Address Translation,网络地址转换)是解决公有ip地址有限和内外网安全隔离的技术,讲解NAT特点分类,和提供一个测试程序(python)。
在Internet世界里,区分各个主机是用32bit的ip地址(公网的),主机之间的连接是通过路由器,路由器里面有个路由表用指导数据包如何转发的规则;譬如,A主机要发送一条报文'hello'给Internet某个角落的B主机,那么A先将报文封装上带有目标ip的包头发到A的直接路由器R1上,R1用收到的数据包里获取到包头的目标ip,拿头目标ip在自己的路由表里匹配寻找是否有主机记录或从下一跳路由器发出,最终数据包到达B主机。另外ip地址只能定位到主机,至于是主机上属于哪个进程的呢,有个端口号的整数值区分,属于tcp/ip分层里传输层的一个16bit的值。所以在给网络上某个进程请求或回复数据,ip地址+端口号给定了就能正确送达。 在一个局域网里只有某一台主机N能上网,那么其他主机也想连网怎么办呢,不可能各个去电信拉条宽带吧;假设主机N有公有ip地址ipn,能在Internet上通信;Internet网络底层不外乎ip数据包的收发吧;那么局域网其他主机可以把数据包收发中介都通过主机N代理;局域网每个主机有网部的ip,对应进程有端口,要在Internet上通信,要让大家认识才行,那么在主机N上就把局域网主机ip和端口换成自己的公网ip并分配一个端口,这样就有了一个对应关系localip:localport <-->publicip:publicport。后面接收到的数据也按这个关系逆向替换并转发,这种局域网内的主机就可以和外网通信了,主机N在这个过程做的事情就是网络地址转化NAT。
NAT分类
- NAT1: Full Cone
完全椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,并且能接收任何外部主机的数据包 - NAT2: Restricted Cone
地址受限椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,并且能接收曾发送过数据的目标主机的数据包 - NAT3: Port-Restricted Cone
地址和端口受限椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,只能接收来自曾发送过的目标地址+端口的数据包 - NAT4: Symmetric
对称型,相同内部ip:port,当目标不同时自身映射出的外部ip:port是不同的。
按本地和公网地址映射规则划分为上面4种,用1~4越来约束越多,当然对内网的隔离保护就越安全,但越到最后能实现p2p穿透通信的难度越大,有NAT4的网络环境基本上不能p2p通信的。p2p即点对点通信,在大文件大流量传输的场景可以减轻服务端网络IO,加快客户收发速度。譬如有一即可通信的服务端在北京,有一个广州的用户和深圳的用户,两个用户要发文件,传统的发送流程数据流都经过北京服务器中转,而p2p通信会在开始时借助北京服务器查询到对方用户的公网ip和端口,后面的数据流直接发送不经北京服务器。NAT1,NAT2,NAT3都是可以打通进行p2p通信的,实际的网络可能很复杂,局域网用户可能经过多层NAT设置代理,接收方亦如是,只要一个是NAT4对称型的就会失败。曾经也见过有人发表过如何通过猜测端口打通对称型NAT设备,我觉得成功率还得看运气成份。
NAT检测
本人出于兴趣开发了个基于python2的NAT检测程序,就一个py文件(服务端/客户端共用,启动时参数不同),先修改好配置,部署两台服务器,然后在所要测试的环境上跑客户端,结果就会在客户端标准输出打印了。
由于检测时需要借助两个公网服务,我这个现成的提供自带以下两个s1,s2,代码默认配置就是这两个,如果自己另外部署就自行修改。
-
s1: cn.cppcloud.cn
-
s2: usa.cppcloud.cn
github地址:
https://github.com/kawloong/NatDetect/
下面也直接贴出代码,不想去github下载就直接copy到本地保存。
快速运行:
1 | python natchk.py c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 | #! /usr/bin/python # -*- coding:utf-8 -*- # # @Version : 1.0 # @Time : 2018/4/1 # @Author : hejl # @File : natchk.py # @Summery : Detect client's local network on which type of NAT ''' 测试用户所在环境的NAT设备类型 NAT1: Full Cone NAT2: Restricted Cone NAT3: Port-Restricted Cone NAT4: Symmetric 拓普模型: 两服务端 + 一客户端,服务端要求有有独立公网IP,至少一台, 客户端在要测试的网络环境机器上执行, 服务端的防火墙需放行相关的UDP端口 C(client) __________________________|_____________________________ 1| ↑8 2| ↑7 ↑6 ↑9 10| ↑11 ↓ | ↓ | | | | | Server1 S1:P1 S1:P2 S1:P3 | | | |----------------------------------------↑3 4 5 | ↓ | Server2 +----------------------------------------------------→ S2:P1 -------------→ S2:P2 ------------→ S2:P3 序列图: (UDP响应无顺序性,6/7/8/9先后到达不影响) c S1:P1 S1:P2 S1:P3 S2:P1 S2:P2 S2:P3 | | | | | | | |---------->| 1 | | | | | |-----------+----------->|2 | | | | |<----------+------------| | | | | |7 | | | | | | | |------------+---------->|3 | | | | |------------+-----------+-------------------->|4 | | |6 | | | +----------->|5 | |<----------+------------------------| | | | |8 | | | | | | |<----------+ | | | | | | | | | | | | <----------+------------+-----------+---------------------+------------| | |9 | | | | | | | | | | | | | | | | | | | | |-----------+------------+-----------+---------------------+------------+------------->|10 |<----------+------------+-----------+---------------------+------------+--------------|11 | | | | | | | | | | | | | | | | | | | | | 响应的rStep(程序内的标识响应的身份)与上面时序标号关系: rStep1 <---> 8 rStep2 <---> 7 rStep3 <---> 6 rStep4 <---> 9 rStep5 <---> 11 传参: s1: 服务端1 s2: 服务端2 [可选,没有时会降低准确度] c: 客户端 运行环境注意: 此工具服务端和客户端都使本同一程序文件,服务端要求有两个公网IP(相应端口不能被防火墙拦截),客户端要求能连通公网; 使用示例: 1. 获得natchk.py文件之后,修改程序开始处的服务器地址 s1_ip='第一台具备公网IP的机器' (本文件85行) s2_ip='第二台具备公网IP的机器' (本文件90行) 2. 运行服务端程序 第一台上运行 python natchk.py s1 第二台上运行 python natchk.py s2 3. 在待检测的环境运行客户端程序 python natchk.py c 网络正常的话10内打印出检测结果 结果输出样式Test Summery: NAT3 Port Restricted ''' import threading import socket import json import sys s1_ip = 'cn.cppcloud.cn' s1_port1 = 17770 # listen and recv+resp s1_port2 = 17771 # listen and recv+resp s1_port3 = 18770 # sendto s2_ip = 'usa.cppcloud.cn' s2_port1 = 27770 # listen for s1 notify; s2_port2 = 27771 # sendto s2_port3 = 28770 # listen and recv+resp # check Symmetric (Last) c_ip = '0' c_port = 0 wait_timeout_ms = 5 def createUdpSock(ip, port): us = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) if ip > 0 and port > 0 : us.bind( (ip,port) ) print ( "socket Bind to " + str (us.getsockname()) ) return us def udpSendTo(us, serv, port, data): strdata = json.dumps(data) return us.sendto(strdata, (serv, port)) def udpRecvFrom(us, waitsec, count): try : datalst = [] us.settimeout(waitsec) for i in range (count): datalst.append(us.recvfrom( 1024 )) except socket.timeout, e: pass return datalst def loadJsonStr(datastr): try : dt = json.loads(datastr) except ValueError,e: dt = {} return dt # Server-1 Process def Server1_1(): serv1 = createUdpSock(s1_ip, s1_port1) serv3 = createUdpSock(s1_ip, s1_port3) while True : datastr,addr = serv1.recvfrom( 1024 ) datadict = loadJsonStr(datastr) datadict[ "cli_addr" ] = addr[ 0 ] datadict[ "cli_port" ] = addr[ 1 ] s1log = [] # notify Server2 Sendto if s2_ip: datadict[ "rStep" ] = 4 nret = udpSendTo(serv1, s2_ip, s2_port1, datadict) s1log.append( 'notify S2(%s) nsend=%d' % (s2_ip, nret)) datadict[ "rStep" ] = 3 nret = udpSendTo(serv3, addr[ 0 ], addr[ 1 ], datadict) # other port response s1log.append( 's1_port3 resp nsend=%d' % nret) datadict[ "rStep" ] = 1 datadict[ "msglog" ] = s1log nret = udpSendTo(serv1, addr[ 0 ], addr[ 1 ], datadict) # response echo s1log.append( 's1_port1 resp nsend=%d' % nret) print ( "Serv1-Recv| client=" + str (addr) + '| detail=' + ';' .join(s1log)) # Server-1 Process def Server1_2(): ServEcho(s1_ip, s1_port2, 2 ) def ServEcho(servIP, servPort, rStep): serv = createUdpSock(servIP, servPort) while True : datastr,addr = serv.recvfrom( 1024 ) datadict = loadJsonStr(datastr) datadict[ "cli_addr" ] = addr[ 0 ] datadict[ "cli_port" ] = addr[ 1 ] datadict[ "rStep" ] = rStep udpSendTo(serv, addr[ 0 ], addr[ 1 ], datadict) # Server-2 Process # listen for Server1's notify def Server2_1(): serv1 = createUdpSock(s2_ip, s2_port1) serv2 = createUdpSock(s2_ip, s2_port2) while True : datastr,addr = serv1.recvfrom( 1024 ) datadict = loadJsonStr(datastr) if not "cli_addr" in datadict: print ( 'Invalid NotifyMsg:' + datastr) continue cliaddr = datadict[ "cli_addr" ] cliport = datadict[ "cli_port" ] # datadict["rStep"] = 4 nret = udpSendTo(serv2, cliaddr, cliport, datadict) print ( 'Serv1(%s) Notify test Client(%s), nsend=%d' % (addr,cliaddr, nret) ) # Server-2 Process # Check Last def Server2_2(): ServEcho(s2_ip, s2_port3, 5 ) # check is valid bind IP addr for server def isValidServAddr(ipv4): try : addrs = socket.getaddrinfo(socket.gethostname(), None ) except : addrs = [] addrlist = [item[ 4 ][ 0 ] for item in addrs if ':' not in item[ 4 ][ 0 ]] bret = ipv4 in addrlist if not bret: try : stmp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) stmp.bind( (ipv4, 60330 ) ) bret = True except BaseException,e: bret = False print ( 'IpAddr %s isnot local addr%s' % (ipv4, addrlist)) print (e) finally : stmp.close() return bret def runS1(): if not isValidServAddr(s1_ip): return 1 t1 = threading.Thread(target = Server1_1, name = 'S1-Listen1' ) t2 = threading.Thread(target = Server1_2, name = 'S1-Echo' ) t1.start() t2.start() t1.join() t2.join() return 0 def runS2(): if not isValidServAddr(s2_ip): return 2 t1 = threading.Thread(target = Server2_1, name = 'S2-ListenNotify' ) t2 = threading.Thread(target = Server2_2, name = 'S2-Echo' ) t1.start() t2.start() t1.join() t2.join() return 0 # client process def runClient(): clisock = createUdpSock(c_ip, c_port) udpSendTo(clisock, s1_ip, s1_port1, { 'step' : 1 }) udpSendTo(clisock, s1_ip, s1_port2, { 'step' : 2 }) print ( 'client Sock is %s' % str (clisock.getsockname())) # 接收各路响应 resplst1 = udpRecvFrom(clisock, wait_timeout_ms, 4 ) resplst2 = [] if s2_ip > '': udpSendTo(clisock, s2_ip, s2_port3, { 'step' : 3 }) # server-2 echo resplst2 = udpRecvFrom(clisock, wait_timeout_ms, 1 ) clisock.close() calcSummery(resplst1, resplst2) def calcSummery(rsp1, rsp2): rspmap = {} for item in rsp1 + rsp2: nitem = json.loads(item[ 0 ]) nitem[ 'serv' ] = item[ 1 ] rspmap[nitem.get( 'rStep' )] = nitem print (nitem) if not 1 in rspmap: print ( 'Server1 not work Or Offline' ) return if s2_ip > '' and not 5 in rspmap: print ( 'Server2 not work' ) return nat_type = 'unknow' if s2_ip > '': # 完整的服务 if 4 in rspmap: nat_type = 'NAT1 Cone' elif rspmap[ 1 ][ 'cli_port' ] ! = rspmap[ 5 ][ 'cli_port' ]: nat_type = 'NAT4 Symmetric' elif 3 in rspmap: nat_type = 'NAT2 Address Restricted' else : nat_type = 'NAT3 Port Restricted' else : if rspmap[ 1 ][ 'cli_port' ] ! = rspmap[ 2 ][ 'cli_port' ]: nat_type = 'NAT4 Symmetric' elif 3 in rspmap: nat_type = 'NAT2 Address Restricted(Maybe) or NAT1' else : nat_type = 'NAT3 Port Restricted(Maybe)' print ( "Test Summery: " + nat_type) if __name__ = = "__main__" : param = '' if len (sys.argv) > 1 : param = sys.argv[ 1 ] runfunlst = { 's1' : runS1, 's2' : runS2, 'c' : runClient} while not param in runfunlst: param = raw_input ( '''Please Select Run Mode: s1: Server-1 Process s2: Server-2 Process c: Client >>''' ) param = param.strip() run = runfunlst[param] run() |