Skip to content
This repository has been archived by the owner on Dec 11, 2022. It is now read-only.

Commit

Permalink
Fixed broken subscriptions.
Browse files Browse the repository at this point in the history
  • Loading branch information
hperrin committed May 2, 2019
1 parent 0bc2208 commit 2d2ceda
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions src/MessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public function onClose(ConnectionInterface $conn) {
foreach ($this->querySubs as $curEtype => &$curSubscriptions) {
foreach ($curSubscriptions as $curQuery => &$curClients) {
if ($curClients->contains($conn)) {
$curClients->detach($conn);
unset($curClients[$conn]);

$count = count($curClients);

Expand Down Expand Up @@ -100,7 +100,7 @@ public function onClose(ConnectionInterface $conn) {

foreach ($this->uidSubs as $curUID => $curClients) {
if ($curClients->contains($conn)) {
$curClients->detach($conn);
unset($curClients[$conn]);

$count = count($curClients);

Expand All @@ -109,7 +109,6 @@ public function onClose(ConnectionInterface $conn) {
} else {
if (Server::$config['broadcast_counts']) {
// Notify clients of the subscription count.
$count = count($curClients);
foreach ($curClients as $key) {
$curData = $curClients[$key];
if ($curData['count']) {
Expand All @@ -125,6 +124,11 @@ public function onClose(ConnectionInterface $conn) {
}
}

if ($this->sessions->contains($conn)) {
unset($this->sessions[$conn]);
$mess++;
}

if ($mess) {
$this->logger->notice(
"Cleaned up client's mess. ".
Expand All @@ -147,9 +151,9 @@ private function handleAuthentication(ConnectionInterface $from, $data) {
// Save the user's auth token in session storage.
$token = $data['token'];
if (isset($token)) {
$this->sessions->attach($from, $token);
} else {
$this->sessions->detach($from);
$this->sessions[$from] = $token;
} elseif ($this->sessions->contains($from)) {
unset($this->sessions[$from]);
}
}

Expand Down Expand Up @@ -179,12 +183,12 @@ private function handleSubscription(ConnectionInterface $from, $data) {
*/
private function handleSubscriptionQuery(ConnectionInterface $from, $data) {
$args = json_decode($data['query'], true);
if (!class_exists($args[0]['class'])) {
return;
}
$count = count($args);
if ($count > 1) {
for ($i = 1; $i < $count; $i++) {
if (!class_exists($args[0]['class'])) {
return;
}
$newArg =
\Nymph\REST::translateSelector($args[0]['class'], $args[$i]);
if ($newArg === false) {
Expand All @@ -208,10 +212,9 @@ private function handleSubscriptionQuery(ConnectionInterface $from, $data) {
$guidArgs = $args;
$guidArgs[0]['return'] = 'guid';
$guidArgs[0]['source'] = 'client';
$token = null;
if ($this->sessions->contains($from)) {
$token = $this->sessions[$from];
} else {
$token = null;
}
if (class_exists('\Tilmeld\Tilmeld') && isset($token)) {
$user = \Tilmeld\Tilmeld::extractToken($token);
Expand All @@ -220,14 +223,14 @@ private function handleSubscriptionQuery(ConnectionInterface $from, $data) {
\Tilmeld\Tilmeld::fillSession($user);
}
}
$this->querySubs[$etype][$serialArgs]->attach($from, [
$this->querySubs[$etype][$serialArgs][$from] = [
'current' => call_user_func_array(
"\Nymph\Nymph::getEntities",
$guidArgs
),
'query' => $data['query'],
'count' => !!$data['count']
]);
];
if (class_exists('\Tilmeld\Tilmeld') && isset($token)) {
// Clear the user that was temporarily logged in.
\Tilmeld\Tilmeld::clearSession();
Expand Down Expand Up @@ -263,7 +266,7 @@ private function handleSubscriptionQuery(ConnectionInterface $from, $data) {
if (!$this->querySubs[$etype][$serialArgs]->contains($from)) {
return;
}
$this->querySubs[$etype][$serialArgs]->detach($from);
unset($this->querySubs[$etype][$serialArgs][$from]);
$this->logger->notice(
"Client unsubscribed from a query! ".
"($serialArgs, {$from->resourceId})"
Expand Down Expand Up @@ -307,9 +310,9 @@ private function handleSubscriptionUid(ConnectionInterface $from, $data) {
if (!key_exists($data['uid'], $this->uidSubs)) {
$this->uidSubs[$data['uid']] = new \SplObjectStorage();
}
$this->uidSubs[$data['uid']]->attach($from, [
$this->uidSubs[$data['uid']][$from] = [
'count' => !!$data['count']
]);
];
$this->logger->notice(
"Client subscribed to a UID! ".
"({$data['uid']}, {$from->resourceId})"
Expand Down Expand Up @@ -338,7 +341,7 @@ private function handleSubscriptionUid(ConnectionInterface $from, $data) {
if (!$this->uidSubs[$data['uid']]->contains($from)) {
return;
}
$this->uidSubs[$data['uid']]->detach($from);
unset($this->uidSubs[$data['uid']][$from]);
$this->logger->notice(
"Client unsubscribed from a UID! ".
"({$data['uid']}, {$from->resourceId})"
Expand Down Expand Up @@ -442,10 +445,9 @@ private function handlePublishEntity(ConnectionInterface $from, $data) {
$this->prepareSelectors($queryArgs);
$queryArgs[0]['source'] = 'client';
$queryArgs[] = ['&', 'guid' => $data['guid']];
$token = null;
if ($this->sessions->contains($curClient)) {
$token = $this->sessions[$curClient];
} else {
$token = null;
}
if (class_exists('\Tilmeld\Tilmeld') && isset($token)) {
$user = \Tilmeld\Tilmeld::extractToken($token);
Expand Down Expand Up @@ -478,7 +480,7 @@ private function handlePublishEntity(ConnectionInterface $from, $data) {
$curData['current'],
[$data['guid']]
);
$curClients->attach($curClient, $curData);
$curClients[$curClient] = $curData;

// Notify subscriber.
$this->logger->notice(
Expand Down Expand Up @@ -533,10 +535,9 @@ private function handlePublishEntity(ConnectionInterface $from, $data) {
$this->prepareSelectors($queryArgs);
$queryArgs[0]['source'] = 'client';
$queryArgs[] = ['&', 'guid' => $data['guid']];
$token = null;
if ($this->sessions->contains($curClient)) {
$token = $this->sessions[$curClient];
} else {
$token = null;
}
if (class_exists('\Tilmeld\Tilmeld') && isset($token)) {
$user = \Tilmeld\Tilmeld::extractToken($token);
Expand All @@ -554,7 +555,7 @@ private function handlePublishEntity(ConnectionInterface $from, $data) {
// Update the currents list.
$curData = $curClients[$curClient];
$curData['current'][] = $data['guid'];
$curClients->attach($curClient, $curData);
$curClients[$curClient] = $curData;

// Notify client.
$this->logger->notice(
Expand Down

0 comments on commit 2d2ceda

Please sign in to comment.