Skip to content

Commit

Permalink
fix: use separate transaction storage for each DictProxy handler
Browse files Browse the repository at this point in the history
DictProxy can have transactions with the same name
(most frequently `1`) processed in parallel.
Dovecot expects that transaction names on each connection
are independent.
  • Loading branch information
link2xt committed Jul 31, 2024
1 parent a9bdc3d commit d3f3e02
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 39 deletions.
31 changes: 17 additions & 14 deletions chatmaild/src/chatmaild/dictproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@


class DictProxy:
def __init__(self):
self.transactions = {}

def loop_forever(self, rfile, wfile):
# Transaction storage is local to each handler loop.
# Dovecot reuses transaction IDs across connections,
# starting transaction with the name `1`
# on two different connections to the same proxy sometimes.
transactions = {}

while True:
msg = rfile.readline().strip().decode()
if not msg:
break

res = self.handle_dovecot_request(msg)
res = self.handle_dovecot_request(msg, transactions)
if res:
wfile.write(res.encode("ascii"))
wfile.flush()

def handle_dovecot_request(self, msg):
def handle_dovecot_request(self, msg, transactions):
# see https://doc.dovecot.org/developer_manual/design/dict_protocol/#dovecot-dict-protocol
short_command = msg[0]
parts = msg[1:].split("\t")
Expand All @@ -37,11 +40,11 @@ def handle_dovecot_request(self, msg):
transaction_id = parts[0]

if short_command == "B":
return self.handle_begin_transaction(transaction_id, parts)
return self.handle_begin_transaction(transaction_id, parts, transactions)
elif short_command == "C":
return self.handle_commit_transaction(transaction_id, parts)
return self.handle_commit_transaction(transaction_id, parts, transactions)
elif short_command == "S":
return self.handle_set(transaction_id, parts)
return self.handle_set(transaction_id, parts, transactions)

def handle_lookup(self, parts):
logging.warning(f"lookup ignored: {parts!r}")
Expand All @@ -52,19 +55,19 @@ def handle_iterate(self, parts):
# If we don't return empty line Dovecot will timeout.
return "\n"

def handle_begin_transaction(self, transaction_id, parts):
def handle_begin_transaction(self, transaction_id, parts, transactions):
addr = parts[1]
self.transactions[transaction_id] = dict(addr=addr, res="O\n")
transactions[transaction_id] = dict(addr=addr, res="O\n")

def handle_set(self, transaction_id, parts):
def handle_set(self, transaction_id, parts, transactions):
# For documentation on key structure see
# https://github.com/dovecot/core/blob/main/src/lib-storage/mailbox-attribute.h

self.transactions[transaction_id]["res"] = "F\n"
transactions[transaction_id]["res"] = "F\n"

def handle_commit_transaction(self, transaction_id, parts):
def handle_commit_transaction(self, transaction_id, parts, transactions):
# return whatever "set" command(s) set as result.
return self.transactions.pop(transaction_id)["res"]
return transactions.pop(transaction_id)["res"]

def serve_forever_from_socket(self, socket):
dictproxy = self
Expand Down
6 changes: 3 additions & 3 deletions chatmaild/src/chatmaild/lastlogin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ def __init__(self, config):
super().__init__()
self.config = config

def handle_set(self, transaction_id, parts):
def handle_set(self, transaction_id, parts, transactions):
keyname = parts[1].split("/")
value = parts[2] if len(parts) > 2 else ""
addr = self.transactions[transaction_id]["addr"]
addr = transactions[transaction_id]["addr"]
if keyname[0] == "shared" and keyname[1] == "last-login":
if addr.startswith("echo@"):
return
Expand All @@ -22,7 +22,7 @@ def handle_set(self, transaction_id, parts):
user.set_last_login_timestamp(timestamp)
else:
# Transaction failed.
self.transactions[transaction_id]["res"] = "F\n"
transactions[transaction_id]["res"] = "F\n"


def main():
Expand Down
8 changes: 4 additions & 4 deletions chatmaild/src/chatmaild/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ def handle_lookup(self, parts):
logging.warning(f"lookup ignored: {parts!r}")
return "N\n"

def handle_set(self, transaction_id, parts):
def handle_set(self, transaction_id, parts, transactions):
# For documentation on key structure see
# https://github.com/dovecot/core/blob/main/src/lib-storage/mailbox-attribute.h
keyname = parts[1].split("/")
value = parts[2] if len(parts) > 2 else ""
addr = self.transactions[transaction_id]["addr"]
addr = transactions[transaction_id]["addr"]
if keyname[0] == "priv" and keyname[2] == self.metadata.DEVICETOKEN_KEY:
self.metadata.add_token_to_addr(addr, value)
elif keyname[0] == "priv" and keyname[2] == "messagenew":
self.notifier.new_message_for_addr(addr, self.metadata)
else:
# Transaction failed.
try:
self.transactions[transaction_id]["res"] = "F\n"
transactions[transaction_id]["res"] = "F\n"
except KeyError:
logging.error(
f"could not mark tx as failed: {transaction_id} {self.transactions}"
f"could not mark tx as failed: {transaction_id} {transactions}"
)


