16boke - 一路博客

StatsD的使用及Java和NodeJS客户端的调用

StatsD负责收集并聚合测量值,将数据传给Graphite,StatsD是一个NodeJS的daemon程序,简单,轻巧。使用的UDP协议。可以和Graphite图片渲染应用结合。

UDP协议相比于TCP减少握手确认时间,UDP好处就是fire-and-forget。你不用去管后台的StatsD 服务器是不是崩了,崩了就崩了,不会影响前台应用。

StatsD有一个时间周期的概念,默认是10秒钟,就是说,StatsD会把收集到的数据(经过处理)每隔10秒,发送给后端。比如Counter,他就会把10秒内该Counter累加的值,发送到后端。比如Time/Timing,他会把次数、最大值、最小值、平均值发往后端。所以,所有的数据都是以10秒为一个周期的。

1、StatsD概念

Buckets

Each stat is in its own “bucket”. They are not predefined anywhere. Buckets can be named anything that will translate to Graphite (periods make folders, etc)

Values

Each stat will have a value. How it is interpreted depends on modifiers. In general values should be integer.

Flush Interval

After the flush interval timeout (default 10 seconds), stats are aggregated and sent to an upstream backend service.

2、指标类别

计数器

计数器很简单。它会给 bucket 加 value,并存储在内存中,直到 flush interval 超时。

让我们看一下生成计数器 stats 的源码,该 stats 会被推送到后端。

for (key in counters) {
  var value = counters[key];
  var valuePerSecond = value / (flushInterval / 1000); // calculate "per second" rate
  statString += 'stats.'+ key + ' ' + valuePerSecond + ' ' + ts + "\n";
  statString += 'stats_counts.' + key + ' ' + value  + ' ' + ts + "\n";
  numStats += 1;
}

首先,StatsD 会迭代它收到的所有计数器,对每个计数器它都会分配两个变量。一个变量用于存储计数器的 value,另一个存储 per-second value。之后,它会将 values 加至 statString,同时增加 numStats 变量的值。

如果你使用默认的 flush interval(10秒),并在每个间隔通过某个计数器给 StatsD 传送7个增量。则计时器的 value 为 7,而 per-second value 为 0.7。

例:统计用户登录失败

stats.login.error:1|c

计时器

计时器用于收集数字。他们不必要包含时间值。你可以收集某个存储器中的字节数、对象数或任意数字。计时器的一大好处在于,你可以得到平均值、总值、计数值和上下限值。给 StatsD 设置一个计时器,就能在数据传送给 Graphite 之前自动计算这些量。

例:用户登录响应时间

stats.login.response:1|ms

Gauges

一个 guage 代表着时间段内某点的任意 vaule,是 StatsD 中最简单的类型。你可以给它传任意值,它会传给后端。

Gauge stats 的源码只有短短四行。

for (key in gauges) {
  statString += 'stats.gauges.' + key + ' ' + gauges[key] + ' ' + ts + "\n";
  numStats += 1;
}

给 StatsD 传一个数字,它会不经处理地将该数字传到后端。值得注意的是,在一个 flush interval 内,只有 gauge 最后的值会传送到后端。因此,如果你在一个 flush interval 内,将下面的 gauge 值传给 StatsD:

643

754

583

会传到后端的值只有583而已。该 gauge 的值会一直存储在内存中,直到 flush interval 结束才传值。

例:当前用户数

stats.user:1000|g

由于statsd采用udp方式向graphite传数据,所以只要是支持udp协议的开发语言都可以使用。另外在statsd的example目录下面就提供了各种语言的客户端代码,

clipboard.png

下面以Java和nodejs为例提供这两种语言的客户代码。

package com;
/**
 * StatsdClient.java
 *
 * (C) 2011 Meetup, Inc.
 * Author: Andrew Gwozdziewycz , @apgwoz
 *
 *
 *
 * Example usage:
 *
 *    StatsdClient client = new StatsdClient("statsd.example.com", 8125);
 *    // increment by 1
 *    client.increment("foo.bar.baz");
 *    // increment by 10
 *    client.increment("foo.bar.baz", 10);
 *    // sample rate
 *    client.increment("foo.bar.baz", 10, .1);
 *    // increment multiple keys by 1
 *    client.increment("foo.bar.baz", "foo.bar.boo", "foo.baz.bar");
 *    // increment multiple keys by 10 -- yeah, it's "backwards"
 *    client.increment(10, "foo.bar.baz", "foo.bar.boo", "foo.baz.bar");
 *    // multiple keys with a sample rate
 *    client.increment(10, .1, "foo.bar.baz", "foo.bar.boo", "foo.baz.bar");
 *
 *    // To enable multi metrics (aka more than 1 metric in a UDP packet) (disabled by default)
 *    client.enableMultiMetrics(true);  //disable by passing in false
 *    // To fine-tune udp packet buffer size (default=1500)
 *    client.setBufferSize((short) 1500);
 *    // To force flush the buffer out (good idea to add to your shutdown path)
 *    client.flush();
 *
 *
 * Note: For best results, and greater availability, you'll probably want to
 * create a wrapper class which creates a static client and proxies to it.
 *
 * You know... the "Java way."
 */
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Locale;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.log4j.Logger;
public class StatsdClient extends TimerTask {
    private ByteBuffer sendBuffer;
    private Timer flushTimer;
    private boolean multi_metrics = false;
    private static final Random RNG = new Random();
    private static final Logger log = Logger.getLogger(StatsdClient.class.getName());
    private final InetSocketAddress _address;
    private final DatagramChannel _channel;
    public StatsdClient(String host, int port) throws UnknownHostException, IOException {
        this(InetAddress.getByName(host), port);
    }
    public StatsdClient(InetAddress host, int port) throws IOException {
        _address = new InetSocketAddress(host, port);
        _channel = DatagramChannel.open();
        /* Put this in non-blocking mode so send does not block forever. */
        _channel.configureBlocking(false);
        /* Increase the size of the output buffer so that the size is larger than our buffer size. */
        //StandardSocketOptions只在jdk1.7以上才有
        //_channel.setOption(StandardSocketOptions.SO_SNDBUF, 4096);
        setBufferSize((short) 1500);
    }
    protected void finalize() {
        flush();
    }
    public synchronized void setBufferSize(short packetBufferSize) {
        if (sendBuffer != null) {
            flush();
        }
        sendBuffer = ByteBuffer.allocate(packetBufferSize);
    }
    public synchronized void enableMultiMetrics(boolean enable) {
        multi_metrics = enable;
    }
    public synchronized boolean startFlushTimer(long period) {
        if (flushTimer == null) {
            // period is in msecs
            if (period <= 0) {
                period = 2000;
            }
            flushTimer = new Timer();
            // We pass this object in as the TimerTask (which calls run())
            flushTimer.schedule((TimerTask) this, period, period);
            return true;
        }
        return false;
    }
    public synchronized void stopFlushTimer() {
        if (flushTimer != null) {
            flushTimer.cancel();
            flushTimer = null;
        }
    }
    public void run() { // used by Timer, we're a Runnable TimerTask
        flush();
    }
    public boolean timing(String key, int value) {
        return timing(key, value, 1.0);
    }
    public boolean timing(String key, int value, double sampleRate) {
        return send(sampleRate, String.format(Locale.ENGLISH, "%s:%d|ms", key, value));
    }
    public boolean decrement(String key) {
        return increment(key, -1, 1.0);
    }
    public boolean decrement(String key, int magnitude) {
        return decrement(key, magnitude, 1.0);
    }
    public boolean decrement(String key, int magnitude, double sampleRate) {
        magnitude = magnitude < 0 ? magnitude : -magnitude;
        return increment(key, magnitude, sampleRate);
    }
    public boolean decrement(String... keys) {
        return increment(-1, 1.0, keys);
    }
    public boolean decrement(int magnitude, String... keys) {
        magnitude = magnitude < 0 ? magnitude : -magnitude;
        return increment(magnitude, 1.0, keys);
    }
    public boolean decrement(int magnitude, double sampleRate, String... keys) {
        magnitude = magnitude < 0 ? magnitude : -magnitude;
        return increment(magnitude, sampleRate, keys);
    }
    public boolean increment(String key) {
        return increment(key, 1, 1.0);
    }
    public boolean increment(String key, int magnitude) {
        return increment(key, magnitude, 1.0);
    }
    public boolean increment(String key, int magnitude, double sampleRate) {
        String stat = String.format(Locale.ENGLISH, "%s:%s|c", key, magnitude);
        return send(sampleRate, stat);
    }
    public boolean increment(int magnitude, double sampleRate, String... keys) {
        String[] stats = new String[keys.length];
        for (int i = 0; i < keys.length; i++) {
            stats[i] = String.format(Locale.ENGLISH, "%s:%s|c", keys[i], magnitude);
        }
        return send(sampleRate, stats);
    }
    public boolean gauge(String key, double magnitude) {
        return gauge(key, magnitude, 1.0);
    }
    public boolean gauge(String key, double magnitude, double sampleRate) {
        final String stat = String.format(Locale.ENGLISH, "%s:%s|g", key, magnitude);
        return send(sampleRate, stat);
    }
    private boolean send(double sampleRate, String... stats) {
        boolean retval = false; // didn't send anything
        if (sampleRate < 1.0) {
            for (String stat : stats) {
                if (RNG.nextDouble() <= sampleRate) {
                    stat = String.format(Locale.ENGLISH, "%s|@%f", stat, sampleRate);
                    if (doSend(stat)) {
                        retval = true;
                    }
                }
            }
        } else {
            for (String stat : stats) {
                if (doSend(stat)) {
                    retval = true;
                }
            }
        }
        return retval;
    }
    private synchronized boolean doSend(String stat) {
        try {
            final byte[] data = stat.getBytes("utf-8");
            // If we're going to go past the threshold of the buffer then flush.
            // the +1 is for the potential '\n' in multi_metrics below
            if (sendBuffer.remaining() < (data.length + 1)) {
                flush();
            }
            if (sendBuffer.position() > 0) { // multiple metrics are separated by '\n'
                sendBuffer.put((byte) '\n');
            }
            sendBuffer.put(data); // append the data
            if (!multi_metrics) {
                flush();
            }
            return true;
        } catch (IOException e) {
            log.error(
                    String.format("Could not send stat %s to host %s:%d", sendBuffer.toString(), _address.getHostName(),
                            _address.getPort()), e);
            return false;
        }
    }
    public synchronized boolean flush() {
        try {
            final int sizeOfBuffer = sendBuffer.position();
            if (sizeOfBuffer <= 0) {
                return false;
            } // empty buffer
            // send and reset the buffer
            sendBuffer.flip();
            final int nbSentBytes = _channel.send(sendBuffer, _address);
            sendBuffer.limit(sendBuffer.capacity());
            sendBuffer.rewind();
            if (sizeOfBuffer == nbSentBytes) {
                return true;
            } else {
                log.error(String.format("Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes",
                        sendBuffer.toString(), _address.getHostName(), _address.getPort(), nbSentBytes, sizeOfBuffer));
                return false;
            }
        } catch (IOException e) {
            /* This would be a good place to close the channel down and recreate it. */
            log.error(
                    String.format("Could not send stat %s to host %s:%d", sendBuffer.toString(), _address.getHostName(),
                            _address.getPort()), e);
            return false;
        }
    }
}
具体使用:
try {
            StatsdClient client = new StatsdClient("127.0.0.1", 8125);
            client.increment("statsd.login.error");
            client.gauge("statsd.user", 100);
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

java端也有其它statsd-client.jar可以在maven上下载。

Nodejs版

dgram = require("dgram")
os = require('os')

exports.getLocalIP=()->
	ip = ''
	hostName=os.hostname()
	networkInterfaces = os.networkInterfaces()
	for network in networkInterfaces.em1
		if network.family is 'IPv4'
			ip = @formatIp(network.address)
	return ip

exports.formatIp = (ip)->
	ip.replace(/\./g, "_")

exports.increment = (metric)->
	if @getLocalIP()
		metric = metric+"."+@getLocalIP()
	data = metric+":1|c"
	@sendToGraphite(data)

exports.gauge = (metric,value)->
	if @getLocalIP()
		metric = metric+"."+@getLocalIP()
	data = metric+":"+value+"|g"
	@sendToGraphite(data)

exports.timing = (metric,value)->
	if @getLocalIP()
		metric = metric+"."+@getLocalIP()
	data = metric+":"+value+"|ms"
	@sendToGraphite(data)

exports.counter = (metric,value)->
	if @getLocalIP()
		metric = metric+"."+@getLocalIP()
	data = metric+":"+value+"|c"
	@sendToGraphite(data)

exports.sendToGraphite = (data)->
	socket = dgram.createSocket("udp4")
	socket.bind(->
		socket.setBroadcast(true)
	)
	message = new Buffer(data);
	socket.send(message, 0, message.length, 8125, '127.0.0.1', (err, bytes)->
		if err
			console.log("sendToGraphite error:"+err.stack)
		socket.close()
	)