Skip to content

asynio for multiple users #1

@VanishInAir

Description

@VanishInAir

朋友,我看了你的代码,写的有点复杂。你可以添加异步IO的小模块,使得你的FTP server可以支持多人同时使用。思路是这样,
server端:
1.创建一个无柱塞式的socket监听21号端口
2.写一个异步IO的处理循环eventloop

TIMEOUT_PRECISION = 10
POLL_NULL = 0x00
POLL_IN = 0x01
POLL_OUT = 0x04
POLL_ERR = 0x08
class SelectLoop(object):
    def __init__(self):
        self._r_list = set()
        self._w_list = set()
        self._x_list = set()

    def poll(self, timeout):
        r, w, x = select.select(self._r_list, self._w_list, self._x_list, timeout)
        results = defaultdict(lambda: POLL_NULL)
        for p in [(r, POLL_IN), (w, POLL_OUT), (x, POLL_ERR)]:
            for fd in p[0]:
                results[fd] |= p[1]
        return results.items()

    def register(self, fd, mode):
        if mode & POLL_IN:
            self._r_list.add(fd)
        if mode & POLL_OUT:
            self._w_list.add(fd)
        if mode & POLL_ERR:
            self._x_list.add(fd)

    def unregister(self, fd):
        if fd in self._r_list:
            self._r_list.remove(fd)
        if fd in self._w_list:
            self._w_list.remove(fd)
        if fd in self._x_list:
            self._x_list.remove(fd)

    def modify(self, fd, mode):
        self.unregister(fd)
        self.register(fd, mode)

class EventLoop(object):
    def __init__(self):
        self._impl = SelectLoop()
        self._fdmap = {} # (f, handler)

    def add(self, f, mode, handler):
        fd = f.fileno()
        self._fdmap[fd] = (f, handler)
        self._impl.register(fd, mode)

    def remove(self, f):
        fd = f.fileno()
        del self._fdmap[fd]
        self._impl.unregister(fd)

    def poll(self, timeout=None):
        events = self._impl.poll(timeout)
        return [(self._fdmap[fd][0], fd, event) for fd, event in events

    def modify(self, f, mode):
        fd = f.fileno()
        self._impl.modify(fd, mode)

    def run(self):
        events = []
        while True:
            events = self.poll(TIMEOUT_PRECISION)
            for sock, fd, event in events:
                handler = self._fdmap.get(fd, None)
                if handler is not None:
                    handler = handler[1]
                    handler.handle_event(sock, fd, event)

3.为每一个操作写一个对应的处理函数,比如说"lsdir"操作可以写按照以下方式来写:

BUF_SIZE = 32 * 1024
class Action(object):
    def __init__(self, loop, local_sock, parent):
        self._local_sock = local_sock
        self._parent = parent
        self._loop = loop
        self._loop.add(local_sock, eventloop.POLL_IN, parent)

    def handle_event(self, sock, event):
        if event & eventloop.POLL_IN:
            data = self._local_sock.recv(BUF_SIZE)
            #parse data
            if cmd == 'ls':
                data = self._lsdir(self.workdir, self.tmpdir)
                self._loop.modify(self._local_sock, eventloop.POLL_IN | eventloop.POLL_OUT)
        if event & eventloop.POLL_OUT:
            #query data from queue
            self._local_sock.send(data)
            #if queue empty:
            #     self._loop.modify(self._local_sock, eventloop.POLL_IN)

    def _lsdir(self, workdir, tempdir):
        dir_list = os.listdir(workdir)
        dirlist = ""
        for i in dir_list:
            print(i)
            dirlist += i+"\n"
        con_len = sys.getsizeof(dirlist)
        print("dirlist = {}".format(dirlist))
        #communicate_socket.send(b"400")    #ls成功状态码400
        status_code = "400||"
        #communicate_socket.send(bytes(str(con_len), encoding="utf-8"))   #发送传输内容大小
        data = status_code+str(con_len)+"||"+dirlist
        return data

4.调用过程如下:

class Control(object):
    def __init__(self, loop):
        #create server socket binding on port 21
        listen_addr = ‘0.0.0.0’
        listen_port = 21
        addrs = socket.getaddrinfo(listen_addr, listen_port, 0, socket.SOCK_STREAM, socket.SOL_TCP)
        af, socktype, proto, canonname, sa = addrs[0]
        server_socket = socket.socket(af, socktype, proto)
        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server_socket.bind(sa)
        server_socket.setblocking(False)
        self._server_socket = server_socket
        self._fd_to_handlers = {}
        self._loop = loop
        self._loop.add(self._server_socket, eventloop.POLL_IN, self)

    def handle_event(self, sock, fd, event):
        if sock == self._server_socket:
            conn = self._server_socket.accept()
            new_action = Action(self._loop, conn[0], self)
            self._fd_to_handlers[conn[0].fileno()] = new_action
        else:
            if sock:
                handler = self._fd_to_handlers.get(fd, None)
                if handler:
                    handler.handle_event(sock, event)

    def run(self):
        self._loop.run()

#server main entry
loop = eventloop.EventLoop()
my_control = Control(loop)
my_control.run()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions