Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
Copyright 2010 Eugene Kirpichov, Dmitry Astapov. All rights reserved.
Copyright (c) 2009, Eugene Kirpichov

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
All rights reserved.

1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. The name of the author may not be used to endorse or promote products derived from this software without specific prior written permission.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials provided
with the distribution.

* The names of contributors may not be used to endorse or promote
products derived from this software without specific prior
written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2 changes: 1 addition & 1 deletion greg-clients/haskell/LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (c) 2009, Eugene Kirpichov
Copyright (c) 2009, Eugene Kirpichov, Dmitry Astapov

All rights reserved.

Expand Down
9 changes: 9 additions & 0 deletions greg-clients/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@
<name>Greg java client</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand Down
181 changes: 171 additions & 10 deletions greg-clients/java/src/main/java/org/greg/client/Configuration.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,179 @@
package org.greg.client;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Enumeration;
import java.util.Properties;

public class Configuration {
public final String server = System.getProperty("greg.server", "localhost");
public final int port = Integer.parseInt(System.getProperty("greg.port", "5676"));
public final int calibrationPort = Integer.parseInt(System.getProperty("greg.calibrationPort", "5677"));
public final int flushPeriodMs = Integer.parseInt(System.getProperty("greg.flushPeriodMs", "1000"));
public final String clientId = System.getProperty("greg.clientId", "unknown");
public final int maxBufferedRecords = Integer.parseInt(System.getProperty("greg.maxBufferedRecords", "1000000"));
public final boolean useCompression = Boolean.parseBoolean(System.getProperty("greg.useCompression", "true"));
public final int calibrationPeriodSec = Integer.parseInt(System.getProperty("greg.calibrationPeriodSec", "10"));
public static final String SERVER = "greg.server";
public static final String HOST_NAME = "greg.hostname";
public static final String PORT = "greg.port";
public static final String CALIBRATION_PORT = "greg.port";
public static final String CALIBRATION_PERIOD_SEC = "greg.calibrationPeriodSec";
public static final String FLUSH_PERIOD_MS = "greg.flushPeriodMs";
public static final String CLIENT_ID = "greg.clientId";
public static final String MAX_BUFFERED_RECORDS = "greg.maxBufferedRecords";
public static final String USE_COMPRESSION = "greg.useCompression";

/**
* This field is not final - you can change it if you wish to use
* your own configuration mechanism.
*/
public static Configuration INSTANCE = new Configuration();
}
public static final Configuration INSTANCE = new Configuration();

private static final String defaultPropertiesPath = "/greg.properties";
private Properties properties = new Properties();
private String server;
private String hostname;
private int port;
private int calibrationPort;
private int calibrationPeriodSec;
private int flushPeriodMs;
private String clientId;
private int maxBufferedRecords;
private boolean useCompression;

public Configuration() {
this(defaultPropertiesPath);
}

public Configuration(String path) {
this(Configuration.class.getClass().getResourceAsStream(path));
}

public Configuration(InputStream is) {
Properties defaultProperties = new Properties();
if (is != null) {
try {
load(is, defaultProperties);
} catch (IOException e) {
// ignore
} finally {
close(is);
}
}
}

public String get(String key) {
return properties.getProperty(key);
}

public String get(String key, String defaultValue) {
return properties.getProperty(key, defaultValue);
}

public int getInt(String key) {
return Integer.parseInt(get(key));
}

public int getInt(String key, int defaultValue) {
return Integer.parseInt(get(key, String.valueOf(defaultValue)));
}

public boolean getBoolean(String key) {
return Boolean.parseBoolean(get(key));
}

public boolean getBoolean(String key, boolean defaultValue) {
return Boolean.parseBoolean(get(key, String.valueOf(defaultValue)));
}

public String getServer() {
return server;
}

public String getHostname() {
return hostname;
}

public int getPort() {
return port;
}

public int getCalibrationPort() {
return calibrationPort;
}

public int getCalibrationPeriodSec() {
return calibrationPeriodSec;
}

public int getFlushPeriodMs() {
return flushPeriodMs;
}

public String getClientId() {
return clientId;
}

public int getMaxBufferedRecords() {
return maxBufferedRecords;
}

public boolean isUseCompression() {
return useCompression;
}

private void load(InputStream is, Properties defaultProperties) throws IOException {
properties.load(is);
loadDefaultProperties(defaultProperties);
merge(defaultProperties, properties);
initialize();
}

private void loadDefaultProperties(Properties properties) {
properties.setProperty(SERVER, "localhost");
properties.setProperty(PORT, "5676");
properties.setProperty(CALIBRATION_PORT, "5677");
properties.setProperty(CALIBRATION_PERIOD_SEC, "10");
properties.setProperty(FLUSH_PERIOD_MS, "1000");
properties.setProperty(CLIENT_ID, "unknown");
properties.setProperty(MAX_BUFFERED_RECORDS, "1000000");
properties.setProperty(USE_COMPRESSION, "true");
properties.setProperty(HOST_NAME, getHostName());
}

private void merge(Properties defaultProperties, Properties targetProperties) {
for(Object key : defaultProperties.keySet()) {
if (!targetProperties.containsKey(key)) {
targetProperties.setProperty((String) key, defaultProperties.getProperty((String) key));
}
}
}

