about summary refs log tree commit diff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/index.js65
1 files changed, 49 insertions, 16 deletions
diff --git a/streaming/index.js b/streaming/index.js
index b70339c30..f1d0ed5c0 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -144,13 +144,21 @@ const startWorker = (workerId) => {
     callbacks.forEach(callback => callback(message));
   });
 
-  const subscriptionHeartbeat = (channel) => {
-    const interval = 6*60;
+  const subscriptionHeartbeat = channels => {
+    if (!Array.isArray(channels)) {
+      channels = [channels];
+    }
+
+    const interval = 6 * 60;
+
     const tellSubscribed = () => {
-      redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval*3);
+      channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3));
     };
+
     tellSubscribed();
-    const heartbeat = setInterval(tellSubscribed, interval*1000);
+
+    const heartbeat = setInterval(tellSubscribed, interval * 1000);
+
     return () => {
       clearInterval(heartbeat);
     };
@@ -203,7 +211,7 @@ const startWorker = (workerId) => {
         return;
       }
 
-      client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
+      client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
         done();
 
         if (err) {
@@ -232,6 +240,7 @@ const startWorker = (workerId) => {
         req.accountId = result.rows[0].account_id;
         req.chosenLanguages = result.rows[0].chosen_languages;
         req.allowNotifications = scopes.some(scope => ['read', 'read:notifications'].includes(scope));
+        req.deviceId = result.rows[0].device_id;
 
         next();
       });
@@ -353,11 +362,15 @@ const startWorker = (workerId) => {
     });
   };
 
-  const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
-    const accountId = req.accountId || req.remoteAddress;
-
+  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
+    const accountId  = req.accountId || req.remoteAddress;
     const streamType = notificationOnly ? ' (notification)' : '';
-    log.verbose(req.requestId, `Starting stream from ${id} for ${accountId}${streamType}`);
+
+    if (!Array.isArray(ids)) {
+      ids = [ids];
+    }
+
+    log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`);
 
     const listener = message => {
       const { event, payload, queued_at } = JSON.parse(message);
@@ -436,8 +449,11 @@ const startWorker = (workerId) => {
       });
     };
 
-    subscribe(`${redisPrefix}${id}`, listener);
-    attachCloseHandler(`${redisPrefix}${id}`, listener);
+    ids.forEach(id => {
+      subscribe(`${redisPrefix}${id}`, listener);
+    });
+
+    attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
   };
 
   // Setup stream output to HTTP
@@ -464,9 +480,16 @@ const startWorker = (workerId) => {
   };
 
   // Setup stream end for HTTP
-  const streamHttpEnd = (req, closeHandler = false) => (id, listener) => {
+  const streamHttpEnd = (req, closeHandler = false) => (ids, listener) => {
+    if (!Array.isArray(ids)) {
+      ids = [ids];
+    }
+
     req.on('close', () => {
-      unsubscribe(id, listener);
+      ids.forEach(id => {
+        unsubscribe(id, listener);
+      });
+
       if (closeHandler) {
         closeHandler();
       }
@@ -522,8 +545,13 @@ const startWorker = (workerId) => {
   app.use(errorMiddleware);
 
   app.get('/api/v1/streaming/user', (req, res) => {
-    const channel = `timeline:${req.accountId}`;
-    streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
+    const channels = [`timeline:${req.accountId}`];
+
+    if (req.deviceId) {
+      channels.push(`timeline:${req.accountId}:${req.deviceId}`);
+    }
+
+    streamFrom(channels, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channels)));
   });
 
   app.get('/api/v1/streaming/user/notification', (req, res) => {
@@ -603,7 +631,12 @@ const startWorker = (workerId) => {
 
     switch(location.query.stream) {
     case 'user':
-      channel = `timeline:${req.accountId}`;
+      channel = [`timeline:${req.accountId}`];
+
+      if (req.deviceId) {
+        channel.push(`timeline:${req.accountId}:${req.deviceId}`);
+      }
+
       streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
       break;
     case 'user:notification':