From 01f9386ea9ea45e1e3eaf69f55c378f433f4b185 Mon Sep 17 00:00:00 2001 From: George Janak Date: Wed, 21 Dec 2016 14:55:18 +0000 Subject: [PATCH 1/5] moved the oauth keys into its own file so we define it only once --- .gitignore | 1 + example/filter-oauth.php | 14 +++----------- example/filter-reconfigure.php | 12 ++---------- example/filter-track-geo.php | 12 ++---------- example/filter-track.php | 14 +++----------- example/ghetto-queue-collect.php | 10 +--------- example/sample-oauth-es.php | 17 +++++------------ example/sample.php | 10 +--------- example/sitestream.php | 17 +++-------------- example/twitter-auth-config.php | 13 +++++++++++++ example/userstream-alternative.php | 17 +++-------------- example/userstream-simple.php | 17 +++-------------- 12 files changed, 40 insertions(+), 114 deletions(-) create mode 100644 example/twitter-auth-config.php diff --git a/.gitignore b/.gitignore index aefac54..2121da7 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ *.un~ .cache .cache/* +.idea diff --git a/example/filter-oauth.php b/example/filter-oauth.php index 9adac3d..f6aa70a 100644 --- a/example/filter-oauth.php +++ b/example/filter-oauth.php @@ -1,9 +1,10 @@ setTrack(array('morning', 'goodnight', 'hello', 'the')); diff --git a/example/filter-reconfigure.php b/example/filter-reconfigure.php index 284c396..8a041e0 100644 --- a/example/filter-reconfigure.php +++ b/example/filter-reconfigure.php @@ -1,6 +1,7 @@ consume(); \ No newline at end of file +$sc->consume(); diff --git a/example/filter-track-geo.php b/example/filter-track-geo.php index a72b3bf..83d42e1 100644 --- a/example/filter-track-geo.php +++ b/example/filter-track-geo.php @@ -1,6 +1,7 @@ setLocations(array( array(-122.75, 36.8, -121.75, 37.8), // San Francisco array(-74, 40, -73, 41), // New York )); -$sc->consume(); \ No newline at end of file +$sc->consume(); diff --git a/example/filter-track.php b/example/filter-track.php index 080cdf0..91a9db2 100644 --- a/example/filter-track.php +++ b/example/filter-track.php @@ -1,6 +1,7 @@ setTrack(array('morning', 'goodnight', 'hello', 'the')); -$sc->consume(); \ No newline at end of file +$sc->setTrack(['berlin', 'attack']); +$sc->consume(); diff --git a/example/ghetto-queue-collect.php b/example/ghetto-queue-collect.php index da5b875..8e64a78 100644 --- a/example/ghetto-queue-collect.php +++ b/example/ghetto-queue-collect.php @@ -1,6 +1,7 @@ setTrack(array('morning', 'goodnight', 'hello', 'the', 'and')); diff --git a/example/sample-oauth-es.php b/example/sample-oauth-es.php index 89f1483..819b7db 100644 --- a/example/sample-oauth-es.php +++ b/example/sample-oauth-es.php @@ -1,8 +1,10 @@ setLang('es'); -$sc->consume(); \ No newline at end of file +$sc->consume(); diff --git a/example/sample.php b/example/sample.php index 1618bc5..4fe12de 100644 --- a/example/sample.php +++ b/example/sample.php @@ -1,6 +1,7 @@ consume(); diff --git a/example/sitestream.php b/example/sitestream.php index 8fa3574..a6df606 100644 --- a/example/sitestream.php +++ b/example/sitestream.php @@ -1,12 +1,13 @@ setFollow(array( diff --git a/example/twitter-auth-config.php b/example/twitter-auth-config.php new file mode 100644 index 0000000..f52f41f --- /dev/null +++ b/example/twitter-auth-config.php @@ -0,0 +1,13 @@ +consume(); diff --git a/example/userstream-simple.php b/example/userstream-simple.php index 36ac2ad..ec8e5eb 100644 --- a/example/userstream-simple.php +++ b/example/userstream-simple.php @@ -1,10 +1,11 @@ consume(); From 498390a43563cf89e87b126c0915b96fb82f1d44 Mon Sep 17 00:00:00 2001 From: George Janak Date: Wed, 21 Dec 2016 15:08:11 +0000 Subject: [PATCH 2/5] back to default values and php5.2 --- example/filter-track.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/filter-track.php b/example/filter-track.php index 91a9db2..f6aa70a 100644 --- a/example/filter-track.php +++ b/example/filter-track.php @@ -29,5 +29,5 @@ public function enqueueStatus($status) // Start streaming $sc = new FilterTrackConsumer(OAUTH_TOKEN, OAUTH_SECRET, Phirehose::METHOD_FILTER); -$sc->setTrack(['berlin', 'attack']); +$sc->setTrack(array('morning', 'goodnight', 'hello', 'the')); $sc->consume(); From 10bc782e4a9d72c85acca5ba0057a0c10f0bcbb6 Mon Sep 17 00:00:00 2001 From: George Janak Date: Wed, 21 Dec 2016 15:37:44 +0000 Subject: [PATCH 3/5] - psr1/psr2 refactoring - phpdoc fixes - fixed error log and small typos - bumped up the version to php5.4 --- composer.json | 4 +- example/filter-oauth.php | 36 +- example/filter-reconfigure.php | 56 +- example/filter-track-geo.php | 44 +- example/filter-track.php | 36 +- example/ghetto-queue-collect.php | 228 ++-- example/ghetto-queue-consume.php | 226 ++-- example/sample-en.php | 37 +- example/sample-oauth-es.php | 34 +- example/sample.php | 34 +- example/sitestream.php | 69 +- example/userstream-alternative.php | 59 +- example/userstream-simple.php | 56 +- lib/OauthPhirehose.php | 277 ++--- lib/Phirehose.php | 1749 +++++++++++++++------------- lib/UserstreamPhirehose.php | 41 +- 16 files changed, 1527 insertions(+), 1459 deletions(-) diff --git a/composer.json b/composer.json index b0f51c5..8be99ff 100644 --- a/composer.json +++ b/composer.json @@ -12,9 +12,9 @@ } ], "require": { - "php": ">=5.2.0" + "php": ">=5.4.0" }, "autoload": { "classmap": ["lib"] } -} \ No newline at end of file +} diff --git a/example/filter-oauth.php b/example/filter-oauth.php index f6aa70a..30a6a69 100644 --- a/example/filter-oauth.php +++ b/example/filter-oauth.php @@ -1,6 +1,6 @@ setTrack(array('morning', 'goodnight', 'hello', 'the')); +$sc->setTrack(['morning', 'goodnight', 'hello', 'the']); $sc->consume(); diff --git a/example/filter-reconfigure.php b/example/filter-reconfigure.php index 8a041e0..26c4d07 100644 --- a/example/filter-reconfigure.php +++ b/example/filter-reconfigure.php @@ -1,6 +1,6 @@ myTrackWords[rand(0, 3)]; - $randWord2 = $this->myTrackWords[rand(0, 3)]; - $this->setTrack(array($randWord1, $randWord2)); - } + /** + * Enqueue each status + * + * @param string $status + */ + public function enqueueStatus($status) + { + // We won't actually do anything with statuses in this example, see updateFilterPredicates() for important stuff + } + /** + * In this example, we just set the track words to a random 2 words. In a real example, you'd want to check some sort + * of shared medium (ie: memcache, DB, filesystem) to determine if the filter has changed and set appropriately. The + * speed of this method will affect how quickly you can update filters. + */ + public function checkFilterPredicates() + { + // This is all that's required, Phirehose will detect the change and reconnect as soon as possible + $randWord1 = $this->myTrackWords[rand(0, 3)]; + $randWord2 = $this->myTrackWords[rand(0, 3)]; + $this->setTrack([$randWord1, $randWord2]); + } } // Start streaming diff --git a/example/filter-track-geo.php b/example/filter-track-geo.php index 83d42e1..2b720d4 100644 --- a/example/filter-track-geo.php +++ b/example/filter-track-geo.php @@ -1,6 +1,6 @@ setLocations(array( - array(-122.75, 36.8, -121.75, 37.8), // San Francisco - array(-74, 40, -73, 41), // New York - )); +$sc->setLocations( + [ + [-122.75, 36.8, -121.75, 37.8], // San Francisco + [-74, 40, -73, 41], // New York + ] +); $sc->consume(); diff --git a/example/filter-track.php b/example/filter-track.php index f6aa70a..30a6a69 100644 --- a/example/filter-track.php +++ b/example/filter-track.php @@ -1,6 +1,6 @@ setTrack(array('morning', 'goodnight', 'hello', 'the')); +$sc->setTrack(['morning', 'goodnight', 'hello', 'the']); $sc->consume(); diff --git a/example/ghetto-queue-collect.php b/example/ghetto-queue-collect.php index 8e64a78..26ea3a9 100644 --- a/example/ghetto-queue-collect.php +++ b/example/ghetto-queue-collect.php @@ -1,149 +1,147 @@ = 5 seconds'); - } - - // Set subclass parameters - $this->queueDir = $queueDir; - $this->rotateInterval = $rotateInterval; - - // Call parent constructor - return parent::__construct($username, $password, Phirehose::METHOD_FILTER); - } - - /** - * Enqueue each status - * - * @param string $status - */ - public function enqueueStatus($status) - { - - // Write the status to the stream (must be via getStream()) - fputs($this->getStream(), $status .PHP_EOL); - - /* Are we due for a file rotate? Note this won't be called if there are no statuses coming through - - * This is (probably) a good thing as it means the collector won't needlessly rotate empty files. That said, if - * you have a very sparse/quiet stream that you need highly regular analytics on, this may not work for you. + /** + * Member attributes specific to this subclass */ - $now = time(); - if (($now - $this->lastRotated) > $this->rotateInterval) { - // Mark last rotation time as now - $this->lastRotated = $now; + protected $queueDir; + protected $rotateInterval; + protected $streamFile; + protected $statusStream; + protected $lastRotated; + + /** + * Overidden constructor to take class-specific parameters + * + * @param string $username + * @param string $password + * @param string $queueDir + * @param integer $rotateInterval + * + * @throws Exception + */ + public function __construct($username, $password, $queueDir = '/tmp', $rotateInterval = 10) + { - // Rotate it - $this->rotateStreamFile(); - } + // Sanity check + if ($rotateInterval < 5) { + throw new Exception('Rotate interval set too low - Must be >= 5 seconds'); + } - } - - /** - * Returns a stream resource for the current file being written/enqueued to - * - * @return resource - */ - private function getStream() - { - // If we have a valid stream, return it - if (is_resource($this->statusStream)) { - return $this->statusStream; - } + // Set subclass parameters + $this->queueDir = $queueDir; + $this->rotateInterval = $rotateInterval; - // If it's not a valid resource, we need to create one - if (!is_dir($this->queueDir) || !is_writable($this->queueDir)) { - throw new Exception('Unable to write to queueDir: ' . $this->queueDir); + // Call parent constructor + return parent::__construct($username, $password, Phirehose::METHOD_FILTER); } - // Construct stream file name, log and open - $this->streamFile = $this->queueDir . '/' . self::QUEUE_FILE_ACTIVE; - $this->log('Opening new active status stream: ' . $this->streamFile); - $this->statusStream = fopen($this->streamFile, 'a'); // Append if present (crash recovery) - - // Ok? - if (!is_resource($this->statusStream)) { - throw new Exception('Unable to open stream file for writing: ' . $this->streamFile); + /** + * Enqueue each status + * + * @param string $status + */ + public function enqueueStatus($status) + { + + // Write the status to the stream (must be via getStream()) + fputs($this->getStream(), $status . PHP_EOL); + + /* Are we due for a file rotate? Note this won't be called if there are no statuses coming through - + * This is (probably) a good thing as it means the collector won't needlessly rotate empty files. That said, if + * you have a very sparse/quiet stream that you need highly regular analytics on, this may not work for you. + */ + $now = time(); + if (($now - $this->lastRotated) > $this->rotateInterval) { + // Mark last rotation time as now + $this->lastRotated = $now; + + // Rotate it + $this->rotateStreamFile(); + } } - // If we don't have a last rotated time, it's effectively now - if ($this->lastRotated == NULL) { - $this->lastRotated = time(); + /** + * Returns a stream resource for the current file being written/enqueued to + * + * @return resource + * @throws Exception + */ + private function getStream() + { + // If we have a valid stream, return it + if (is_resource($this->statusStream)) { + return $this->statusStream; + } + + // If it's not a valid resource, we need to create one + if (!is_dir($this->queueDir) || !is_writable($this->queueDir)) { + throw new Exception('Unable to write to queueDir: ' . $this->queueDir); + } + + // Construct stream file name, log and open + $this->streamFile = $this->queueDir . '/' . self::QUEUE_FILE_ACTIVE; + $this->log('Opening new active status stream: ' . $this->streamFile); + $this->statusStream = fopen($this->streamFile, 'a'); // Append if present (crash recovery) + + // Ok? + if (!is_resource($this->statusStream)) { + throw new Exception('Unable to open stream file for writing: ' . $this->streamFile); + } + + // If we don't have a last rotated time, it's effectively now + if ($this->lastRotated == null) { + $this->lastRotated = time(); + } + + // Looking good, return the resource + return $this->statusStream; } - // Looking good, return the resource - return $this->statusStream; - - } + /** + * Rotates the stream file if due + * @throws Exception + */ + private function rotateStreamFile() + { + // Close the stream + fclose($this->statusStream); - /** - * Rotates the stream file if due - */ - private function rotateStreamFile() - { - // Close the stream - fclose($this->statusStream); + // Create queue file with timestamp so they're both unique and naturally ordered + $queueFile = $this->queueDir . '/' . self::QUEUE_FILE_PREFIX . '.' . date('Ymd-His') . '.queue'; - // Create queue file with timestamp so they're both unique and naturally ordered - $queueFile = $this->queueDir . '/' . self::QUEUE_FILE_PREFIX . '.' . date('Ymd-His') . '.queue'; + // Do the rotate + rename($this->streamFile, $queueFile); - // Do the rotate - rename($this->streamFile, $queueFile); + // Did it work? + if (!file_exists($queueFile)) { + throw new Exception('Failed to rotate queue file to: ' . $queueFile); + } - // Did it work? - if (!file_exists($queueFile)) { - throw new Exception('Failed to rotate queue file to: ' . $queueFile); + // At this point, all looking good - the next call to getStream() will create a new active file + $this->log('Successfully rotated active stream to queue file: ' . $queueFile); } - - // At this point, all looking good - the next call to getStream() will create a new active file - $this->log('Successfully rotated active stream to queue file: ' . $queueFile); - } - } // End of class // Start streaming/collecting $sc = new GhettoQueueCollector(OAUTH_TOKEN, OAUTH_SECRET); -$sc->setTrack(array('morning', 'goodnight', 'hello', 'the', 'and')); +$sc->setTrack(['morning', 'goodnight', 'hello', 'the', 'and']); $sc->consume(); diff --git a/example/ghetto-queue-consume.php b/example/ghetto-queue-consume.php index e5145fd..2b793d2 100644 --- a/example/ghetto-queue-consume.php +++ b/example/ghetto-queue-consume.php @@ -1,131 +1,125 @@ queueDir = $queueDir; - $this->filePattern = $filePattern; - $this->checkInterval = $checkInterval; - - // Sanity checks - if (!is_dir($queueDir)) { - throw new ErrorException('Invalid directory: ' . $queueDir); + /** + * Member attribs + */ + protected $queueDir; + protected $filePattern; + protected $checkInterval; + + /** + * Construct the consumer and start processing + */ + public function __construct($queueDir = '/tmp', $filePattern = 'phirehose-ghettoqueue*.queue', $checkInterval = 10) + { + $this->queueDir = $queueDir; + $this->filePattern = $filePattern; + $this->checkInterval = $checkInterval; + + // Sanity checks + if (!is_dir($queueDir)) { + throw new ErrorException('Invalid directory: ' . $queueDir); + } + } + + /** + * Method that actually starts the processing task (never returns). + */ + public function process() + { + // Loop infinitely + while (true) { + + // Get a list of queue files + $queueFiles = glob($this->queueDir . '/' . $this->filePattern); + $lastCheck = time(); + + $this->log('Found ' . count($queueFiles) . ' queue files to process...'); + + // Iterate over each file (if any) + foreach ($queueFiles as $queueFile) { + $this->processQueueFile($queueFile); + } + + // Wait until ready for next check + $this->log('Sleeping...'); + while (time() - $lastCheck < $this->checkInterval) { + sleep(1); + } + } // Infinite loop + } // End process() + + /** + * Basic log function. + * + * @see error_log() + * + * @param string $message + */ + protected function log($message) + { + @error_log($message, 0); } - - } - - /** - * Method that actually starts the processing task (never returns). - */ - public function process() { - - // Init some things - $lastCheck = 0; - - // Loop infinitely - while (TRUE) { - - // Get a list of queue files - $queueFiles = glob($this->queueDir . '/' . $this->filePattern); - $lastCheck = time(); - - $this->log('Found ' . count($queueFiles) . ' queue files to process...'); - - // Iterate over each file (if any) - foreach ($queueFiles as $queueFile) { - $this->processQueueFile($queueFile); - } - - // Wait until ready for next check - $this->log('Sleeping...'); - while (time() - $lastCheck < $this->checkInterval) { - sleep(1); - } - - } // Infinite loop - - } // End process() - - /** - * Processes a queue file and does something with it (example only) - * @param string $queueFile The queue file - */ - protected function processQueueFile($queueFile) { - $this->log('Processing file: ' . $queueFile); - - // Open file - $fp = fopen($queueFile, 'r'); - - // Check if something has gone wrong, or perhaps the file is just locked by another process - if (!is_resource($fp)) { - $this->log('WARN: Unable to open file or file already open: ' . $queueFile . ' - Skipping.'); - return FALSE; + + /** + * Processes a queue file and does something with it (example only) + * + * @param string $queueFile The queue file + * + * @return bool + */ + protected function processQueueFile($queueFile) + { + $this->log('Processing file: ' . $queueFile); + + // Open file + $fp = fopen($queueFile, 'r'); + + // Check if something has gone wrong, or perhaps the file is just locked by another process + if (!is_resource($fp)) { + $this->log('WARN: Unable to open file or file already open: ' . $queueFile . ' - Skipping.'); + + return false; + } + + // Lock file + flock($fp, LOCK_EX); + + // Loop over each line (1 line per status) + $statusCounter = 0; + while ($rawStatus = fgets($fp, 8192)) { + $statusCounter++; + + /** **************** NOTE ******************** + * This is the part where you would normally do your processing. If you're extracting/trending information + * about the tweets it should happen here, where it doesn't matter so much if things are slow (you will + * catch up on the next loop). + */ + $data = json_decode($rawStatus, true); + if (is_array($data) && isset($data['user']['screen_name'])) { + $this->log('Decoded tweet: ' . $data['user']['screen_name'] . ': ' . urldecode($data['text'])); + } + } // End while + + // Release lock and close + flock($fp, LOCK_UN); + fclose($fp); + + // All done with this file + $this->log('Successfully processed ' . $statusCounter . ' tweets from ' . $queueFile . ' - deleting.'); + unlink($queueFile); } - - // Lock file - flock($fp, LOCK_EX); - - // Loop over each line (1 line per status) - $statusCounter = 0; - while ($rawStatus = fgets($fp, 8192)) { - $statusCounter ++; - - /** **************** NOTE ******************** - * This is the part where you would normally do your processing. If you're extracting/trending information - * about the tweets it should happen here, where it doesn't matter so much if things are slow (you will - * catch up on the next loop). - */ - $data = json_decode($rawStatus, true); - if (is_array($data) && isset($data['user']['screen_name'])) { - $this->log('Decoded tweet: ' . $data['user']['screen_name'] . ': ' . urldecode($data['text'])); - } - - } // End while - - // Release lock and close - flock($fp, LOCK_UN); - fclose($fp); - - // All done with this file - $this->log('Successfully processed ' . $statusCounter . ' tweets from ' . $queueFile . ' - deleting.'); - unlink($queueFile); - - } - - /** - * Basic log function. - * - * @see error_log() - * @param string $messages - */ - protected function log($message) - { - @error_log($message, 0); - } - } // Construct consumer and start processing $gqc = new GhettoQueueConsumer(); -$gqc->process(); \ No newline at end of file +$gqc->process(); diff --git a/example/sample-en.php b/example/sample-en.php index 6c86ea7..5e6d644 100644 --- a/example/sample-en.php +++ b/example/sample-en.php @@ -1,31 +1,32 @@ setLang('es'); -$sc->consume(); \ No newline at end of file +$sc->consume(); diff --git a/example/sample-oauth-es.php b/example/sample-oauth-es.php index 819b7db..bf6a46f 100644 --- a/example/sample-oauth-es.php +++ b/example/sample-oauth-es.php @@ -1,6 +1,6 @@ array(123,2334,9876)); - * - * Each tweet of your friends looks like: - * [id] => 1011234124121 - * [text] => (the tweet) - * [user] => array( the user who tweeted ) - * [entities] => array ( urls, etc. ) - * - * Every 30 seconds we get the keep-alive message, where $status is empty. - * - * When the user adds a friend we get one of these: - * [event] => follow - * [source] => Array( my user ) - * [created_at] => Tue May 24 13:02:25 +0000 2011 - * [target] => Array (the user now being followed) - * - * @param string $status - */ - public function enqueueStatus($status) - { - /* - * In this simple example, we will just display to STDOUT rather than enqueue. - * NOTE: You should NOT be processing tweets at this point in a real application, instead they - * should be being enqueued and processed asyncronously from the collection process. + /** + * First response looks like this: + * $data=array('friends'=>array(123,2334,9876)); + * Each tweet of your friends looks like: + * [id] => 1011234124121 + * [text] => (the tweet) + * [user] => array( the user who tweeted ) + * [entities] => array ( urls, etc. ) + * Every 30 seconds we get the keep-alive message, where $status is empty. + * When the user adds a friend we get one of these: + * [event] => follow + * [source] => Array( my user ) + * [created_at] => Tue May 24 13:02:25 +0000 2011 + * [target] => Array (the user now being followed) + * + * @param string $status */ - $data = json_decode($status, true); - echo date("Y-m-d H:i:s (").strlen($status)."):".print_r($data,true)."\n"; - } - + public function enqueueStatus($status) + { + /* + * In this simple example, we will just display to STDOUT rather than enqueue. + * NOTE: You should NOT be processing tweets at this point in a real application, instead they + * should be being enqueued and processed asyncronously from the collection process. + */ + $data = json_decode($status, true); + echo date("Y-m-d H:i:s (") . strlen($status) . "):" . print_r($data, true) . "\n"; + } } // Start streaming $sc = new MyUserConsumer(OAUTH_TOKEN, OAUTH_SECRET, Phirehose::METHOD_SITE); -$sc->setFollow(array( - 1234, 5678, 901234573 //The user IDs of the twitter accounts to follow. All of +$sc->setFollow( + [ + 1234, + 5678, + 901234573 //The user IDs of the twitter accounts to follow. All of //these users must have given your app permission. - )); + ] +); $sc->consume(); diff --git a/example/userstream-alternative.php b/example/userstream-alternative.php index 6bb581e..1217bfe 100644 --- a/example/userstream-alternative.php +++ b/example/userstream-alternative.php @@ -1,47 +1,42 @@ array(123,2334,9876)); - * - * Each tweet of your friends looks like: - * [id] => 1011234124121 - * [text] => (the tweet) - * [user] => array( the user who tweeted ) - * [entities] => array ( urls, etc. ) - * - * Every 30 seconds we get the keep-alive message, where $status is empty. - * - * When the user adds a friend we get one of these: - * [event] => follow - * [source] => Array( my user ) - * [created_at] => Tue May 24 13:02:25 +0000 2011 - * [target] => Array (the user now being followed) - * - * @param string $status - */ - public function enqueueStatus($status) - { - /* - * In this simple example, we will just display to STDOUT rather than enqueue. - * NOTE: You should NOT be processing tweets at this point in a real application, instead they - * should be being enqueued and processed asyncronously from the collection process. + /** + * First response looks like this: + * $data=array('friends'=>array(123,2334,9876)); + * Each tweet of your friends looks like: + * [id] => 1011234124121 + * [text] => (the tweet) + * [user] => array( the user who tweeted ) + * [entities] => array ( urls, etc. ) + * Every 30 seconds we get the keep-alive message, where $status is empty. + * When the user adds a friend we get one of these: + * [event] => follow + * [source] => Array( my user ) + * [created_at] => Tue May 24 13:02:25 +0000 2011 + * [target] => Array (the user now being followed) + * + * @param string $status */ - $data = json_decode($status, true); - echo date("Y-m-d H:i:s (").strlen($status)."):".print_r($data,true)."\n"; - } - + public function enqueueStatus($status) + { + /* + * In this simple example, we will just display to STDOUT rather than enqueue. + * NOTE: You should NOT be processing tweets at this point in a real application, instead they + * should be being enqueued and processed asyncronously from the collection process. + */ + $data = json_decode($status, true); + echo date("Y-m-d H:i:s (") . strlen($status) . "):" . print_r($data, true) . "\n"; + } } // Start streaming diff --git a/example/userstream-simple.php b/example/userstream-simple.php index ec8e5eb..4bdcd23 100644 --- a/example/userstream-simple.php +++ b/example/userstream-simple.php @@ -7,37 +7,33 @@ */ class MyUserConsumer extends UserstreamPhirehose { - /** - * First response looks like this: - * $data=array('friends'=>array(123,2334,9876)); - * - * Each tweet of your friends looks like: - * [id] => 1011234124121 - * [text] => (the tweet) - * [user] => array( the user who tweeted ) - * [entities] => array ( urls, etc. ) - * - * Every 30 seconds we get the keep-alive message, where $status is empty. - * - * When the user adds a friend we get one of these: - * [event] => follow - * [source] => Array( my user ) - * [created_at] => Tue May 24 13:02:25 +0000 2011 - * [target] => Array (the user now being followed) - * - * @param string $status - */ - public function enqueueStatus($status) - { - /* - * In this simple example, we will just display to STDOUT rather than enqueue. - * NOTE: You should NOT be processing tweets at this point in a real application, instead they - * should be being enqueued and processed asyncronously from the collection process. + /** + * First response looks like this: + * $data=array('friends'=>array(123,2334,9876)); + * Each tweet of your friends looks like: + * [id] => 1011234124121 + * [text] => (the tweet) + * [user] => array( the user who tweeted ) + * [entities] => array ( urls, etc. ) + * Every 30 seconds we get the keep-alive message, where $status is empty. + * When the user adds a friend we get one of these: + * [event] => follow + * [source] => Array( my user ) + * [created_at] => Tue May 24 13:02:25 +0000 2011 + * [target] => Array (the user now being followed) + * + * @param string $status */ - $data = json_decode($status, true); - echo date("Y-m-d H:i:s (").strlen($status)."):".print_r($data,true)."\n"; - } - + public function enqueueStatus($status) + { + /* + * In this simple example, we will just display to STDOUT rather than enqueue. + * NOTE: You should NOT be processing tweets at this point in a real application, instead they + * should be being enqueued and processed asyncronously from the collection process. + */ + $data = json_decode($status, true); + echo date("Y-m-d H:i:s (") . strlen($status) . "):" . print_r($data, true) . "\n"; + } } // Start streaming diff --git a/lib/OauthPhirehose.php b/lib/OauthPhirehose.php index 5b024df..44dfc00 100644 --- a/lib/OauthPhirehose.php +++ b/lib/OauthPhirehose.php @@ -2,148 +2,155 @@ require_once('Phirehose.php'); /** -* -* -* @internal At time of writing thise overrides getAuthorizationHeader() from the parent class; -* all other functions are helper functions for that. -*/ + * @internal At time of writing thise overrides getAuthorizationHeader() from the parent class; + * all other functions are helper functions for that. + */ abstract class OauthPhirehose extends Phirehose { + /** + * The Twitter consumer key. Get it from the application's page on Twitter. + * If not set then the global define TWITTER_CONSUMER_KEY is used instead. + */ + public $consumerKey = null; + /** + * The Twitter consumer secret. Get it from the application's page on Twitter. + * If not set then the global define TWITTER_CONSUMER_SECRET is used instead. + */ + public $consumerSecret = null; + protected $auth_method; - protected $auth_method; + /** Overrides base class function */ + protected function getAuthorizationHeader($url, $requestParams) + { + return $this->getOAuthHeader('POST', $url, $requestParams); + } - /** - * The Twitter consumer key. Get it from the application's page on Twitter. - * If not set then the global define TWITTER_CONSUMER_KEY is used instead. - */ - public $consumerKey=null; + protected function getOAuthHeader($method, $url, $params = []) + { + $params = $this->prepareParameters($method, $url, $params); + $oauthHeaders = $params['oauth']; - /** - * The Twitter consumer secret. Get it from the application's page on Twitter. - * If not set then the global define TWITTER_CONSUMER_SECRET is used instead. - */ - public $consumerSecret=null; + $oauth = 'OAuth realm="",'; + foreach ($oauthHeaders as $name => $value) { + $oauth .= "{$name}=\"{$value}\","; + } + $oauth = substr($oauth, 0, -1); + return $oauth; + } /** - */ - protected function prepareParameters($method = null, $url = null, - array $params) - { - if (empty($method) || empty($url)) - return false; - - $oauth['oauth_consumer_key'] = $this->consumerKey?$this->consumerKey:TWITTER_CONSUMER_KEY; - $oauth['oauth_nonce'] = md5(uniqid(rand(), true)); - $oauth['oauth_signature_method'] = 'HMAC-SHA1'; - $oauth['oauth_timestamp'] = time(); - $oauth['oauth_version'] = '1.0A'; - $oauth['oauth_token'] = $this->username; - if (isset($params['oauth_verifier'])) - { - $oauth['oauth_verifier'] = $params['oauth_verifier']; - unset($params['oauth_verifier']); - } - // encode all oauth values - foreach ($oauth as $k => $v) - $oauth[$k] = $this->encode_rfc3986($v); - - // encode all non '@' params - // keep sigParams for signature generation (exclude '@' params) - // rename '@key' to 'key' - $sigParams = array(); - $hasFile = false; - if (is_array($params)) - { - foreach ($params as $k => $v) - { - if (strncmp('@', $k, 1) !== 0) - { - $sigParams[$k] = $this->encode_rfc3986($v); - $params[$k] = $this->encode_rfc3986($v); - } - else - { - $params[substr($k, 1)] = $v; - unset($params[$k]); - $hasFile = true; - } - } - - if ($hasFile === true) - $sigParams = array(); - } - - $sigParams = array_merge($oauth, (array) $sigParams); - - // sorting - ksort($sigParams); - - // signing - $oauth['oauth_signature'] = $this->encode_rfc3986($this->generateSignature($method, $url, $sigParams)); - return array('request' => $params, 'oauth' => $oauth); - } - - protected function encode_rfc3986($string) - { - return str_replace('+', ' ', str_replace('%7E', '~', rawurlencode(($string)))); - } - - protected function generateSignature($method = null, $url = null, - $params = null) - { - if (empty($method) || empty($url)) - return false; - - // concatenating and encode - $concat = ''; - foreach ((array) $params as $key => $value) - $concat .= "{$key}={$value}&"; - $concat = substr($concat, 0, -1); - $concatenatedParams = $this->encode_rfc3986($concat); - - // normalize url - $urlParts = parse_url($url); - $scheme = strtolower($urlParts['scheme']); - $host = strtolower($urlParts['host']); - $port = isset($urlParts['port']) ? intval($urlParts['port']) : 0; - $retval = strtolower($scheme) . '://' . strtolower($host); - if (!empty($port) && (($scheme === 'http' && $port != 80) || ($scheme === 'https' && $port != 443))) - $retval .= ":{$port}"; - - $retval .= $urlParts['path']; - if (!empty($urlParts['query'])) - $retval .= "?{$urlParts['query']}"; - - $normalizedUrl = $this->encode_rfc3986($retval); - $method = $this->encode_rfc3986($method); // don't need this but why not? - - $signatureBaseString = "{$method}&{$normalizedUrl}&{$concatenatedParams}"; - - # sign the signature string - $key = $this->encode_rfc3986($this->consumerSecret?$this->consumerSecret:TWITTER_CONSUMER_SECRET) . '&' . $this->encode_rfc3986($this->password); - return base64_encode(hash_hmac('sha1', $signatureBaseString, $key, true)); - } - - protected function getOAuthHeader($method, $url, $params = array()) - { - $params = $this->prepareParameters($method, $url, $params); - $oauthHeaders = $params['oauth']; - - $urlParts = parse_url($url); - $oauth = 'OAuth realm="",'; - foreach ($oauthHeaders as $name => $value) - { - $oauth .= "{$name}=\"{$value}\","; - } - $oauth = substr($oauth, 0, -1); - - return $oauth; - } + * @param null $method + * @param null $url + * @param array $params + * + * @return array|bool + */ + protected function prepareParameters($method = null, $url = null, array $params) + { + if (empty($method) || empty($url)) { + return false; + } + + $oauth['oauth_consumer_key'] = $this->consumerKey ? $this->consumerKey : TWITTER_CONSUMER_KEY; + $oauth['oauth_nonce'] = md5(uniqid(rand(), true)); + $oauth['oauth_signature_method'] = 'HMAC-SHA1'; + $oauth['oauth_timestamp'] = time(); + $oauth['oauth_version'] = '1.0A'; + $oauth['oauth_token'] = $this->username; + if (isset($params['oauth_verifier'])) { + $oauth['oauth_verifier'] = $params['oauth_verifier']; + unset($params['oauth_verifier']); + } + // encode all oauth values + foreach ($oauth as $k => $v) { + $oauth[$k] = $this->encode_rfc3986($v); + } + + // encode all non '@' params + // keep sigParams for signature generation (exclude '@' params) + // rename '@key' to 'key' + $sigParams = []; + $hasFile = false; + if (is_array($params)) { + foreach ($params as $k => $v) { + if (strncmp('@', $k, 1) !== 0) { + $sigParams[$k] = $this->encode_rfc3986($v); + $params[$k] = $this->encode_rfc3986($v); + } else { + $params[substr($k, 1)] = $v; + unset($params[$k]); + $hasFile = true; + } + } + + if ($hasFile === true) { + $sigParams = []; + } + } + + $sigParams = array_merge($oauth, (array)$sigParams); + + // sorting + ksort($sigParams); + + // signing + $oauth['oauth_signature'] = $this->encode_rfc3986($this->generateSignature($method, $url, $sigParams)); + + return ['request' => $params, 'oauth' => $oauth]; + } + + protected function encode_rfc3986($string) + { + return str_replace('+', ' ', str_replace('%7E', '~', rawurlencode(($string)))); + } - /** Overrides base class function */ - protected function getAuthorizationHeader($url,$requestParams) - { - return $this->getOAuthHeader('POST', $url, $requestParams); - } + /** + * @param null $method + * @param null $url + * @param null $params + * + * @return bool|string + */ + protected function generateSignature($method = null, $url = null, $params = null) + { + if (empty($method) || empty($url)) { + return false; + } + + // concatenating and encode + $concat = ''; + foreach ((array)$params as $key => $value) { + $concat .= "{$key}={$value}&"; + } + $concat = substr($concat, 0, -1); + $concatenatedParams = $this->encode_rfc3986($concat); + + // normalize url + $urlParts = parse_url($url); + $scheme = strtolower($urlParts['scheme']); + $host = strtolower($urlParts['host']); + $port = isset($urlParts['port']) ? intval($urlParts['port']) : 0; + $retval = strtolower($scheme) . '://' . strtolower($host); + if (!empty($port) && (($scheme === 'http' && $port != 80) || ($scheme === 'https' && $port != 443))) { + $retval .= ":{$port}"; + } + + $retval .= $urlParts['path']; + if (!empty($urlParts['query'])) { + $retval .= "?{$urlParts['query']}"; + } + + $normalizedUrl = $this->encode_rfc3986($retval); + $method = $this->encode_rfc3986($method); // don't need this but why not? + + $signatureBaseString = "{$method}&{$normalizedUrl}&{$concatenatedParams}"; + + # sign the signature string + $key = $this->encode_rfc3986( + $this->consumerSecret ? $this->consumerSecret : TWITTER_CONSUMER_SECRET + ) . '&' . $this->encode_rfc3986($this->password); + + return base64_encode(hash_hmac('sha1', $signatureBaseString, $key, true)); + } } diff --git a/lib/Phirehose.php b/lib/Phirehose.php index 79908e8..f5f5445 100644 --- a/lib/Phirehose.php +++ b/lib/Phirehose.php @@ -1,868 +1,941 @@ * @version 1.0RC */ abstract class Phirehose { - - /** - * Class constants - */ - const FORMAT_JSON = 'json'; - const FORMAT_XML = 'xml'; - const METHOD_FILTER = 'filter'; - const METHOD_SAMPLE = 'sample'; - const METHOD_RETWEET = 'retweet'; - const METHOD_FIREHOSE = 'firehose'; - const METHOD_LINKS = 'links'; - const METHOD_USER = 'user'; //See UserstreamPhirehose.php - const METHOD_SITE = 'site'; //See UserstreamPhirehose.php - - const EARTH_RADIUS_KM = 6371; - - /** - * @internal Moved from being a const to a variable, because some methods (user and site) need to change it. - */ - protected $URL_BASE = 'https://stream.twitter.com/1.1/statuses/'; - - - /** - * Member Attribs - */ - protected $username; - protected $password; - protected $method; - protected $format; - protected $count; //Can be -150,000 to 150,000. @see http://dev.twitter.com/pages/streaming_api_methods#count - protected $followIds; - protected $trackWords; - protected $locationBoxes; - protected $conn; - protected $fdrPool; - protected $buff; - // State vars - protected $filterChanged; - protected $reconnect; - - /** - * The number of tweets received per second in previous minute; calculated fresh - * just before each call to statusUpdate() - * I.e. if fewer than 30 tweets in last minute then this will be zero; if 30 to 90 then it - * will be 1, if 90 to 150 then 2, etc. - * - * @var integer - */ - protected $statusRate; - - protected $lastErrorNo; - protected $lastErrorMsg; - - /** - * Number of tweets received. - * - * Note: by default this is the sum for last 60 seconds, and is therefore - * reset every 60 seconds. - * To change this behaviour write a custom statusUpdate() function. - * - * @var integer - */ - protected $statusCount=0; - - /** - * The number of calls to $this->checkFilterPredicates(). - * - * By default it is called every 5 seconds, so if doing statusUpdates every - * 60 seconds and then resetting it, this will usually be 12. - * - * @var integer - */ - protected $filterCheckCount=0; - - /** - * Total number of seconds (fractional) spent in the enqueueStatus() calls (i.e. the customized - * function that handles each received tweet). - * - * @var float - */ - protected $enqueueSpent=0; - - /** - * Total number of seconds (fractional) spent in the checkFilterPredicates() calls - * - * @var float - */ - protected $filterCheckSpent=0; - - /** - * Number of seconds since the last tweet arrived (or the keep-alive newline) - * - * @var integer - */ - protected $idlePeriod=0; - - /** - * The maximum value $this->idlePeriod has reached. - * - * @var integer - */ - protected $maxIdlePeriod=0; - - /** - * Time spent on each call to enqueueStatus() (i.e. average time spent, in milliseconds, - * spent processing received tweet). - * - * Simply: enqueueSpent divided by statusCount - * Note: by default, calculated fresh for past 60 seconds, every 60 seconds. - * - * @var float - */ - protected $enqueueTimeMS=0; - - /** - * Like $enqueueTimeMS but for the checkFilterPredicates() function. - * @var float - */ - protected $filterCheckTimeMS=0; - - /** - * Seconds since the last call to statusUpdate() - * - * Reset to zero after each call to statusUpdate() - * Highest value it should ever reach is $this->avgPeriod - * - * @var integer - */ - protected $avgElapsed=0; - - // Config type vars - override in subclass if desired - protected $connectFailuresMax = 20; - protected $connectTimeout = 5; - protected $readTimeout = 5; - protected $idleReconnectTimeout = 90; - protected $avgPeriod = 60; - protected $status_length_base = 10; - protected $userAgent = 'Phirehose/1.0RC +https://github.com/fennb/phirehose'; - protected $filterCheckMin = 5; - protected $filterUpdMin = 120; - protected $tcpBackoff = 1; - protected $tcpBackoffMax = 16; - protected $httpBackoff = 10; - protected $httpBackoffMax = 240; - protected $hostPort = 80; - protected $secureHostPort = 443; - - /** - * Create a new Phirehose object attached to the appropriate twitter stream method. - * Methods are: METHOD_FIREHOSE, METHOD_RETWEET, METHOD_SAMPLE, METHOD_FILTER, METHOD_LINKS, METHOD_USER, METHOD_SITE. Note: the method might cause the use of a different endpoint URL. - * Formats are: FORMAT_JSON, FORMAT_XML - * @see Phirehose::METHOD_SAMPLE - * @see Phirehose::FORMAT_JSON - * - * @param string $username Any twitter username. When using oAuth, this is the 'oauth_token'. - * @param string $password Any twitter password. When using oAuth this is you oAuth secret. - * @param string $method - * @param string $format - * - * @todo I've kept the "/2/" at the end of the URL for user streams, as that is what - * was there before AND it works for me! But the official docs say to use /1.1/ - * so that is what I have used for site. - * https://dev.twitter.com/docs/api/1.1/get/user - * - * @todo Shouldn't really hard-code URL strings in this function. - */ - public function __construct($username, $password, $method = Phirehose::METHOD_SAMPLE, $format = self::FORMAT_JSON, $lang = FALSE) - { - $this->username = $username; - $this->password = $password; - $this->method = $method; - $this->format = $format; - $this->lang = $lang; - switch($method){ - case self::METHOD_USER:$this->URL_BASE = 'https://userstream.twitter.com/1.1/';break; - case self::METHOD_SITE:$this->URL_BASE = 'https://sitestream.twitter.com/1.1/';break; - default:break; //Stick to the default + /** + * Class constants + */ + const FORMAT_JSON = 'json'; + const FORMAT_XML = 'xml'; + const METHOD_FILTER = 'filter'; + const METHOD_SAMPLE = 'sample'; + const METHOD_RETWEET = 'retweet'; + const METHOD_FIREHOSE = 'firehose'; + const METHOD_LINKS = 'links'; + const METHOD_USER = 'user'; //See UserstreamPhirehose.php + const METHOD_SITE = 'site'; //See UserstreamPhirehose.php + + const EARTH_RADIUS_KM = 6371; + + /** + * @internal Moved from being a const to a variable, because some methods (user and site) need to change it. + */ + protected $URL_BASE = 'https://stream.twitter.com/1.1/statuses/'; + + /** + * Member Attribs + */ + protected $username; + protected $password; + protected $method; + protected $format; + protected $count; //Can be -150,000 to 150,000. @see http://dev.twitter.com/pages/streaming_api_methods#count + protected $followIds; + protected $trackWords; + protected $locationBoxes; + protected $conn; + protected $fdrPool; + protected $buff; + // State vars + protected $filterChanged; + protected $reconnect; + + /** + * The number of tweets received per second in previous minute; calculated fresh + * just before each call to statusUpdate() + * I.e. if fewer than 30 tweets in last minute then this will be zero; if 30 to 90 then it + * will be 1, if 90 to 150 then 2, etc. + * + * @var integer + */ + protected $statusRate; + + protected $lastErrorNo; + protected $lastErrorMsg; + + /** + * Number of tweets received. + * Note: by default this is the sum for last 60 seconds, and is therefore + * reset every 60 seconds. + * To change this behaviour write a custom statusUpdate() function. + * + * @var integer + */ + protected $statusCount = 0; + + /** + * The number of calls to $this->checkFilterPredicates(). + * By default it is called every 5 seconds, so if doing statusUpdates every + * 60 seconds and then resetting it, this will usually be 12. + * + * @var integer + */ + protected $filterCheckCount = 0; + + /** + * Total number of seconds (fractional) spent in the enqueueStatus() calls (i.e. the customized + * function that handles each received tweet). + * + * @var float + */ + protected $enqueueSpent = 0; + + /** + * Total number of seconds (fractional) spent in the checkFilterPredicates() calls + * + * @var float + */ + protected $filterCheckSpent = 0; + + /** + * Number of seconds since the last tweet arrived (or the keep-alive newline) + * + * @var integer + */ + protected $idlePeriod = 0; + + /** + * The maximum value $this->idlePeriod has reached. + * + * @var integer + */ + protected $maxIdlePeriod = 0; + + /** + * Time spent on each call to enqueueStatus() (i.e. average time spent, in milliseconds, + * spent processing received tweet). + * Simply: enqueueSpent divided by statusCount + * Note: by default, calculated fresh for past 60 seconds, every 60 seconds. + * + * @var float + */ + protected $enqueueTimeMS = 0; + + /** + * Like $enqueueTimeMS but for the checkFilterPredicates() function. + * + * @var float + */ + protected $filterCheckTimeMS = 0; + + /** + * Seconds since the last call to statusUpdate() + * Reset to zero after each call to statusUpdate() + * Highest value it should ever reach is $this->avgPeriod + * + * @var integer + */ + protected $avgElapsed = 0; + + // Config type vars - override in subclass if desired + protected $connectFailuresMax = 20; + protected $connectTimeout = 5; + protected $readTimeout = 5; + protected $idleReconnectTimeout = 90; + protected $avgPeriod = 60; + protected $status_length_base = 10; + protected $userAgent = 'Phirehose/1.0RC +https://github.com/fennb/phirehose'; + protected $filterCheckMin = 5; + protected $filterUpdMin = 120; + protected $tcpBackoff = 1; + protected $tcpBackoffMax = 16; + protected $httpBackoff = 10; + protected $httpBackoffMax = 240; + protected $hostPort = 80; + protected $secureHostPort = 443; + + protected $errorTypes = [ + 'error' => E_USER_ERROR, + 'info' => E_USER_WARNING, + 'notice' => E_USER_NOTICE, + ]; + + /** + * Create a new Phirehose object attached to the appropriate twitter stream method. + * Methods are: METHOD_FIREHOSE, METHOD_RETWEET, METHOD_SAMPLE, METHOD_FILTER, METHOD_LINKS, METHOD_USER, METHOD_SITE. Note: the method might cause the use of a different endpoint URL. + * Formats are: FORMAT_JSON, FORMAT_XML + * + * @see Phirehose::METHOD_SAMPLE + * @see Phirehose::FORMAT_JSON + * + * @param string $username Any twitter username. When using oAuth, this is the 'oauth_token'. + * @param string $password Any twitter password. When using oAuth this is you oAuth secret. + * @param string $method + * @param string $format + * @param string $lang + * + * @todo I've kept the "/2/" at the end of the URL for user streams, as that is what + * was there before AND it works for me! But the official docs say to use /1.1/ + * so that is what I have used for site. + * https://dev.twitter.com/docs/api/1.1/get/user + * @todo Shouldn't really hard-code URL strings in this function. + */ + public function __construct( + $username, + $password, + $method = Phirehose::METHOD_SAMPLE, + $format = self::FORMAT_JSON, + $lang = '' + ) { + $this->username = $username; + $this->password = $password; + $this->method = $method; + $this->format = $format; + $this->lang = $lang; + switch ($method) { + case self::METHOD_USER: + $this->URL_BASE = 'https://userstream.twitter.com/1.1/'; + break; + case self::METHOD_SITE: + $this->URL_BASE = 'https://sitestream.twitter.com/1.1/'; + break; + default: + break; //Stick to the default } - } - - /** - * Returns public statuses from or in reply to a set of users. Mentions ("Hello @user!") and implicit replies - * ("@user Hello!" created without pressing the reply button) are not matched. It is up to you to find the integer - * IDs of each twitter user. - * Applies to: METHOD_FILTER - * - * @param array $userIds Array of Twitter integer userIDs - */ - public function setFollow($userIds) - { - $userIds = ($userIds === NULL) ? array() : $userIds; - sort($userIds); // Non-optimal but necessary - if ($this->followIds != $userIds) { - $this->filterChanged = TRUE; - } - $this->followIds = $userIds; - } - - /** - * Returns an array of followed Twitter userIds (integers) - * - * @return array - */ - public function getFollow() - { - return $this->followIds; - } - - /** - * Specifies keywords to track. Track keywords are case-insensitive logical ORs. Terms are exact-matched, ignoring - * punctuation. Phrases, keywords with spaces, are not supported. Queries are subject to Track Limitations. - * Applies to: METHOD_FILTER - * - * See: http://apiwiki.twitter.com/Streaming-API-Documentation#TrackLimiting - * - * @param array $trackWords - */ - public function setTrack(array $trackWords) - { - $trackWords = ($trackWords === NULL) ? array() : $trackWords; - sort($trackWords); // Non-optimal, but necessary - if ($this->trackWords != $trackWords) { - $this->filterChanged = TRUE; } - $this->trackWords = $trackWords; - } - - /** - * Returns an array of keywords being tracked - * - * @return array - */ - public function getTrack() - { - return $this->trackWords; - } - - /** - * Specifies a set of bounding boxes to track as an array of 4 element lon/lat pairs denoting , - * . Only tweets that are both created using the Geotagging API and are placed from within a tracked - * bounding box will be included in the stream. The user's location field is not used to filter tweets. Bounding boxes - * are logical ORs and must be less than or equal to 1 degree per side. A locations parameter may be combined with - * track parameters, but note that all terms are logically ORd. - * - * NOTE: The argument order is Longitude/Latitude (to match the Twitter API and GeoJSON specifications). - * - * Applies to: METHOD_FILTER - * - * See: http://apiwiki.twitter.com/Streaming-API-Documentation#locations - * - * Eg: - * setLocations(array( - * array(-122.75, 36.8, -121.75, 37.8), // San Francisco - * array(-74, 40, -73, 41), // New York - * )); - * - * @param array $boundingBoxes - */ - public function setLocations($boundingBoxes) - { - $boundingBoxes = ($boundingBoxes === NULL) ? array() : $boundingBoxes; - sort($boundingBoxes); // Non-optimal, but necessary - // Flatten to single dimensional array - $locationBoxes = array(); - foreach ($boundingBoxes as $boundingBox) { - // Sanity check - if (count($boundingBox) != 4) { - // Invalid - Not much we can do here but log error - $this->log('Invalid location bounding box: [' . implode(', ', $boundingBox) . ']','error'); - return FALSE; - } - // Append this lat/lon pairs to flattened array - $locationBoxes = array_merge($locationBoxes, $boundingBox); - } - // If it's changed, make note - if ($this->locationBoxes != $locationBoxes) { - $this->filterChanged = TRUE; + + /** + * Returns public statuses from or in reply to a set of users. Mentions ("Hello @user!") and implicit replies + * ("@user Hello!" created without pressing the reply button) are not matched. It is up to you to find the integer + * IDs of each twitter user. + * Applies to: METHOD_FILTER + * + * @param array $userIds Array of Twitter integer userIDs + */ + public function setFollow($userIds) + { + $userIds = ($userIds === null) ? [] : $userIds; + sort($userIds); // Non-optimal but necessary + if ($this->followIds != $userIds) { + $this->filterChanged = true; + } + $this->followIds = $userIds; } - // Set flattened value - $this->locationBoxes = $locationBoxes; - } - - /** - * Returns an array of 4 element arrays that denote the monitored location bounding boxes for tweets using the - * Geotagging API. - * - * @see setLocations() - * @return array - */ - public function getLocations() { - if ($this->locationBoxes == NULL) { - return NULL; + + /** + * Returns an array of followed Twitter userIds (integers) + * + * @return array + */ + public function getFollow() + { + return $this->followIds; } - $locationBoxes = $this->locationBoxes; // Copy array - $ret = array(); - while (count($locationBoxes) >= 4) { - $ret[] = array_splice($locationBoxes, 0, 4); // Append to ret array in blocks of 4 + + /** + * Specifies keywords to track. Track keywords are case-insensitive logical ORs. Terms are exact-matched, ignoring + * punctuation. Phrases, keywords with spaces, are not supported. Queries are subject to Track Limitations. + * Applies to: METHOD_FILTER + * See: http://apiwiki.twitter.com/Streaming-API-Documentation#TrackLimiting + * + * @param array $trackWords + */ + public function setTrack(array $trackWords) + { + $trackWords = ($trackWords === null) ? [] : $trackWords; + sort($trackWords); // Non-optimal, but necessary + if ($this->trackWords != $trackWords) { + $this->filterChanged = true; + } + $this->trackWords = $trackWords; } - return $ret; - } - - /** - * Convenience method that sets location bounding boxes by an array of lon/lat/radius sets, rather than manually - * specified bounding boxes. Each array element should contain 3 element subarray containing a latitude, longitude and - * radius. Radius is specified in kilometers and is approximate (as boxes are square). - * - * NOTE: The argument order is Longitude/Latitude (to match the Twitter API and GeoJSON specifications). - * - * Eg: - * setLocationsByCircle(array( - * array(144.9631, -37.8142, 30), // Melbourne, 3km radius - * array(-0.1262, 51.5001, 25), // London 10km radius - * )); - * - * - * @see setLocations() - * @param array - */ - public function setLocationsByCircle($locations) { - $boundingBoxes = array(); - foreach ($locations as $locTriplet) { - // Sanity check - if (count($locTriplet) != 3) { - // Invalid - Not much we can do here but log error - $this->log('Invalid location triplet for ' . __METHOD__ . ': [' . implode(', ', $locTriplet) . ']','error'); - return FALSE; - } - list($lon, $lat, $radius) = $locTriplet; - - // Calc bounding boxes - $maxLat = round($lat + rad2deg($radius / self::EARTH_RADIUS_KM), 2); - $minLat = round($lat - rad2deg($radius / self::EARTH_RADIUS_KM), 2); - // Compensate for degrees longitude getting smaller with increasing latitude - $maxLon = round($lon + rad2deg($radius / self::EARTH_RADIUS_KM / cos(deg2rad($lat))), 2); - $minLon = round($lon - rad2deg($radius / self::EARTH_RADIUS_KM / cos(deg2rad($lat))), 2); - // Add to bounding box array - $boundingBoxes[] = array($minLon, $minLat, $maxLon, $maxLat); - // Debugging is handy - $this->log('Resolved location circle [' . $lon . ', ' . $lat . ', r: ' . $radius . '] -> bbox: [' . $minLon . - ', ' . $minLat . ', ' . $maxLon . ', ' . $maxLat . ']'); + + /** + * Returns an array of keywords being tracked + * + * @return array + */ + public function getTrack() + { + return $this->trackWords; } - // Set by bounding boxes - $this->setLocations($boundingBoxes); - } - - /** - * Sets the number of previous statuses to stream before transitioning to the live stream. Applies only to firehose - * and filter + track methods. This is generally used internally and should not be needed by client applications. - * Applies to: METHOD_FILTER, METHOD_FIREHOSE, METHOD_LINKS - * - * @param integer $count - */ - public function setCount($count) - { - $this->count = $count; - } - - /** - * Restricts tweets to the given language, given by an ISO 639-1 code (http://en.wikipedia.org/wiki/List_of_ISO_639-1_codes). - * - * @param string $lang - */ - public function setLang($lang) - { - $this->lang = $lang; - } - - /** - * Returns the ISO 639-1 code formatted language string of the current setting. (http://en.wikipedia.org/wiki/List_of_ISO_639-1_codes). - * - * @param string $lang - */ - public function getLang() - { - return $this->lang; - } - - /** - * Connects to the stream API and consumes the stream. Each status update in the stream will cause a call to the - * handleStatus() method. - * - * Note: in normal use this function does not return. - * If you pass $reconnect as false, it will still not return in normal use: it will only return - * if the remote side (Twitter) close the socket. (Or the socket dies for some other external reason.) - * - * @see handleStatus() - * @param boolean $reconnect Reconnects as per recommended - * @throws ErrorException - */ - public function consume($reconnect = TRUE) - { - // Persist connection? - $this->reconnect = $reconnect; - - // Loop indefinitely based on reconnect - do { - - // (Re)connect - $this->reconnect(); - - // Init state - $lastAverage = $lastFilterCheck = $lastFilterUpd = $lastStreamActivity = time(); - $fdw = $fde = NULL; // Placeholder write/error file descriptors for stream_select - - // We use a blocking-select with timeout, to allow us to continue processing on idle streams - //TODO: there is a bug lurking here. If $this->conn is fine, but $numChanged returns zero, because readTimeout was - // reached, then we should consider we still need to call statusUpdate() every 60 seconds, etc. - // ($this->readTimeout is 5 seconds.) This can be quite annoying. E.g. Been getting data regularly for 55 seconds, - // then it goes quiet for just 10 or so seconds. It is now 65 seconds since last call to statusUpdate() has been - // called, which might mean a monitoring system kills the script assuming it has died. - while ($this->conn !== NULL && !feof($this->conn) && - ($numChanged = stream_select($this->fdrPool, $fdw, $fde, $this->readTimeout)) !== FALSE) { - /* Unfortunately, we need to do a safety check for dead twitter streams - This seems to be able to happen where - * you end up with a valid connection, but NO tweets coming along the wire (or keep alives). The below guards - * against this. - */ - if ((time() - $lastStreamActivity) > $this->idleReconnectTimeout) { - $this->log('Idle timeout: No stream activity for > ' . $this->idleReconnectTimeout . ' seconds. ' . - ' Reconnecting.','info'); - $this->reconnect(); - $lastStreamActivity = time(); - continue; + + /** + * Returns an array of 4 element arrays that denote the monitored location bounding boxes for tweets using the + * Geotagging API. + * + * @see setLocations() + * @return array + */ + public function getLocations() + { + if ($this->locationBoxes == null) { + return; } - // Process stream/buffer - $this->fdrPool = array($this->conn); // Must reassign for stream_select() - - //Get a full HTTP chunk. - //NB. This is a tight loop, not using stream_select. - //NB. If that causes problems, then perhaps put something to give up after say trying for 10 seconds? (but - // the stream will be all messed up, so will need to do a reconnect). - $chunk_info=trim(fgets($this->conn)); //First line is hex digits giving us the length - if($chunk_info=='')continue; //Usually indicates a time-out. If we wanted to be sure, - //then stream_get_meta_data($this->conn)['timed_out']==1. (We could instead - // look at the 'eof' member, which appears to be boolean false if just a time-out.) - //TODO: need to consider calling statusUpdate() every 60 seconds, etc. - - // Track maximum idle period - // (We got start of an HTTP chunk, this is stream activity) - $this->idlePeriod = (time() - $lastStreamActivity); - $this->maxIdlePeriod = ($this->idlePeriod > $this->maxIdlePeriod) ? $this->idlePeriod : $this->maxIdlePeriod; - $lastStreamActivity = time(); - - //Append one HTTP chunk to $this->buff - $len=hexdec($chunk_info); //$len includes the \r\n at the end of the chunk (despite what wikipedia says) - //TODO: could do a check for data corruption here. E.g. if($len>100000){...} - $s=''; - $len+=2; //For the \r\n at the end of the chunk - while(!feof($this->conn)){ - $s.=fread($this->conn,$len-strlen($s)); - if(strlen($s)>=$len)break; //TODO: Can never be >$len, only ==$len?? - } - $this->buff.=substr($s,0,-2); //This is our HTTP chunk - - //Process each full tweet inside $this->buff - while(1){ - $eol = strpos($this->buff,"\r\n"); //Find next line ending - if($eol===0) { // if 0, then buffer starts with "\r\n", so trim it and loop again - $this->buff = substr($this->buff,$eol+2); // remove the "\r\n" from line start - continue; // loop again - } - if($eol===false)break; //Time to get more data - $enqueueStart = microtime(TRUE); - $this->enqueueStatus(substr($this->buff,0,$eol)); - $this->enqueueSpent += (microtime(TRUE) - $enqueueStart); - $this->statusCount++; - $this->buff = substr($this->buff,$eol+2); //+2 to allow for the \r\n + $locationBoxes = $this->locationBoxes; // Copy array + $ret = []; + while (count($locationBoxes) >= 4) { + $ret[] = array_splice($locationBoxes, 0, 4); // Append to ret array in blocks of 4 } - //NOTE: if $this->buff is not empty, it is tempting to go round and get the next HTTP chunk, as - // we know there is data on the incoming stream. However, this could mean the below functions (heartbeat - // and statusUpdate) *never* get called, which would be bad. - - // Calc counter averages - $this->avgElapsed = time() - $lastAverage; - if ($this->avgElapsed >= $this->avgPeriod) { - $this->statusRate = round($this->statusCount / $this->avgElapsed, 0); // Calc tweets-per-second - // Calc time spent per enqueue in ms - $this->enqueueTimeMS = ($this->statusCount > 0) ? - round($this->enqueueSpent / $this->statusCount * 1000, 2) : 0; - // Calc time spent total in filter predicate checking - $this->filterCheckTimeMS = ($this->filterCheckCount > 0) ? - round($this->filterCheckSpent / $this->filterCheckCount * 1000, 2) : 0; - - $this->heartbeat(); - $this->statusUpdate(); - $lastAverage = time(); + return $ret; + } + + /** + * Convenience method that sets location bounding boxes by an array of lon/lat/radius sets, rather than manually + * specified bounding boxes. Each array element should contain 3 element subarray containing a latitude, longitude and + * radius. Radius is specified in kilometers and is approximate (as boxes are square). + * NOTE: The argument order is Longitude/Latitude (to match the Twitter API and GeoJSON specifications). + * Eg: + * setLocationsByCircle(array( + * array(144.9631, -37.8142, 30), // Melbourne, 3km radius + * array(-0.1262, 51.5001, 25), // London 10km radius + * )); + * + * @see setLocations() + * + * @param array + * + * @return bool + */ + public function setLocationsByCircle($locations) + { + $boundingBoxes = []; + foreach ($locations as $locTriplet) { + // Sanity check + if (count($locTriplet) != 3) { + // Invalid - Not much we can do here but log error + $this->log( + 'Invalid location triplet for ' . __METHOD__ . ': [' . implode(', ', $locTriplet) . ']', + 'error' + ); + + return false; + } + list($lon, $lat, $radius) = $locTriplet; + + // Calc bounding boxes + $maxLat = round($lat + rad2deg($radius / self::EARTH_RADIUS_KM), 2); + $minLat = round($lat - rad2deg($radius / self::EARTH_RADIUS_KM), 2); + // Compensate for degrees longitude getting smaller with increasing latitude + $maxLon = round($lon + rad2deg($radius / self::EARTH_RADIUS_KM / cos(deg2rad($lat))), 2); + $minLon = round($lon - rad2deg($radius / self::EARTH_RADIUS_KM / cos(deg2rad($lat))), 2); + // Add to bounding box array + $boundingBoxes[] = [$minLon, $minLat, $maxLon, $maxLat]; + // Debugging is handy + $this->log( + 'Resolved location circle [' . $lon . ', ' . $lat . ', r: ' . $radius . '] -> bbox: [' . $minLon . + ', ' . $minLat . ', ' . $maxLon . ', ' . $maxLat . ']' + ); } - // Check if we're ready to check filter predicates - if ($this->method == self::METHOD_FILTER && (time() - $lastFilterCheck) >= $this->filterCheckMin) { - $this->filterCheckCount++; - $lastFilterCheck = time(); - $filterCheckStart = microtime(TRUE); - $this->checkFilterPredicates(); // This should be implemented in subclass if required - $this->filterCheckSpent += (microtime(TRUE) - $filterCheckStart); + // Set by bounding boxes + $this->setLocations($boundingBoxes); + } + + /** + * Basic log function that outputs logging to the standard error_log() handler. This should generally be overridden + * to suit the application environment. + * + * @see error_log() + * + * @param string $message + * @param string $level 'error', 'info', 'notice'. Defaults to 'notice', so you should set this + * parameter on the more important error messages. + * 'info' is used for problems that the class should be able to recover from automatically. + * 'error' is for exceptional conditions that may need human intervention. (For instance, emailing + * them to a system administrator may make sense.) + */ + protected function log($message, $level = 'notice') + { + @trigger_error('Phirehose: ' . $message, $this->errorTypes[$level]); + } + + /** + * Specifies a set of bounding boxes to track as an array of 4 element lon/lat pairs denoting , + * . Only tweets that are both created using the Geotagging API and are placed from within a tracked + * bounding box will be included in the stream. The user's location field is not used to filter tweets. Bounding boxes + * are logical ORs and must be less than or equal to 1 degree per side. A locations parameter may be combined with + * track parameters, but note that all terms are logically ORd. + * NOTE: The argument order is Longitude/Latitude (to match the Twitter API and GeoJSON specifications). + * Applies to: METHOD_FILTER + * See: http://apiwiki.twitter.com/Streaming-API-Documentation#locations + * Eg: + * setLocations(array( + * array(-122.75, 36.8, -121.75, 37.8), // San Francisco + * array(-74, 40, -73, 41), // New York + * )); + * + * @param array $boundingBoxes + * + * @return bool + */ + public function setLocations($boundingBoxes) + { + $boundingBoxes = ($boundingBoxes === null) ? [] : $boundingBoxes; + sort($boundingBoxes); // Non-optimal, but necessary + // Flatten to single dimensional array + $locationBoxes = []; + foreach ($boundingBoxes as $boundingBox) { + // Sanity check + if (count($boundingBox) != 4) { + // Invalid - Not much we can do here but log error + $this->log('Invalid location bounding box: [' . implode(', ', $boundingBox) . ']', 'error'); + + return false; + } + // Append this lat/lon pairs to flattened array + $locationBoxes = array_merge($locationBoxes, $boundingBox); } - // Check if filter is ready + allowed to be updated (reconnect) - if ($this->filterChanged == TRUE && (time() - $lastFilterUpd) >= $this->filterUpdMin) { - $this->log('Reconnecting due to changed filter predicates.','info'); - $this->reconnect(); - $lastFilterUpd = time(); + // If it's changed, make note + if ($this->locationBoxes != $locationBoxes) { + $this->filterChanged = true; } - - } // End while-stream-activity - - if (function_exists('pcntl_signal_dispatch')) { - pcntl_signal_dispatch(); - } - - // Some sort of socket error has occured - $this->lastErrorNo = is_resource($this->conn) ? @socket_last_error($this->conn) : NULL; - $this->lastErrorMsg = ($this->lastErrorNo > 0) ? @socket_strerror($this->lastErrorNo) : 'Socket disconnected'; - $this->log('Phirehose connection error occured: ' . $this->lastErrorMsg,'error'); - - // Reconnect - } while ($this->reconnect); - - // Exit - $this->log('Exiting.'); - - } - - - /** - * Called every $this->avgPeriod (default=60) seconds, and this default implementation - * calculates some rates, logs them, and resets the counters. - */ - protected function statusUpdate() - { - $this->log('Consume rate: ' . $this->statusRate . ' status/sec (' . $this->statusCount . ' total), avg ' . - 'enqueueStatus(): ' . $this->enqueueTimeMS . 'ms, avg checkFilterPredicates(): ' . $this->filterCheckTimeMS . 'ms (' . - $this->filterCheckCount . ' total) over ' . $this->avgElapsed . ' seconds, max stream idle period: ' . - $this->maxIdlePeriod . ' seconds.'); - // Reset - $this->statusCount = $this->filterCheckCount = $this->enqueueSpent = 0; - $this->filterCheckSpent = $this->idlePeriod = $this->maxIdlePeriod = 0; - } - - /** - * Returns the last error message (TCP or HTTP) that occured with the streaming API or client. State is cleared upon - * successful reconnect - * @return string - */ - public function getLastErrorMsg() - { - return $this->lastErrorMsg; - } - - /** - * Returns the last error number that occured with the streaming API or client. Numbers correspond to either the - * fsockopen() error states (in the case of TCP errors) or HTTP error codes from Twitter (in the case of HTTP errors). - * - * State is cleared upon successful reconnect. - * - * @return string - */ - public function getLastErrorNo() - { - return $this->lastErrorNo; - } - - - /** - * Connects to the stream URL using the configured method. - * @throws ErrorException - */ - protected function connect() - { - - // Init state - $connectFailures = 0; - $tcpRetry = $this->tcpBackoff / 2; - $httpRetry = $this->httpBackoff / 2; - - // Keep trying until connected (or max connect failures exceeded) - do { - - // Check filter predicates for every connect (for filter method) - if ($this->method == self::METHOD_FILTER) { - $this->checkFilterPredicates(); - } - - // Construct URL/HTTP bits - $url = $this->URL_BASE . $this->method . '.' . $this->format; - $urlParts = parse_url($url); - - // Setup params appropriately - $requestParams=array(); - - //$requestParams['delimited'] = 'length'; //No, we don't want this any more - - // Setup the language of the stream - if($this->lang) { - $requestParams['language'] = $this->lang; - } - - // Filter takes additional parameters - if (($this->method == self::METHOD_FILTER || $this->method == self::METHOD_USER) && count($this->trackWords) > 0) { - $requestParams['track'] = implode(',', $this->trackWords); - } - if ( ($this->method == self::METHOD_FILTER || $this->method == self::METHOD_SITE) - && count($this->followIds) > 0) { - $requestParams['follow'] = implode(',', $this->followIds); - } - if ($this->method == self::METHOD_FILTER && count($this->locationBoxes) > 0) { - $requestParams['locations'] = implode(',', $this->locationBoxes); - } - if ($this->count <> 0) { - $requestParams['count'] = $this->count; - } - - // Debugging is useful - $this->log('Connecting to twitter stream: ' . $url . ' with params: ' . str_replace("\n", '', - var_export($requestParams, TRUE))); - - /** - * Open socket connection to make POST request. It'd be nice to use stream_context_create with the native - * HTTP transport but it hides/abstracts too many required bits (like HTTP error responses). - */ - $errNo = $errStr = NULL; - $scheme = ($urlParts['scheme'] == 'https') ? 'ssl://' : 'tcp://'; - $port = ($urlParts['scheme'] == 'https') ? $this->secureHostPort : $this->hostPort; - - $this->log("Connecting to {$scheme}{$urlParts['host']}, port={$port}, connectTimeout={$this->connectTimeout}"); - - @$this->conn = fsockopen($scheme . $urlParts['host'], $port, $errNo, $errStr, $this->connectTimeout); - - // No go - handle errors/backoff - if (!$this->conn || !is_resource($this->conn)) { - $this->lastErrorMsg = $errStr; - $this->lastErrorNo = $errNo; - $connectFailures++; - if ($connectFailures > $this->connectFailuresMax) { - $msg = 'TCP failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; - $this->log($msg,'error'); - throw new PhirehoseConnectLimitExceeded($msg, $errNo); // Throw an exception for other code to handle + // Set flattened value + $this->locationBoxes = $locationBoxes; + } + + /** + * Sets the number of previous statuses to stream before transitioning to the live stream. Applies only to firehose + * and filter + track methods. This is generally used internally and should not be needed by client applications. + * Applies to: METHOD_FILTER, METHOD_FIREHOSE, METHOD_LINKS + * + * @param integer $count + */ + public function setCount($count) + { + $this->count = $count; + } + + /** + * Restricts tweets to the given language, given by an ISO 639-1 code (http://en.wikipedia.org/wiki/List_of_ISO_639-1_codes). + * + * @param string $lang + */ + public function setLang($lang) + { + $this->lang = $lang; + } + + /** + * Returns the ISO 639-1 code formatted language string of the current setting. + * + * @see http://en.wikipedia.org/wiki/List_of_ISO_639-1_codes + */ + public function getLang() + { + return $this->lang; + } + + /** + * Connects to the stream API and consumes the stream. Each status update in the stream will cause a call to the + * handleStatus() method. + * Note: in normal use this function does not return. + * If you pass $reconnect as false, it will still not return in normal use: it will only return + * if the remote side (Twitter) close the socket. (Or the socket dies for some other external reason.) + * + * @see handleStatus() + * + * @param boolean $reconnect Reconnects as per recommended + * + * @throws ErrorException + */ + public function consume($reconnect = true) + { + // Persist connection? + $this->reconnect = $reconnect; + + // Loop indefinitely based on reconnect + do { + + // (Re)connect + $this->reconnect(); + + // Init state + $lastAverage = $lastFilterCheck = $lastFilterUpd = $lastStreamActivity = time(); + $fdw = $fde = null; // Placeholder write/error file descriptors for stream_select + + // We use a blocking-select with timeout, to allow us to continue processing on idle streams + //TODO: there is a bug lurking here. If $this->conn is fine, but $numChanged returns zero, because readTimeout was + // reached, then we should consider we still need to call statusUpdate() every 60 seconds, etc. + // ($this->readTimeout is 5 seconds.) This can be quite annoying. E.g. Been getting data regularly for 55 seconds, + // then it goes quiet for just 10 or so seconds. It is now 65 seconds since last call to statusUpdate() has been + // called, which might mean a monitoring system kills the script assuming it has died. + while ($this->conn !== null && !feof($this->conn) && + ($numChanged = stream_select($this->fdrPool, $fdw, $fde, $this->readTimeout)) !== false) { + /* Unfortunately, we need to do a safety check for dead twitter streams - This seems to be able to happen where + * you end up with a valid connection, but NO tweets coming along the wire (or keep alives). The below guards + * against this. + */ + if ((time() - $lastStreamActivity) > $this->idleReconnectTimeout) { + $this->log( + 'Idle timeout: No stream activity for > ' . $this->idleReconnectTimeout . ' seconds. ' . + ' Reconnecting.', + 'info' + ); + $this->reconnect(); + $lastStreamActivity = time(); + continue; + } + // Process stream/buffer + $this->fdrPool = [$this->conn]; // Must reassign for stream_select() + + //Get a full HTTP chunk. + //NB. This is a tight loop, not using stream_select. + //NB. If that causes problems, then perhaps put something to give up after say trying for 10 seconds? (but + // the stream will be all messed up, so will need to do a reconnect). + $chunk_info = trim(fgets($this->conn)); //First line is hex digits giving us the length + if ($chunk_info == '') { + continue; + } //Usually indicates a time-out. If we wanted to be sure, + //then stream_get_meta_data($this->conn)['timed_out']==1. (We could instead + // look at the 'eof' member, which appears to be boolean false if just a time-out.) + //TODO: need to consider calling statusUpdate() every 60 seconds, etc. + + // Track maximum idle period + // (We got start of an HTTP chunk, this is stream activity) + $this->idlePeriod = (time() - $lastStreamActivity); + $this->maxIdlePeriod = ($this->idlePeriod > $this->maxIdlePeriod) ? $this->idlePeriod : $this->maxIdlePeriod; + $lastStreamActivity = time(); + + //Append one HTTP chunk to $this->buff + $len = hexdec( + $chunk_info + ); //$len includes the \r\n at the end of the chunk (despite what wikipedia says) + //TODO: could do a check for data corruption here. E.g. if($len>100000){...} + $s = ''; + $len += 2; //For the \r\n at the end of the chunk + while (!feof($this->conn)) { + $s .= fread($this->conn, $len - strlen($s)); + if (strlen($s) >= $len) { + break; + } //TODO: Can never be >$len, only ==$len?? + } + $this->buff .= substr($s, 0, -2); //This is our HTTP chunk + + //Process each full tweet inside $this->buff + while (1) { + $eol = strpos($this->buff, "\r\n"); //Find next line ending + if ($eol === 0) { // if 0, then buffer starts with "\r\n", so trim it and loop again + $this->buff = substr($this->buff, $eol + 2); // remove the "\r\n" from line start + continue; // loop again + } + if ($eol === false) { + break; + } //Time to get more data + $enqueueStart = microtime(true); + $this->enqueueStatus(substr($this->buff, 0, $eol)); + $this->enqueueSpent += (microtime(true) - $enqueueStart); + $this->statusCount++; + $this->buff = substr($this->buff, $eol + 2); //+2 to allow for the \r\n + } + + //NOTE: if $this->buff is not empty, it is tempting to go round and get the next HTTP chunk, as + // we know there is data on the incoming stream. However, this could mean the below functions (heartbeat + // and statusUpdate) *never* get called, which would be bad. + + // Calc counter averages + $this->avgElapsed = time() - $lastAverage; + if ($this->avgElapsed >= $this->avgPeriod) { + $this->statusRate = round( + $this->statusCount / $this->avgElapsed, + 0 + ); // Calc tweets-per-second + // Calc time spent per enqueue in ms + $this->enqueueTimeMS = ($this->statusCount > 0) ? + round($this->enqueueSpent / $this->statusCount * 1000, 2) : 0; + // Calc time spent total in filter predicate checking + $this->filterCheckTimeMS = ($this->filterCheckCount > 0) ? + round($this->filterCheckSpent / $this->filterCheckCount * 1000, 2) : 0; + + $this->heartbeat(); + $this->statusUpdate(); + $lastAverage = time(); + } + // Check if we're ready to check filter predicates + if ($this->method == self::METHOD_FILTER && (time() - $lastFilterCheck) >= $this->filterCheckMin) { + $this->filterCheckCount++; + $lastFilterCheck = time(); + $filterCheckStart = microtime(true); + $this->checkFilterPredicates(); // This should be implemented in subclass if required + $this->filterCheckSpent += (microtime(true) - $filterCheckStart); + } + // Check if filter is ready + allowed to be updated (reconnect) + if ($this->filterChanged == true && (time() - $lastFilterUpd) >= $this->filterUpdMin) { + $this->log('Reconnecting due to changed filter predicates.', 'info'); + $this->reconnect(); + $lastFilterUpd = time(); + } + } // End while-stream-activity + + if (function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + + // Some sort of socket error has occured + $this->lastErrorNo = is_resource($this->conn) ? @socket_last_error($this->conn) : null; + $this->lastErrorMsg = ($this->lastErrorNo > 0) ? @socket_strerror( + $this->lastErrorNo + ) : 'Socket disconnected'; + $this->log('Phirehose connection error occured: ' . $this->lastErrorMsg, 'error'); + + // Reconnect + } while ($this->reconnect); + + // Exit + $this->log('Exiting.'); + } + + /** + * Reconnects as quickly as possible. Should be called whenever a reconnect is required rather that connect/disconnect + * to preserve streams reconnect state + */ + private function reconnect() + { + $reconnect = $this->reconnect; + $this->disconnect(); // Implicitly sets reconnect to FALSE + $this->reconnect = $reconnect; // Restore state to prev + $this->connect(); + } + + /** + * Performs forcible disconnect from stream (if connected) and cleanup. + */ + protected function disconnect() + { + if (is_resource($this->conn)) { + $this->log('Closing Phirehose connection.'); + fclose($this->conn); } - // Increase retry/backoff up to max - $tcpRetry = ($tcpRetry < $this->tcpBackoffMax) ? $tcpRetry * 2 : $this->tcpBackoffMax; - $this->log('TCP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . - $errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry . ' seconds.','info'); - sleep($tcpRetry); - continue; - } - - // TCP connect OK, clear last error (if present) - $this->log('Connection established to ' . $urlParts['host']); - $this->lastErrorMsg = NULL; - $this->lastErrorNo = NULL; - - // If we have a socket connection, we can attempt a HTTP request - Ensure blocking read for the moment - stream_set_blocking($this->conn, 1); - - // Encode request data - $postData = http_build_query($requestParams, NULL, '&'); - $postData = str_replace('+','%20',$postData); //Change it from RFC1738 to RFC3986 (see + $this->conn = null; + $this->reconnect = false; + } + + /** + * Connects to the stream URL using the configured method. + * + * @throws ErrorException + */ + protected function connect() + { + + // Init state + $connectFailures = 0; + $tcpRetry = $this->tcpBackoff / 2; + $httpRetry = $this->httpBackoff / 2; + + // Keep trying until connected (or max connect failures exceeded) + do { + + // Check filter predicates for every connect (for filter method) + if ($this->method == self::METHOD_FILTER) { + $this->checkFilterPredicates(); + } + + // Construct URL/HTTP bits + $url = $this->URL_BASE . $this->method . '.' . $this->format; + $urlParts = parse_url($url); + + // Setup params appropriately + $requestParams = []; + + //$requestParams['delimited'] = 'length'; //No, we don't want this any more + + // Setup the language of the stream + if ($this->lang) { + $requestParams['language'] = $this->lang; + } + + // Filter takes additional parameters + if (($this->method == self::METHOD_FILTER || $this->method == self::METHOD_USER) && count( + $this->trackWords + ) > 0 + ) { + $requestParams['track'] = implode(',', $this->trackWords); + } + if (($this->method == self::METHOD_FILTER || $this->method == self::METHOD_SITE) + && count($this->followIds) > 0 + ) { + $requestParams['follow'] = implode(',', $this->followIds); + } + if ($this->method == self::METHOD_FILTER && count($this->locationBoxes) > 0) { + $requestParams['locations'] = implode(',', $this->locationBoxes); + } + if ($this->count <> 0) { + $requestParams['count'] = $this->count; + } + + // Debugging is useful + $this->log( + 'Connecting to twitter stream: ' . $url . ' with params: ' . str_replace( + "\n", + '', + var_export($requestParams, true) + ) + ); + + /** + * Open socket connection to make POST request. It'd be nice to use stream_context_create with the native + * HTTP transport but it hides/abstracts too many required bits (like HTTP error responses). + */ + $errNo = $errStr = null; + $scheme = ($urlParts['scheme'] == 'https') ? 'ssl://' : 'tcp://'; + $port = ($urlParts['scheme'] == 'https') ? $this->secureHostPort : $this->hostPort; + + $this->log( + "Connecting to {$scheme}{$urlParts['host']}, port={$port}, connectTimeout={$this->connectTimeout}" + ); + + @$this->conn = fsockopen($scheme . $urlParts['host'], $port, $errNo, $errStr, $this->connectTimeout); + + // No go - handle errors/backoff + if (!$this->conn || !is_resource($this->conn)) { + $this->lastErrorMsg = $errStr; + $this->lastErrorNo = $errNo; + $connectFailures++; + if ($connectFailures > $this->connectFailuresMax) { + $msg = 'TCP failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; + $this->log($msg, 'error'); + throw new PhirehoseConnectLimitExceeded( + $msg, $errNo + ); // Throw an exception for other code to handle + } + // Increase retry/backoff up to max + $tcpRetry = ($tcpRetry < $this->tcpBackoffMax) ? $tcpRetry * 2 : $this->tcpBackoffMax; + $this->log( + 'TCP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . + $errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry . ' seconds.', + 'info' + ); + sleep($tcpRetry); + continue; + } + + // TCP connect OK, clear last error (if present) + $this->log('Connection established to ' . $urlParts['host']); + $this->lastErrorMsg = null; + $this->lastErrorNo = null; + + // If we have a socket connection, we can attempt a HTTP request - Ensure blocking read for the moment + stream_set_blocking($this->conn, 1); + + // Encode request data + $postData = http_build_query($requestParams, null, '&'); + $postData = str_replace('+', '%20', $postData); //Change it from RFC1738 to RFC3986 (see //enc_type parameter in http://php.net/http_build_query and note that enc_type is //not available as of php 5.3) - $authCredentials = $this->getAuthorizationHeader($url,$requestParams); - - // Do it - $s = "POST " . $urlParts['path'] . " HTTP/1.1\r\n"; - $s.= "Host: " . $urlParts['host'] . ':' . $port . "\r\n"; - $s .= "Connection: Close\r\n"; - $s.= "Content-type: application/x-www-form-urlencoded\r\n"; - $s.= "Content-length: " . strlen($postData) . "\r\n"; - $s.= "Accept: */*\r\n"; - $s.= 'Authorization: ' . $authCredentials . "\r\n"; - $s.= 'User-Agent: ' . $this->userAgent . "\r\n"; - $s.= "\r\n"; - $s.= $postData . "\r\n"; - $s.= "\r\n"; - - fwrite($this->conn, $s); - $this->log($s); - - // First line is response - list($httpVer, $httpCode, $httpMessage) = preg_split('/\s+/', trim(fgets($this->conn, 1024)), 3); - - // Response buffers - $respHeaders = $respBody = ''; - $isChunking = false; - - // Consume each header response line until we get to body - while ($hLine = trim(fgets($this->conn, 4096))) { - $respHeaders .= $hLine."\n"; - if(strtolower($hLine) == 'transfer-encoding: chunked') $isChunking = true; - } - - // If we got a non-200 response, we need to backoff and retry - if ($httpCode != 200) { - $connectFailures++; - - // Twitter will disconnect on error, but we want to consume the rest of the response body (which is useful) - //TODO: this might be chunked too? In which case this contains some bad characters?? - while ($bLine = trim(fgets($this->conn, 4096))) { - $respBody .= $bLine; - } - - // Construct error - $errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage . ' (' . $respBody . ')'; - - // Set last error state - $this->lastErrorMsg = $errStr; - $this->lastErrorNo = $httpCode; - - // Have we exceeded maximum failures? - if ($connectFailures > $this->connectFailuresMax) { - $msg = 'Connection failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; - $this->log($msg,'error'); - throw new PhirehoseConnectLimitExceeded($msg, $httpCode); // We eventually throw an exception for other code to handle - } - // Increase retry/backoff up to max - $httpRetry = ($httpRetry < $this->httpBackoffMax) ? $httpRetry * 2 : $this->httpBackoffMax; - $this->log('HTTP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . - $errStr . '. Sleeping for ' . $httpRetry . ' seconds.','info'); - sleep($httpRetry); - continue; - - } // End if not http 200 - else{ - if(!$isChunking)throw new Exception("Twitter did not send a chunking header. Is this really HTTP/1.1? Here are headers:\n$respHeaders"); //TODO: rather crude! - } - - // Loop until connected OK - } while (!is_resource($this->conn) || $httpCode != 200); - - // Connected OK, reset connect failures - $connectFailures = 0; - $this->lastErrorMsg = NULL; - $this->lastErrorNo = NULL; - - // Switch to non-blocking to consume the stream (important) - stream_set_blocking($this->conn, 0); - - // Connect always causes the filterChanged status to be cleared - $this->filterChanged = FALSE; - - // Flush stream buffer & (re)assign fdrPool (for reconnect) - $this->fdrPool = array($this->conn); - $this->buff = ''; - - } - - protected function getAuthorizationHeader($url,$requestParams) - { - throw new Exception("Basic auth no longer works with Twitter. You must derive from OauthPhirehose, not directly from the Phirehose class."); - $authCredentials = base64_encode($this->username . ':' . $this->password); - return "Basic: ".$authCredentials; - } - - /** - * Method called as frequently as practical (every 5+ seconds) that is responsible for checking if filter predicates - * (ie: track words or follow IDs) have changed. If they have, they should be set using the setTrack() and setFollow() - * methods respectively within the overridden implementation. - * - * Note that even if predicates are changed every 5 seconds, an actual reconnect will not happen more frequently than - * every 2 minutes (as per Twitter Streaming API documentation). - * - * Note also that this method is called upon every connect attempt, so if your predicates are causing connection - * errors, they should be checked here and corrected. - * - * This should be implemented/overridden in any subclass implementing the FILTER method. - * - * @see setTrack() - * @see setFollow() - * @see Phirehose::METHOD_FILTER - */ - protected function checkFilterPredicates() - { - // Override in subclass - } - - /** - * Basic log function that outputs logging to the standard error_log() handler. This should generally be overridden - * to suit the application environment. - * - * @see error_log() - * @param string $messages - * @param String $level 'error', 'info', 'notice'. Defaults to 'notice', so you should set this - * parameter on the more important error messages. - * 'info' is used for problems that the class should be able to recover from automatically. - * 'error' is for exceptional conditions that may need human intervention. (For instance, emailing - * them to a system administrator may make sense.) - */ - protected function log($message,$level='notice') - { - @error_log('Phirehose: ' . $message, 0); - } - - /** - * Performs forcible disconnect from stream (if connected) and cleanup. - */ - protected function disconnect() - { - if (is_resource($this->conn)) { - $this->log('Closing Phirehose connection.'); - fclose($this->conn); + $authCredentials = $this->getAuthorizationHeader($url, $requestParams); + + // Do it + $s = "POST " . $urlParts['path'] . " HTTP/1.1\r\n"; + $s .= "Host: " . $urlParts['host'] . ':' . $port . "\r\n"; + $s .= "Connection: Close\r\n"; + $s .= "Content-type: application/x-www-form-urlencoded\r\n"; + $s .= "Content-length: " . strlen($postData) . "\r\n"; + $s .= "Accept: */*\r\n"; + $s .= 'Authorization: ' . $authCredentials . "\r\n"; + $s .= 'User-Agent: ' . $this->userAgent . "\r\n"; + $s .= "\r\n"; + $s .= $postData . "\r\n"; + $s .= "\r\n"; + + fwrite($this->conn, $s); + $this->log($s); + + // First line is response + list($httpVer, $httpCode, $httpMessage) = preg_split('/\s+/', trim(fgets($this->conn, 1024)), 3); + + // Response buffers + $respHeaders = $respBody = ''; + $isChunking = false; + + // Consume each header response line until we get to body + while ($hLine = trim(fgets($this->conn, 4096))) { + $respHeaders .= $hLine . "\n"; + if (strtolower($hLine) == 'transfer-encoding: chunked') { + $isChunking = true; + } + } + + // If we got a non-200 response, we need to backoff and retry + if ($httpCode != 200) { + $connectFailures++; + + // Twitter will disconnect on error, but we want to consume the rest of the response body (which is useful) + //TODO: this might be chunked too? In which case this contains some bad characters?? + while ($bLine = trim(fgets($this->conn, 4096))) { + $respBody .= $bLine; + } + + // Construct error + $errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage . ' (' . $respBody . ')'; + + // Set last error state + $this->lastErrorMsg = $errStr; + $this->lastErrorNo = $httpCode; + + // Have we exceeded maximum failures? + if ($connectFailures > $this->connectFailuresMax) { + $msg = 'Connection failure limit exceeded with ' . $connectFailures . ' failures. Last error: ' . $errStr; + $this->log($msg, 'error'); + throw new PhirehoseConnectLimitExceeded( + $msg, $httpCode + ); // We eventually throw an exception for other code to handle + } + // Increase retry/backoff up to max + $httpRetry = ($httpRetry < $this->httpBackoffMax) ? $httpRetry * 2 : $this->httpBackoffMax; + $this->log( + 'HTTP failure ' . $connectFailures . ' of ' . $this->connectFailuresMax . ' connecting to stream: ' . + $errStr . '. Sleeping for ' . $httpRetry . ' seconds.', + 'info' + ); + sleep($httpRetry); + continue; + } // End if not http 200 + else { + if (!$isChunking) { + throw new Exception( + "Twitter did not send a chunking header. Is this really HTTP/1.1? Here are headers:\n$respHeaders" + ); + } //TODO: rather crude! + } + + // Loop until connected OK + } while (!is_resource($this->conn) || $httpCode != 200); + + // Connected OK, reset connect failures + $connectFailures = 0; + $this->lastErrorMsg = null; + $this->lastErrorNo = null; + + // Switch to non-blocking to consume the stream (important) + stream_set_blocking($this->conn, 0); + + // Connect always causes the filterChanged status to be cleared + $this->filterChanged = false; + + // Flush stream buffer & (re)assign fdrPool (for reconnect) + $this->fdrPool = [$this->conn]; + $this->buff = ''; + } + + /** + * Method called as frequently as practical (every 5+ seconds) that is responsible for checking if filter predicates + * (ie: track words or follow IDs) have changed. If they have, they should be set using the setTrack() and setFollow() + * methods respectively within the overridden implementation. + * Note that even if predicates are changed every 5 seconds, an actual reconnect will not happen more frequently than + * every 2 minutes (as per Twitter Streaming API documentation). + * Note also that this method is called upon every connect attempt, so if your predicates are causing connection + * errors, they should be checked here and corrected. + * This should be implemented/overridden in any subclass implementing the FILTER method. + * + * @see setTrack() + * @see setFollow() + * @see Phirehose::METHOD_FILTER + */ + protected function checkFilterPredicates() + { + // Override in subclass } - $this->conn = NULL; - $this->reconnect = FALSE; - } - - /** - * Reconnects as quickly as possible. Should be called whenever a reconnect is required rather that connect/disconnect - * to preserve streams reconnect state - */ - private function reconnect() - { - $reconnect = $this->reconnect; - $this->disconnect(); // Implicitly sets reconnect to FALSE - $this->reconnect = $reconnect; // Restore state to prev - $this->connect(); - } - - /** - * This is the one and only method that must be implemented additionally. As per the streaming API documentation, - * statuses should NOT be processed within the same process that is performing collection - * - * @param string $status - */ - abstract public function enqueueStatus($status); - - /** - * Reports a periodic heartbeat. Keep execution time minimal. - * - * @return NULL - */ - public function heartbeat() {} - - /** - * Set host port - * - * @param string $host - * @return void - */ - public function setHostPort($port) - { - $this->hostPort = $port; - } - - /** - * Set secure host port - * - * @param int $port - * @return void - */ - public function setSecureHostPort($port) - { - $this->secureHostPort = $port; - } + protected function getAuthorizationHeader($url, $requestParams) + { + throw new Exception( + "Basic auth no longer works with Twitter. You must derive from OauthPhirehose, not directly from the Phirehose class." + ); + $authCredentials = base64_encode($this->username . ':' . $this->password); + + return "Basic: " . $authCredentials; + } + + /** + * This is the one and only method that must be implemented additionally. As per the streaming API documentation, + * statuses should NOT be processed within the same process that is performing collection + * + * @param string $status + */ + abstract public function enqueueStatus($status); + + /** + * Reports a periodic heartbeat. Keep execution time minimal. + * + * @return NULL + */ + public function heartbeat() + { + } + + /** + * Called every $this->avgPeriod (default=60) seconds, and this default implementation + * calculates some rates, logs them, and resets the counters. + */ + protected function statusUpdate() + { + $this->log( + 'Consume rate: ' . $this->statusRate . ' status/sec (' . $this->statusCount . ' total), avg ' . + 'enqueueStatus(): ' . $this->enqueueTimeMS . 'ms, avg checkFilterPredicates(): ' . $this->filterCheckTimeMS . 'ms (' . + $this->filterCheckCount . ' total) over ' . $this->avgElapsed . ' seconds, max stream idle period: ' . + $this->maxIdlePeriod . ' seconds.' + ); + // Reset + $this->statusCount = $this->filterCheckCount = $this->enqueueSpent = 0; + $this->filterCheckSpent = $this->idlePeriod = $this->maxIdlePeriod = 0; + } + + /** + * Returns the last error message (TCP or HTTP) that occured with the streaming API or client. State is cleared upon + * successful reconnect + * + * @return string + */ + public function getLastErrorMsg() + { + return $this->lastErrorMsg; + } + + /** + * Returns the last error number that occured with the streaming API or client. Numbers correspond to either the + * fsockopen() error states (in the case of TCP errors) or HTTP error codes from Twitter (in the case of HTTP errors). + * State is cleared upon successful reconnect. + * + * @return string + */ + public function getLastErrorNo() + { + return $this->lastErrorNo; + } + + /** + * Set host port + * + * @param string $port + * + * @return void + */ + public function setHostPort($port) + { + $this->hostPort = $port; + } + + /** + * Set secure host port + * + * @param int $port + * + * @return void + */ + public function setSecureHostPort($port) + { + $this->secureHostPort = $port; + } } // End of class -class PhirehoseException extends Exception {} -class PhirehoseNetworkException extends PhirehoseException {} -class PhirehoseConnectLimitExceeded extends PhirehoseException {} +class PhirehoseException extends Exception +{ +} + +class PhirehoseNetworkException extends PhirehoseException +{ +} + +class PhirehoseConnectLimitExceeded extends PhirehoseException +{ +} diff --git a/lib/UserstreamPhirehose.php b/lib/UserstreamPhirehose.php index 372af43..de727d8 100644 --- a/lib/UserstreamPhirehose.php +++ b/lib/UserstreamPhirehose.php @@ -1,21 +1,26 @@ - Date: Wed, 21 Dec 2016 17:42:13 +0000 Subject: [PATCH 4/5] - psr1/psr2 refactoring - phpdoc fixes - fixed error log and small typos --- example/filter-track.php | 44 ++++++++++++++++++++-- lib/Phirehose.php | 23 +++++++++--- lib/SimilarityAlg.php | 79 ++++++++++++++++++++++++++++++++++++++++ lib/SimilarityAlg2.php | 66 +++++++++++++++++++++++++++++++++ 4 files changed, 204 insertions(+), 8 deletions(-) create mode 100644 lib/SimilarityAlg.php create mode 100644 lib/SimilarityAlg2.php diff --git a/example/filter-track.php b/example/filter-track.php index 30a6a69..bf45e1e 100644 --- a/example/filter-track.php +++ b/example/filter-track.php @@ -1,6 +1,8 @@ similarityIndex = $class; + } + /** * Enqueue each status * @@ -21,13 +30,42 @@ public function enqueueStatus($status) * enqueued and processed asyncronously from the collection process. */ $data = json_decode($status, true); - if (is_array($data) && isset($data['user']['screen_name'])) { - print $data['user']['screen_name'] . ': ' . urldecode($data['text']) . "\n"; + if (is_array($data) && isset($data['user']['screen_name']) && substr($data['text'], 0, 4) != 'RT @') { + if ($this->testSim($data['text'])) { + print $data['user']['screen_name'] . ': ' . urldecode($data['text']) . "\n"; + } } } + + public function testSim($text) + { + foreach ($this->list as &$line) { + $index = $this->similarityIndex->compareStrings($text, $line); + if ($index >= 0.75) { + return false; + } + } + + if (sizeof($this->list) > 100) { + array_shift($this->list); + } + + $this->list[] = $text; + + return true; + } } +$similarityIndex = new SimilarityAlg2(); +//$index = $similarityIndex->compareStrings( +// 'Berlin market attack: Police searching for Tunisian man after finding ID in truck – ', +// 'Berlin attack: Police hunt Tunisian suspect after finding ID papers in truck - ' +//); +//var_dump($index); +//exit; + // Start streaming $sc = new FilterTrackConsumer(OAUTH_TOKEN, OAUTH_SECRET, Phirehose::METHOD_FILTER); -$sc->setTrack(['morning', 'goodnight', 'hello', 'the']); +$sc->setSimilarityClass($similarityIndex); +$sc->setTrack(['berlin', 'attack'], FilterTrackConsumer::TRACK_OP_AND); $sc->consume(); diff --git a/lib/Phirehose.php b/lib/Phirehose.php index f5f5445..17d740d 100644 --- a/lib/Phirehose.php +++ b/lib/Phirehose.php @@ -25,12 +25,12 @@ abstract class Phirehose const METHOD_SITE = 'site'; //See UserstreamPhirehose.php const EARTH_RADIUS_KM = 6371; - + const TRACK_OP_AND = 'AND'; + const TRACK_OP_OR = 'OR'; /** * @internal Moved from being a const to a variable, because some methods (user and site) need to change it. */ protected $URL_BASE = 'https://stream.twitter.com/1.1/statuses/'; - /** * Member Attribs */ @@ -38,9 +38,17 @@ abstract class Phirehose protected $password; protected $method; protected $format; - protected $count; //Can be -150,000 to 150,000. @see http://dev.twitter.com/pages/streaming_api_methods#count + protected $count; // Can be -150,000 to 150,000. @see http://dev.twitter.com/pages/streaming_api_methods#count protected $followIds; + protected $trackWords; + // logical operators for building up a phrase + protected $trackOperators = [ + self::TRACK_OP_AND => ' ', + self::TRACK_OP_OR => ',', + ]; + protected $trackOperator = self::TRACK_OP_OR; + protected $locationBoxes; protected $conn; protected $fdrPool; @@ -238,8 +246,9 @@ public function getFollow() * See: http://apiwiki.twitter.com/Streaming-API-Documentation#TrackLimiting * * @param array $trackWords + * @param string $operator OR|AND - how words will be joined */ - public function setTrack(array $trackWords) + public function setTrack(array $trackWords, $operator = self::TRACK_OP_OR) { $trackWords = ($trackWords === null) ? [] : $trackWords; sort($trackWords); // Non-optimal, but necessary @@ -247,6 +256,10 @@ public function setTrack(array $trackWords) $this->filterChanged = true; } $this->trackWords = $trackWords; + + if (isset($this->trackOperators[$operator])) { + $this->trackOperator = $this->trackOperators[$operator]; + } } /** @@ -649,7 +662,7 @@ protected function connect() $this->trackWords ) > 0 ) { - $requestParams['track'] = implode(',', $this->trackWords); + $requestParams['track'] = implode($this->trackOperator, $this->trackWords); } if (($this->method == self::METHOD_FILTER || $this->method == self::METHOD_SITE) && count($this->followIds) > 0 diff --git a/lib/SimilarityAlg.php b/lib/SimilarityAlg.php new file mode 100644 index 0000000..09ccb93 --- /dev/null +++ b/lib/SimilarityAlg.php @@ -0,0 +1,79 @@ +wordLetterPairs(strtoupper($str1)); + $pairs2 = $this->wordLetterPairs(strtoupper($str2)); + + $intersection = 0; + + $union = count($pairs1) + count($pairs2); + + for ($i = 0; $i < count($pairs1); $i++) { + $pair1 = $pairs1[$i]; + + $pairs2 = array_values($pairs2); + for ($j = 0; $j < count($pairs2); $j++) { + $pair2 = $pairs2[$j]; + if ($pair1 === $pair2) { + $intersection++; + unset($pairs2[$j]); + break; + } + } + } + + return (2.0 * $intersection) / $union; + } + + /** + * @param $str + * + * @return mixed + */ + private function wordLetterPairs($str) + { + $allPairs = array(); + + // Tokenize the string and put the tokens/words into an array + + $words = explode(' ', $str); + + // For each word + for ($w = 0; $w < count($words); $w++) { + // Find the pairs of characters + $pairsInWord = $this->letterPairs($words[$w]); + + for ($p = 0; $p < count($pairsInWord); $p++) { + $allPairs[] = $pairsInWord[$p]; + } + } + + return $allPairs; + } + + /** + * @param $str + * + * @return array + */ + private function letterPairs($str) + { + $numPairs = mb_strlen($str) - 1; + $pairs = array(); + + for ($i = 0; $i < $numPairs; $i++) { + $pairs[$i] = mb_substr($str, $i, 2); + } + + return $pairs; + } +} diff --git a/lib/SimilarityAlg2.php b/lib/SimilarityAlg2.php new file mode 100644 index 0000000..40d8707 --- /dev/null +++ b/lib/SimilarityAlg2.php @@ -0,0 +1,66 @@ + Date: Fri, 23 Dec 2016 12:13:14 +0000 Subject: [PATCH 5/5] news reader --- .gitignore | 0 Readme.md | 0 composer.json | 40 ++++--- example/filter-oauth.php | 0 example/filter-reconfigure.php | 0 example/filter-track-geo.php | 0 example/filter-track.php | 181 ++++++++++++++++++++++++++--- example/ghetto-queue-collect.php | 0 example/ghetto-queue-consume.php | 0 example/sample-en.php | 0 example/sample-oauth-es.php | 0 example/sample.php | 0 example/server.php | 0 example/sitestream.php | 0 example/test-sim-speed.php | 38 ++++++ example/userstream-alternative.php | 0 example/userstream-simple.php | 0 gpl.txt | 0 lib/OauthPhirehose.php | 0 lib/Phirehose.php | 0 lib/SimilarityAlg.php | 0 lib/SimilarityAlg2.php | 0 lib/UserstreamPhirehose.php | 0 23 files changed, 225 insertions(+), 34 deletions(-) mode change 100644 => 100755 .gitignore mode change 100644 => 100755 Readme.md mode change 100644 => 100755 composer.json mode change 100644 => 100755 example/filter-oauth.php mode change 100644 => 100755 example/filter-reconfigure.php mode change 100644 => 100755 example/filter-track-geo.php mode change 100644 => 100755 example/filter-track.php mode change 100644 => 100755 example/ghetto-queue-collect.php mode change 100644 => 100755 example/ghetto-queue-consume.php mode change 100644 => 100755 example/sample-en.php mode change 100644 => 100755 example/sample-oauth-es.php mode change 100644 => 100755 example/sample.php create mode 100755 example/server.php mode change 100644 => 100755 example/sitestream.php create mode 100755 example/test-sim-speed.php mode change 100644 => 100755 example/userstream-alternative.php mode change 100644 => 100755 example/userstream-simple.php mode change 100644 => 100755 gpl.txt mode change 100644 => 100755 lib/OauthPhirehose.php mode change 100644 => 100755 lib/Phirehose.php mode change 100644 => 100755 lib/SimilarityAlg.php mode change 100644 => 100755 lib/SimilarityAlg2.php mode change 100644 => 100755 lib/UserstreamPhirehose.php diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 diff --git a/Readme.md b/Readme.md old mode 100644 new mode 100755 diff --git a/composer.json b/composer.json old mode 100644 new mode 100755 index 8be99ff..005241a --- a/composer.json +++ b/composer.json @@ -1,20 +1,26 @@ { - "name": "fennb/phirehose", - "description": "A PHP interface to the Twitter Streaming API.", - "keywords": ["phirehose", "php", "twitter", "streaming", "api"], - "type": "library", - "homepage": "https://github.com/fennb/phirehose", - "license": "GPL", - "authors": [ - { - "name": "Fenn Bailey", - "homepage": "http://fennb.com/" - } - ], - "require": { - "php": ">=5.4.0" + "name": "fennb/phirehose", + "description": "A PHP interface to the Twitter Streaming API.", + "keywords": [ + "phirehose", + "php", + "twitter", + "streaming", + "api" + ], + "type": "library", + "homepage": "https://github.com/fennb/phirehose", + "license": "GPL", + "require": { + "php": ">=5.4.0", + "kreait/firebase-php": "^2.0" + }, + "autoload": { + "psr-0": { + "MyApp": "src" }, - "autoload": { - "classmap": ["lib"] - } + "classmap": [ + "lib" + ] + } } diff --git a/example/filter-oauth.php b/example/filter-oauth.php old mode 100644 new mode 100755 diff --git a/example/filter-reconfigure.php b/example/filter-reconfigure.php old mode 100644 new mode 100755 diff --git a/example/filter-track-geo.php b/example/filter-track-geo.php old mode 100644 new mode 100755 diff --git a/example/filter-track.php b/example/filter-track.php old mode 100644 new mode 100755 index bf45e1e..c29a864 --- a/example/filter-track.php +++ b/example/filter-track.php @@ -4,6 +4,7 @@ require_once '../lib/SimilarityAlg.php'; require_once '../lib/SimilarityAlg2.php'; require_once 'twitter-auth-config.php'; +require '../vendor/autoload.php'; /** * Example of using Phirehose to display a live filtered stream using track words @@ -12,11 +13,33 @@ class FilterTrackConsumer extends OauthPhirehose { public $list = []; + public $pushInterval = 1; + + private $similarityThreshold = 0.75; + + private $db; + + private $queueSize = 100; + + private $queueStartTime; + + private $resetQueueSeconds = 800; + public function setSimilarityClass(SimilarityAlg2 $class) { $this->similarityIndex = $class; } + public function fopen() + { + $this->fh = fopen('./data', 'a'); + } + + public function setFirebaseDb($db) + { + $this->db = $db; + } + /** * Enqueue each status * @@ -25,34 +48,138 @@ public function setSimilarityClass(SimilarityAlg2 $class) public function enqueueStatus($status) { /* - * In this simple example, we will just display to STDOUT rather than enqueue. - * NOTE: You should NOT be processing tweets at this point in a real application, instead they should be being - * enqueued and processed asyncronously from the collection process. - */ + * You should NOT be processing tweets at this point in a real application, instead they should be being + * enqueued and processed asyncronously from the collection process. + */ $data = json_decode($status, true); - if (is_array($data) && isset($data['user']['screen_name']) && substr($data['text'], 0, 4) != 'RT @') { - if ($this->testSim($data['text'])) { - print $data['user']['screen_name'] . ': ' . urldecode($data['text']) . "\n"; + if (is_array($data) && + isset($data['user']['screen_name']) && + $data['lang'] == 'en' && + substr( + $data['text'], + 0, + 4 + ) != 'RT @' + ) { + $text = $this->cleanupText($data['text']); + if (strlen($text) > 30 && $this->testSim($text)) { + $this->push($data['user']['screen_name'], $text); +// print $data['user']['screen_name'] . ': ' . $text . "\n"; +// fwrite($this->fh, str_replace("\n", ',', $data['text']) . PHP_EOL); } } } - public function testSim($text) + /** + * @param $text + * + * @return mixed + */ + private function &cleanupText(&$text) + { + $text = str_replace("\n", ',', $text); + $text = trim($text); + + return $text; + } + + private function testSim($text) { - foreach ($this->list as &$line) { - $index = $this->similarityIndex->compareStrings($text, $line); - if ($index >= 0.75) { - return false; + $size = sizeof($this->list); + + $text = preg_replace('~(?:https?:\/\/t.co\/[a-zA-Z0-9]+\s?)~', '', urldecode($text)); + $text = preg_replace('~(?:@[a-zA-Z_]+\s?:?)|(?:#[a-zA-Z\s]+)~', '', $text); + + // do not print, store for checking + if ($size < $this->queueSize) { + + print "::QSize:$size::$text\n"; + + // add only unique messages, continue otherwise + for ($i = $size; $i > 0; --$i) { + if (!isset($this->list[$i])) { + continue; + } + $index = $this->similarityIndex->compareStrings($text, $this->list[$i]); + if ($index >= $this->similarityThreshold) { + print "::In queue - skip:$i::$text\n"; + + return true; + } + } + + $this->list[] = $text; + + return true; + } + + $this->resetQueue(); + + // start filtering something sensible + for ($i = $size; $i >= 0; --$i) { + if (!isset($this->list[$i])) { + continue; + } + $index = $this->similarityIndex->compareStrings($text, $this->list[$i]); + // we want to keep the most common messages, as ranting is quite unique this will keep only valid sources + if ($index >= $this->similarityThreshold) { + // move to the last position + $this->list[$i] = $this->list[$size]; + $this->list[$size] = $text; + + print "::Queue:$i::$text\n"; + + return true; } } - if (sizeof($this->list) > 100) { - array_shift($this->list); + print ':::::DROP:::::::' . $text . "\n"; + + return false; + } + + protected function push($user, $data) + { + $doc = implode('-', $this->getTrack()); + $this->db->getReference($doc) + ->push( + [ + 'username' => $user, + 'tweet' => $data, + ] + ); + } + + public function resetQueue() + { + // shuffle and reset 50% every 5 min + if ($this->getQueueStartTime() < time() - $this->resetQueueSeconds) { + $this->initStart(); + shuffle($this->list); + $this->list = array_slice($this->list, floor(sizeof($this->list) / 2)); + print ':::Resetting queue...' . PHP_EOL; } + } + + public function getQueueStartTime() + { + return $this->queueStartTime; + } - $this->list[] = $text; + public function initStart() + { + $this->queueStartTime = time(); + } + + public function __destruct() + { + print 'GG&BB' . PHP_EOL; - return true; + $this->push('Admin', 'Restarting........'); + $this->push('Admin', 'Restarting................'); + $this->push('Admin', 'Restarting......................'); + $this->push('Admin', 'Restarting..............................'); + $this->disconnect(); } } @@ -64,8 +191,28 @@ public function testSim($text) //var_dump($index); //exit; +//class test +//{ +// public $list = ['a', 'b', 'c', 'd']; +// +// function tests() +// { +// shuffle($this->list); +// $this->list = array_slice($this->list, floor(sizeof($this->list)/2)); +// var_dump($this->list); +// } +//} +// +//(new test)->tests(); + +$firebase = Firebase::fromServiceAccount(__DIR__ . '/../news-test-01-8e00fcff2669.json'); +$database = $firebase->getDatabase(); + // Start streaming $sc = new FilterTrackConsumer(OAUTH_TOKEN, OAUTH_SECRET, Phirehose::METHOD_FILTER); $sc->setSimilarityClass($similarityIndex); -$sc->setTrack(['berlin', 'attack'], FilterTrackConsumer::TRACK_OP_AND); +$sc->setFirebaseDb($database); +$sc->initStart(); +$sc->fopen(); +$sc->setTrack(['terror', 'attack'], FilterTrackConsumer::TRACK_OP_AND); $sc->consume(); diff --git a/example/ghetto-queue-collect.php b/example/ghetto-queue-collect.php old mode 100644 new mode 100755 diff --git a/example/ghetto-queue-consume.php b/example/ghetto-queue-consume.php old mode 100644 new mode 100755 diff --git a/example/sample-en.php b/example/sample-en.php old mode 100644 new mode 100755 diff --git a/example/sample-oauth-es.php b/example/sample-oauth-es.php old mode 100644 new mode 100755 diff --git a/example/sample.php b/example/sample.php old mode 100644 new mode 100755 diff --git a/example/server.php b/example/server.php new file mode 100755 index 0000000..e69de29 diff --git a/example/sitestream.php b/example/sitestream.php old mode 100644 new mode 100755 diff --git a/example/test-sim-speed.php b/example/test-sim-speed.php new file mode 100755 index 0000000..fdcac1b --- /dev/null +++ b/example/test-sim-speed.php @@ -0,0 +1,38 @@ +alg = $alg; + $this->list = $list; + } + + public function testSim($text) + { + $size = sizeof($this->list) - 1; + for ($i = $size; $i >= 0; --$i) { + $this->alg->compareStrings($text, $this->list[$i]); + } + } +} + +$similarityIndex = new LetterPairSimilarity(); +$similarityIndex2 = new SimilarityAlg2(); +$data = file('./data'); +$bench = new TestSpeed($similarityIndex2, $data); +$start_time = microtime(TRUE); +$test = 'RT @AlbertoNardelli: Even before #Berlin facts are known, one thing is clear: far-right, trolls & propaganda are targeting Merkel & 2017 ht'; +$bench->testSim($test); +$end_time = microtime(TRUE); + +print $end_time - $start_time; + + diff --git a/example/userstream-alternative.php b/example/userstream-alternative.php old mode 100644 new mode 100755 diff --git a/example/userstream-simple.php b/example/userstream-simple.php old mode 100644 new mode 100755 diff --git a/gpl.txt b/gpl.txt old mode 100644 new mode 100755 diff --git a/lib/OauthPhirehose.php b/lib/OauthPhirehose.php old mode 100644 new mode 100755 diff --git a/lib/Phirehose.php b/lib/Phirehose.php old mode 100644 new mode 100755 diff --git a/lib/SimilarityAlg.php b/lib/SimilarityAlg.php old mode 100644 new mode 100755 diff --git a/lib/SimilarityAlg2.php b/lib/SimilarityAlg2.php old mode 100644 new mode 100755 diff --git a/lib/UserstreamPhirehose.php b/lib/UserstreamPhirehose.php old mode 100644 new mode 100755