Home Reference Source

lib/transport.js

const unixDgram = require('unix-dgram');
const helpers = require('./helpers');
const dgram = require('dgram');
const dns = require('dns');
const net = require('net');

/**
 * @description Creates a socket with the arguments provided
 * @param {object} args
 * @param {string} args.host - The host to use for the socket
 * @param {number} args.port - The port to use for the socket
 * @param {number} args.path - The UDS path to use for the socket
 * @param {number} args.protocol - The protocol to use
 * @ignore
 * @throws Error if it fails to create the socket
 */
function createSocket(args) {
  let socket;

  if (args.protocol === 'tcp') {
    socket = net.connect(args.port, args.host);
    socket.setKeepAlive(true);
  } else if (args.protocol === 'unix_dgram') {
    socket = unixDgram.createSocket('unix_dgram');
  } else {
    socket = dgram.createSocket('udp4');
  }

  return socket;
}

/**
 * StatsD Transport
 * @description Base class for transporting StatsD Messages
 * @throws Error if it fails to create the socket
 * @ignore
 * NOTE: Adding new parameters to the constructor is deprecated- please use the
 * constructor as one options object.
 */
class Transport {
  constructor(
    host,
    port,
    prefix,
    suffix,
    globalize,
    cacheDns,
    mock,
    globalTags,
    maxBufferSize,
    bufferFlushInterval,
    telegraf,
    sampleRate,
    protocol
  ) {
    let options = host || {};
    const self = this;

    // Adding options below is DEPRECATED.  Use the options object instead.
    if (arguments.length > 1 || typeof(host) === 'string') {
      options = {
        host        : host,
        port        : port,
        prefix      : prefix,
        suffix      : suffix,
        globalize   : globalize,
        cacheDns    : cacheDns,
        mock        : mock === true,
        globalTags  : globalTags,
        maxBufferSize : maxBufferSize,
        bufferFlushInterval: bufferFlushInterval,
        telegraf    : telegraf,
        sampleRate  : sampleRate,
        protocol    : protocol
      };
    }

    // hidden global_tags option for backwards compatibility
    options.globalTags = options.globalTags || options.global_tags;

    this.protocol    = (options.protocol && options.protocol.toLowerCase());
    this.host        = options.host || 'localhost';
    this.port        = options.port || 8125;
    this.path        = options.path || '';
    this.prefix      = options.prefix || '';
    this.suffix      = options.suffix || '';
    this.mock        = options.mock === true;
    this.globalTags  = typeof options.globalTags === 'object' ?
        helpers.formatTags(options.globalTags, options.telegraf) : [];
    this.telegraf    = options.telegraf || false;
    this.maxBufferSize = options.maxBufferSize || 0;
    this.sampleRate  = typeof options.sampleRate === 'number' ? options.sampleRate : 1;
    this.bufferFlushInterval = options.bufferFlushInterval || 1000;
    this.bufferHolder = options.isChild ? options.bufferHolder : { buffer: '' };
    this.errorHandler = options.errorHandler;

    // If we're mocking the client, create a buffer to record the outgoing calls.
    if (this.mock) {
      this.mockBuffer = [];
    } else {
      this.socket = options.isChild ? options.socket : createSocket({
        host: this.host,
        path: this.path,
        port: this.port,
        protocol: this.protocol
      });

      if (!options.isChild && options.errorHandler) {
        this.socket.on('error', options.errorHandler);
      }
    }

    // We only want a single flush event per parent and all its child clients
    if (!options.isChild && this.maxBufferSize > 0) {
      this.intervalHandle = setInterval(this.onBufferFlushInterval.bind(this), this.bufferFlushInterval);
    }

    if (options.isChild) {
      if (options.dnsError) {
        this.dnsError = options.dnsError;
      }
    } else if (options.cacheDns === true) {
      dns.lookup(options.host, (err, address) => {
        if (err === null) {
          self.host = address;
        } else {
          self.dnsError = err;
        }
      });
    }

    if (options.globalize) {
      global.statsd = this;
    }

    if (options.useDefaultRoute) {
      const defaultRoute = helpers.getDefaultRoute();
      if (defaultRoute) {
        this.host = defaultRoute;
      }
    }

    this.messagesInFlight = 0;
    this.CHECKS = {
      OK: 0,
      WARNING: 1,
      CRITICAL: 2,
      UNKNOWN: 3,
    };
  }

