about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--app/lib/feed_manager.rb6
-rw-r--r--streaming/index.js52
2 files changed, 49 insertions, 9 deletions
diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb
index c2d3a2e2c..86928fa36 100644
--- a/app/lib/feed_manager.rb
+++ b/app/lib/feed_manager.rb
@@ -34,7 +34,7 @@ class FeedManager
       trim(timeline_type, account.id)
     end
 
-    PushUpdateWorker.perform_async(account.id, status.id)
+    PushUpdateWorker.perform_async(account.id, status.id) if push_update_required?(timeline_type, account.id)
   end
 
   def trim(type, account_id)
@@ -43,6 +43,10 @@ class FeedManager
     redis.zremrangebyscore(key(type, account_id), '-inf', "(#{last.last}")
   end
 
+  def push_update_required?(timeline_type, account_id)
+    timeline_type != :home || redis.get("subscribed:timeline:#{account_id}").present?
+  end
+
   def merge_into_timeline(from_account, into_account)
     timeline_key = key(:home, into_account.id)
     query        = from_account.statuses.limit(FeedManager::MAX_ITEMS / 4)
diff --git a/streaming/index.js b/streaming/index.js
index 0411ae8ef..d77ca63ff 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -110,11 +110,12 @@ const startWorker = (workerId) => {
 
   const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
 
+  const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
   const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
 
   const subs = {};
 
-  redisClient.on('pmessage', (_, channel, message) => {
+  redisSubscribeClient.on('pmessage', (_, channel, message) => {
     const callbacks = subs[channel];
 
     log.silly(`New message on channel ${channel}`);
@@ -126,7 +127,19 @@ const startWorker = (workerId) => {
     callbacks.forEach(callback => callback(message));
   });
 
-  redisClient.psubscribe(`${redisPrefix}timeline:*`);
+  redisSubscribeClient.psubscribe(`${redisPrefix}timeline:*`);
+
+  const subscriptionHeartbeat = (channel) => {
+    const interval = 6*60;
+    const tellSubscribed = () => {
+      redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval*3);
+    };
+    tellSubscribed();
+    const heartbeat = setInterval(tellSubscribed, interval*1000);
+    return () => {
+      clearInterval(heartbeat);
+    };
+  };
 
   const subscribe = (channel, callback) => {
     log.silly(`Adding listener for ${channel}`);
@@ -231,8 +244,9 @@ const startWorker = (workerId) => {
 
   const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
 
-  const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false) => {
-    log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`);
+  const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
+    const streamType = notificationOnly ? ' (notification)' : '';
+    log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}${streamType}`);
 
     const listener = message => {
       const { event, payload, queued_at } = JSON.parse(message);
@@ -245,6 +259,10 @@ const startWorker = (workerId) => {
         output(event, payload);
       };
 
+      if (notificationOnly && event !== 'notification') {
+        return;
+      }
+
       // Only messages that may require filtering are statuses, since notifications
       // are already personalized and deletes do not matter
       if (needsFiltering && event === 'update') {
@@ -313,9 +331,12 @@ const startWorker = (workerId) => {
   };
 
   // Setup stream end for HTTP
-  const streamHttpEnd = req => (id, listener) => {
+  const streamHttpEnd = (req, closeHandler = false) => (id, listener) => {
     req.on('close', () => {
       unsubscribe(id, listener);
+      if (closeHandler) {
+        closeHandler();
+      }
     });
   };
 
@@ -330,15 +351,21 @@ const startWorker = (workerId) => {
   };
 
   // Setup stream end for WebSockets
-  const streamWsEnd = (req, ws) => (id, listener) => {
+  const streamWsEnd = (req, ws, closeHandler = false) => (id, listener) => {
     ws.on('close', () => {
       log.verbose(req.requestId, `Ending stream for ${req.accountId}`);
       unsubscribe(id, listener);
+      if (closeHandler) {
+        closeHandler();
+      }
     });
 
     ws.on('error', e => {
       log.verbose(req.requestId, `Ending stream for ${req.accountId}`);
       unsubscribe(id, listener);
+      if (closeHandler) {
+        closeHandler();
+      }
     });
   };
 
@@ -348,7 +375,12 @@ const startWorker = (workerId) => {
   app.use(errorMiddleware);
 
   app.get('/api/v1/streaming/user', (req, res) => {
-    streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req));
+    const channel = `timeline:${req.accountId}`;
+    streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
+  });
+
+  app.get('/api/v1/streaming/user/notification', (req, res) => {
+    streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), false, true);
   });
 
   app.get('/api/v1/streaming/public', (req, res) => {
@@ -382,7 +414,11 @@ const startWorker = (workerId) => {
 
     switch(location.query.stream) {
     case 'user':
-      streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws));
+      const channel = `timeline:${req.accountId}`;
+      streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
+      break;
+    case 'user:notification':
+      streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true);
       break;
     case 'public':
       streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true);