about summary refs log tree commit diff
path: root/streaming/index.js
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/index.js')
-rw-r--r--streaming/index.js188
1 files changed, 154 insertions, 34 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 43d8895f1..e2e8f943e 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -1,8 +1,12 @@
 import dotenv from 'dotenv'
 import express from 'express'
+import http from 'http'
 import redis from 'redis'
 import pg from 'pg'
 import log from 'npmlog'
+import url from 'url'
+import WebSocket from 'ws'
+import uuid from 'uuid'
 
 const env = process.env.NODE_ENV || 'development'
 
@@ -27,21 +31,27 @@ const pgConfigs = {
   }
 }
 
-const app = express()
+const app    = express()
 const pgPool = new pg.Pool(pgConfigs[env])
+const server = http.createServer(app)
+const wss    = new WebSocket.Server({ server })
 
-const authenticationMiddleware = (req, res, next) => {
-  const authorization = req.get('Authorization')
+const allowCrossDomain = (req, res, next) => {
+  res.header('Access-Control-Allow-Origin', '*')
+  res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control')
+  res.header('Access-Control-Allow-Methods', 'GET, OPTIONS')
 
-  if (!authorization) {
-    err = new Error('Missing access token')
-    err.statusCode = 401
+  next()
+}
 
-    return next(err)
-  }
+const setRequestId = (req, res, next) => {
+  req.requestId = uuid.v4()
+  res.header('X-Request-Id', req.requestId)
 
-  const token = authorization.replace(/^Bearer /, '')
+  next()
+}
 
+const accountFromToken = (token, req, next) => {
   pgPool.connect((err, client, done) => {
     if (err) {
       return next(err)
@@ -68,28 +78,46 @@ const authenticationMiddleware = (req, res, next) => {
   })
 }
 
+const authenticationMiddleware = (req, res, next) => {
+  if (req.method === 'OPTIONS') {
+    return next()
+  }
+
+  const authorization = req.get('Authorization')
+
+  if (!authorization) {
+    const err = new Error('Missing access token')
+    err.statusCode = 401
+
+    return next(err)
+  }
+
+  const token = authorization.replace(/^Bearer /, '')
+
+  accountFromToken(token, req, next)
+}
+
 const errorMiddleware = (err, req, res, next) => {
-  log.error(err)
+  log.error(req.requestId, err)
   res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' })
-  res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occured' }))
+  res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' }))
 }
 
 const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
 
-const streamFrom = (id, req, res, needsFiltering = false) => {
-  log.verbose(`Starting stream from ${id} for ${req.accountId}`)
+const streamFrom = (redisClient, id, req, output, needsFiltering = false) => {
+  log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`)
 
-  res.setHeader('Content-Type', 'text/event-stream')
-  res.setHeader('Transfer-Encoding', 'chunked')
+  redisClient.on('message', (channel, message) => {
+    const { event, payload, queued_at } = JSON.parse(message)
 
-  const redisClient = redis.createClient({
-    host:     process.env.REDIS_HOST     || '127.0.0.1',
-    port:     process.env.REDIS_PORT     || 6379,
-    password: process.env.REDIS_PASSWORD
-  })
+    const transmit = () => {
+      const now   = new Date().getTime()
+      const delta = now - queued_at;
 
-  redisClient.on('message', (channel, message) => {
-    const { event, payload } = JSON.parse(message)
+      log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`)
+      output(event, payload)
+    }
 
     // Only messages that may require filtering are statuses, since notifications
     // are already personalized and deletes do not matter
@@ -115,35 +143,127 @@ const streamFrom = (id, req, res, needsFiltering = false) => {
             return
           }
 
-          res.write(`event: ${event}\n`)
-          res.write(`data: ${payload}\n\n`)
+          transmit()
         })
       })
     } else {
-      res.write(`event: ${event}\n`)
-      res.write(`data: ${payload}\n\n`)
+      transmit()
     }
   })
 
