Shadowsocks (一) 主循环

Reading Time: 5 minutes

这里简单分析一下 Shadowsocks 这个软件的源码.

因为网上一直有声音认为其源码质量不错. 刚好最近在学一些服务器编程相关的知识, 所以就来简单分析一下它.

同时由于希望能获得一些阅读源码的经验. 这里会简单记录我读源码的心路历程.

目录结构

首先我们来 tree 一下, 惯例, 测试相关的文件就省略了

❯ tree
.
├── CHANGES
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── MANIFEST.in
├── README.md
├── README.rst
├── config.json.example
├── debian
│   ├── changelog
│   ├── compat
│   ├── config.json
│   ├── control
│   ├── copyright
│   ├── docs
│   ├── init.d
│   ├── install
│   ├── rules
│   ├── shadowsocks.default
│   ├── shadowsocks.manpages
│   ├── source
│   │   └── format
│   ├── sslocal.1
│   └── ssserver.1
├── setup.py
├── shadowsocks
│   ├── __init__.py
│   ├── asyncdns.py
│   ├── common.py
│   ├── crypto
│   │   ├── __init__.py
│   │   ├── aead.py
│   │   ├── hkdf.py
│   │   ├── mbedtls.py
│   │   ├── openssl.py
│   │   ├── rc4_md5.py
│   │   ├── sodium.py
│   │   ├── table.py
│   │   └── util.py
│   ├── cryptor.py
│   ├── daemon.py
│   ├── eventloop.py
│   ├── local.py
│   ├── lru_cache.py
│   ├── manager.py
│   ├── server.py
│   ├── shell.py
│   ├── tcprelay.py
│   ├── tunnel.py
│   └── udprelay.py
├── snapcraft.yaml
├── tests
│   └── ...
└── utils
    ├── README.md
    ├── autoban.py
    └── fail2ban
        └── shadowsocks.conf

11 directories, 106 files

可以看到, 主要的源码应该都在 shadowsocks 目录下, 我们可以看到用于加 / 解密的程序, 可以看到处理 TCP, UDP 连接的程序.

也可以看到根目录下有 DockerFile, 以及一个不知道用来干啥的 debain 文件夹, 我猜测是用来生成软件包的.

setup.py 看起来像是用于安装成 Python 包的.

现在对于目录结构有一定认识和猜测了之后, 我们可以开始下一步行动了.

入口

阅读源码肯定要先找入口, 至少要知道 main 在哪里,

由于实际上使用过这个软件, 我知道我们在本地启动的时候输入的命令是 sslocal, 而在服务器运行的时候输入的命令是 ssserver.

这意味着我们首先得知道这两个命令是如何启动代码的, 输入这两个命令之后实际上运行了什么函数.

setup.py 中我们可以看到:

entry_points="""
[console_scripts]
sslocal = shadowsocks.local:main
ssserver = shadowsocks.server:main
""",

不难看出, sslocal 对应的是 shadowsocks 包下的 local.py 里面的 main(),
ssserver 对应的是 shadowsocks 包下的 server.py 里面的 main().

那么我们先看客户端, 也就是 sslocal.

客户端 sslocal

main

local.py::main 这个函数不长, 我就直接复制过来了.

@shell.exception_handle(self_=False, exit_code=1)
def main():
    shell.check_python()

    # fix py2exe
    if hasattr(sys, "frozen") and sys.frozen in \
            ("windows_exe", "console_exe"):
        p = os.path.dirname(os.path.abspath(sys.executable))
        os.chdir(p)

    config = shell.get_config(True) # shell 大约是一个 "外壳", 用来交互. 所以我们使用 shell 来获得配置文件.
    daemon.daemon_exec(config) # 启动 daemon 进程

    logging.info("starting local at %s:%d" %
                 (config['local_address'], config['local_port'])) # 可以看出来监听的地址和端口是写在配置文件里的.

    dns_resolver = asyncdns.DNSResolver() # 启动 dns, 这个 dns 貌似是不可配置的, 因为 config 没有被传入.
    tcp_server = tcprelay.TCPRelay(config, dns_resolver, True) # 创建新的 tcp_server 和 udp_server
    udp_server = udprelay.UDPRelay(config, dns_resolver, True)
    loop = eventloop.EventLoop()
    dns_resolver.add_to_loop(loop)
    tcp_server.add_to_loop(loop)
    udp_server.add_to_loop(loop) # 建立一个 loop 将 dns / udp / tcp server 都添加到这个 loop 中.

    def handler(signum, _):
        logging.warn('received SIGQUIT, doing graceful shutting down..')
        tcp_server.close(next_tick=True)
        udp_server.close(next_tick=True)
    signal.signal(getattr(signal, 'SIGQUIT', signal.SIGTERM), handler) # 注册信号处理函数.

    def int_handler(signum, _):
        sys.exit(1)
    signal.signal(signal.SIGINT, int_handler) # 注册信号处理函数.

    daemon.set_user(config.get('user', None))
    loop.run() # 启动

