about summary refs log tree commit diff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/index.js53
1 files changed, 38 insertions, 15 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 67cd48b43..9f8ab107e 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -364,6 +364,7 @@ const startWorker = (workerId) => {
   const channelNameFromPath = req => {
     const { path, query } = req;
     const onlyMedia = isTruthy(query.only_media);
+    const allowLocalOnly = isTruthy(query.allow_local_only);
 
     switch(path) {
     case '/api/v1/streaming/user':
@@ -587,9 +588,10 @@ const startWorker = (workerId) => {
    * @param {function(string, string): void} output
    * @param {function(string[], function(string): void): void} attachCloseHandler
    * @param {boolean=} needsFiltering
+   * @param {boolean=} allowLocalOnly
    * @return {function(string): void}
    */
-  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
+  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, allowLocalOnly = false) => {
     const accountId  = req.accountId || req.remoteAddress;
 
     log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
@@ -610,6 +612,12 @@ const startWorker = (workerId) => {
         output(event, encodedPayload);
       };
 
+      // Only send local-only statuses to logged-in users
+      if (event === 'update' && payload.local_only && !(req.accountId && allowLocalOnly)) {
+        log.silly(req.requestId, `Message ${payload.id} filtered because it was local-only`);
+        return;
+      }
+
       // Only messages that may require filtering are statuses, since notifications
       // are already personalized and deletes do not matter
       if (!needsFiltering || event !== 'update') {
@@ -756,7 +764,7 @@ const startWorker = (workerId) => {
       const onSend = streamToHttp(req, res);
       const onEnd  = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
 
-      streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering);
+      streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.allowLocalOnly);
     }).catch(err => {
       log.verbose(req.requestId, 'Subscription error:', err.toString());
       httpNotFound(res);
@@ -801,63 +809,77 @@ const startWorker = (workerId) => {
     case 'user':
       resolve({
         channelIds: channelsForUserStream(req),
-        options: { needsFiltering: false },
+        options: { needsFiltering: false, allowLocalOnly: true },
       });
 
       break;
     case 'user:notification':
       resolve({
         channelIds: [`timeline:${req.accountId}:notifications`],
-        options: { needsFiltering: false },
+        options: { needsFiltering: false, allowLocalOnly: true },
       });
 
       break;
     case 'public':
       resolve({
         channelIds: ['timeline:public'],
-        options: { needsFiltering: true },
+        options: { needsFiltering: true, allowLocalOnly: isTruthy(params.allow_local_only) },
+      });
+
+      break;
+    case 'public:allow_local_only':
+      resolve({
+        channelIds: ['timeline:public'],
+        options: { needsFiltering: true, allowLocalOnly: true },
       });
 
       break;
     case 'public:local':
       resolve({
         channelIds: ['timeline:public:local'],
-        options: { needsFiltering: true },
+        options: { needsFiltering: true, allowLocalOnly: true },
       });
 
       break;
     case 'public:remote':
       resolve({
         channelIds: ['timeline:public:remote'],
-        options: { needsFiltering: true },
+        options: { needsFiltering: true, allowLocalOnly: false },
       });
 
       break;
     case 'public:media':
       resolve({
         channelIds: ['timeline:public:media'],
-        options: { needsFiltering: true },
+        options: { needsFiltering: true, allowLocalOnly: isTruthy(query.allow_local_only) },
+      });
+
+      break;
+    case 'public:allow_local_only:media':
+      resolve({
+        channelIds: ['timeline:public:media'],
+        options: { needsFiltering: true, allowLocalOnly: true },
       });
 
       break;
     case 'public:local:media':
       resolve({
         channelIds: ['timeline:public:local:media'],
-        options: { needsFiltering: true },
+        options: { needsFiltering: true, allowLocalOnly: true },
       });
 
       break;
     case 'public:remote:media':
       resolve({
         channelIds: ['timeline:public:remote:media'],
-        options: { needsFiltering: true },
+        options: { needsFiltering: true, allowLocalOnly: false },
       });
 
       break;
     case 'direct':
       resolve({
         channelIds: [`timeline:direct:${req.accountId}`],
-        options: { needsFiltering: false },
+        options: { needsFiltering: false, allowLocalOnly: true },
       });
 
       break;
@@ -867,7 +889,7 @@ const startWorker = (workerId) => {
       } else {
         resolve({
           channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}`],
-          options: { needsFiltering: true },
+          options: { needsFiltering: true, allowLocalOnly: true },
         });
       }
 
@@ -878,7 +900,7 @@ const startWorker = (workerId) => {
       } else {
         resolve({
           channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}:local`],
-          options: { needsFiltering: true },
+          options: { needsFiltering: true, allowLocalOnly: true },
         });
       }
 
@@ -887,7 +909,7 @@ const startWorker = (workerId) => {
       authorizeListAccess(params.list, req).then(() => {
         resolve({
           channelIds: [`timeline:list:${params.list}`],
-          options: { needsFiltering: false },
+          options: { needsFiltering: false, allowLocalOnly: true },
         });
       }).catch(() => {
         reject('Not authorized to stream this list');
@@ -934,7 +956,8 @@ const startWorker = (workerId) => {
 
       const onSend        = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
       const stopHeartbeat = subscriptionHeartbeat(channelIds);
-      const listener      = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
+
+      const listener      = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.allowLocalOnly);
 
       subscriptions[channelIds.join(';')] = {
         listener,