about summary refs log tree commit diff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/index.js52
1 files changed, 37 insertions, 15 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 791a26941..de2175144 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -356,6 +356,7 @@ const startWorker = (workerId) => {
   const channelNameFromPath = req => {
     const { path, query } = req;
     const onlyMedia = isTruthy(query.only_media);
+    const allowLocalOnly = isTruthy(query.allow_local_only);
 
     switch(path) {
     case '/api/v1/streaming/user':
@@ -526,9 +527,10 @@ const startWorker = (workerId) => {
    * @param {function(string[], function(string): void): void} attachCloseHandler
    * @param {boolean=} needsFiltering
    * @param {boolean=} notificationOnly
+   * @param {boolean=} allowLocalOnly
    * @return {function(string): void}
    */
-  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
+  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false, allowLocalOnly = false) => {
     const accountId  = req.accountId || req.remoteAddress;
     const streamType = notificationOnly ? ' (notification)' : '';
 
@@ -556,6 +558,12 @@ const startWorker = (workerId) => {
         return;
       }
 
+      // Only send local-only statuses to logged-in users
+      if (event === 'update' && payload.local_only && !(req.accountId && allowLocalOnly)) {
+        log.silly(req.requestId, `Message ${payload.id} filtered because it was local-only`);
+        return;
+      }
+
       // Only messages that may require filtering are statuses, since notifications
       // are already personalized and deletes do not matter
       if (!needsFiltering || event !== 'update') {
@@ -702,7 +710,7 @@ const startWorker = (workerId) => {
       const onSend = streamToHttp(req, res);
       const onEnd  = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
 
-      streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly);
+      streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly, options.allowLocalOnly);
     }).catch(err => {
       log.verbose(req.requestId, 'Subscription error:', err.toString());
       httpNotFound(res);
@@ -729,63 +737,77 @@ const startWorker = (workerId) => {
     case 'user':
       resolve({
         channelIds: req.deviceId ? [`timeline:${req.accountId}`, `timeline:${req.accountId}:${req.deviceId}`] : [`timeline:${req.accountId}`],
-        options: { needsFiltering: false, notificationOnly: false },
+        options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true },
       });
 
       break;
     case 'user:notification':
       resolve({
         channelIds: [`timeline:${req.accountId}`],
-        options: { needsFiltering: false, notificationOnly: true },
+        options: { needsFiltering: false, notificationOnly: true, allowLocalOnly: true },
       });
 
       break;
     case 'public':
       resolve({
         channelIds: ['timeline:public'],
-        options: { needsFiltering: true, notificationOnly: false },
+        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(params.allow_local_only) },
+      });
+
+      break;
+    case 'public:allow_local_only':
+      resolve({
+        channelIds: ['timeline:public'],
+        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
       });
 
       break;
     case 'public:local':
       resolve({
         channelIds: ['timeline:public:local'],
-        options: { needsFiltering: true, notificationOnly: false },
+        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
       });
 
       break;
     case 'public:remote':
       resolve({
         channelIds: ['timeline:public:remote'],
-        options: { needsFiltering: true, notificationOnly: false },
+        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false },
       });
 
       break;
     case 'public:media':
       resolve({
         channelIds: ['timeline:public:media'],
-        options: { needsFiltering: true, notificationOnly: false },
+        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: isTruthy(query.allow_local_only) },
+      });
+
+      break;
+    case 'public:allow_local_only:media':
+      resolve({
+        channelIds: ['timeline:public:media'],
+        options: { needsFiltering: true, notificationsOnly: false, allowLocalOnly: true },
       });
 
       break;
     case 'public:local:media':
       resolve({
         channelIds: ['timeline:public:local:media'],
-        options: { needsFiltering: true, notificationOnly: false },
+        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
       });
 
       break;
     case 'public:remote:media':
       resolve({
         channelIds: ['timeline:public:remote:media'],
-        options: { needsFiltering: true, notificationOnly: false },
+        options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: false },
       });
 
       break;
     case 'direct':
       resolve({
         channelIds: [`timeline:direct:${req.accountId}`],
-        options: { needsFiltering: false, notificationOnly: false },
+        options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true },
       });
 
       break;
@@ -795,7 +817,7 @@ const startWorker = (workerId) => {
       } else {
         resolve({
           channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}`],
-          options: { needsFiltering: true, notificationOnly: false },
+          options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
         });
       }
 
@@ -806,7 +828,7 @@ const startWorker = (workerId) => {
       } else {
         resolve({
           channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}:local`],
-          options: { needsFiltering: true, notificationOnly: false },
+          options: { needsFiltering: true, notificationOnly: false, allowLocalOnly: true },
         });
       }
 
@@ -815,7 +837,7 @@ const startWorker = (workerId) => {
       authorizeListAccess(params.list, req).then(() => {
         resolve({
           channelIds: [`timeline:list:${params.list}`],
-          options: { needsFiltering: false, notificationOnly: false },
+          options: { needsFiltering: false, notificationOnly: false, allowLocalOnly: true },
         });
       }).catch(() => {
         reject('Not authorized to stream this list');
@@ -862,7 +884,7 @@ const startWorker = (workerId) => {
 
       const onSend        = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
       const stopHeartbeat = subscriptionHeartbeat(channelIds);
-      const listener      = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly);
+      const listener      = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly, options.allowLocalOnly);
 
       subscriptions[channelIds.join(';')] = {
         listener,