网络知识-NAT分类和测试


NAT概述

    互联网的迅速发展给大家日常带来了无限便利,互联网依赖于tcp/ip网络,ip地址使得全球计算机得以在Internet上互通互联;ip地址(ipv4)数量有限,到了今天已十分紧缺了;这里介绍一下NAT(Network Address Translation,网络地址转换)是解决公有ip地址有限和内外网安全隔离的技术,讲解NAT特点分类,和提供一个测试程序(python)。

    在Internet世界里,区分各个主机是用32bit的ip地址(公网的),主机之间的连接是通过路由器,路由器里面有个路由表用指导数据包如何转发的规则;譬如,A主机要发送一条报文'hello'给Internet某个角落的B主机,那么A先将报文封装上带有目标ip的包头发到A的直接路由器R1上,R1用收到的数据包里获取到包头的目标ip,拿头目标ip在自己的路由表里匹配寻找是否有主机记录或从下一跳路由器发出,最终数据包到达B主机。另外ip地址只能定位到主机,至于是主机上属于哪个进程的呢,有个端口号的整数值区分,属于tcp/ip分层里传输层的一个16bit的值。所以在给网络上某个进程请求或回复数据,ip地址+端口号给定了就能正确送达。  在一个局域网里只有某一台主机N能上网,那么其他主机也想连网怎么办呢,不可能各个去电信拉条宽带吧;假设主机N有公有ip地址ipn,能在Internet上通信;Internet网络底层不外乎ip数据包的收发吧;那么局域网其他主机可以把数据包收发中介都通过主机N代理;局域网每个主机有网部的ip,对应进程有端口,要在Internet上通信,要让大家认识才行,那么在主机N上就把局域网主机ip和端口换成自己的公网ip并分配一个端口,这样就有了一个对应关系localip:localport <-->publicip:publicport。后面接收到的数据也按这个关系逆向替换并转发,这种局域网内的主机就可以和外网通信了,主机N在这个过程做的事情就是网络地址转化NAT。

NAT分类

  1. NAT1: Full Cone
    完全椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,并且能接收任何外部主机的数据包
  2. NAT2: Restricted Cone
    地址受限椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,并且能接收曾发送过数据的目标主机的数据包
  3. NAT3: Port-Restricted Cone
    地址和端口受限椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,只能接收来自曾发送过的目标地址+端口的数据包
  4. NAT4: Symmetric
    对称型,相同内部ip:port,当目标不同时自身映射出的外部ip:port是不同的。




    按本地和公网地址映射规则划分为上面4种,用1~4越来约束越多,当然对内网的隔离保护就越安全,但越到最后能实现p2p穿透通信的难度越大,有NAT4的网络环境基本上不能p2p通信的。p2p即点对点通信,在大文件大流量传输的场景可以减轻服务端网络IO,加快客户收发速度。譬如有一即可通信的服务端在北京,有一个广州的用户和深圳的用户,两个用户要发文件,传统的发送流程数据流都经过北京服务器中转,而p2p通信会在开始时借助北京服务器查询到对方用户的公网ip和端口,后面的数据流直接发送不经北京服务器。NAT1,NAT2,NAT3都是可以打通进行p2p通信的,实际的网络可能很复杂,局域网用户可能经过多层NAT设置代理,接收方亦如是,只要一个是NAT4对称型的就会失败。曾经也见过有人发表过如何通过猜测端口打通对称型NAT设备,我觉得成功率还得看运气成份。

NAT检测

    本人出于兴趣开发了个基于python2的NAT检测程序,就一个py文件(服务端/客户端共用,启动时参数不同),先修改好配置,部署两台服务器,然后在所要测试的环境上跑客户端,结果就会在客户端标准输出打印了。

   由于检测时需要借助两个公网服务,我这个现成的提供自带以下两个s1,s2,代码默认配置就是这两个,如果自己另外部署就自行修改。

  • s1: cn.cppcloud.cn
  • s2: usa.cppcloud.cn

   github地址:

 https://github.com/kawloong/NatDetect/

下面也直接贴出代码,不想去github下载就直接copy到本地保存。

 

   快速运行:

python natchk.py c



#! /usr/bin/python
# -*- coding:utf-8 -*- 
#  
# @Version : 1.0  
# @Time    : 2018/4/1
# @Author  : hejl
# @File    : natchk.py  
# @Summery : Detect client's local network on which type of NAT

