网络知识-NAT分类和测试


NAT概述

    互联网的迅速发展给大家日常带来了无限便利,互联网依赖于tcp/ip网络,ip地址使得全球计算机得以在Internet上互通互联;ip地址(ipv4)数量有限,到了今天已十分紧缺了;这里介绍一下NAT(Network Address Translation,网络地址转换)是解决公有ip地址有限和内外网安全隔离的技术,讲解NAT特点分类,和提供一个测试程序(python)。

    在Internet世界里,区分各个主机是用32bit的ip地址(公网的),主机之间的连接是通过路由器,路由器里面有个路由表用指导数据包如何转发的规则;譬如,A主机要发送一条报文'hello'给Internet某个角落的B主机,那么A先将报文封装上带有目标ip的包头发到A的直接路由器R1上,R1用收到的数据包里获取到包头的目标ip,拿头目标ip在自己的路由表里匹配寻找是否有主机记录或从下一跳路由器发出,最终数据包到达B主机。另外ip地址只能定位到主机,至于是主机上属于哪个进程的呢,有个端口号的整数值区分,属于tcp/ip分层里传输层的一个16bit的值。所以在给网络上某个进程请求或回复数据,ip地址+端口号给定了就能正确送达。  在一个局域网里只有某一台主机N能上网,那么其他主机也想连网怎么办呢,不可能各个去电信拉条宽带吧;假设主机N有公有ip地址ipn,能在Internet上通信;Internet网络底层不外乎ip数据包的收发吧;那么局域网其他主机可以把数据包收发中介都通过主机N代理;局域网每个主机有网部的ip,对应进程有端口,要在Internet上通信,要让大家认识才行,那么在主机N上就把局域网主机ip和端口换成自己的公网ip并分配一个端口,这样就有了一个对应关系localip:localport <-->publicip:publicport。后面接收到的数据也按这个关系逆向替换并转发,这种局域网内的主机就可以和外网通信了,主机N在这个过程做的事情就是网络地址转化NAT。

NAT分类

  1. NAT1: Full Cone
    完全椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,并且能接收任何外部主机的数据包
  2. NAT2: Restricted Cone
    地址受限椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,并且能接收曾发送过数据的目标主机的数据包
  3. NAT3: Port-Restricted Cone
    地址和端口受限椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,只能接收来自曾发送过的目标地址+端口的数据包
  4. NAT4: Symmetric
    对称型,相同内部ip:port,当目标不同时自身映射出的外部ip:port是不同的。




    按本地和公网地址映射规则划分为上面4种,用1~4越来约束越多,当然对内网的隔离保护就越安全,但越到最后能实现p2p穿透通信的难度越大,有NAT4的网络环境基本上不能p2p通信的。p2p即点对点通信,在大文件大流量传输的场景可以减轻服务端网络IO,加快客户收发速度。譬如有一即可通信的服务端在北京,有一个广州的用户和深圳的用户,两个用户要发文件,传统的发送流程数据流都经过北京服务器中转,而p2p通信会在开始时借助北京服务器查询到对方用户的公网ip和端口,后面的数据流直接发送不经北京服务器。NAT1,NAT2,NAT3都是可以打通进行p2p通信的,实际的网络可能很复杂,局域网用户可能经过多层NAT设置代理,接收方亦如是,只要一个是NAT4对称型的就会失败。曾经也见过有人发表过如何通过猜测端口打通对称型NAT设备,我觉得成功率还得看运气成份。

NAT检测

    本人出于兴趣开发了个基于python2的NAT检测程序,就一个py文件(服务端/客户端共用,启动时参数不同),先修改好配置,部署两台服务器,然后在所要测试的环境上跑客户端,结果就会在客户端标准输出打印了。

   由于检测时需要借助两个公网服务,我这个现成的提供自带以下两个s1,s2,代码默认配置就是这两个,如果自己另外部署就自行修改。

  • s1: cn.cppcloud.cn
  • s2: usa.cppcloud.cn

   github地址:

 https://github.com/kawloong/NatDetect/

下面也直接贴出代码,不想去github下载就直接copy到本地保存。

 

   快速运行:

1
python natchk.py c



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
#! /usr/bin/python
# -*- coding:utf-8 -*-
# @Version : 1.0 
# @Time    : 2018/4/1
# @Author  : hejl
# @File    : natchk.py 
# @Summery : Detect client's local network on which type of NAT
 
