diff options
Diffstat (limited to 'streaming/index.js')
-rw-r--r-- | streaming/index.js | 188 |
1 files changed, 154 insertions, 34 deletions
diff --git a/streaming/index.js b/streaming/index.js index 43d8895f1..e2e8f943e 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -1,8 +1,12 @@ import dotenv from 'dotenv' import express from 'express' +import http from 'http' import redis from 'redis' import pg from 'pg' import log from 'npmlog' +import url from 'url' +import WebSocket from 'ws' +import uuid from 'uuid' const env = process.env.NODE_ENV || 'development' @@ -27,21 +31,27 @@ const pgConfigs = { } } -const app = express() +const app = express() const pgPool = new pg.Pool(pgConfigs[env]) +const server = http.createServer(app) +const wss = new WebSocket.Server({ server }) -const authenticationMiddleware = (req, res, next) => { - const authorization = req.get('Authorization') +const allowCrossDomain = (req, res, next) => { + res.header('Access-Control-Allow-Origin', '*') + res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control') + res.header('Access-Control-Allow-Methods', 'GET, OPTIONS') - if (!authorization) { - err = new Error('Missing access token') - err.statusCode = 401 + next() +} - return next(err) - } +const setRequestId = (req, res, next) => { + req.requestId = uuid.v4() + res.header('X-Request-Id', req.requestId) - const token = authorization.replace(/^Bearer /, '') + next() +} +const accountFromToken = (token, req, next) => { pgPool.connect((err, client, done) => { if (err) { return next(err) @@ -68,28 +78,46 @@ const authenticationMiddleware = (req, res, next) => { }) } +const authenticationMiddleware = (req, res, next) => { + if (req.method === 'OPTIONS') { + return next() + } + + const authorization = req.get('Authorization') + + if (!authorization) { + const err = new Error('Missing access token') + err.statusCode = 401 + + return next(err) + } + + const token = authorization.replace(/^Bearer /, '') + + accountFromToken(token, req, next) +} + const errorMiddleware = (err, req, res, next) => { - log.error(err) + log.error(req.requestId, err) res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' }) - res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occured' })) + res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' })) } const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); -const streamFrom = (id, req, res, needsFiltering = false) => { - log.verbose(`Starting stream from ${id} for ${req.accountId}`) +const streamFrom = (redisClient, id, req, output, needsFiltering = false) => { + log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`) - res.setHeader('Content-Type', 'text/event-stream') - res.setHeader('Transfer-Encoding', 'chunked') + redisClient.on('message', (channel, message) => { + const { event, payload, queued_at } = JSON.parse(message) - const redisClient = redis.createClient({ - host: process.env.REDIS_HOST || '127.0.0.1', - port: process.env.REDIS_PORT || 6379, - password: process.env.REDIS_PASSWORD - }) + const transmit = () => { + const now = new Date().getTime() + const delta = now - queued_at; - redisClient.on('message', (channel, message) => { - const { event, payload } = JSON.parse(message) + log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`) + output(event, payload) + } // Only messages that may require filtering are statuses, since notifications // are already personalized and deletes do not matter @@ -115,35 +143,127 @@ const streamFrom = (id, req, res, needsFiltering = false) => { return } - res.write(`event: ${event}\n`) - res.write(`data: ${payload}\n\n`) + transmit() }) }) } else { - res.write(`event: ${event}\n`) - res.write(`data: ${payload}\n\n`) + transmit() } }) + redisClient.subscribe(id) +} + +// Setup stream output to HTTP +const streamToHttp = (req, res, redisClient) => { + res.setHeader('Content-Type', 'text/event-stream') + res.setHeader('Transfer-Encoding', 'chunked') + const heartbeat = setInterval(() => res.write(':thump\n'), 15000) req.on('close', () => { - log.verbose(`Ending stream from ${id} for ${req.accountId}`) + log.verbose(req.requestId, `Ending stream for ${req.accountId}`) clearInterval(heartbeat) redisClient.quit() }) - redisClient.subscribe(id) + return (event, payload) => { + res.write(`event: ${event}\n`) + res.write(`data: ${payload}\n\n`) + } +} + +// Setup stream output to WebSockets +const streamToWs = (req, ws, redisClient) => { + ws.on('close', () => { + log.verbose(req.requestId, `Ending stream for ${req.accountId}`) + redisClient.quit() + }) + + return (event, payload) => { + if (ws.readyState !== ws.OPEN) { + log.error(req.requestId, 'Tried writing to closed socket') + return + } + + ws.send(JSON.stringify({ event, payload })) + } } +// Get new redis connection +const getRedisClient = () => redis.createClient({ + host: process.env.REDIS_HOST || '127.0.0.1', + port: process.env.REDIS_PORT || 6379, + password: process.env.REDIS_PASSWORD +}) + +app.use(setRequestId) +app.use(allowCrossDomain) app.use(authenticationMiddleware) app.use(errorMiddleware) -app.get('/api/v1/streaming/user', (req, res) => streamFrom(`timeline:${req.accountId}`, req, res)) -app.get('/api/v1/streaming/public', (req, res) => streamFrom('timeline:public', req, res, true)) -app.get('/api/v1/streaming/hashtag', (req, res) => streamFrom(`timeline:hashtag:${req.params.tag}`, req, res, true)) +app.get('/api/v1/streaming/user', (req, res) => { + const redisClient = getRedisClient() + streamFrom(redisClient, `timeline:${req.accountId}`, req, streamToHttp(req, res, redisClient)) +}) + +app.get('/api/v1/streaming/public', (req, res) => { + const redisClient = getRedisClient() + streamFrom(redisClient, 'timeline:public', req, streamToHttp(req, res, redisClient), true) +}) + +app.get('/api/v1/streaming/public/local', (req, res) => { + const redisClient = getRedisClient() + streamFrom(redisClient, 'timeline:public:local', req, streamToHttp(req, res, redisClient), true) +}) + +app.get('/api/v1/streaming/hashtag', (req, res) => { + const redisClient = getRedisClient() + streamFrom(redisClient, `timeline:hashtag:${req.params.tag}`, req, streamToHttp(req, res, redisClient), true) +}) + +app.get('/api/v1/streaming/hashtag/local', (req, res) => { + const redisClient = getRedisClient() + streamFrom(redisClient, `timeline:hashtag:${req.params.tag}:local`, req, streamToHttp(req, res, redisClient), true) +}) + +wss.on('connection', ws => { + const location = url.parse(ws.upgradeReq.url, true) + const token = location.query.access_token + const req = { requestId: uuid.v4() } + + accountFromToken(token, req, err => { + if (err) { + log.error(req.requestId, err) + ws.close() + return + } -log.level = 'verbose' -log.info(`Starting HTTP server on port ${process.env.PORT || 4000}`) + const redisClient = getRedisClient() + + switch(location.query.stream) { + case 'user': + streamFrom(redisClient, `timeline:${req.accountId}`, req, streamToWs(req, ws, redisClient)) + break; + case 'public': + streamFrom(redisClient, 'timeline:public', req, streamToWs(req, ws, redisClient), true) + break; + case 'public:local': + streamFrom(redisClient, 'timeline:public:local', req, streamToWs(req, ws, redisClient), true) + break; + case 'hashtag': + streamFrom(redisClient, `timeline:hashtag:${location.query.tag}`, req, streamToWs(req, ws, redisClient), true) + break; + case 'hashtag:local': + streamFrom(redisClient, `timeline:hashtag:${location.query.tag}:local`, req, streamToWs(req, ws, redisClient), true) + break; + default: + ws.close() + } + }) +}) -app.listen(process.env.PORT || 4000) +server.listen(process.env.PORT || 4000, () => { + log.level = process.env.LOG_LEVEL || 'verbose' + log.info(`Starting streaming API server on port ${server.address().port}`) +}) |