diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/index.js | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/streaming/index.js b/streaming/index.js index 4f0df1ea5..49686b859 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -6,6 +6,7 @@ import pg from 'pg' import log from 'npmlog' import url from 'url' import WebSocket from 'ws' +import uuid from 'uuid' const env = process.env.NODE_ENV || 'development' @@ -43,6 +44,13 @@ const allowCrossDomain = (req, res, next) => { next() } +const setRequestId = (req, res, next) => { + req.requestId = uuid.v4() + res.header('X-Request-Id', req.requestId) + + next() +} + const accountFromToken = (token, req, next) => { pgPool.connect((err, client, done) => { if (err) { @@ -90,7 +98,7 @@ const authenticationMiddleware = (req, res, next) => { } const errorMiddleware = (err, req, res, next) => { - log.error(err) + log.error(req.requestId, err) res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' }) res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' })) } @@ -98,7 +106,7 @@ const errorMiddleware = (err, req, res, next) => { const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); const streamFrom = (redisClient, id, req, output, needsFiltering = false) => { - log.verbose(`Starting stream from ${id} for ${req.accountId}`) + log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`) redisClient.on('message', (channel, message) => { const { event, payload, queued_at } = JSON.parse(message) @@ -107,7 +115,7 @@ const streamFrom = (redisClient, id, req, output, needsFiltering = false) => { const now = new Date().getTime() const delta = now - queued_at; - log.silly(`Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`) + log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`) output(event, payload) } @@ -154,7 +162,7 @@ const streamToHttp = (req, res, redisClient) => { const heartbeat = setInterval(() => res.write(':thump\n'), 15000) req.on('close', () => { - log.verbose(`Ending stream for ${req.accountId}`) + log.verbose(req.requestId, `Ending stream for ${req.accountId}`) clearInterval(heartbeat) redisClient.quit() }) @@ -168,11 +176,16 @@ const streamToHttp = (req, res, redisClient) => { // Setup stream output to WebSockets const streamToWs = (req, ws, redisClient) => { ws.on('close', () => { - log.verbose(`Ending stream for ${req.accountId}`) + log.verbose(req.requestId, `Ending stream for ${req.accountId}`) redisClient.quit() }) return (event, payload) => { + if (ws.readyState !== ws.OPEN) { + log.error(req.requestId, 'Tried writing to closed socket') + return + } + ws.send(JSON.stringify({ event, payload })) } } @@ -184,6 +197,7 @@ const getRedisClient = () => redis.createClient({ password: process.env.REDIS_PASSWORD }) +app.use(setRequestId) app.use(allowCrossDomain) app.use(authenticationMiddleware) app.use(errorMiddleware) @@ -206,11 +220,11 @@ app.get('/api/v1/streaming/hashtag', (req, res) => { wss.on('connection', ws => { const location = url.parse(ws.upgradeReq.url, true) const token = location.query.access_token - const req = {} + const req = { requestId: uuid.v4() } accountFromToken(token, req, err => { if (err) { - log.error(err) + log.error(req.requestId, err) ws.close() return } |