'''
测试用户所在环境的NAT设备类型
NAT1: Full Cone
NAT2: Restricted Cone
NAT3: Port-Restricted Cone
NAT4: Symmetric
拓普模型:
两服务端 + 一客户端,服务端要求有有独立公网IP,至少一台,
客户端在要测试的网络环境机器上执行, 服务端的防火墙需放行相关的UDP端口
                                                         C(client)
            __________________________|_____________________________
          1|   ↑8            2|    ↑7               ↑6                                   ↑9               10|   ↑11
           ↓   |              ↓    |                |                                    |                  |   |
Server1    S1:P1              S1:P2               S1:P3                                  |                  |   |
           |----------------------------------------↑3          4                    5   |                  ↓   |
Server2    +----------------------------------------------------→ S2:P1 -------------→  S2:P2 ------------→ S2:P3
序列图: (UDP响应无顺序性,6/7/8/9先后到达不影响)
c         S1:P1        S1:P2       S1:P3                 S2:P1        S2:P2          S2:P3
|           |            |           |                     |            |              |
|---------->| 1          |           |                     |            |              |
|-----------+----------->|2          |                     |            |              |
|<----------+------------|           |                     |            |              |
|7          |            |           |                     |            |              |
|           |------------+---------->|3                    |            |              |
|           |------------+-----------+-------------------->|4           |              |
|6          |            |           |                     +----------->|5             |
|<----------+------------------------|                     |            |              |
|8          |            |           |                     |            |              |
|<----------+            |           |                     |            |              |
|           |            |           |                     |            |              |
 <----------+------------+-----------+---------------------+------------|              |
|9          |            |           |                     |            |              |
|           |            |           |                     |            |              |
|           |            |           |                     |            |              |
|-----------+------------+-----------+---------------------+------------+------------->|10
|<----------+------------+-----------+---------------------+------------+--------------|11
|           |            |           |                     |            |              |
|           |            |           |                     |            |              |
|           |            |           |                     |            |              |
响应的rStep(程序内的标识响应的身份)与上面时序标号关系:
rStep1 <---> 8
rStep2 <---> 7
rStep3 <---> 6
rStep4 <---> 9
rStep5 <---> 11
传参:
s1: 服务端1
s2: 服务端2 [可选,没有时会降低准确度]
c:  客户端
运行环境注意:
    此工具服务端和客户端都使本同一程序文件,服务端要求有两个公网IP(相应端口不能被防火墙拦截),客户端要求能连通公网;
使用示例:
1. 获得natchk.py文件之后,修改程序开始处的服务器地址
   s1_ip='第一台具备公网IP的机器'  (本文件85行)
   s2_ip='第二台具备公网IP的机器'  (本文件90行)
2. 运行服务端程序
   第一台上运行 python natchk.py s1
   第二台上运行 python natchk.py s2
3. 在待检测的环境运行客户端程序
   python natchk.py c
   网络正常的话10内打印出检测结果
   结果输出样式Test Summery: NAT3 Port Restricted
'''
 
import threading
import socket
import json
import sys
 
s1_ip='cn.cppcloud.cn'
s1_port1=17770 # listen and recv+resp
s1_port2=17771 # listen and recv+resp
s1_port3=18770 # sendto
 
s2_ip='usa.cppcloud.cn'
s2_port1=27770 # listen for s1 notify;
s2_port2=27771 # sendto
s2_port3=28770 # listen and recv+resp # check Symmetric (Last)
 
c_ip='0'
c_port=0
wait_timeout_ms = 5
 
def createUdpSock(ip, port):
    us = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    if ip > 0 and port > 0:
        us.bind( (ip,port) )
        print("socket Bind to " + str(us.getsockname()) )
 
    return us
 
def udpSendTo(us, serv, port, data):
    strdata = json.dumps(data)
    return us.sendto(strdata, (serv, port))
 
def udpRecvFrom(us, waitsec, count):
    try:
        datalst = []
        us.settimeout(waitsec)
        for i in range(count):
            datalst.append(us.recvfrom(1024))
    except socket.timeout, e:
        pass
    return datalst
 
 
def loadJsonStr(datastr):
    try:
        dt = json.loads(datastr)
    except ValueError,e:
        dt = {}
    return dt
 
# Server-1 Process
def Server1_1():
    serv1 = createUdpSock(s1_ip, s1_port1)
    serv3 = createUdpSock(s1_ip, s1_port3)
    while True:
        datastr,addr = serv1.recvfrom(1024)
        datadict = loadJsonStr(datastr)
        datadict["cli_addr"] = addr[0]
        datadict["cli_port"] = addr[1]
 
        s1log = []
        # notify Server2 Sendto
        if s2_ip:
            datadict["rStep"] = 4
            nret = udpSendTo(serv1, s2_ip, s2_port1, datadict)
            s1log.append('notify S2(%s) nsend=%d' % (s2_ip, nret))
        datadict["rStep"] = 3
        nret = udpSendTo(serv3, addr[0], addr[1], datadict) # other port response
        s1log.append('s1_port3 resp nsend=%d' % nret)
 
        datadict["rStep"] = 1
        datadict["msglog"] = s1log
        nret = udpSendTo(serv1, addr[0], addr[1], datadict) # response echo
        s1log.append('s1_port1 resp nsend=%d' % nret)
        print("Serv1-Recv| client="+str(addr)+'| detail=' + ';'.join(s1log))
 
