forked from brandon-rhodes/fopnp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
srv_async.py
77 lines (63 loc) · 2.72 KB
/
srv_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python3
# Foundations of Python Network Programming, Third Edition
# https://github.com/brandon-rhodes/fopnp/blob/m/py3/chapter07/srv_async.py
# Asynchronous I/O driven directly by the poll() system call.
import select, zen_utils
def all_events_forever(poll_object):
while True:
for fd, event in poll_object.poll():
yield fd, event
def serve(listener):
sockets = {listener.fileno(): listener}
addresses = {}
bytes_received = {}
bytes_to_send = {}
poll_object = select.poll()
poll_object.register(listener, select.POLLIN)
for fd, event in all_events_forever(poll_object):
sock = sockets[fd]
# Socket closed: remove it from our data structures.
if event & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
address = addresses.pop(sock)
rb = bytes_received.pop(sock, b'')
sb = bytes_to_send.pop(sock, b'')
if rb:
print('Client {} sent {} but then closed'.format(address, rb))
elif sb:
print('Client {} closed before we sent {}'.format(address, sb))
else:
print('Client {} closed socket normally'.format(address))
poll_object.unregister(fd)
del sockets[fd]
# New socket: add it to our data structures.
elif sock is listener:
sock, address = sock.accept()
print('Accepted connection from {}'.format(address))
sock.setblocking(False) # force socket.timeout if we blunder
sockets[sock.fileno()] = sock
addresses[sock] = address
poll_object.register(sock, select.POLLIN)
# Incoming data: keep receiving until we see the suffix.
elif event & select.POLLIN:
more_data = sock.recv(4096)
if not more_data: # end-of-file
sock.close() # next poll() will POLLNVAL, and thus clean up
continue
data = bytes_received.pop(sock, b'') + more_data
if data.endswith(b'?'):
bytes_to_send[sock] = zen_utils.get_answer(data)
poll_object.modify(sock, select.POLLOUT)
else:
bytes_received[sock] = data
# Socket ready to send: keep sending until all bytes are delivered.
elif event & select.POLLOUT:
data = bytes_to_send.pop(sock)
n = sock.send(data)
if n < len(data):
bytes_to_send[sock] = data[n:]
else:
poll_object.modify(sock, select.POLLIN)
if __name__ == '__main__':
address = zen_utils.parse_command_line('low-level async server')
listener = zen_utils.create_srv_socket(address)
serve(listener)