这里 有关于两个信号 SIGQUITSIGINT 的区别.

这里可以记下这么几个问题:

  1. daemon 进程是怎么启动的呢? 为什么要等最后 loop.run() 之前再 set_uesr 呢?
  2. 为什么 dns 服务器创建时不读取配置文件?
  3. loop 是怎么工作的?
  4. 三个 ..._server 分别是怎么工作的呢?

可以很明显地看出, 我们的主要目标应该是 3 和 4.

那么接下来的目标就是 loop.run() 了.

loop.run

看一个类的时候我们可以先看它是如何工作的. 有什么看不懂的地方再回头看它的构造函数之类的东西.

对于一个过程其实也是一样的. 我们可以先看它是如何工作的, 然后再看它是怎么初始化的.

落实到这里, 就是说我们可以先看 run() 看不懂了再去看 add_to_loop()

看看eventloop.py:run()

def run(self):
    events = []
    while not self._stopping:
        asap = False
        try:
            events = self.poll(TIMEOUT_PRECISION) # poll 出一个 events, 从下面看 events 中有 sock, fd 和 event 这三个东西.
        except (OSError, IOError) as e:
            if errno_from_exception(e) in (errno.EPIPE, errno.EINTR):
                # EPIPE: Happens when the client closes the connection
                # EINTR: Happens when received a signal
                # handles them as soon as possible
                asap = True
                logging.debug('poll:%s', e)
            else:
                logging.error('poll:%s', e)
                traceback.print_exc()
                continue

        for sock, fd, event in events:
            handler = self._fdmap.get(fd, None) # 获得和 fd 对应的 handler
            if handler is not None:
                handler = handler[1]
                try:
                    handler.handle_event(sock, fd, event) # 调用 handler 处理 IO 事件.
                except (OSError, IOError) as e:
                    shell.print_exception(e)
        now = time.time()
        if asap or now - self._last_time >= TIMEOUT_PRECISION:
            for callback in self._periodic_callbacks:
                callback()
            self._last_time = now

大致上这个 loop 的工作流程就是:

poll 出来一个 IO 事件, 看下这个 socket 有没有对应的 handler, 有就去调这个 handler.

这里扩充一下问题列表:

  1. daemon 进程是怎么启动的呢? 为什么要等最后 loop.run() 之前再 set_uesr 呢?
  2. 为什么 dns 服务器创建时不读取配置文件?
  3. loop 是怎么工作的?
    1. handler 这个对象是如何注册到 loop 中的? [是在什么地方被放进 _fdmap 的呢?]
    2. handler[1]?
    3. 为什么还需要判断 handler 不为空, 什么情况下它会是空呢?
    4. 为什么 events 遍历的时候会出来 sock fdevent 三个变量呢? epoll 不是只会有 fdevent 两个东西告诉程序是哪个文件发生了什么样的事件吗?
    5. 最后看起来是在判断超时的几行代码的意义是什么呢?
  4. 三个 server 分别是怎么工作的呢?

那么重点应该是 3.1, 我们可以猜测这个注册发生在 add_to_loop 里面.
但是我们首先可以发现 loop 有个方法叫 add, 这里写了 _fdmap.

eventloop.py:add()

def add(self, f, mode, handler):
    fd = f.fileno()
    self._fdmap[fd] = (f, handler)
    self._impl.register(fd, mode)

这里有一个注册函数.

eventloop.py::__init__() 中可以看到, 我们的 _impl 就是 select.epoll.

那么我们现在可以看看是谁在调用这个 add, 以 TCPRelay 的 add_to_loop 为例

TCPRelay

这里 可以看到

def add_to_loop(self, loop):
    if self._eventloop:
        raise Exception('already add to loop')
    if self._closed:
        raise Exception('already closed')
    self._eventloop = loop
    self._eventloop.add(self._server_socket,
                        eventloop.POLL_IN | eventloop.POLL_ERR, self)
    self._eventloop.add_periodic(self.handle_periodic)

这里将自己的 _server_socket 注册给了 loop, 这个 socket 肯定是创建 TCPRelay 的时候建立的.

我们来看看这个创建过程:

这里 我们可以看到:

def __init__(self, config, dns_resolver, is_local, stat_callback=None):
    self._config = config
    self._is_local = is_local
    self._dns_resolver = dns_resolver
    self._closed = False
    self._eventloop = None
    self._fd_to_handlers = {}
    self._is_tunnel = False

    self._timeout = config['timeout']
    self._timeouts = []  # a list for all the handlers
    # we trim the timeouts once a while
    self._timeout_offset = 0   # last checked position for timeout
    self._handler_to_timeouts = {}  # key: handler value: index in timeouts

    if is_local:
        listen_addr = config['local_address']
        listen_port = config['local_port']
    else:
        listen_addr = config['server']
        listen_port = config['server_port']
    self._listen_port = listen_port

    addrs = socket.getaddrinfo(listen_addr, listen_port, 0,
                               socket.SOCK_STREAM, socket.SOL_TCP)
    if len(addrs) == 0:
        raise Exception("can't get addrinfo for %s:%d" %
                        (listen_addr, listen_port))
    af, socktype, proto, canonname, sa = addrs[0]
    server_socket = socket.socket(af, socktype, proto)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind(sa)
    server_socket.setblocking(False)
    if config['fast_open']:
        try:
            server_socket.setsockopt(socket.SOL_TCP, 23, 5)
        except socket.error:
            logging.error('warning: fast open is not available')
            self._config['fast_open'] = False
    server_socket.listen(1024)
    self._server_socket = server_socket
    self._stat_callback = stat_callback

这就是创建 socket, bind, listen 的三部曲了. 这里还能看到一个 Fast Open 的设置.

使用了非阻塞的 socket.

接下来的问题就是:

self._eventloop.add(self._server_socket,
                        eventloop.POLL_IN | eventloop.POLL_ERR, self)

这个地方, 向 _eventloop 注册事件的时候, 实际上添加进去的 handle 是 self.

在上面这里的 eventloop.py:run() 中:

...
        for sock, fd, event in events:
            handler = self._fdmap.get(fd, None) # 获得和 fd 对应的 handler
            if handler is not None:
                handler = handler[1]
                try:
                    handler.handle_event(sock, fd, event) # 调用 handler 处理 IO 事件.
                except (OSError, IOError) as e:
                    shell.print_exception(e)
...

调用了 handle_event() 方法, 那我们来看一下

这里

    def handle_event(self, sock, fd, event):
        # handle events and dispatch to handlers
        if sock:
            logging.log(shell.VERBOSE_LEVEL, 'fd %d %s', fd,
                        eventloop.EVENT_NAMES.get(event, event))
        if sock == self._server_socket: # 如果是监听新连接的 socket
            if event & eventloop.POLL_ERR:
                # TODO
                raise Exception('server_socket error')
            try:
                logging.debug('accept')
                conn = self._server_socket.accept()
                TCPRelayHandler(self, self._fd_to_handlers, # 创建处理单个连接的 handler, 并将其注册到 eventloop 中.
                                self._eventloop, conn[0], self._config,
                                self._dns_resolver, self._is_local)
            except (OSError, IOError) as e:
                error_no = eventloop.errno_from_exception(e)
                if error_no in (errno.EAGAIN, errno.EINPROGRESS,
                                errno.EWOULDBLOCK):
                    return
                else:
                    shell.print_exception(e)
                    if self._config['verbose']:
                        traceback.print_exc()
        else:
            if sock: # 如果这个 socket 不是监听新连接的. 那么这意味着我们可以在一个 map 里面找到它的 handler. 调用它.
                handler = self._fd_to_handlers.get(fd, None)
                if handler:
                    handler.handle_event(sock, event)
            else:
                logging.warn('poll removed fd')

可以在 TCPRelayHandler构造函数 中看到:

 def __init__(self, server, fd_to_handlers, loop, local_sock, config,
              dns_resolver, is_local):
        self._server = server
        ...
        loop.add(local_sock, eventloop.POLL_IN | eventloop.POLL_ERR,
                 self._server)
        ...

这里产生了一个新的疑问, 为什么创建好新连接进来的 socket 对应的 handler 之后向 eventloop 中注册处理函数的时候不干脆直接注册成新生成的 handler 呢? 还将它注册成TCPRelay 呢?

