about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2017-02-05 23:37:25 +0100
committerEugen Rochko <eugen@zeonfederated.com>2017-02-05 23:37:25 +0100
commit4d2be9f43273d010d683a0fccda97974b7249579 (patch)
treed5bde631d933709eb8e4c4c250961a965e91aac4
parent0af3401553bfc8366779bf64ebe257c962eb82eb (diff)
Add unique request IDs to streaming API to improve logs
-rw-r--r--package.json1
-rw-r--r--streaming/index.js28
-rw-r--r--yarn.lock4
3 files changed, 26 insertions, 7 deletions
diff --git a/package.json b/package.json
index def42f596..9f2bd3df9 100644
--- a/package.json
+++ b/package.json
@@ -66,6 +66,7 @@
     "sinon": "^1.17.6",
     "style-loader": "^0.13.1",
     "utf-8-validate": "^3.0.0",
+    "uuid": "^3.0.1",
     "webpack": "^1.14.0",
     "websocket.js": "^0.1.7",
     "ws": "^2.0.2"
diff --git a/streaming/index.js b/streaming/index.js
index 4f0df1ea5..49686b859 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -6,6 +6,7 @@ import pg from 'pg'
 import log from 'npmlog'
 import url from 'url'
 import WebSocket from 'ws'
+import uuid from 'uuid'
 
 const env = process.env.NODE_ENV || 'development'
 
@@ -43,6 +44,13 @@ const allowCrossDomain = (req, res, next) => {
   next()
 }
 
+const setRequestId = (req, res, next) => {
+  req.requestId = uuid.v4()
+  res.header('X-Request-Id', req.requestId)
+
+  next()
+}
+
 const accountFromToken = (token, req, next) => {
   pgPool.connect((err, client, done) => {
     if (err) {
@@ -90,7 +98,7 @@ const authenticationMiddleware = (req, res, next) => {
 }
 
 const errorMiddleware = (err, req, res, next) => {
-  log.error(err)
+  log.error(req.requestId, err)
   res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' })
   res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' }))
 }
@@ -98,7 +106,7 @@ const errorMiddleware = (err, req, res, next) => {
 const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
 
 const streamFrom = (redisClient, id, req, output, needsFiltering = false) => {
-  log.verbose(`Starting stream from ${id} for ${req.accountId}`)
+  log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`)
 
   redisClient.on('message', (channel, message) => {
     const { event, payload, queued_at } = JSON.parse(message)
@@ -107,7 +115,7 @@ const streamFrom = (redisClient, id, req, output, needsFiltering = false) => {
       const now   = new Date().getTime()
       const delta = now - queued_at;
 
-      log.silly(`Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`)
+      log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`)
       output(event, payload)
     }
 
@@ -154,7 +162,7 @@ const streamToHttp = (req, res, redisClient) => {
   const heartbeat = setInterval(() => res.write(':thump\n'), 15000)
 
   req.on('close', () => {
-    log.verbose(`Ending stream for ${req.accountId}`)
+    log.verbose(req.requestId, `Ending stream for ${req.accountId}`)
     clearInterval(heartbeat)
     redisClient.quit()
   })
@@ -168,11 +176,16 @@ const streamToHttp = (req, res, redisClient) => {
 // Setup stream output to WebSockets
 const streamToWs = (req, ws, redisClient) => {
   ws.on('close', () => {
-    log.verbose(`Ending stream for ${req.accountId}`)
+    log.verbose(req.requestId, `Ending stream for ${req.accountId}`)
     redisClient.quit()
   })
 
   return (event, payload) => {
+    if (ws.readyState !== ws.OPEN) {
+      log.error(req.requestId, 'Tried writing to closed socket')
+      return
+    }
+
     ws.send(JSON.stringify({ event, payload }))
   }
 }
@@ -184,6 +197,7 @@ const getRedisClient = () => redis.createClient({
   password: process.env.REDIS_PASSWORD
 })
 
+app.use(setRequestId)
 app.use(allowCrossDomain)
 app.use(authenticationMiddleware)
 app.use(errorMiddleware)
@@ -206,11 +220,11 @@ app.get('/api/v1/streaming/hashtag', (req, res) => {
 wss.on('connection', ws => {
   const location = url.parse(ws.upgradeReq.url, true)
   const token    = location.query.access_token
-  const req      = {}
+  const req      = { requestId: uuid.v4() }
 
   accountFromToken(token, req, err => {
     if (err) {
-      log.error(err)
+      log.error(req.requestId, err)
       ws.close()
       return
     }
diff --git a/yarn.lock b/yarn.lock
index 8038411fe..89236d45a 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -5644,6 +5644,10 @@ uuid@^2.0.1, uuid@^2.0.2:
   version "2.0.3"
   resolved "https://registry.yarnpkg.com/uuid/-/uuid-2.0.3.tgz#67e2e863797215530dff318e5bf9dcebfd47b21a"
 
+uuid@^3.0.1:
+  version "3.0.1"
+  resolved "https://registry.yarnpkg.com/uuid/-/uuid-3.0.1.tgz#6544bba2dfda8c1cf17e629a3a305e2bb1fee6c1"
+
 v8flags@^2.0.10:
   version "2.0.11"
   resolved "https://registry.yarnpkg.com/v8flags/-/v8flags-2.0.11.tgz#bca8f30f0d6d60612cc2c00641e6962d42ae6881"