Expand Down
24 changes: 14 additions & 10 deletions chatmaild/src/chatmaild/tests/test_lastlogin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,30 @@ def test_handle_dovecot_request_last_login(testaddr, example_config):
authproxy = AuthDictProxy(config=example_config)
authproxy.lookup_passdb(testaddr, "1l2k3j1l2k3jl123")

dictproxy_transactions = {}

# Begin transaction
tx = "1111"
msg = f"B{tx}\t{testaddr}"
res = dictproxy.handle_dovecot_request(msg)
res = dictproxy.handle_dovecot_request(msg, dictproxy_transactions)
assert not res
assert dictproxy.transactions == {tx: dict(addr=testaddr, res="O\n")}
assert dictproxy_transactions == {tx: dict(addr=testaddr, res="O\n")}

# set last-login info for user
user = dictproxy.config.get_user(testaddr)
timestamp = int(time.time())
msg = f"S{tx}\tshared/last-login/{testaddr}\t{timestamp}"
res = dictproxy.handle_dovecot_request(msg)
res = dictproxy.handle_dovecot_request(msg, dictproxy_transactions)
assert not res
assert len(dictproxy.transactions) == 1
assert len(dictproxy_transactions) == 1
read_timestamp = user.get_last_login_timestamp()
assert read_timestamp == timestamp // 86400 * 86400

# finish transaction
msg = f"C{tx}"
res = dictproxy.handle_dovecot_request(msg)
res = dictproxy.handle_dovecot_request(msg, dictproxy_transactions)
assert res == "O\n"
assert len(dictproxy.transactions) == 0
assert len(dictproxy_transactions) == 0


def test_handle_dovecot_request_last_login_echobot(example_config):
Expand All @@ -44,17 +46,19 @@ def test_handle_dovecot_request_last_login_echobot(example_config):
authproxy.lookup_passdb(testaddr, "ignore")
user = dictproxy.config.get_user(testaddr)

transactions = {}

# set last-login info for user
tx = "1111"
msg = f"B{tx}\t{testaddr}"
res = dictproxy.handle_dovecot_request(msg)
res = dictproxy.handle_dovecot_request(msg, transactions)
assert not res
assert dictproxy.transactions == {tx: dict(addr=testaddr, res="O\n")}
assert transactions == {tx: dict(addr=testaddr, res="O\n")}

timestamp = int(time.time())
msg = f"S{tx}\tshared/last-login/{testaddr}\t{timestamp}"
res = dictproxy.handle_dovecot_request(msg)
res = dictproxy.handle_dovecot_request(msg, transactions)
assert not res
assert len(dictproxy.transactions) == 1
assert len(transactions) == 1
read_timestamp = user.get_last_login_timestamp()
assert read_timestamp is None
17 changes: 9 additions & 8 deletions chatmaild/src/chatmaild/tests/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,42 +88,43 @@ def test_notifier_remove_without_set(metadata, testaddr):


def test_handle_dovecot_request_lookup_fails(dictproxy, testaddr):
res = dictproxy.handle_dovecot_request(f"Lpriv/123/chatmail\t{testaddr}")
transactions = {}
res = dictproxy.handle_dovecot_request(f"Lpriv/123/chatmail\t{testaddr}", transactions)
assert res == "N\n"


def test_handle_dovecot_request_happy_path(dictproxy, testaddr, token):
metadata = dictproxy.metadata
transactions = dictproxy.transactions
transactions = {}
notifier = dictproxy.notifier

# set device token in a transaction
tx = "1111"
msg = f"B{tx}\t{testaddr}"
res = dictproxy.handle_dovecot_request(msg)
res = dictproxy.handle_dovecot_request(msg, transactions)
assert not res and not metadata.get_tokens_for_addr(testaddr)
assert transactions == {tx: dict(addr=testaddr, res="O\n")}

msg = f"S{tx}\tpriv/guid00/devicetoken\t{token}"
res = dictproxy.handle_dovecot_request(msg)
res = dictproxy.handle_dovecot_request(msg, transactions)
assert not res
assert len(transactions) == 1
assert metadata.get_tokens_for_addr(testaddr) == [token]

msg = f"C{tx}"
res = dictproxy.handle_dovecot_request(msg)
res = dictproxy.handle_dovecot_request(msg, transactions)
assert res == "O\n"
assert len(transactions) == 0
assert metadata.get_tokens_for_addr(testaddr) == [token]

# trigger notification for incoming message
tx2 = "2222"
assert dictproxy.handle_dovecot_request(f"B{tx2}\t{testaddr}") is None
assert dictproxy.handle_dovecot_request(f"B{tx2}\t{testaddr}", transactions) is None
msg = f"S{tx2}\tpriv/guid00/messagenew"
assert dictproxy.handle_dovecot_request(msg) is None
assert dictproxy.handle_dovecot_request(msg, transactions) is None
queue_item = notifier.retry_queues[0].get()[1]
assert queue_item.token == token
assert dictproxy.handle_dovecot_request(f"C{tx2}") == "O\n"
assert dictproxy.handle_dovecot_request(f"C{tx2}", transactions) == "O\n"
assert not transactions
assert queue_item.path.exists()

Expand Down

0 comments on commit d3f3e02

Please sign in to comment.