更新一下疑问列表:

  1. daemon 进程是怎么启动的呢? 为什么要等最后 loop.run() 之前再 set_uesr 呢?
  2. 为什么 dns 服务器创建时不读取配置文件?
  3. loop 是怎么工作的?
    1. handler 这个对象是如何注册到 loop 中的? [是在什么地方被放进 _fdmap 的呢?]
    2. handler[1]?
    3. 为什么还需要判断 handler 不为空, 什么情况下它会是空呢?
    4. 为什么 events 遍历的时候会出来 sock fdevent 三个变量呢? epoll 不是只会有 fdevent 两个东西告诉程序是哪个文件发生了什么样的事件吗?
    5. 最后看起来是在判断超时的几行代码的意义是什么呢?
  4. 三个 server 分别是怎么工作的呢?
    1. 为什么 TCPRelay 生成完新 handler 不直接将对应的 socket 处理函数注册成新生成的 handler, 反而要把它注册成生成了这个 handler 的TCPRelay 呢?

不过总体来说整个流程就十分清楚了.

主要流程

我们有一个主循环, 这个主事件循环从 epoll 中拉出发生了 IO 事件的 fd, 然后从一个 map 中找到这个 fd 对应的 handler, 调用这个 handler 的 handle_event 函数来处理这个 fd 发生的 IO 事件.

对于 TCP 连接的处理而言, 我们把所有 TCP 连接的 handle 注册成了 TCPRelay. 也就是说所有 TCP 连接对应的 socket 发生 IO 事件之后, 都会调用 TCPRelay 下面的 handle_event. 如果是老连接, 我们就从 TCPRelay 下的另一个 fd 到 handle 的 map 里面找到这个 socket 对应的 handle, 调用 handle 来处理 IO 事件; 如果是新连接, 就创建一个新的 TCPRelayHandler, 将它写进 TCPRelay 下的 map.

大致就是这样了.

至于 TCPRelaHandler 是怎么处理 IO 事件的. 等我想看了, 再写一篇文章就是了.

问题

那么我们现在把之前遗留的几个问题一一处理一下:

  1. daemon 进程是怎么启动的呢? 为什么要等最后 loop.run() 之前再 set_uesr 呢?

  2. 为什么 dns 服务器创建时不读取配置文件?

    这里只是在本地端创建 dns 不需要读取配置文件. sever 端的代码中可以看到:

   dns_resolver = asyncdns.DNSResolver(config['dns_server'],
                                       config['prefer_ipv6'])
  1. loop 是怎么工作的?

    1. handler 这个对象是如何注册到 loop 中的? [是在什么地方被放进 _fdmap 的呢?]

      回答过了
      
    2. handler[1]?

      在[这里](https://github.com/shadowsocks/shadowsocks/blob/master/shadowsocks/eventloop.py#L160)可以看到
      
      ```python
      self._fdmap = {}  # (f, handler)
      ```
      
      这里的 `f` 是那个 file 对象. 所以这里需要去第二个, 第二个才是真正的 handler.
      
    3. 为什么还需要判断 handler 不为空, 什么情况下它会是空呢?

    4. 为什么 events 遍历的时候会出来 sock fdevent 三个变量呢? epoll 不是只会有 fdevent 两个东西告诉程序是哪个文件发生了什么样的事件吗?

      [这里](https://github.com/shadowsocks/shadowsocks/blob/master/shadowsocks/eventloop.py#L168)可以看到, 我们通过这个 `_fdmap` 取到了 `fd` 对应的 file 对象.
      
      ```python
      return [(self._fdmap[fd][0], fd, event) for fd, event in events]
      ```
      
    5. 最后看起来是在判断超时的几行代码的意义是什么呢?

  2. 三个 server 分别是怎么工作的呢?

    1. 为什么 TCPRelay 生成完新 handler 不直接将对应的 socket 处理函数注册成新生成的 handler, 反而要把它注册成生成了这个 handler 的TCPRelay 呢?

遗留问题

那么现在留下的问题是这几个:

  1. daemon 进程是怎么启动的呢? 为什么要等最后 loop.run() 之前再 set_uesr 呢?
  2. loop 是怎么工作的?
    1. 为什么还需要判断 handler 不为空, 什么情况下它会是空呢?
    2. 最后看起来是在判断超时的几行代码的意义是什么呢?
  3. 三个 server 分别是怎么工作的呢?
    1. 为什么 TCPRelay 生成完新 handler 不直接将对应的 socket 处理函数注册成新生成的 handler, 反而要把它注册成生成了这个 handler 的TCPRelay 呢?

以及我们接下来还有一些希望了解到的内容, 比如 TCPRelayHandler 到底需要处理哪些 IO 事件, 分别又都是怎么处理的呢?

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注