Skip to content
This repository has been archived by the owner on Nov 25, 2020. It is now read-only.

Avoid binding DatagramSocket #2

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 16 additions & 51 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,17 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>studyblue</groupId>
<groupId>com.bealetech</groupId>
<artifactId>metrics-statsd</artifactId>
<name>Metrics Statsd Support</name>
<version>2.1.3.0</version>
<version>2.3.0.1</version>
<packaging>jar</packaging>

<properties>
<metrics.version>2.1.3</metrics.version>
<slf4j.version>1.6.4</slf4j.version>
<metrics.version>2.2.0</metrics.version>
<slf4j.version>1.7.10</slf4j.version>
</properties>

<developers>
<developer>
<name>Sean Laurent</name>
<email>organicveggie@gmail.com</email>
<timezone>-6</timezone>
</developer>
</developers>

<licenses>
<license>
<name>Apache License 2.0</name>
Expand All @@ -29,34 +21,6 @@
</license>
</licenses>

<scm>
<connection>scm:git:git://github.com/organicveggie/metrics-statsd.git</connection>
<developerConnection>scm:git:git@github.com:organicveggie/metrics-statsd.git</developerConnection>
<url>http://github.com/organicveggie/metrics-statsd/</url>
</scm>

<issueManagement>
<system>github</system>
<url>http://github.com/organicveggie/metrics-statsd/issues#issue/</url>
</issueManagement>

<repositories>
<repository>
<id>central</id>
<url>http://oss.sonatype.org/content/repositories/releases</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>snapshots</id>
<url>http://oss.sonatype.org/content/repositories/snapshots</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>com.yammer.metrics</groupId>
Expand Down Expand Up @@ -86,13 +50,13 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit-dep</artifactId>
<version>4.10</version>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.0</version>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand All @@ -102,30 +66,31 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<version>3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>2.3.7</version>
<version>2.5.3</version>
<extensions>true</extensions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.8.1</version>
<version>2.18.1</version>
<configuration>
<parallel>classes</parallel>
<forkCount>3</forkCount>
<reuseForks>true</reuseForks>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.1.2</version>
<version>2.4</version>
<executions>
<execution>
<id>attach-sources</id>
Expand All @@ -138,7 +103,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.8.1</version>
<version>2.10.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
Expand All @@ -151,7 +116,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>2.2.1</version>
<version>2.5.1</version>
<configuration>
<autoVersionSubmodules>true</autoVersionSubmodules>
<mavenExecutorId>forked-path</mavenExecutorId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.studyblue.metrics.reporting;
package com.bealetech.metrics.reporting;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.*;
Expand All @@ -25,14 +25,23 @@
import java.io.*;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.InetAddress;
import java.util.Locale;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;