  /**
   * Checks if stats is an array and sends all stats calling back once all have sent
   * @param {string|string[]} stat {String|Array} The stat(s) to send
   * @param {any} value The value to send
   * @param {string} type The type of the metric
   * @param {number} sampleRate The Number of times to sample (0 to 1). Optional.
   * @param {string[]|object} tags The Array of tags to add to metrics. Optional.
   * @param {function} callback Callback when message is done being delivered. Optional.
   */
  sendAll(stat, value, type, sampleRate, tags, callback) {
    let completed = 0;
    let calledback = false;
    let sentBytes = 0;
    const self = this;

    if (sampleRate && typeof sampleRate !== 'number') {
      callback = tags;
      tags = sampleRate;
      sampleRate = undefined;
    }

    if (tags && typeof tags !== 'object') {
      callback = tags;
      tags = undefined;
    }

    /**
     * Gets called once for each callback, when all callbacks return we will
     * call back from the function
     * @private
     */
    function onSend(error, bytes) {
      completed += 1;
      if (calledback) {
        return;
      }

      if (error) {
        if (typeof callback === 'function') {
          calledback = true;
          callback(error);
        } else if (self.errorHandler) {
          calledback = true;
          self.errorHandler(error);
        }
        return;
      }

      if (bytes) {
        sentBytes += bytes;
      }

      if (completed === stat.length && typeof callback === 'function') {
        callback(null, sentBytes);
      }
    }

    if (Array.isArray(stat)) {
      stat.forEach(item => {
        self.sendStat(item, value, type, sampleRate, tags, onSend);
      });
    } else {
      this.sendStat(stat, value, type, sampleRate, tags, callback);
    }
  }

  /**
   * Sends a stat across the wire
   * @param {string|string[]} stat {String|Array} The stat(s) to send
   * @param {any} value The value to send
   * @param {string} type The type of the metric
   * @param {number} sampleRate The Number of times to sample (0 to 1). Optional.
   * @param {string[]|object} tags The Array of tags to add to metrics. Optional.
   * @param {function} callback Callback when message is done being delivered. Optional.
   */
  sendStat(stat, value, type, sampleRate, tags, callback) {
    let message = `${this.prefix + stat + this.suffix}:${value}|${type}`;
    sampleRate = typeof sampleRate === 'number' ? sampleRate : this.sampleRate;
    if (sampleRate < 1) {
      if (Math.random() < sampleRate) {
        message += `|@${sampleRate}`;
      } else {
        // don't want to send if we don't meet the sample ratio
        return callback ? callback() : undefined;
      }
    }
    this.send(message, tags, callback);
  }

  /**
   * Send a stat or event across the wire
   * @param {string} message The constructed message without tags
   * @param {string[]|object} tags {Array} The tags to include (along with global tags). Optional.
   * @param callback {Function=} Callback when message is done being delivered (only if maxBufferSize == 0). Optional.
   */
  send(message, tags, callback) {
    let mergedTags = this.globalTags;
    if (tags && typeof tags === 'object') {
      mergedTags = helpers.overrideTags(mergedTags, tags, this.telegraf);
    }
    if (mergedTags.length > 0) {
      if (this.telegraf) {
        message = message.split(':');
        message = `${message[0]},${mergedTags.join(',').replace(/:/g, '=')}:${message.slice(1).join(':')}`;
      } else {
        message += `|#${mergedTags.join(',')}`;
      }
    }

    this._send(message, callback);
  }

  /**
   * Send a stat or event across the wire
   * @param {string} message The constructed message without tags
   * @param {function} callback Callback when message is done being delivered (only if maxBufferSize == 0). Optional.
   */
  _send(message, callback) {
    // we may have a cached error rather than a cached lookup, so
    // throw it on
    if (this.dnsError) {
      if (callback) {
        return callback(this.dnsError);
      } else if (this.errorHandler) {
        return this.errorHandler(this.dnsError);
      }
      throw this.dnsError;
    }

    // Only send this stat if we're not a mock Client.
    if (!this.mock) {
      if (this.maxBufferSize === 0) {
        this.sendMessage(message, callback);
      } else {
        this.enqueue(message, callback);
      }
    } else {
      this.mockBuffer.push(message);
      if (typeof callback === 'function') {
        callback(null, 0);
      }
    }
  }

