python socket 编程 - pystun
基础
The
AF_*
andSOCK_*
constants are now AddressFamily and SocketKind IntEnum collections.https://docs.python.org/3/library/socket.html
What is Address Family?
Name Purpose
AF_UNIX, AF_LOCAL Local communication
AF_INET IPv4 Internet protocols
AF_INET6 IPv6 Internet protocols
AF_IPX IPX - Novell protocols
AF_NETLINK Kernel user interface device
AF_X25 ITU-T X.25 / ISO-8208 protocol
AF_AX25 Amateur radio AX.25 protocol
AF_ATMPVC Access to raw ATM PVCs
AF_APPLETALK Appletalk
AF_PACKET Low level packet interface
What is SocketKind?
Name Purpose
socket.SOCK_STREAM 流式 socket , for TCP
socket.SOCK_DGRAM 数据报式 socket , for UDP
socket.SOCK_RAW 原始套接字,普通的套接字无法处理 ICMP、IGMP 等网络报文,而 SOCK_RAW 可以;其次,SOCK_RAW 也可以处理特殊的 IPv4 报文;此外,利用原始套接字,可以通过 IP_HDRINCL 套接字选项由用户构造 IP 头。
socket.SOCK_SEQPACKET 可靠的连续数据包服务
简介
STUN,首先在 RFC3489 中定义,作为一个完整的 NAT 穿透解决方案,英文全称是 Simple Traversal of UDP Through NATs,即简单的用 UDP 穿透 NAT。 在新的 RFC5389 修订中把 STUN 协议定位于为穿透 NAT 提供工具,而不是一个完整的解决方案,英文全称是 Session Traversal Utilities for NAT,即 NAT 会话穿透效用。RFC5389 与 RFC3489 除了名称变化外,最大的区别是支持 TCP 穿透。
PyStun follows RFC3489: http://www.ietf.org/rfc/rfc3489.txt
PyStun stun/init.py 代码注释
def get_ip_info(source_ip="0.0.0.0", source_port=54320, stun_host=None,
stun_port=3478):
# 创建一个 IPv4 协议的 UDP socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Set a timeout on blocking socket operations.
s.settimeout(2)
# 设置额外的 socket 行为:关闭 socket 之后想继续重用该 socket
# setsockopt 设置 socket 详细用法(http://blog.chinaunix.net/uid-25324849-id-207869.html)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Bind the socket to address.
s.bind((source_ip, source_port))
# 通过 stun 服务器获取 nat_type 和 nat 设备信息
nat_type, nat = get_nat_type(s, source_ip, source_port,
stun_host=stun_host, stun_port=stun_port)
external_ip = nat['ExternalIP']
external_port = nat['ExternalPort']
s.close()
return (nat_type, external_ip, external_port)
def stun_test(sock, host, port, source_ip, source_port, send_data=""):
# 设置期望的返回数据结构
retVal = {'Resp': False, 'ExternalIP': None, 'ExternalPort': None,
'SourceIP': None, 'SourcePort': None, 'ChangedIP': None,
'ChangedPort': None}
# 构造要发送的数据
# "%#04x" % 17 = 0x11 && "%04x" % 17 = 0011
# "%#04d" 看上去和 "%04d" 没什么不同
# 消息长度
str_len = "%#04d" % (len(send_data) / 2)
# 生成事务 ID
tranid = gen_tran_id()
# 生成要发出的数据 str_data
# BindRequestMsg 0001 => 绑定消息类型
# str_len 0000 => 消息长度
# tranid 32位 => 事务 ID
# send_data '' => 要发出的数据(空字符串)
str_data = ''.join([BindRequestMsg, str_len, tranid, send_data])
# binascii => Conversion between binary data and ASCII
data = binascii.a2b_hex(str_data)
# 标志返回消息是否正确
recvCorr = False
while not recvCorr:
recieved = False
# UDP 数据包重发 3 次
count = 3
while not recieved:
log.debug("sendto: %s", (host, port))
try:
sock.sendto(data, (host, port))
except socket.gaierror:
retVal['Resp'] = False
return retVal
try:
# 返回数据读取至 buf
buf, addr = sock.recvfrom(2048)
log.debug("recvfrom: %s", addr)
recieved = True
except Exception:
recieved = False
if count > 0:
count -= 1
else:
retVal['Resp'] = False
return retVal
# 解析返回消息的类型原始数据
msgtype = binascii.b2a_hex(buf[0:2])
# 判断消息类型等于 BindResponseMsg
bind_resp_msg = dictValToMsgType[msgtype] == "BindResponseMsg"
# 判断事务 ID 匹配
tranid_match = tranid.upper() == binascii.b2a_hex(buf[4:20]).upper()
if bind_resp_msg and tranid_match:
recvCorr = True
retVal['Resp'] = True
# 解析消息长度
len_message = int(binascii.b2a_hex(buf[2:4]), 16)
len_remain = len_message
base = 20
# 解析所有返回值到 retVal
..............
def get_nat_type(s, source_ip, source_port, stun_host=None, stun_port=3478):
"""
RFC3489 文档:
客户首先从测试 I 开始。如果该测试没有响应,客户就知道他不能使用UDP连接。如果测试产生响应,则客户检查MAPPED-ADDRESS属性。
如果其中的地址和端口与用来发送请求的套接口的本地IP地址和端口相同,则客户知道他没有通过NAT。然后执行测试II。
如果收到响应,则客户知道他与互联网之间可互访(或者,至少,他在表现如同完全锥型NAT且没有进行转换的防火墙后)。
如果没有响应,则客户知道他在对称型UDP防火墙之后。
在套接口的 IP 地址和端口与测试 I 响应的 MAPPED-ADDRESS 属性中的不同的情况下,客户就知道他在 NAT 后。他执行测试 II。
如果收到响应,则客户知道他在完全锥型NAT后。如果没有响应,他再次执行测试I,但这次,使测试I响应的 CHANGED-ADDRESS 属性中的地址和端口。
如果MAPPED-ADDRESS中返回的IP地址和端口与第一次测试I中的不同,客户就知道他在对称型NAT后。如果地址和端口相同,则客户要么在限制 NAT 后,要么在端口限制NAT后。
要判断出到底是哪种,客户进行测试 III。如果收到响应,则他在限制NAT后,如果没有响应,则他在端口限制 NAT 后。
该过程产生关于客户应用程序的操作条件的实际信息。在客户与互联网间存在多级NAT的情况下,发现的类型将是客户与互联网间限制最多的NAT的类型。
NAT的类型接照限制排序从高到低是对称型,端口限制锥型,限制锥型和完全锥型。
"""
# 初始化 RFC3489/STUN 定义的数据结构,详情 https://tools.ietf.org/html/rfc3489
_initialize()
port = stun_port
# 开始 测试-1
log.debug("Do Test1")
# 标志 stun_test() 返回是否成功
resp = False
if stun_host:
ret = stun_test(s, stun_host, port, source_ip, source_port)
resp = ret['Resp']
else:
for stun_host in stun_servers_list:
log.debug('Trying STUN host: %s', stun_host)
ret = stun_test(s, stun_host, port, source_ip, source_port)
resp = ret['Resp']
if resp:
break
if not resp:
return Blocked, ret
log.debug("Result: %s", ret)
# 获取第一次测试的
# ExternalIP
# ExternalPort
# ChangedIP
# ChangedPort
exIP = ret['ExternalIP']
exPort = ret['ExternalPort']
changedIP = ret['ChangedIP']
changedPort = ret['ChangedPort']
if ret['ExternalIP'] == source_ip:
# 构建第二次测试的消息体
.......