# Server-1 Process
def Server1_2():
    ServEcho(s1_ip, s1_port2, 2)
 
def ServEcho(servIP, servPort, rStep):
    serv = createUdpSock(servIP, servPort)
    while True:
        datastr,addr = serv.recvfrom(1024)
        datadict = loadJsonStr(datastr)
        datadict["cli_addr"] = addr[0]
        datadict["cli_port"] = addr[1]
        datadict["rStep"] = rStep
        udpSendTo(serv, addr[0], addr[1], datadict)
 
 
# Server-2 Process # listen for Server1's notify
def Server2_1():
    serv1 = createUdpSock(s2_ip, s2_port1)
    serv2 = createUdpSock(s2_ip, s2_port2)
    while True:
        datastr,addr = serv1.recvfrom(1024)
        datadict = loadJsonStr(datastr)
        if not "cli_addr" in datadict:
            print('Invalid NotifyMsg:'+datastr)
            continue
        cliaddr = datadict["cli_addr"]
        cliport = datadict["cli_port"]
        # datadict["rStep"] = 4
        nret = udpSendTo(serv2, cliaddr, cliport, datadict)
        print('Serv1(%s) Notify test Client(%s), nsend=%d' % (addr,cliaddr, nret) )
 
# Server-2 Process # Check Last
def Server2_2():
    ServEcho(s2_ip, s2_port3, 5)
 
# check is valid bind IP addr for server
def isValidServAddr(ipv4):
    try:
        addrs = socket.getaddrinfo(socket.gethostname(),None)
    except:
        addrs = []
    addrlist = [item[4][0] for item in addrs if ':' not in item[4][0]]
    bret = ipv4 in addrlist
    if not bret:
        try:
            stmp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            stmp.bind( (ipv4, 60330) )
            bret = True
        except BaseException,e:
            bret = False
            print('IpAddr %s isnot local addr%s'%(ipv4, addrlist))
            print(e)
        finally:
            stmp.close()
    return bret
 
def runS1():
    if not isValidServAddr(s1_ip):
        return 1
 
    t1 = threading.Thread(target=Server1_1, name='S1-Listen1')
    t2 = threading.Thread(target=Server1_2, name='S1-Echo')
 
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    return 0
 
def runS2():
    if not isValidServAddr(s2_ip):
        return 2
 
    t1 = threading.Thread(target=Server2_1, name='S2-ListenNotify')
    t2 = threading.Thread(target=Server2_2, name='S2-Echo')
 
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    return 0
 
# client process
def runClient():
    clisock = createUdpSock(c_ip, c_port)
 
    udpSendTo(clisock, s1_ip, s1_port1, {'step': 1})
    udpSendTo(clisock, s1_ip, s1_port2, {'step': 2})
    print('client Sock is %s' % str(clisock.getsockname()))
 
    # 接收各路响应
    resplst1 = udpRecvFrom(clisock, wait_timeout_ms, 4)
 
    resplst2 = []
    if s2_ip > '':
        udpSendTo(clisock, s2_ip, s2_port3, {'step': 3}) # server-2 echo
        resplst2 = udpRecvFrom(clisock, wait_timeout_ms, 1)
 
    clisock.close()
    calcSummery(resplst1, resplst2)
 
def calcSummery(rsp1, rsp2):
    rspmap = {}
    for item in rsp1+rsp2:
        nitem = json.loads(item[0])
        nitem['serv'] = item[1]
        rspmap[nitem.get('rStep')] = nitem
        print(nitem)
    if not 1 in rspmap:
        print('Server1 not work Or Offline')
        return
    if s2_ip > '' and not 5 in rspmap:
        print('Server2 not work')
        return
 
    nat_type = 'unknow'
    if s2_ip > '': # 完整的服务
        if 4 in rspmap:
            nat_type = 'NAT1 Cone'
        elif rspmap[1]['cli_port'] != rspmap[5]['cli_port']:
            nat_type = 'NAT4 Symmetric'
        elif 3 in rspmap:
            nat_type = 'NAT2 Address Restricted'
        else:
            nat_type = 'NAT3 Port Restricted'
    else:
        if rspmap[1]['cli_port'] != rspmap[2]['cli_port']:
            nat_type = 'NAT4 Symmetric'
        elif 3 in rspmap:
            nat_type = 'NAT2 Address Restricted(Maybe) or NAT1'
        else:
            nat_type = 'NAT3 Port Restricted(Maybe)'
 
    print("Test Summery: "+nat_type)
 
 
if __name__ == "__main__":
    param = ''
    if len(sys.argv) > 1:
        param = sys.argv[1]
    runfunlst = {'s1': runS1, 's2': runS2, 'c': runClient}
    while not param in runfunlst:
        param = raw_input('''Please Select Run Mode:
        s1: Server-1 Process
        s2: Server-2 Process
        c:  Client 
        >>''')
        param = param.strip()
 
    run = runfunlst[param]
    run()

 

 

 

/latefirstcmt/3