about summary refs log tree commit diff
path: root/streaming
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2017-02-02 13:56:14 +0100
committerEugen Rochko <eugen@zeonfederated.com>2017-02-02 13:56:14 +0100
commit1ee4a17f3792669d3f03ddcb060aa48b622eca61 (patch)
treecd0107cd7b884cbb305080defeb2034952536f1a /streaming
parent0cfd3188b443beee43bb86739c614ddb755a62e4 (diff)
Add logging and filtering to the node.js streaming API
Diffstat (limited to 'streaming')
-rw-r--r--streaming/index.js55
1 files changed, 46 insertions, 9 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 70067d2f6..945e287f5 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -2,6 +2,7 @@ import dotenv from 'dotenv'
 import express from 'express'
 import redis from 'redis'
 import pg from 'pg'
+import log from 'npmlog'
 
 dotenv.config()
 
@@ -40,6 +41,7 @@ const authenticationMiddleware = (req, res, next) => {
 
   pgPool.connect((err, client, done) => {
     if (err) {
+      log.error(err)
       return next(err)
     }
 
@@ -47,6 +49,7 @@ const authenticationMiddleware = (req, res, next) => {
       done()
 
       if (err) {
+        log.error(err)
         return next(err)
       }
 
@@ -66,10 +69,12 @@ const authenticationMiddleware = (req, res, next) => {
 
 const errorMiddleware = (err, req, res, next) => {
   res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' })
-  res.end(JSON.stringify({ error: `${err}` }))
+  res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occured' }))
 }
 
-const streamFrom = (id, res) => {
+const streamFrom = (id, req, res, needsFiltering = false) => {
+  log.verbose(`Starting stream from ${id} for ${req.accountId}`)
+
   res.setHeader('Content-Type', 'text/event-stream')
   res.setHeader('Transfer-Encoding', 'chunked')
 
@@ -78,11 +83,40 @@ const streamFrom = (id, res) => {
   redisClient.on('message', (channel, message) => {
     const { event, payload } = JSON.parse(message)
 
-    res.write(`event: ${event}\n`)
-    res.write(`data: ${payload}\n\n`)
+    if (needsFiltering) {
+      pgPool.connect((err, client, done) => {
+        if (err) {
+          log.error(err)
+          return
+        }
+
+        const unpackedPayload  = JSON.parse(payload)
+        const targetAccountIds = [unpackedPayload.account.id] + unpackedPayload.mentions.map(item => item.id) + (unpackedPayload.reblog ? unpackedPayload.reblog.account.id : [])
+
+        client.query('SELECT target_account_id FROM blocks WHERE account_id = $1 AND target_account_id IN ($2)', [req.accountId, targetAccountIds], (err, result) => {
+          done()
+
+          if (err) {
+            log.error(err)
+            return
+          }
+
+          if (result.rows.length > 0) {
+            return
+          }
+
+          res.write(`event: ${event}\n`)
+          res.write(`data: ${payload}\n\n`)
+        })
+      })
+    } else {
+      res.write(`event: ${event}\n`)
+      res.write(`data: ${payload}\n\n`)
+    }
   })
 
-  setInterval(() => res.write('\n'), 15000)
+  // Heartbeat to keep connection alive
+  setInterval(() => res.write(':thump\n'), 15000)
 
   redisClient.subscribe(id)
 }
@@ -90,8 +124,11 @@ const streamFrom = (id, res) => {
 app.use(authenticationMiddleware)
 app.use(errorMiddleware)
 
-app.get('/api/v1/streaming/user',    (req, res) => streamFrom(`timeline:${req.accountId}`, res))
-app.get('/api/v1/streaming/public',  (_, res)   => streamFrom('timeline:public', res))
-app.get('/api/v1/streaming/hashtag', (req, res) => streamFrom(`timeline:hashtag:${req.params.tag}`, res))
+app.get('/api/v1/streaming/user',    (req, res) => streamFrom(`timeline:${req.accountId}`, req, res))
+app.get('/api/v1/streaming/public',  (req, res) => streamFrom('timeline:public', req, res, true))
+app.get('/api/v1/streaming/hashtag', (req, res) => streamFrom(`timeline:hashtag:${req.params.tag}`, req, res, true))
+
+log.level = 'verbose'
+log.info(`Starting HTTP server on port ${process.env.PORT || 4000}`)
 
-app.listen(4000)
+app.listen(process.env.PORT || 4000)