diff options
author | Thibaut Girka <thib@sitedethib.com> | 2020-06-09 10:39:20 +0200 |
---|---|---|
committer | Thibaut Girka <thib@sitedethib.com> | 2020-06-09 10:39:20 +0200 |
commit | 12c8ac9e1443d352eca3538ed1558de8ccdd9434 (patch) | |
tree | ed480d77b29f0d571ad219190288bde3b0c09b32 /streaming/index.js | |
parent | f328f2faa3fbdb182921366c6a20e745c069b840 (diff) | |
parent | 89f40b6c3ec525b09d02f21e9b45276084167d8d (diff) |
Merge branch 'master' into glitch-soc/merge-upstream
Conflicts: - `app/controllers/activitypub/collections_controller.rb`: Conflict due to glitch-soc having to take care of local-only pinned toots in that controller. Took upstream's changes and restored the local-only special handling. - `app/controllers/auth/sessions_controller.rb`: Minor conflicts due to the theming system, applied upstream changes, adapted the following two files for glitch-soc's theming system: - `app/controllers/concerns/sign_in_token_authentication_concern.rb` - `app/controllers/concerns/two_factor_authentication_concern.rb` - `app/services/backup_service.rb`: Minor conflict due to glitch-soc having to handle local-only toots specially. Applied upstream changes and restored the local-only special handling. - `app/views/admin/custom_emojis/index.html.haml`: Minor conflict due to the theming system. - `package.json`: Upstream dependency updated, too close to a glitch-soc-only dependency in the file. - `yarn.lock`: Upstream dependency updated, too close to a glitch-soc-only dependency in the file.
Diffstat (limited to 'streaming/index.js')
-rw-r--r-- | streaming/index.js | 65 |
1 files changed, 49 insertions, 16 deletions
diff --git a/streaming/index.js b/streaming/index.js index b70339c30..f1d0ed5c0 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -144,13 +144,21 @@ const startWorker = (workerId) => { callbacks.forEach(callback => callback(message)); }); - const subscriptionHeartbeat = (channel) => { - const interval = 6*60; + const subscriptionHeartbeat = channels => { + if (!Array.isArray(channels)) { + channels = [channels]; + } + + const interval = 6 * 60; + const tellSubscribed = () => { - redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval*3); + channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3)); }; + tellSubscribed(); - const heartbeat = setInterval(tellSubscribed, interval*1000); + + const heartbeat = setInterval(tellSubscribed, interval * 1000); + return () => { clearInterval(heartbeat); }; @@ -203,7 +211,7 @@ const startWorker = (workerId) => { return; } - client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => { + client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => { done(); if (err) { @@ -232,6 +240,7 @@ const startWorker = (workerId) => { req.accountId = result.rows[0].account_id; req.chosenLanguages = result.rows[0].chosen_languages; req.allowNotifications = scopes.some(scope => ['read', 'read:notifications'].includes(scope)); + req.deviceId = result.rows[0].device_id; next(); }); @@ -353,11 +362,15 @@ const startWorker = (workerId) => { }); }; - const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => { - const accountId = req.accountId || req.remoteAddress; - + const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => { + const accountId = req.accountId || req.remoteAddress; const streamType = notificationOnly ? ' (notification)' : ''; - log.verbose(req.requestId, `Starting stream from ${id} for ${accountId}${streamType}`); + + if (!Array.isArray(ids)) { + ids = [ids]; + } + + log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`); const listener = message => { const { event, payload, queued_at } = JSON.parse(message); @@ -436,8 +449,11 @@ const startWorker = (workerId) => { }); }; - subscribe(`${redisPrefix}${id}`, listener); - attachCloseHandler(`${redisPrefix}${id}`, listener); + ids.forEach(id => { + subscribe(`${redisPrefix}${id}`, listener); + }); + + attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener); }; // Setup stream output to HTTP @@ -464,9 +480,16 @@ const startWorker = (workerId) => { }; // Setup stream end for HTTP - const streamHttpEnd = (req, closeHandler = false) => (id, listener) => { + const streamHttpEnd = (req, closeHandler = false) => (ids, listener) => { + if (!Array.isArray(ids)) { + ids = [ids]; + } + req.on('close', () => { - unsubscribe(id, listener); + ids.forEach(id => { + unsubscribe(id, listener); + }); + if (closeHandler) { closeHandler(); } @@ -522,8 +545,13 @@ const startWorker = (workerId) => { app.use(errorMiddleware); app.get('/api/v1/streaming/user', (req, res) => { - const channel = `timeline:${req.accountId}`; - streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel))); + const channels = [`timeline:${req.accountId}`]; + + if (req.deviceId) { + channels.push(`timeline:${req.accountId}:${req.deviceId}`); + } + + streamFrom(channels, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channels))); }); app.get('/api/v1/streaming/user/notification', (req, res) => { @@ -603,7 +631,12 @@ const startWorker = (workerId) => { switch(location.query.stream) { case 'user': - channel = `timeline:${req.accountId}`; + channel = [`timeline:${req.accountId}`]; + + if (req.deviceId) { + channel.push(`timeline:${req.accountId}:${req.deviceId}`); + } + streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel))); break; case 'user:notification': |