diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/index.js | 125 |
1 files changed, 66 insertions, 59 deletions
diff --git a/streaming/index.js b/streaming/index.js index 45ea26bd6..94568ee9a 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -7,6 +7,7 @@ const express = require('express'); const http = require('http'); const redis = require('redis'); const pg = require('pg'); +const dbUrlToConfig = require('pg-connection-string').parse; const log = require('npmlog'); const url = require('url'); const uuid = require('uuid'); @@ -15,7 +16,6 @@ const WebSocket = require('ws'); const { JSDOM } = require('jsdom'); const env = process.env.NODE_ENV || 'development'; -const alwaysRequireAuth = process.env.LIMITED_FEDERATION_MODE === 'true' || process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true'; dotenv.config({ path: env === 'production' ? '.env.production' : '.env', @@ -24,43 +24,6 @@ dotenv.config({ log.level = process.env.LOG_LEVEL || 'verbose'; /** - * @param {string} dbUrl - * @return {Object.<string, any>} - */ -const dbUrlToConfig = (dbUrl) => { - if (!dbUrl) { - return {}; - } - - const params = url.parse(dbUrl, true); - const config = {}; - - if (params.auth) { - [config.user, config.password] = params.auth.split(':'); - } - - if (params.hostname) { - config.host = params.hostname; - } - - if (params.port) { - config.port = params.port; - } - - if (params.pathname) { - config.database = params.pathname.split('/')[1]; - } - - const ssl = params.query && params.query.ssl; - - if (ssl && ssl === 'true' || ssl === '1') { - config.ssl = true; - } - - return config; -}; - -/** * @param {Object.<string, any>} defaultConfig * @param {string} redisUrl */ @@ -117,9 +80,10 @@ const startMaster = () => { log.warn(`Starting streaming API server master with ${numWorkers} workers`); }; -const startWorker = async (workerId) => { - log.warn(`Starting worker ${workerId}`); - +/** + * @return {Object.<string, any>} + */ +const pgConfigFromEnv = () => { const pgConfigs = { development: { user: process.env.DB_USER || pg.defaults.user, @@ -127,7 +91,6 @@ const startWorker = async (workerId) => { database: process.env.DB_NAME || 'mastodon_development', host: process.env.DB_HOST || pg.defaults.host, port: process.env.DB_PORT || pg.defaults.port, - max: 10, }, production: { @@ -136,20 +99,48 @@ const startWorker = async (workerId) => { database: process.env.DB_NAME || 'mastodon_production', host: process.env.DB_HOST || 'localhost', port: process.env.DB_PORT || 5432, - max: 10, }, }; - if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') { - pgConfigs.development.ssl = true; - pgConfigs.production.ssl = true; + let baseConfig; + + if (process.env.DATABASE_URL) { + baseConfig = dbUrlToConfig(process.env.DATABASE_URL); + } else { + baseConfig = pgConfigs[env]; + + if (process.env.DB_SSLMODE) { + switch(process.env.DB_SSLMODE) { + case 'disable': + case '': + baseConfig.ssl = false; + break; + case 'no-verify': + baseConfig.ssl = { rejectUnauthorized: false }; + break; + default: + baseConfig.ssl = {}; + break; + } + } } + return { + ...baseConfig, + max: process.env.DB_POOL || 10, + connectionTimeoutMillis: 15000, + application_name: '', + }; +}; + +const startWorker = async (workerId) => { + log.warn(`Starting worker ${workerId}`); + const app = express(); app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal'); - const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL))); + const pgPool = new pg.Pool(pgConfigFromEnv()); const server = http.createServer(app); const redisNamespace = process.env.REDIS_NAMESPACE || null; @@ -355,22 +346,17 @@ const startWorker = async (workerId) => { * @param {boolean=} required * @return {Promise.<void>} */ - const accountFromRequest = (req, required = true) => new Promise((resolve, reject) => { + const accountFromRequest = (req) => new Promise((resolve, reject) => { const authorization = req.headers.authorization; const location = url.parse(req.url, true); const accessToken = location.query.access_token || req.headers['sec-websocket-protocol']; if (!authorization && !accessToken) { - if (required) { - const err = new Error('Missing access token'); - err.status = 401; + const err = new Error('Missing access token'); + err.status = 401; - reject(err); - return; - } else { - resolve(); - return; - } + reject(err); + return; } const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken; @@ -474,7 +460,7 @@ const startWorker = async (workerId) => { // variables. OAuth scope checks are moved to the point of subscription // to a specific stream. - accountFromRequest(info.req, alwaysRequireAuth).then(() => { + accountFromRequest(info.req).then(() => { callback(true, undefined, undefined); }).catch(err => { log.error(info.req.requestId, err.toString()); @@ -548,7 +534,7 @@ const startWorker = async (workerId) => { return; } - accountFromRequest(req, alwaysRequireAuth).then(() => checkScopes(req, channelNameFromPath(req))).then(() => { + accountFromRequest(req).then(() => checkScopes(req, channelNameFromPath(req))).then(() => { subscribeHttpToSystemChannel(req, res); }).then(() => { next(); @@ -858,6 +844,27 @@ const startWorker = async (workerId) => { res.end('OK'); }); + app.get('/metrics', (req, res) => server.getConnections((err, count) => { + res.writeHeader(200, { 'Content-Type': 'application/openmetrics-text; version=1.0.0; charset=utf-8' }); + res.write('# TYPE connected_clients gauge\n'); + res.write('# HELP connected_clients The number of clients connected to the streaming server\n'); + res.write(`connected_clients ${count}.0\n`); + res.write('# TYPE connected_channels gauge\n'); + res.write('# HELP connected_channels The number of Redis channels the streaming server is subscribed to\n'); + res.write(`connected_channels ${Object.keys(subs).length}.0\n`); + res.write('# TYPE pg_pool_total_connections gauge\n'); + res.write('# HELP pg_pool_total_connections The total number of clients existing within the pool\n'); + res.write(`pg_pool_total_connections ${pgPool.totalCount}.0\n`); + res.write('# TYPE pg_pool_idle_connections gauge\n'); + res.write('# HELP pg_pool_idle_connections The number of clients which are not checked out but are currently idle in the pool\n'); + res.write(`pg_pool_idle_connections ${pgPool.idleCount}.0\n`); + res.write('# TYPE pg_pool_waiting_queries gauge\n'); + res.write('# HELP pg_pool_waiting_queries The number of queued requests waiting on a client when all clients are checked out\n'); + res.write(`pg_pool_waiting_queries ${pgPool.waitingCount}.0\n`); + res.write('# EOF\n'); + res.end(); + })); + app.use(authenticationMiddleware); app.use(errorMiddleware); |