private void initialize() {
server = get(SERVER);
hostname = get(HOST_NAME);
port = getInt(PORT);
calibrationPort = getInt(CALIBRATION_PORT);
calibrationPeriodSec = getInt(CALIBRATION_PERIOD_SEC);
flushPeriodMs = getInt(FLUSH_PERIOD_MS);
clientId = get(CLIENT_ID);
maxBufferedRecords = getInt(MAX_BUFFERED_RECORDS);
useCompression = getBoolean(USE_COMPRESSION);
}

private String getHostName() {
String hostname;
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new AssertionError("Can't get localhost?");
}
return hostname;
}

private void close(Closeable is) {
try {
if (is != null) {
is.close();
}
} catch (IOException ioe) {
// ignore
}
}
}
30 changes: 11 additions & 19 deletions greg-clients/java/src/main/java/org/greg/client/Greg.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -19,10 +18,9 @@ public class Greg {
private static final AtomicInteger numDropped = new AtomicInteger(0);
// Don't use ConcurrentLinkedQueue.size() because it's O(n)
private static final AtomicInteger numRecords = new AtomicInteger(0);
private static final Configuration conf = Configuration.INSTANCE;
private static final Configuration config = Configuration.INSTANCE;

private static final UUID OUR_UUID = UUID.randomUUID();
private static final String hostname;

static {
Thread pushMessages = new Thread("GregPushMessages") {
Expand All @@ -39,16 +37,10 @@ public void run() {
};
initCalibration.setDaemon(true);
initCalibration.start();

try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new AssertionError("Can't get localhost?");
}
}

public static void log(String message) {
if (numRecords.get() < conf.maxBufferedRecords) {
if (numRecords.get() < config.getMaxBufferedRecords()) {
numRecords.incrementAndGet();

Record r = new Record();
Expand Down Expand Up @@ -84,7 +76,7 @@ private static void pushCurrentMessages() {
OutputStream stream = null;

try {
client = new Socket(conf.server, conf.port);
client = new Socket(config.getServer(), config.getPort());
Trace.writeLine(
"Client connected to " + client.getRemoteSocketAddress() +
" from " + client.getLocalSocketAddress());
Expand All @@ -93,9 +85,9 @@ private static void pushCurrentMessages() {
DataOutput w = new LittleEndianDataOutputStream(bStream);
w.writeLong(OUR_UUID.getLeastSignificantBits());
w.writeLong(OUR_UUID.getMostSignificantBits());
w.writeBoolean(conf.useCompression);
w.writeBoolean(config.isUseCompression());

stream = new BufferedOutputStream(conf.useCompression ? new GZIPOutputStream(bStream) : bStream, 65536);
stream = new BufferedOutputStream(config.isUseCompression() ? new GZIPOutputStream(bStream) : bStream, 65536);
exhausted = writeRecordsBatchTo(stream);
} catch (Exception e) {
Trace.writeLine("Failed to push messages: " + e);
Expand All @@ -110,7 +102,7 @@ private static void pushCurrentMessages() {
// Only sleep when waiting for new records.
if (exhausted) {
try {
Thread.sleep(conf.flushPeriodMs);
Thread.sleep(config.getFlushPeriodMs());
} catch (InterruptedException e) {
continue;
}
Expand Down Expand Up @@ -141,12 +133,12 @@ private static void close(Socket sock) {
private static boolean writeRecordsBatchTo(OutputStream stream) throws IOException {
int maxBatchSize = 10000;
DataOutput w = new LittleEndianDataOutputStream(stream);
byte[] cidBytes = conf.clientId.getBytes("utf-8");
byte[] cidBytes = config.getClientId().getBytes("utf-8");
w.writeInt(cidBytes.length);
w.write(cidBytes);
int recordsWritten = 0;

byte[] machineBytes = hostname.getBytes("utf-8");
byte[] machineBytes = config.getHostname().getBytes("utf-8");

CharsetEncoder enc = Charset.forName("utf-8").newEncoder();

Expand Down Expand Up @@ -189,7 +181,7 @@ private static void initiateCalibration() {
while (true) {
Socket client = null;
try {
client = new Socket(conf.server, conf.calibrationPort);
client = new Socket(config.getServer(), config.getCalibrationPort());
client.setTcpNoDelay(true);
exchangeTicksOver(client.getInputStream(), client.getOutputStream());
} catch (Exception e) {
Expand All @@ -198,7 +190,7 @@ private static void initiateCalibration() {
close(client);
}
try {
Thread.sleep(conf.calibrationPeriodSec * 1000L);
Thread.sleep(config.getCalibrationPeriodSec() * 1000L);
} catch (InterruptedException e) {
continue;
}
Expand All @@ -221,4 +213,4 @@ private static void exchangeTicksOver(InputStream in, OutputStream out) throws I
// Our sample arrives to them after network latency.
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.greg.client;

import junit.framework.TestCase;

import java.beans.IntrospectionException;
import java.io.IOException;
import java.util.Properties;

import static org.greg.client.Configuration.*;

/**
* @author Anton Panasenko
* Date: 25.12.10
*/
public class ConfigurationTest extends TestCase {
private String[] keys = {SERVER, HOST_NAME, PORT, CALIBRATION_PORT, CALIBRATION_PERIOD_SEC, FLUSH_PERIOD_MS,
CLIENT_ID, MAX_BUFFERED_RECORDS, USE_COMPRESSION};

public void testPropertiesFiles() throws IOException, IntrospectionException {
Properties properties = new Properties();
properties.load(getClass().getResourceAsStream("/greg.properties"));

Configuration config = Configuration.INSTANCE;

for(String key: keys) {
String value = properties.getProperty(key);
if (value != null) {
assertEquals(value, config.get(key));
}
}
}
}
Loading