  /**
   * Add the message to the buffer and flush the buffer if needed
   *
   * @param {string} message The constructed message without tags
   * @param {function} callback
   */
  enqueue(message, callback) {
    message += '\n';

    if (this.bufferHolder.buffer.length + message.length > this.maxBufferSize) {
      this.flushQueue(callback);
      this.bufferHolder.buffer += message;
    }
    else {
      this.bufferHolder.buffer += message;
      if (callback) {
        callback(null);
      }
    }
  }

  /**
   * Flush the buffer, sending on the messages
   */
  flushQueue(callback) {
    this.sendMessage(this.bufferHolder.buffer, callback);
    this.bufferHolder.buffer = '';
  }

  /**
   * Send on the message through the socket
   *
   * @param {string} message The constructed message without tags
   * @param {function} callback Callback when message is done being delivered. Optional.
   */
  sendMessage(message, callback) {
    // don't waste the time if we aren't sending anything
    if (message === '' || this.mock) {
      if (callback) {
        callback(null);
      }
      return;
    }

    if (this.protocol === 'tcp' && message.lastIndexOf('\n') !== message.length - 1) {
      message += '\n';
    }

    const handleCallback = (err) => {
      this.messagesInFlight--;
      const errFormatted = err ? new Error(`Error sending hot-shots message: ${err}`) : null;
      if (errFormatted) {
        errFormatted.code = err.code;
      }
      if (callback) {
        callback(errFormatted);
      } else if (errFormatted) {
        if (this.errorHandler) {
          this.errorHandler(errFormatted);
        } else {
          // emit error ourselves on the socket for backwards compatibility
          this.socket.emit('error', errFormatted);
        }
      }
    };

    const buf = Buffer.from(message);
    try {
      this.messagesInFlight++;
      if (this.protocol === 'tcp') {
        this.socket.write(buf, 'ascii', handleCallback);
      } else if (this.protocol === 'unix_dgram') {
        this.socket.send(buf, 0, buf.length, this.path, handleCallback);
      } else {
        this.socket.send(buf, 0, buf.length, this.port, this.host, handleCallback);
      }
    } catch (err) {
      handleCallback(err);
    }
  }

  /**
   * Called every bufferFlushInterval to flush any buffer that is around
   */
  onBufferFlushInterval() {
    this.flushQueue();
  }

  /**
   * Close the underlying socket and stop listening for data on it.
   */
  close(callback) {
    // stop trying to flush the queue on an interval
    if (this.intervalHandle) {
      clearInterval(this.intervalHandle);
    }

    // flush the queue one last time, if needed
    this.flushQueue((err) => {
      if (err) {
        if (callback) {
          callback(err);
        }
        return;
      }

      // FIXME: we have entered callback hell, and this whole file is in need of an async rework

      // wait until there are no more messages in flight before really closing the socket
      let intervalAttempts = 0;
      const waitForMessages = setInterval(() => {
        intervalAttempts++;
        if (intervalAttempts > 10) {
          this.messagesInFlight = 0;
        }
        if (this.messagesInFlight <= 0) {
          clearInterval(waitForMessages);
          this._close(callback);
        }
      }, 50);
    });
  }

  /**
   * Really close the socket and handle any errors related to it
   */
  _close(callback) {
    // error function to use in callback and catch below
    let handledError = false;
    const handleErr = (err) => {
      const errMessage = `Error closing hot-shots socket: ${err}`;
      if (!handledError) {
        // The combination of catch and error can lead to some errors
        // showing up twice.  So we just show one of the errors that occur
        // on close.
        handledError = true;

        if (callback) {
          callback(new Error(errMessage));
        } else if (this.errorHandler) {
          this.errorHandler(new Error(errMessage));
        }
      }
    };

    if (!this.mock) {
      if (this.errorHandler) {
        this.socket.removeListener('error', this.errorHandler);
      }

      // handle error and close events
      this.socket.on('error', handleErr);
      if (callback) {
        this.socket.on('close', err => {
          if (! handledError && callback) {
            callback(err);
          }
        });
      }

      try {
        if (this.protocol === 'tcp') {
          this.socket.destroy();
        } else {
          this.socket.close();
        }
      } catch (err) {
        handleErr(err);
      }
    } else if (callback) {
      return callback(null);
    }
  }
}

module.exports = Transport;