'''
测试用户所在环境的NAT设备类型
NAT1: Full Cone
NAT2: Restricted Cone
NAT3: Port-Restricted Cone
NAT4: Symmetric
拓普模型:
两服务端 + 一客户端,服务端要求有有独立公网IP,至少一台,
客户端在要测试的网络环境机器上执行, 服务端的防火墙需放行相关的UDP端口
                                                         C(client)
            __________________________|_____________________________
          1|   ↑8            2|    ↑7               ↑6                                   ↑9               10|   ↑11
           ↓   |              ↓    |                |                                    |                  |   |
Server1    S1:P1              S1:P2               S1:P3                                  |                  |   |
           |----------------------------------------↑3          4                    5   |                  ↓   |
Server2    +----------------------------------------------------→ S2:P1 -------------→  S2:P2 ------------→ S2:P3
序列图: (UDP响应无顺序性,6/7/8/9先后到达不影响)
c         S1:P1        S1:P2       S1:P3                 S2:P1        S2:P2          S2:P3
|           |            |           |                     |            |              |
|---------->| 1          |           |                     |            |              |
|-----------+----------->|2          |                     |            |              |
|<----------+------------|           |                     |            |              |
|7          |            |           |                     |            |              |
|           |------------+---------->|3                    |            |              |
|           |------------+-----------+-------------------->|4           |              |
|6          |            |           |                     +----------->|5             |
|<----------+------------------------|                     |            |              |
|8          |            |           |                     |            |              |
|<----------+            |           |                     |            |              |
|           |            |           |                     |            |              |
 <----------+------------+-----------+---------------------+------------|              |
|9          |            |           |                     |            |              |
|           |            |           |                     |            |              |
|           |            |           |                     |            |              |
|-----------+------------+-----------+---------------------+------------+------------->|10
|<----------+------------+-----------+---------------------+------------+--------------|11
|           |            |           |                     |            |              |
|           |            |           |                     |            |              |
|           |            |           |                     |            |              |
响应的rStep(程序内的标识响应的身份)与上面时序标号关系:
rStep1 <---> 8
rStep2 <---> 7
rStep3 <---> 6
rStep4 <---> 9
rStep5 <---> 11
传参: 
s1: 服务端1
s2: 服务端2 [可选,没有时会降低准确度]
c:  客户端
运行环境注意:
    此工具服务端和客户端都使本同一程序文件,服务端要求有两个公网IP(相应端口不能被防火墙拦截),客户端要求能连通公网;
使用示例:
1. 获得natchk.py文件之后,修改程序开始处的服务器地址
   s1_ip='第一台具备公网IP的机器'  (本文件85行)
   s2_ip='第二台具备公网IP的机器'  (本文件90行)
2. 运行服务端程序
   第一台上运行 python natchk.py s1
   第二台上运行 python natchk.py s2
3. 在待检测的环境运行客户端程序
   python natchk.py c
   网络正常的话10内打印出检测结果
   结果输出样式Test Summery: NAT3 Port Restricted
'''

import threading
import socket
import json
import sys

s1_ip='cn.cppcloud.cn'
s1_port1=17770 # listen and recv+resp
s1_port2=17771 # listen and recv+resp
s1_port3=18770 # sendto

s2_ip='usa.cppcloud.cn'
s2_port1=27770 # listen for s1 notify; 
s2_port2=27771 # sendto
s2_port3=28770 # listen and recv+resp # check Symmetric (Last)

c_ip='0'
c_port=0
wait_timeout_ms = 5

def createUdpSock(ip, port):
    us = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    if ip > 0 and port > 0:
        us.bind( (ip,port) )
        print("socket Bind to " + str(us.getsockname()) )

    return us

def udpSendTo(us, serv, port, data):
    strdata = json.dumps(data)
    return us.sendto(strdata, (serv, port))

def udpRecvFrom(us, waitsec, count):
    try:
        datalst = []
        us.settimeout(waitsec)
        for i in range(count):
            datalst.append(us.recvfrom(1024))
    except socket.timeout, e:
        pass
    return datalst


def loadJsonStr(datastr):
    try:
        dt = json.loads(datastr)
    except ValueError,e:
        dt = {}
    return dt

# Server-1 Process
def Server1_1():
    serv1 = createUdpSock(s1_ip, s1_port1)
    serv3 = createUdpSock(s1_ip, s1_port3)
    while True:
        datastr,addr = serv1.recvfrom(1024)
        datadict = loadJsonStr(datastr)
        datadict["cli_addr"] = addr[0]
        datadict["cli_port"] = addr[1]

        s1log = []
        # notify Server2 Sendto
        if s2_ip:
            datadict["rStep"] = 4
            nret = udpSendTo(serv1, s2_ip, s2_port1, datadict)
            s1log.append('notify S2(%s) nsend=%d' % (s2_ip, nret))
        datadict["rStep"] = 3
        nret = udpSendTo(serv3, addr[0], addr[1], datadict) # other port response
        s1log.append('s1_port3 resp nsend=%d' % nret)

        datadict["rStep"] = 1
        datadict["msglog"] = s1log
        nret = udpSendTo(serv1, addr[0], addr[1], datadict) # response echo
        s1log.append('s1_port1 resp nsend=%d' % nret)
        print("Serv1-Recv| client="+str(addr)+'| detail=' + ';'.join(s1log))

# Server-1 Process
def Server1_2():
    ServEcho(s1_ip, s1_port2, 2)

