about summary refs log tree commit diff
path: root/streaming/index.js
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/index.js')
-rw-r--r--streaming/index.js125
1 files changed, 66 insertions, 59 deletions
diff --git a/streaming/index.js b/streaming/index.js
index 45ea26bd6..94568ee9a 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -7,6 +7,7 @@ const express = require('express');
 const http = require('http');
 const redis = require('redis');
 const pg = require('pg');
+const dbUrlToConfig = require('pg-connection-string').parse;
 const log = require('npmlog');
 const url = require('url');
 const uuid = require('uuid');
@@ -15,7 +16,6 @@ const WebSocket = require('ws');
 const { JSDOM } = require('jsdom');
 
 const env = process.env.NODE_ENV || 'development';
-const alwaysRequireAuth = process.env.LIMITED_FEDERATION_MODE === 'true' || process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
 
 dotenv.config({
   path: env === 'production' ? '.env.production' : '.env',
@@ -24,43 +24,6 @@ dotenv.config({
 log.level = process.env.LOG_LEVEL || 'verbose';
 
 /**
- * @param {string} dbUrl
- * @return {Object.<string, any>}
- */
-const dbUrlToConfig = (dbUrl) => {
-  if (!dbUrl) {
-    return {};
-  }
-
-  const params = url.parse(dbUrl, true);
-  const config = {};
-
-  if (params.auth) {
-    [config.user, config.password] = params.auth.split(':');
-  }
-
-  if (params.hostname) {
-    config.host = params.hostname;
-  }
-
-  if (params.port) {
-    config.port = params.port;
-  }
-
-  if (params.pathname) {
-    config.database = params.pathname.split('/')[1];
-  }
-
-  const ssl = params.query && params.query.ssl;
-
-  if (ssl && ssl === 'true' || ssl === '1') {
-    config.ssl = true;
-  }
-
-  return config;
-};
-
-/**
  * @param {Object.<string, any>} defaultConfig
  * @param {string} redisUrl
  */
@@ -117,9 +80,10 @@ const startMaster = () => {
   log.warn(`Starting streaming API server master with ${numWorkers} workers`);
 };
 
-const startWorker = async (workerId) => {
-  log.warn(`Starting worker ${workerId}`);
-
+/**
+ * @return {Object.<string, any>}
+ */
+const pgConfigFromEnv = () => {
   const pgConfigs = {
     development: {
       user:     process.env.DB_USER || pg.defaults.user,
@@ -127,7 +91,6 @@ const startWorker = async (workerId) => {
       database: process.env.DB_NAME || 'mastodon_development',
       host:     process.env.DB_HOST || pg.defaults.host,
       port:     process.env.DB_PORT || pg.defaults.port,
-      max:      10,
     },
 
     production: {
@@ -136,20 +99,48 @@ const startWorker = async (workerId) => {
       database: process.env.DB_NAME || 'mastodon_production',
       host:     process.env.DB_HOST || 'localhost',
       port:     process.env.DB_PORT || 5432,
-      max:      10,
     },
   };
 
-  if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
-    pgConfigs.development.ssl = true;
-    pgConfigs.production.ssl = true;
+  let baseConfig;
+
+  if (process.env.DATABASE_URL) {
+    baseConfig = dbUrlToConfig(process.env.DATABASE_URL);
+  } else {
+    baseConfig = pgConfigs[env];
+
+    if (process.env.DB_SSLMODE) {
+      switch(process.env.DB_SSLMODE) {
+      case 'disable':
+      case '':
+        baseConfig.ssl = false;
+        break;
+      case 'no-verify':
+        baseConfig.ssl = { rejectUnauthorized: false };
+        break;
+      default:
+        baseConfig.ssl = {};
+        break;
+      }
+    }
   }
 
+  return {
+    ...baseConfig,
+    max: process.env.DB_POOL || 10,
+    connectionTimeoutMillis: 15000,
+    application_name: '',
+  };
+};
+
+const startWorker = async (workerId) => {
+  log.warn(`Starting worker ${workerId}`);
+
   const app = express();
 
   app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal');
 
-  const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL)));
+  const pgPool = new pg.Pool(pgConfigFromEnv());
   const server = http.createServer(app);
   const redisNamespace = process.env.REDIS_NAMESPACE || null;
 
@@ -355,22 +346,17 @@ const startWorker = async (workerId) => {
    * @param {boolean=} required
    * @return {Promise.<void>}
    */
-  const accountFromRequest = (req, required = true) => new Promise((resolve, reject) => {
+  const accountFromRequest = (req) => new Promise((resolve, reject) => {
     const authorization = req.headers.authorization;
     const location      = url.parse(req.url, true);
     const accessToken   = location.query.access_token || req.headers['sec-websocket-protocol'];
 
     if (!authorization && !accessToken) {
-      if (required) {
-        const err = new Error('Missing access token');
-        err.status = 401;
+      const err = new Error('Missing access token');
+      err.status = 401;
 
-        reject(err);
-        return;
-      } else {
-        resolve();
-        return;
-      }
+      reject(err);
+      return;
     }
 
     const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
@@ -474,7 +460,7 @@ const startWorker = async (workerId) => {
     // variables. OAuth scope checks are moved to the point of subscription
     // to a specific stream.
 
-    accountFromRequest(info.req, alwaysRequireAuth).then(() => {
+    accountFromRequest(info.req).then(() => {
       callback(true, undefined, undefined);
     }).catch(err => {
       log.error(info.req.requestId, err.toString());
@@ -548,7 +534,7 @@ const startWorker = async (workerId) => {
       return;
     }
 
-    accountFromRequest(req, alwaysRequireAuth).then(() => checkScopes(req, channelNameFromPath(req))).then(() => {
+    accountFromRequest(req).then(() => checkScopes(req, channelNameFromPath(req))).then(() => {
       subscribeHttpToSystemChannel(req, res);
     }).then(() => {
       next();
@@ -858,6 +844,27 @@ const startWorker = async (workerId) => {
     res.end('OK');
   });
 
+  app.get('/metrics', (req, res) => server.getConnections((err, count) => {
+    res.writeHeader(200, { 'Content-Type': 'application/openmetrics-text; version=1.0.0; charset=utf-8' });
+    res.write('# TYPE connected_clients gauge\n');
+    res.write('# HELP connected_clients The number of clients connected to the streaming server\n');
+    res.write(`connected_clients ${count}.0\n`);
+    res.write('# TYPE connected_channels gauge\n');
+    res.write('# HELP connected_channels The number of Redis channels the streaming server is subscribed to\n');
+    res.write(`connected_channels ${Object.keys(subs).length}.0\n`);
+    res.write('# TYPE pg_pool_total_connections gauge\n');
+    res.write('# HELP pg_pool_total_connections The total number of clients existing within the pool\n');
+    res.write(`pg_pool_total_connections ${pgPool.totalCount}.0\n`);
+    res.write('# TYPE pg_pool_idle_connections gauge\n');
+    res.write('# HELP pg_pool_idle_connections The number of clients which are not checked out but are currently idle in the pool\n');
+    res.write(`pg_pool_idle_connections ${pgPool.idleCount}.0\n`);
+    res.write('# TYPE pg_pool_waiting_queries gauge\n');
+    res.write('# HELP pg_pool_waiting_queries The number of queued requests waiting on a client when all clients are checked out\n');
+    res.write(`pg_pool_waiting_queries ${pgPool.waitingCount}.0\n`);
+    res.write('# EOF\n');
+    res.end();
+  }));
+
   app.use(authenticationMiddleware);
   app.use(errorMiddleware);