+  redisClient.subscribe(id)
+}
+
+// Setup stream output to HTTP
+const streamToHttp = (req, res, redisClient) => {
+  res.setHeader('Content-Type', 'text/event-stream')
+  res.setHeader('Transfer-Encoding', 'chunked')
+
   const heartbeat = setInterval(() => res.write(':thump\n'), 15000)
 
   req.on('close', () => {
-    log.verbose(`Ending stream from ${id} for ${req.accountId}`)
+    log.verbose(req.requestId, `Ending stream for ${req.accountId}`)
     clearInterval(heartbeat)
     redisClient.quit()
   })
 
-  redisClient.subscribe(id)
+  return (event, payload) => {
+    res.write(`event: ${event}\n`)
+    res.write(`data: ${payload}\n\n`)
+  }
+}
+
+// Setup stream output to WebSockets
+const streamToWs = (req, ws, redisClient) => {
+  ws.on('close', () => {
+    log.verbose(req.requestId, `Ending stream for ${req.accountId}`)
+    redisClient.quit()
+  })
+
+  return (event, payload) => {
+    if (ws.readyState !== ws.OPEN) {
+      log.error(req.requestId, 'Tried writing to closed socket')
+      return
+    }
+
+    ws.send(JSON.stringify({ event, payload }))
+  }
 }
 
+// Get new redis connection
+const getRedisClient = () => redis.createClient({
+  host:     process.env.REDIS_HOST     || '127.0.0.1',
+  port:     process.env.REDIS_PORT     || 6379,
+  password: process.env.REDIS_PASSWORD
+})
+
+app.use(setRequestId)
+app.use(allowCrossDomain)
 app.use(authenticationMiddleware)
 app.use(errorMiddleware)
 
-app.get('/api/v1/streaming/user',    (req, res) => streamFrom(`timeline:${req.accountId}`, req, res))
-app.get('/api/v1/streaming/public',  (req, res) => streamFrom('timeline:public', req, res, true))
-app.get('/api/v1/streaming/hashtag', (req, res) => streamFrom(`timeline:hashtag:${req.params.tag}`, req, res, true))
+app.get('/api/v1/streaming/user', (req, res) => {
+  const redisClient = getRedisClient()
+  streamFrom(redisClient, `timeline:${req.accountId}`, req, streamToHttp(req, res, redisClient))
+})
+
+app.get('/api/v1/streaming/public', (req, res) => {
+  const redisClient = getRedisClient()
+  streamFrom(redisClient, 'timeline:public', req, streamToHttp(req, res, redisClient), true)
+})
+
+app.get('/api/v1/streaming/public/local', (req, res) => {
+  const redisClient = getRedisClient()
+  streamFrom(redisClient, 'timeline:public:local', req, streamToHttp(req, res, redisClient), true)
+})
+
+app.get('/api/v1/streaming/hashtag', (req, res) => {
+  const redisClient = getRedisClient()
+  streamFrom(redisClient, `timeline:hashtag:${req.params.tag}`, req, streamToHttp(req, res, redisClient), true)
+})
+
+app.get('/api/v1/streaming/hashtag/local', (req, res) => {
+  const redisClient = getRedisClient()
+  streamFrom(redisClient, `timeline:hashtag:${req.params.tag}:local`, req, streamToHttp(req, res, redisClient), true)
+})
+
+wss.on('connection', ws => {
+  const location = url.parse(ws.upgradeReq.url, true)
+  const token    = location.query.access_token
+  const req      = { requestId: uuid.v4() }
+
+  accountFromToken(token, req, err => {
+    if (err) {
+      log.error(req.requestId, err)
+      ws.close()
+      return
+    }
 
-log.level = 'verbose'
-log.info(`Starting HTTP server on port ${process.env.PORT || 4000}`)
+    const redisClient = getRedisClient()
+
+    switch(location.query.stream) {
+    case 'user':
+      streamFrom(redisClient, `timeline:${req.accountId}`, req, streamToWs(req, ws, redisClient))
+      break;
+    case 'public':
+      streamFrom(redisClient, 'timeline:public', req, streamToWs(req, ws, redisClient), true)
+      break;
+    case 'public:local':
+      streamFrom(redisClient, 'timeline:public:local', req, streamToWs(req, ws, redisClient), true)
+      break;
+    case 'hashtag':
+      streamFrom(redisClient, `timeline:hashtag:${location.query.tag}`, req, streamToWs(req, ws, redisClient), true)
+      break;
+    case 'hashtag:local':
+      streamFrom(redisClient, `timeline:hashtag:${location.query.tag}:local`, req, streamToWs(req, ws, redisClient), true)
+      break;
+    default:
+      ws.close()
+    }
+  })
+})
 
-app.listen(process.env.PORT || 4000)
+server.listen(process.env.PORT || 4000, () => {
+  log.level = process.env.LOG_LEVEL || 'verbose'
+  log.info(`Starting streaming API server on port ${server.address().port}`)
+})