def ServEcho(servIP, servPort, rStep):
    serv = createUdpSock(servIP, servPort)
    while True:
        datastr,addr = serv.recvfrom(1024)
        datadict = loadJsonStr(datastr)
        datadict["cli_addr"] = addr[0]
        datadict["cli_port"] = addr[1]
        datadict["rStep"] = rStep
        udpSendTo(serv, addr[0], addr[1], datadict)


# Server-2 Process # listen for Server1's notify
def Server2_1():
    serv1 = createUdpSock(s2_ip, s2_port1)
    serv2 = createUdpSock(s2_ip, s2_port2)
    while True:
        datastr,addr = serv1.recvfrom(1024)
        datadict = loadJsonStr(datastr)
        if not "cli_addr" in datadict:
            print('Invalid NotifyMsg:'+datastr)
            continue
        cliaddr = datadict["cli_addr"]
        cliport = datadict["cli_port"]
        # datadict["rStep"] = 4
        nret = udpSendTo(serv2, cliaddr, cliport, datadict)
        print('Serv1(%s) Notify test Client(%s), nsend=%d' % (addr,cliaddr, nret) )

# Server-2 Process # Check Last
def Server2_2():
    ServEcho(s2_ip, s2_port3, 5)

# check is valid bind IP addr for server
def isValidServAddr(ipv4):
    try:
        addrs = socket.getaddrinfo(socket.gethostname(),None)
    except:
        addrs = []
    addrlist = [item[4][0] for item in addrs if ':' not in item[4][0]]
    bret = ipv4 in addrlist
    if not bret:
        try:
            stmp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            stmp.bind( (ipv4, 60330) )
            bret = True
        except BaseException,e:
            bret = False
            print('IpAddr %s isnot local addr%s'%(ipv4, addrlist))
            print(e)
        finally:
            stmp.close()
    return bret

def runS1():
    if not isValidServAddr(s1_ip):
        return 1

    t1 = threading.Thread(target=Server1_1, name='S1-Listen1')
    t2 = threading.Thread(target=Server1_2, name='S1-Echo')

    t1.start()
    t2.start()
    t1.join()
    t2.join()
    return 0

def runS2():
    if not isValidServAddr(s2_ip):
        return 2

    t1 = threading.Thread(target=Server2_1, name='S2-ListenNotify')
    t2 = threading.Thread(target=Server2_2, name='S2-Echo')

    t1.start()
    t2.start()
    t1.join()
    t2.join()
    return 0

# client process
def runClient():
    clisock = createUdpSock(c_ip, c_port)

    udpSendTo(clisock, s1_ip, s1_port1, {'step': 1})
    udpSendTo(clisock, s1_ip, s1_port2, {'step': 2})
    print('client Sock is %s' % str(clisock.getsockname()))

    # 接收各路响应
    resplst1 = udpRecvFrom(clisock, wait_timeout_ms, 4)

    resplst2 = []
    if s2_ip > '':
        udpSendTo(clisock, s2_ip, s2_port3, {'step': 3}) # server-2 echo
        resplst2 = udpRecvFrom(clisock, wait_timeout_ms, 1)

    clisock.close()
    calcSummery(resplst1, resplst2)

def calcSummery(rsp1, rsp2):
    rspmap = {}
    for item in rsp1+rsp2:
        nitem = json.loads(item[0])
        nitem['serv'] = item[1]
        rspmap[nitem.get('rStep')] = nitem
        print(nitem)
    if not 1 in rspmap:
        print('Server1 not work Or Offline')
        return
    if s2_ip > '' and not 5 in rspmap:
        print('Server2 not work')
        return

    nat_type = 'unknow'
    if s2_ip > '': # 完整的服务
        if 4 in rspmap:
            nat_type = 'NAT1 Cone'
        elif rspmap[1]['cli_port'] != rspmap[5]['cli_port']:
            nat_type = 'NAT4 Symmetric'
        elif 3 in rspmap:
            nat_type = 'NAT2 Address Restricted'
        else:
            nat_type = 'NAT3 Port Restricted'
    else:
        if rspmap[1]['cli_port'] != rspmap[2]['cli_port']:
            nat_type = 'NAT4 Symmetric'
        elif 3 in rspmap:
            nat_type = 'NAT2 Address Restricted(Maybe) or NAT1'
        else:
            nat_type = 'NAT3 Port Restricted(Maybe)'

    print("Test Summery: "+nat_type)


if __name__ == "__main__":
    param = ''
    if len(sys.argv) > 1:
        param = sys.argv[1]
    runfunlst = {'s1': runS1, 's2': runS2, 'c': runClient}
    while not param in runfunlst:
        param = raw_input('''Please Select Run Mode:
        s1: Server-1 Process
        s2: Server-2 Process
        c:  Client  
        >>''')
        param = param.strip()

    run = runfunlst[param]
    run()

 

 

 

/latefirstcmt/3