public class StatsdReporter extends AbstractPollingReporter implements MetricProcessor<Long> {

/*
MTU size of the host handling the PDU (most of the case it will be 1500) -
size of the IP header (20 bytes) -
size of UDP header (8 bytes)

1500 MTU - 20 IP hdr - 8 UDP hdr = 1472 bytes
*/
private static final int MAX_UDPDATAGRAM_LENGTH = 1472;

public static enum StatType { COUNTER, TIMER, GAUGE }

private static final Logger LOG = LoggerFactory.getLogger(StatsdReporter.class);
Expand All @@ -41,11 +50,16 @@ public static enum StatType { COUNTER, TIMER, GAUGE }
protected final MetricPredicate predicate;
protected final Locale locale = Locale.US;
protected final Clock clock;

protected final UDPSocketProvider socketProvider;
protected DatagramSocket currentSocket = null;

protected final VirtualMachineMetrics vm;

protected Writer writer;
protected ByteArrayOutputStream outputData;

private boolean prependNewline = false;
private boolean printVMMetrics = true;

public interface UDPSocketProvider {
Expand Down Expand Up @@ -78,7 +92,7 @@ public StatsdReporter(MetricsRegistry metricsRegistry, String prefix, MetricPred
}

public StatsdReporter(MetricsRegistry metricsRegistry, String prefix, MetricPredicate predicate, UDPSocketProvider socketProvider, Clock clock, VirtualMachineMetrics vm) throws IOException {
this(metricsRegistry, prefix, predicate, socketProvider, clock, vm, "graphite-reporter");
this(metricsRegistry, prefix, predicate, socketProvider, clock, vm, "statsd-reporter");
}

public StatsdReporter(MetricsRegistry metricsRegistry, String prefix, MetricPredicate predicate, UDPSocketProvider socketProvider, Clock clock, VirtualMachineMetrics vm, String name) throws IOException {
Expand Down Expand Up @@ -109,28 +123,24 @@ public void setPrintVMMetrics(boolean printVMMetrics) {

@Override
public void run() {
DatagramSocket socket = null;

try {
socket = this.socketProvider.get();
outputData.reset();
writer = new BufferedWriter(new OutputStreamWriter(this.outputData));
currentSocket = this.socketProvider.get();
resetWriterState();

final long epoch = clock.time() / 1000;
if (this.printVMMetrics) {
if (printVMMetrics) {
printVmMetrics(epoch);
}
printRegularMetrics(epoch);

// Send UDP data
writer.flush();
DatagramPacket packet = this.socketProvider.newPacket(outputData);
packet.setData(outputData.toByteArray());
socket.send(packet);
sendDatagram();
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error writing to Graphite", e);
LOG.debug("Error writing to Statsd", e);
} else {
LOG.warn("Error writing to Graphite: {}", e.getMessage());
LOG.warn("Error writing to Statsd: {}", e.getMessage());
}
if (writer != null) {
try {
Expand All @@ -140,13 +150,28 @@ public void run() {
}
}
} finally {
if (socket != null) {
socket.close();
if (currentSocket != null) {
currentSocket.close();
}
writer = null;
}
}

private void resetWriterState() {
outputData.reset();
prependNewline = false;
writer = new BufferedWriter(new OutputStreamWriter(this.outputData));
}

private void sendDatagram() throws IOException {
writer.flush();
if (outputData.size() > 0) { // Don't send an empty datagram
DatagramPacket packet = this.socketProvider.newPacket(outputData);
packet.setData(outputData.toByteArray());
currentSocket.send(packet);
}
}

protected void printVmMetrics(long epoch) {
// Memory
sendFloat("jvm.memory.totalInit", StatType.GAUGE, vm.totalInit());
Expand Down Expand Up @@ -304,8 +329,11 @@ protected void sendData(String name, String value, StatType statType) {
statTypeStr = "ms";
break;
}

try {
if (prependNewline) {
writer.write("\n");
}
if (!prefix.isEmpty()) {
writer.write(prefix);
}
Expand All @@ -314,10 +342,16 @@ protected void sendData(String name, String value, StatType statType) {
writer.write(value);
writer.write("|");
writer.write(statTypeStr);
writer.write('\n');
prependNewline = true;
writer.flush();

if (outputData.size() > MAX_UDPDATAGRAM_LENGTH) {
// Need to send our UDP packet now before it gets too big.
sendDatagram();
resetWriterState();
}
} catch (IOException e) {
LOG.error("Error sending to Graphite:", e);
LOG.error("Error sending to Statsd:", e);
}
}

Expand All @@ -333,19 +367,25 @@ public DefaultSocketProvider(String host, int port) {

@Override
public DatagramSocket get() throws Exception {
return new DatagramSocket(new InetSocketAddress(this.host, this.port));
return new DatagramSocket();
}

@Override
public DatagramPacket newPacket(ByteArrayOutputStream out) {
byte[] dataBuffer;

if (out != null) {
dataBuffer = out.toByteArray();
}
else {
dataBuffer = new byte[8192];
}
return new DatagramPacket(dataBuffer, dataBuffer.length);

try {
return new DatagramPacket(dataBuffer, dataBuffer.length, InetAddress.getByName(this.host), this.port);
} catch (Exception e) {
return null;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.studyblue.metrics.reporting;
package com.bealetech.metrics.reporting;

import com.yammer.metrics.core.*;
import com.yammer.metrics.reporting.AbstractPollingReporter;
Expand All @@ -33,7 +33,6 @@
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;

public class StatsdReporterTest {
Expand Down