From 2d2ceda417c0b097b8788eb4d982b439c61eeac9 Mon Sep 17 00:00:00 2001 From: Hunter Perrin Date: Thu, 2 May 2019 15:42:31 -0700 Subject: [PATCH] Fixed broken subscriptions. --- src/MessageHandler.php | 47 +++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/src/MessageHandler.php b/src/MessageHandler.php index 24800fb..09ffdfc 100644 --- a/src/MessageHandler.php +++ b/src/MessageHandler.php @@ -68,7 +68,7 @@ public function onClose(ConnectionInterface $conn) { foreach ($this->querySubs as $curEtype => &$curSubscriptions) { foreach ($curSubscriptions as $curQuery => &$curClients) { if ($curClients->contains($conn)) { - $curClients->detach($conn); + unset($curClients[$conn]); $count = count($curClients); @@ -100,7 +100,7 @@ public function onClose(ConnectionInterface $conn) { foreach ($this->uidSubs as $curUID => $curClients) { if ($curClients->contains($conn)) { - $curClients->detach($conn); + unset($curClients[$conn]); $count = count($curClients); @@ -109,7 +109,6 @@ public function onClose(ConnectionInterface $conn) { } else { if (Server::$config['broadcast_counts']) { // Notify clients of the subscription count. - $count = count($curClients); foreach ($curClients as $key) { $curData = $curClients[$key]; if ($curData['count']) { @@ -125,6 +124,11 @@ public function onClose(ConnectionInterface $conn) { } } + if ($this->sessions->contains($conn)) { + unset($this->sessions[$conn]); + $mess++; + } + if ($mess) { $this->logger->notice( "Cleaned up client's mess. ". @@ -147,9 +151,9 @@ private function handleAuthentication(ConnectionInterface $from, $data) { // Save the user's auth token in session storage. $token = $data['token']; if (isset($token)) { - $this->sessions->attach($from, $token); - } else { - $this->sessions->detach($from); + $this->sessions[$from] = $token; + } elseif ($this->sessions->contains($from)) { + unset($this->sessions[$from]); } } @@ -179,12 +183,12 @@ private function handleSubscription(ConnectionInterface $from, $data) { */ private function handleSubscriptionQuery(ConnectionInterface $from, $data) { $args = json_decode($data['query'], true); + if (!class_exists($args[0]['class'])) { + return; + } $count = count($args); if ($count > 1) { for ($i = 1; $i < $count; $i++) { - if (!class_exists($args[0]['class'])) { - return; - } $newArg = \Nymph\REST::translateSelector($args[0]['class'], $args[$i]); if ($newArg === false) { @@ -208,10 +212,9 @@ private function handleSubscriptionQuery(ConnectionInterface $from, $data) { $guidArgs = $args; $guidArgs[0]['return'] = 'guid'; $guidArgs[0]['source'] = 'client'; + $token = null; if ($this->sessions->contains($from)) { $token = $this->sessions[$from]; - } else { - $token = null; } if (class_exists('\Tilmeld\Tilmeld') && isset($token)) { $user = \Tilmeld\Tilmeld::extractToken($token); @@ -220,14 +223,14 @@ private function handleSubscriptionQuery(ConnectionInterface $from, $data) { \Tilmeld\Tilmeld::fillSession($user); } } - $this->querySubs[$etype][$serialArgs]->attach($from, [ + $this->querySubs[$etype][$serialArgs][$from] = [ 'current' => call_user_func_array( "\Nymph\Nymph::getEntities", $guidArgs ), 'query' => $data['query'], 'count' => !!$data['count'] - ]); + ]; if (class_exists('\Tilmeld\Tilmeld') && isset($token)) { // Clear the user that was temporarily logged in. \Tilmeld\Tilmeld::clearSession(); @@ -263,7 +266,7 @@ private function handleSubscriptionQuery(ConnectionInterface $from, $data) { if (!$this->querySubs[$etype][$serialArgs]->contains($from)) { return; } - $this->querySubs[$etype][$serialArgs]->detach($from); + unset($this->querySubs[$etype][$serialArgs][$from]); $this->logger->notice( "Client unsubscribed from a query! ". "($serialArgs, {$from->resourceId})" @@ -307,9 +310,9 @@ private function handleSubscriptionUid(ConnectionInterface $from, $data) { if (!key_exists($data['uid'], $this->uidSubs)) { $this->uidSubs[$data['uid']] = new \SplObjectStorage(); } - $this->uidSubs[$data['uid']]->attach($from, [ + $this->uidSubs[$data['uid']][$from] = [ 'count' => !!$data['count'] - ]); + ]; $this->logger->notice( "Client subscribed to a UID! ". "({$data['uid']}, {$from->resourceId})" @@ -338,7 +341,7 @@ private function handleSubscriptionUid(ConnectionInterface $from, $data) { if (!$this->uidSubs[$data['uid']]->contains($from)) { return; } - $this->uidSubs[$data['uid']]->detach($from); + unset($this->uidSubs[$data['uid']][$from]); $this->logger->notice( "Client unsubscribed from a UID! ". "({$data['uid']}, {$from->resourceId})" @@ -442,10 +445,9 @@ private function handlePublishEntity(ConnectionInterface $from, $data) { $this->prepareSelectors($queryArgs); $queryArgs[0]['source'] = 'client'; $queryArgs[] = ['&', 'guid' => $data['guid']]; + $token = null; if ($this->sessions->contains($curClient)) { $token = $this->sessions[$curClient]; - } else { - $token = null; } if (class_exists('\Tilmeld\Tilmeld') && isset($token)) { $user = \Tilmeld\Tilmeld::extractToken($token); @@ -478,7 +480,7 @@ private function handlePublishEntity(ConnectionInterface $from, $data) { $curData['current'], [$data['guid']] ); - $curClients->attach($curClient, $curData); + $curClients[$curClient] = $curData; // Notify subscriber. $this->logger->notice( @@ -533,10 +535,9 @@ private function handlePublishEntity(ConnectionInterface $from, $data) { $this->prepareSelectors($queryArgs); $queryArgs[0]['source'] = 'client'; $queryArgs[] = ['&', 'guid' => $data['guid']]; + $token = null; if ($this->sessions->contains($curClient)) { $token = $this->sessions[$curClient]; - } else { - $token = null; } if (class_exists('\Tilmeld\Tilmeld') && isset($token)) { $user = \Tilmeld\Tilmeld::extractToken($token); @@ -554,7 +555,7 @@ private function handlePublishEntity(ConnectionInterface $from, $data) { // Update the currents list. $curData = $curClients[$curClient]; $curData['current'][] = $data['guid']; - $curClients->attach($curClient, $curData); + $curClients[$curClient] = $curData; // Notify client. $this->logger->notice(