diff options
-rw-r--r-- | package.json | 1 | ||||
-rw-r--r-- | streaming/index.js | 28 | ||||
-rw-r--r-- | yarn.lock | 4 |
3 files changed, 26 insertions, 7 deletions
diff --git a/package.json b/package.json index def42f596..9f2bd3df9 100644 --- a/package.json +++ b/package.json @@ -66,6 +66,7 @@ "sinon": "^1.17.6", "style-loader": "^0.13.1", "utf-8-validate": "^3.0.0", + "uuid": "^3.0.1", "webpack": "^1.14.0", "websocket.js": "^0.1.7", "ws": "^2.0.2" diff --git a/streaming/index.js b/streaming/index.js index 4f0df1ea5..49686b859 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -6,6 +6,7 @@ import pg from 'pg' import log from 'npmlog' import url from 'url' import WebSocket from 'ws' +import uuid from 'uuid' const env = process.env.NODE_ENV || 'development' @@ -43,6 +44,13 @@ const allowCrossDomain = (req, res, next) => { next() } +const setRequestId = (req, res, next) => { + req.requestId = uuid.v4() + res.header('X-Request-Id', req.requestId) + + next() +} + const accountFromToken = (token, req, next) => { pgPool.connect((err, client, done) => { if (err) { @@ -90,7 +98,7 @@ const authenticationMiddleware = (req, res, next) => { } const errorMiddleware = (err, req, res, next) => { - log.error(err) + log.error(req.requestId, err) res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' }) res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' })) } @@ -98,7 +106,7 @@ const errorMiddleware = (err, req, res, next) => { const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); const streamFrom = (redisClient, id, req, output, needsFiltering = false) => { - log.verbose(`Starting stream from ${id} for ${req.accountId}`) + log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`) redisClient.on('message', (channel, message) => { const { event, payload, queued_at } = JSON.parse(message) @@ -107,7 +115,7 @@ const streamFrom = (redisClient, id, req, output, needsFiltering = false) => { const now = new Date().getTime() const delta = now - queued_at; - log.silly(`Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`) + log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`) output(event, payload) } @@ -154,7 +162,7 @@ const streamToHttp = (req, res, redisClient) => { const heartbeat = setInterval(() => res.write(':thump\n'), 15000) req.on('close', () => { - log.verbose(`Ending stream for ${req.accountId}`) + log.verbose(req.requestId, `Ending stream for ${req.accountId}`) clearInterval(heartbeat) redisClient.quit() }) @@ -168,11 +176,16 @@ const streamToHttp = (req, res, redisClient) => { // Setup stream output to WebSockets const streamToWs = (req, ws, redisClient) => { ws.on('close', () => { - log.verbose(`Ending stream for ${req.accountId}`) + log.verbose(req.requestId, `Ending stream for ${req.accountId}`) redisClient.quit() }) return (event, payload) => { + if (ws.readyState !== ws.OPEN) { + log.error(req.requestId, 'Tried writing to closed socket') + return + } + ws.send(JSON.stringify({ event, payload })) } } @@ -184,6 +197,7 @@ const getRedisClient = () => redis.createClient({ password: process.env.REDIS_PASSWORD }) +app.use(setRequestId) app.use(allowCrossDomain) app.use(authenticationMiddleware) app.use(errorMiddleware) @@ -206,11 +220,11 @@ app.get('/api/v1/streaming/hashtag', (req, res) => { wss.on('connection', ws => { const location = url.parse(ws.upgradeReq.url, true) const token = location.query.access_token - const req = {} + const req = { requestId: uuid.v4() } accountFromToken(token, req, err => { if (err) { - log.error(err) + log.error(req.requestId, err) ws.close() return } diff --git a/yarn.lock b/yarn.lock index 8038411fe..89236d45a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5644,6 +5644,10 @@ uuid@^2.0.1, uuid@^2.0.2: version "2.0.3" resolved "https://registry.yarnpkg.com/uuid/-/uuid-2.0.3.tgz#67e2e863797215530dff318e5bf9dcebfd47b21a" +uuid@^3.0.1: + version "3.0.1" + resolved "https://registry.yarnpkg.com/uuid/-/uuid-3.0.1.tgz#6544bba2dfda8c1cf17e629a3a305e2bb1fee6c1" + v8flags@^2.0.10: version "2.0.11" resolved "https://registry.yarnpkg.com/v8flags/-/v8flags-2.0.11.tgz#bca8f30f0d6d60612cc2c00641e6962d42ae6881" |