Skip to content

Commit

Permalink
New StreamLoader
Browse files Browse the repository at this point in the history
The v2 Twitter stream API delivers all results as part of a single
stream. Each tweet that comes down the pipe has a tag, which identifies
the searches it belongs to with a tag. This simplifies the StreamLoader
quite a bit since it just needs to listen to the single stream, and then
the stream is controlled by adding and removing stream rules using the
Twitter API. The Database.startStream() and Datbase.stopStream() methods
are used to start and stop the stream for a search. The StreamLoader
class is started up and it basically connects and waits for tweets to
process.

Note: this new functionality means the DocNow app uses the v2 API for
both searching and streaming Twitter data. More work will need to be
done to allow the DocNow app to use the /tweets/search/all endpoint for
historical tweets. That's next!
  • Loading branch information
edsu committed Aug 31, 2021
1 parent 02988d3 commit 0778003
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 254 deletions.
15 changes: 6 additions & 9 deletions src/server/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@ import wayback from './wayback'
import { Database } from './db'
import { Archive } from './archive'
import { activateKeys } from './auth'
import { StreamLoaderController } from './stream-loader'

const app = express()

const db = new Database()

const streamLoader = new StreamLoaderController()

db.startTrendsWatcher({interval: 60 * 1000})

function notAuthorized(res) {
Expand Down Expand Up @@ -267,8 +264,8 @@ app.put('/search/:searchId', async (req, res) => {
if (req.user) {
const search = await db.getSearch(req.body.id)

// get any tweet text that was sent and remove it from the body
// since it's not a property of the search
// get any tweet text that was POSTed and remove it from the body
// since it's not really a property of the search object
const tweetText = req.body.tweetText
delete req.body.tweetText

Expand All @@ -278,7 +275,7 @@ app.put('/search/:searchId', async (req, res) => {
if (req.query.refreshTweets) {
db.importFromSearch(search)
} else if (search.active && ! newSearch.active) {
streamLoader.stopStream(search.id)
await db.stopStream(search)
// stop search too?
} else if (! search.active && newSearch.active) {
// make the search public
Expand All @@ -289,9 +286,9 @@ app.put('/search/:searchId', async (req, res) => {
const twtr = await db.getTwitterClientForUser(req.user)
tweetId = await twtr.sendTweet(tweetText)
}
// start the collection
streamLoader.startStream(search.id, tweetId)
// start search too?
// start the streaming
await db.startStream(search, tweetId)
// start a search too?
} else if (! search.archiveStarted && newSearch.archiveStarted) {
const archive = new Archive()
archive.createArchive(search)
Expand Down
92 changes: 76 additions & 16 deletions src/server/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ export class Database {
for (const row of rows) {
settings[row.name] = row.value
}

if (! settings.instanceTweetText) {
settings.instanceTweetText = "I'm creating a collection of tweets that match {query}. You can learn more about why I'm creating it and specify your terms of your consent here {collection-url}"
}

return settings
}

Expand Down Expand Up @@ -280,21 +285,28 @@ export class Database {
return Place.query().select()
}

getTwitterClientForUser(user) {
return new Promise((resolve) => {
this.getSettings().then((settings) => {
resolve(
new Twitter({
consumerKey: settings.appKey,
consumerSecret: settings.appSecret,
accessToken: user.twitterAccessToken,
accessTokenSecret: user.twitterAccessTokenSecret
})
)
})
async getTwitterClientForUser(user) {
const settings = await this.getSettings()
return new Twitter({
consumerKey: settings.appKey,
consumerSecret: settings.appSecret,
accessToken: user.twitterAccessToken,
accessTokenSecret: user.twitterAccessTokenSecret
})
}

async getTwitterClientForApp() {
const settings = await this.getSettings()
if (settings.appKey && settings.appSecret) {
return new Twitter({
consumerKey: settings.appKey,
consumerSecret: settings.appSecret,
})
} else {
return null
}
}

async createSearch(search) {
search.updated = new Date()
const s1 = await Search.query()
Expand Down Expand Up @@ -495,35 +507,83 @@ export class Database {

// determine the query to run
const lastQuery = search.queries[search.queries.length - 1]
const q = lastQuery.searchQuery()
const q = lastQuery.twitterQuery()

// run the search!
let maxTweetId = null
let count = 0
return new Promise((resolve, reject) => {
twtr.search({q: q, sinceId: search.maxTweetId, count: maxTweets}, async (err, results) => {
if (err) {
log.error(`caught error during search: ${err}`)
reject(err)
} else if (results.length === 0) {
await this.updateSearch({
id: search.id,
maxTweetId: maxTweetId,
active: false
})
log.info(`no more search results, returning ${count}`)
resolve(count)
} else {
if (maxTweetId === null) {
maxTweetId = results[0].id
}
await this.loadTweets(search, results)
count += results.length
log.info(`bulk loaded ${results.length} tweets`)
log.info(`bulk loaded ${results.length} tweets, with total=${count}`)
}
})
})
}

async startStream(search, tweetId) {
log.info(`starting stream for ${search.id}`)
const lastQuery = search.queries[search.queries.length - 1]
const q = lastQuery.twitterQuery()
const job = await this.createSearchJob({
queryId: lastQuery.id,
tweetId: tweetId,
started: new Date()
})
log.info(`created job ${job.id}`)

const twtr = await this.getTwitterClientForApp()
return twtr.addFilterRule(q, `search-${search.id}`)
}

async stopStream(search) {
log.info(`stopping stream for search ${search.id}`)

// remove all filter rules for this search
const twtr = await this.getTwitterClientForApp()
for (const rule of await twtr.getFilterRules()) {
if (rule.tag == `search-${search.id}`) {
await twtr.deleteFilterRule(rule.id)
log.info(`removing filter rule ${rule.id} for ${search.id}`)
}
}

// need a better way to identify the search job that needs to
// be ended but for now just mark any search job that has no
// ended time. once we can do historical collection it will be
// important to only end the filter stream job

const query = search.queries[search.queries.length - 1]
for (const job of query.searchJobs) {
if (! job.ended) {
await this.updateSearchJob({
id: job.id,
ended: new Date()
})
}
}

return this.updateSearch({...search, active: false, archived: false})
}

async loadTweets(search, tweets) {
log.info(`loading ${tweets.length} tweets for searchId=${search.id}`)

const tweetRows = []
for (const tweet of tweets) {
Expand Down Expand Up @@ -931,7 +991,7 @@ export class Database {
.findById(queryId)
.withGraphJoined('search')
.withGraphJoined('searchJobs')
.withGraphJoined('search.user')
.withGraphJoined('search.creator')
.orderBy('searchJobs.created', 'ASC')
return query
}
Expand All @@ -954,4 +1014,4 @@ export class Database {
.where('id', job.id)
}

}
}
22 changes: 2 additions & 20 deletions src/server/models/Query.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const { Model } = require('objection')
const Search = require('./Search')
const SearchJob = require('./SearchJob')

class Query extends Model {
Expand All @@ -9,6 +8,7 @@ class Query extends Model {
}

static get relationMappings() {
const Search = require('./Search')
return {
search: {
relation: Model.HasOneRelation,
Expand All @@ -29,7 +29,7 @@ class Query extends Model {
}
}

searchQuery() {
twitterQuery() {
const queryParts = []
for (const term of this.value.or) {
if (term.type === 'keyword') {
Expand All @@ -47,24 +47,6 @@ class Query extends Model {
return queryParts.join(' OR ')
}

trackQuery() {
const queryParts = []
for (const term of this.value.or) {
if (term.type === 'keyword') {
queryParts.push(term.value)
} else if (term.type === 'user') {
queryParts.push('@' + term.value)
} else if (term.type === 'phrase') {
queryParts.push(term.value.replace(/,/g, ' '))
} else if (term.type === 'hashtag') {
queryParts.push(term.value)
} else {
queryParts.push(term.value)
}
}
return queryParts.join(',')
}

}

module.exports = Query
Loading

0 comments on commit 0778003

Please sign in to comment.