NetStreamSender.java
/*
* Copyright 2006 - 2013
* Stefan Balev <stefan.balev@graphstream-project.org>
* Julien Baudry <julien.baudry@graphstream-project.org>
* Antoine Dutot <antoine.dutot@graphstream-project.org>
* Yoann Pigné <yoann.pigne@graphstream-project.org>
* Guilhelm Savin <guilhelm.savin@graphstream-project.org>
*
* This file is part of GraphStream <http://graphstream-project.org>.
*
* GraphStream is a library whose purpose is to handle static or dynamic
* graph, create them from scratch, file or any source and display them.
*
* This program is free software distributed under the terms of two licenses, the
* CeCILL-C license that fits European law, and the GNU Lesser General Public
* License. You can use, modify and/ or redistribute the software under the terms
* of the CeCILL-C license as circulated by CEA, CNRS and INRIA at the following
* URL <http://www.cecill.info> or under the terms of the GNU LGPL as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* The fact that you are presently reading this means that you have had
* knowledge of the CeCILL-C and LGPL licenses and that you accept their terms.
*/
package org.graphstream.stream.netstream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import org.graphstream.stream.Sink;
import org.graphstream.stream.netstream.packing.NetStreamPacker;
import com.sun.org.apache.xalan.internal.xsltc.compiler.sym;
/**
* <p>
* This class implements a sender according to specifications the NetStream
* protocol.
* </p>
*
* <p>
* See {@link NetStreamConstants} for a full description of the protocol, the
* sender and the receiver.
* </p>
*
* @see NetStreamConstants
* @see NetStreamReceiver
*
*
* Copyright (c) 2010 University of Luxembourg
*
* NetStreamSender.java
* @since Aug 10, 2011
*
* @author Yoann Pigné
*
*/
public class NetStreamSender implements Sink {
private static ByteBuffer NULL_BUFFER = ByteBuffer.allocate(0);
protected String stream;
protected ByteBuffer streamBuffer;
byte[] streamIdArray;
protected String host;
protected int port;
protected Socket socket;
protected BufferedOutputStream out;
protected String sourceId = "";
protected ByteBuffer sourceIdBuff;
class DefaultPacker extends NetStreamPacker {
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
@Override
public ByteBuffer packMessage(ByteBuffer buffer, int startIndex,
int endIndex) {
return buffer;
}
@Override
public ByteBuffer packMessageSize(int capacity) {
sizeBuffer.rewind();
sizeBuffer.putInt(capacity);
return sizeBuffer;
}
};
protected NetStreamPacker packer = new DefaultPacker();
public NetStreamSender(String host, int port) throws UnknownHostException,
IOException {
this("default", host, port);
}
public NetStreamSender(int port) throws UnknownHostException, IOException {
this("default", "localhost", port);
}
public NetStreamSender(String stream, String host, int port)
throws UnknownHostException, IOException {
this.stream = stream;
this.host = host;
this.port = port;
setStream(stream);
connect();
}
/**
* @param stream2
*/
public void setStream(String stream) {
streamIdArray = stream.getBytes(Charset.forName("UTF-8"));
streamBuffer = encodeString(stream);
}
public NetStreamSender(Socket socket) throws IOException {
this("default", socket);
}
public NetStreamSender(String stream, Socket socket) throws IOException {
this.host = socket.getInetAddress().getHostName();
this.port = socket.getPort();
this.socket = socket;
this.out = new BufferedOutputStream(socket.getOutputStream());
this.streamIdArray = stream.getBytes(Charset.forName("UTF-8"));
}
/**
* Sets an optional NetStreamPaker whose "pack" method will be called on
* each message.
*
* a Packer can do extra encoding on the all byte array message, it may also
* crypt things.
*
* @param paker
* The packer object
*/
public void setPacker(NetStreamPacker paker) {
this.packer = paker;
}
public void removePacker() {
packer = new DefaultPacker();
}
protected void connect() throws UnknownHostException, IOException {
socket = new Socket(host, port);
out = new BufferedOutputStream(socket.getOutputStream());
}
protected int getType(Object value) {
int valueType = NetStreamConstants.TYPE_UNKNOWN;
if (value == null)
return NetStreamConstants.TYPE_NULL;
Class<?> valueClass = value.getClass();
boolean isArray = valueClass.isArray();
if (isArray) {
valueClass = ((Object[]) value)[0].getClass();
}
if (valueClass.equals(Boolean.class)) {
if (isArray) {
valueType = NetStreamConstants.TYPE_BOOLEAN_ARRAY;
} else {
valueType = NetStreamConstants.TYPE_BOOLEAN;
}
} else if (valueClass.equals(Byte.class)) {
if (isArray) {
valueType = NetStreamConstants.TYPE_BYTE_ARRAY;
} else {
valueType = NetStreamConstants.TYPE_BYTE;
}
} else if (valueClass.equals(Short.class)) {
if (isArray) {
valueType = NetStreamConstants.TYPE_SHORT_ARRAY;
} else {
valueType = NetStreamConstants.TYPE_SHORT;
}
} else if (valueClass.equals(Integer.class)) {
if (isArray) {
valueType = NetStreamConstants.TYPE_INT_ARRAY;
} else {
valueType = NetStreamConstants.TYPE_INT;
}
} else if (valueClass.equals(Long.class)) {
if (isArray) {
valueType = NetStreamConstants.TYPE_LONG_ARRAY;
} else {
valueType = NetStreamConstants.TYPE_LONG;
}
} else if (valueClass.equals(Float.class)) {
if (isArray) {
valueType = NetStreamConstants.TYPE_FLOAT_ARRAY;
} else {
valueType = NetStreamConstants.TYPE_FLOAT;
}
} else if (valueClass.equals(Double.class)) {
if (isArray) {
valueType = NetStreamConstants.TYPE_DOUBLE_ARRAY;
} else {
valueType = NetStreamConstants.TYPE_DOUBLE;
}
} else if (valueClass.equals(String.class)) {
if (isArray) {
valueType = NetStreamConstants.TYPE_ARRAY;
} else {
valueType = NetStreamConstants.TYPE_STRING;
}
} else
System.err.printf("[warning] can not find type of %s\n", valueClass);
// System.out.println("ValueType="+valueType+" "+value.getClass());
return valueType;
}
protected ByteBuffer encodeValue(Object in, int valueType) {
if (NetStreamConstants.TYPE_BOOLEAN == valueType) {
return encodeBoolean(in);
} else if (NetStreamConstants.TYPE_BOOLEAN_ARRAY == valueType) {
return encodeBooleanArray(in);
} else if (NetStreamConstants.TYPE_BYTE == valueType) {
return encodeByte(in);
} else if (NetStreamConstants.TYPE_BYTE_ARRAY == valueType) {
return encodeByteArray(in);
} else if (NetStreamConstants.TYPE_SHORT == valueType) {
return encodeShort(in);
} else if (NetStreamConstants.TYPE_SHORT_ARRAY == valueType) {
return encodeShortArray(in);
} else if (NetStreamConstants.TYPE_INT == valueType) {
return encodeInt(in);
} else if (NetStreamConstants.TYPE_INT_ARRAY == valueType) {
return encodeIntArray(in);
} else if (NetStreamConstants.TYPE_LONG == valueType) {
return encodeLong(in);
} else if (NetStreamConstants.TYPE_LONG_ARRAY == valueType) {
return encodeLongArray(in);
} else if (NetStreamConstants.TYPE_FLOAT == valueType) {
return encodeFloat(in);
} else if (NetStreamConstants.TYPE_FLOAT_ARRAY == valueType) {
return encodeFloatArray(in);
} else if (NetStreamConstants.TYPE_DOUBLE == valueType) {
return encodeDouble(in);
} else if (NetStreamConstants.TYPE_DOUBLE_ARRAY == valueType) {
return encodeDoubleArray(in);
} else if (NetStreamConstants.TYPE_STRING == valueType) {
return encodeString(in);
} else if (NetStreamConstants.TYPE_ARRAY == valueType) {
return encodeArray(in);
} else if (NetStreamConstants.TYPE_NULL == valueType) {
return NULL_BUFFER;
}
System.err.printf("[warning] unknown value type %d\n", valueType);
return null;
}
/**
* @param in
* @return
*/
protected ByteBuffer encodeArray(Object in) {
// TODO...
return null;
}
private void outBuffer(ByteBuffer buf){
System.out.println(buf.toString());
int nbytes = buf.capacity();
int at = buf.position();
for(int i=0; i< nbytes; i++){
int bt = buf.get(at+i);
if (bt < 0) bt = (bt & 127) + (bt & 128);
System.out.printf("%d ", bt);
}
System.out.println();
}
/**
* @param in
* @return
*/
protected ByteBuffer encodeString(Object in) {
//System.out.println("They want me to encode this string: "+in);
String s = (String) in;
byte[] data = s.getBytes(Charset.forName("UTF-8"));
ByteBuffer lenBuff = encodeUnsignedVarint(data.length);
//outBuffer(lenBuff);
ByteBuffer bb = ByteBuffer.allocate(lenBuff.capacity() + data.length);
bb.put(lenBuff).put(data);
bb.rewind();
//outBuffer(bb);
return bb;
}
/**
* @param in
* @return
*/
protected ByteBuffer encodeDoubleArray(Object in) {
Object[] data = (Object[]) in;
int ssize = varintSize(data.length);
ByteBuffer b = ByteBuffer.allocate(ssize + data.length * 8);
putVarint(b, data.length, ssize);
for (int i = 0; i < data.length; i++) {
b.putDouble((Double) data[i]);
}
b.rewind();
return b;
}
/**
* @param in The double to encode
* @return ByteBuffer with encoded double in it
*/
protected ByteBuffer encodeDouble(Object in) {
return ByteBuffer.allocate(8).putDouble((Double) in);
}
/**
* @param in The float array to encode
* @return ByteBuffer with encoded float array in it
*/
protected ByteBuffer encodeFloatArray(Object in) {
Object[] data = (Object[]) in;
int ssize = varintSize(data.length);
ByteBuffer b = ByteBuffer.allocate(ssize + data.length * 4);
putVarint(b, data.length, ssize);
for (int i = 0; i < data.length; i++) {
b.putFloat((Float) data[i]);
}
b.rewind();
return b;
}
/**
* @param in The float to encode
* @return ByteBuffer with encoded float in it
*/
protected ByteBuffer encodeFloat(Object in) {
ByteBuffer b = ByteBuffer.allocate(4);
b.putFloat(((Float) in));
b.rewind();
return b;
}
/**
* @param in The long array to encode
* @return ByteBuffer with encoded long array in it
*/
protected ByteBuffer encodeLongArray(Object in) {
return encodeVarintArray(in);
}
/**
* @param in The long to encode
* @return ByteBuffer with encoded long in it
*/
protected ByteBuffer encodeLong(Object in) {
return encodeVarint(in);
}
/**
* @param in The integer array to encode
* @return ByteBuffer with encoded integer array in it
*/
protected ByteBuffer encodeIntArray(Object in) {
return encodeVarintArray(in);
}
/**
* @param in The integer to encode
* @return ByteBuffer with encoded integer in it
*/
protected ByteBuffer encodeInt(Object in) {
return encodeVarint(in);
}
/**
* @param in
* @return
*/
protected ByteBuffer encodeShortArray(Object in) {
return encodeVarintArray(in);
}
/**
* @param in
* @return
*/
protected ByteBuffer encodeShort(Object in) {
return encodeVarint(in);
}
/**
* @param in
* @return
*/
protected ByteBuffer encodeByteArray(Object in) {
Object[] data = (Object[]) in;
int ssize = varintSize(data.length);
ByteBuffer b = ByteBuffer.allocate(ssize + data.length);
putVarint(b, data.length, ssize);
for (int i = 0; i < data.length; i++) {
b.put((Byte) data[i]);
}
b.rewind();
return b;
}
/**
* @param in
* @return
*/
protected ByteBuffer encodeByte(Object in) {
ByteBuffer b = ByteBuffer.allocate(1);
b.put(((Byte) in));
b.rewind();
return b;
}
/**
* @param in
* @return
*/
protected ByteBuffer encodeBooleanArray(Object in) {
Object[] data = (Object[]) in;
int ssize = varintSize(data.length);
ByteBuffer b = ByteBuffer.allocate(ssize + data.length);
putVarint(b, data.length, ssize);
for (int i = 0; i < data.length; i++) {
b.put((byte) ((Boolean) data[i] == false ? 0 : 1));
}
b.rewind();
return b;
}
/**
* @param in
* @return
*/
protected ByteBuffer encodeBoolean(Object in) {
ByteBuffer b = ByteBuffer.allocate(1);
b.put((byte) (((Boolean) in) == false ? 0 : 1));
b.rewind();
return b;
}
private int varintSize(long data){
// 7 bits -> 127
if(data < (1L << 7)){
return 1;
}
// 14 bits -> 16383
if(data < (1L << 14)){
return 2;
}
// 21 bits -> 2097151
if(data < (1L << 21)){
return 3;
}
// 28 bits -> 268435455
if(data < (1L << 28)){
return 4;
}
// 35 bits -> 34359738367
if(data < (1L << 35)){
return 5;
}
// 42 bits -> 4398046511103
if(data < (1L << 42)){
return 6;
}
// 49 bits -> 562949953421311
if(data < (1L << 49)){
return 7;
}
// 56 bits -> 72057594037927935
if(data < (1L << 56)){
return 8;
}
return 9;
}
/**
* @param in
* @return
*/
protected ByteBuffer encodeVarint(Object in) {
long data = ((Number)in).longValue();
// signed integers encoding
// (n << 1) ^ (n >> 31)
// OK but java's negative values are two's complements...
return encodeUnsignedVarint(data>=0?(data<<1):((Math.abs(data) << 1) ^ 1));
}
/**
* @param in
* @return
*/
protected ByteBuffer encodeVarintArray(Object in) {
Object[] data = (Object[]) in;
int[] sizes = new int[data.length];
long[] zigzags = new long[data.length];
int sumsizes=0;
for (int i = 0; i < data.length; i++) {
long datum = ((Number)data[i]).longValue();
// signed integers encoding
// (n << 1) ^ (n >> 31)
// OK but java's negative values are two's complements...
zigzags[i] = datum>0?(datum<<1):((Math.abs(datum) << 1) ^ 1);
sizes[i] = varintSize(zigzags[i]);
sumsizes+=sizes[i];
//System.out.printf("i=%d, zigzag=%d, size=%d\n",i, zigzags[i], sizes[i]);
}
// the size of the size!
int ssize = varintSize(data.length);
ByteBuffer b = ByteBuffer.allocate(ssize + sumsizes);
putVarint(b, data.length, ssize);
for (int i = 0; i < data.length; i++) {
putVarint(b, zigzags[i], sizes[i]);
}
b.rewind();
//outBuffer(b);
return b;
}
/**
* @param in
* @return
*/
protected ByteBuffer encodeUnsignedVarint(Object in) {
long data = ((Number)in).longValue();
int size = varintSize(data);
ByteBuffer buff = ByteBuffer.allocate(size);
for(int i = 0; i < size; i++){
int head=128;
if(i==size-1) head = 0;
long b = ((data >> (7*i)) & 127) ^ head;
buff.put((byte)(b & 255 ));
}
buff.rewind();
return buff;
}
/**
* @param b
* @param sumsizes
* @param ssize
*/
private void putVarint(ByteBuffer buffer, long number, int byteSize) {
for(int i = 0; i < byteSize; i++){
int head=128;
if(i==byteSize-1) head = 0;
long b = ((number >> (7*i)) & 127) ^ head;
buffer.put((byte)(b & 255 ));
}
}
/**
* @param buff
*/
private void doSend(ByteBuffer buff) {
if (socket.isClosed()) {
System.err
.println("NetStreamSender : can't send. The socket is closed.");
} else {
buff.rewind();
//outBuffer(buff);
ByteBuffer buffer = packer.packMessage(buff);
ByteBuffer sizeBuffer = packer.packMessageSize(buffer.capacity());
// real sending
try {
out.write(sizeBuffer.array(), 0, sizeBuffer.capacity());
out.write(buffer.array(), 0, buffer.capacity());
out.flush();
} catch (IOException e) {
try {
socket.close();
} catch (IOException e1) {
e1.printStackTrace();
}
System.err.printf("socket error : %s\n", e.getMessage());
}
}
}
/*
* (non-Javadoc)
*
* @see
* org.graphstream.stream.AttributeSink#graphAttributeAdded(java.lang.String
* , long, java.lang.String, java.lang.Object)
*/
public void graphAttributeAdded(String sourceId, long timeId,
String attribute, Object value) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer attrBuff = encodeString(attribute);
int valueType = getType(value);
ByteBuffer valueBuff = encodeValue(value, valueType);
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) + // timeId
attrBuff.capacity() + // attribute id
1 + // attr type
valueBuff.capacity()); // attr value
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_ADD_GRAPH_ATTR)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.put(attrBuff)
.put((byte) valueType)
.put(valueBuff);
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see
* org.graphstream.stream.AttributeSink#graphAttributeChanged(java.lang.
* String, long, java.lang.String, java.lang.Object, java.lang.Object)
*/
public void graphAttributeChanged(String sourceId, long timeId,
String attribute, Object oldValue, Object newValue) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer attrBuff = encodeString(attribute);
int oldValueType = getType(oldValue);
int newValueType = getType(newValue);
ByteBuffer oldValueBuff = encodeValue(oldValue, oldValueType);
ByteBuffer newValueBuff = encodeValue(newValue, newValueType);
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) + // timeId
attrBuff.capacity() + // attribute id
1 + // attr type
oldValueBuff.capacity() + // attr value
1 + // attr type
newValueBuff.capacity()); // attr value
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_CHG_GRAPH_ATTR)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.put(attrBuff)
.put((byte) oldValueType)
.put(oldValueBuff)
.put((byte) newValueType)
.put(newValueBuff);
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see
* org.graphstream.stream.AttributeSink#graphAttributeRemoved(java.lang.
* String, long, java.lang.String)
*/
public void graphAttributeRemoved(String sourceId, long timeId,
String attribute) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer attrBuff = encodeString(attribute);
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) + // timeId
attrBuff.capacity()
); // attribute id
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_DEL_GRAPH_ATTR)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.put(attrBuff);
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see
* org.graphstream.stream.AttributeSink#nodeAttributeAdded(java.lang.String,
* long, java.lang.String, java.lang.String, java.lang.Object)
*/
public void nodeAttributeAdded(String sourceId, long timeId, String nodeId,
String attribute, Object value) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer nodeBuff = encodeString(nodeId);
ByteBuffer attrBuff = encodeString(attribute);
int valueType = getType(value);
ByteBuffer valueBuff = encodeValue(value, valueType);
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) + // timeId
nodeBuff.capacity() + // nodeId
attrBuff.capacity() + // attribute
1 + // value type
valueBuff.capacity() // value
);
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_ADD_NODE_ATTR)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.put(nodeBuff)
.put(attrBuff)
.put((byte) valueType)
.put(valueBuff);
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see
* org.graphstream.stream.AttributeSink#nodeAttributeChanged(java.lang.String
* , long, java.lang.String, java.lang.String, java.lang.Object,
* java.lang.Object)
*/
public void nodeAttributeChanged(String sourceId, long timeId,
String nodeId, String attribute, Object oldValue, Object newValue) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer nodeBuff = encodeString(nodeId);
ByteBuffer attrBuff = encodeString(attribute);
int oldValueType = getType(oldValue);
int newValueType = getType(newValue);
ByteBuffer oldValueBuff = encodeValue(oldValue, oldValueType);
ByteBuffer newValueBuff = encodeValue(newValue, newValueType);
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) + // timeId
nodeBuff.capacity() + // nodeId
attrBuff.capacity() + // attribute
1 + // value type
oldValueBuff.capacity() + // value
1 + // value type
newValueBuff.capacity() // value
);
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_CHG_NODE_ATTR)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.put(nodeBuff)
.put(attrBuff)
.put((byte) oldValueType)
.put(oldValueBuff)
.put((byte) newValueType)
.put(newValueBuff);
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see
* org.graphstream.stream.AttributeSink#nodeAttributeRemoved(java.lang.String
* , long, java.lang.String, java.lang.String)
*/
public void nodeAttributeRemoved(String sourceId, long timeId,
String nodeId, String attribute) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer nodeBuff = encodeString(nodeId);
ByteBuffer attrBuff = encodeString(attribute);
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) + // timeId
nodeBuff.capacity() + // nodeId
attrBuff.capacity() // attribute
);
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_DEL_NODE_ATTR)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.put(nodeBuff)
.put(attrBuff);
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see
* org.graphstream.stream.AttributeSink#edgeAttributeAdded(java.lang.String,
* long, java.lang.String, java.lang.String, java.lang.Object)
*/
public void edgeAttributeAdded(String sourceId, long timeId, String edgeId,
String attribute, Object value) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer edgeBuff = encodeString(edgeId);
ByteBuffer attrBuff = encodeString(attribute);
int valueType = getType(value);
ByteBuffer valueBuff = encodeValue(value, valueType);
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) + // timeId
edgeBuff.capacity() + // nodeId
attrBuff.capacity() + // attribute
1 + // value type
valueBuff.capacity() // value
);
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_ADD_EDGE_ATTR)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.put(edgeBuff)
.put(attrBuff)
.put((byte) valueType) // value type
.put(valueBuff);
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see
* org.graphstream.stream.AttributeSink#edgeAttributeChanged(java.lang.String
* , long, java.lang.String, java.lang.String, java.lang.Object,
* java.lang.Object)
*/
public void edgeAttributeChanged(String sourceId, long timeId,
String edgeId, String attribute, Object oldValue, Object newValue) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer edgeBuff = encodeString(edgeId);
ByteBuffer attrBuff = encodeString(attribute);
int oldValueType = getType(oldValue);
int newValueType = getType(newValue);
ByteBuffer oldValueBuff = encodeValue(oldValue, oldValueType);
ByteBuffer newValueBuff = encodeValue(newValue, newValueType);
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) + // timeId
edgeBuff.capacity() + // nodeId
attrBuff.capacity() + // attribute
1 + // value type
oldValueBuff.capacity() + // value
1 + // value type
newValueBuff.capacity() // value
);
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_CHG_EDGE_ATTR)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.put(edgeBuff)
.put(attrBuff)
.put((byte) oldValueType)
.put(oldValueBuff)
.put((byte) newValueType)
.put(newValueBuff);
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see
* org.graphstream.stream.AttributeSink#edgeAttributeRemoved(java.lang.String
* , long, java.lang.String, java.lang.String)
*/
public void edgeAttributeRemoved(String sourceId, long timeId,
String edgeId, String attribute) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer edgeBuff = encodeString(edgeId);
ByteBuffer attrBuff = encodeString(attribute);
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) + // timeId
edgeBuff.capacity() + // nodeId
attrBuff.capacity() // attribute
);
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_DEL_EDGE_ATTR)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.put(edgeBuff)
.put(attrBuff);
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see org.graphstream.stream.ElementSink#nodeAdded(java.lang.String, long,
* java.lang.String)
*/
public void nodeAdded(String sourceId, long timeId, String nodeId) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer nodeBuff = encodeString(nodeId);
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) + // timeId
nodeBuff.capacity() // nodeId
);
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_ADD_NODE)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.put(nodeBuff);
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see org.graphstream.stream.ElementSink#nodeRemoved(java.lang.String,
* long, java.lang.String)
*/
public void nodeRemoved(String sourceId, long timeId, String nodeId) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer nodeBuff = encodeString(nodeId);
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) + // timeId
nodeBuff.capacity() // nodeId
);
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_DEL_NODE)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.put(nodeBuff);
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see org.graphstream.stream.ElementSink#edgeAdded(java.lang.String, long,
* java.lang.String, java.lang.String, java.lang.String, boolean)
*/
public void edgeAdded(String sourceId, long timeId, String edgeId,
String fromNodeId, String toNodeId, boolean directed) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer edgeBuff = encodeString(edgeId);
ByteBuffer fromNodeBuff = encodeString(fromNodeId);
ByteBuffer toNodeBuff = encodeString(toNodeId);
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) + // timeId
edgeBuff.capacity() + // edge
fromNodeBuff.capacity() + // from nodeId
toNodeBuff.capacity() + // to nodeId
1 // direction
);
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_ADD_EDGE)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.put(edgeBuff)
.put(fromNodeBuff)
.put(toNodeBuff)
.put((byte) (!directed ? 0 : 1));
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see org.graphstream.stream.ElementSink#edgeRemoved(java.lang.String,
* long, java.lang.String)
*/
public void edgeRemoved(String sourceId, long timeId, String edgeId) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer edgeBuff = encodeString(edgeId);
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) + // timeId
edgeBuff.capacity() // edge
);
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_DEL_EDGE)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.put(edgeBuff);
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see org.graphstream.stream.ElementSink#graphCleared(java.lang.String,
* long)
*/
public void graphCleared(String sourceId, long timeId) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId)
);
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_CLEARED)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId));
doSend(buff);
}
/*
* (non-Javadoc)
*
* @see org.graphstream.stream.ElementSink#stepBegins(java.lang.String,
* long, double)
*/
public void stepBegins(String sourceId, long timeId, double step) {
if (!sourceId.equals(this.sourceId)) {
this.sourceId = sourceId;
sourceIdBuff = encodeString(sourceId);
}
ByteBuffer buff = ByteBuffer.allocate(
streamBuffer.capacity() + // stream
1 + // CMD
sourceIdBuff.capacity() + // source id
varintSize(timeId) +
8 // time
);
streamBuffer.rewind();
sourceIdBuff.rewind();
buff
.put(streamBuffer)
.put((byte) NetStreamConstants.EVENT_STEP)
.put(sourceIdBuff)
.put(encodeUnsignedVarint(timeId))
.putDouble(step);
doSend(buff);
}
/**
* Force the connection to close (properly) with the server
*
* @throws IOException
*/
public void close() throws IOException {
socket.close();
}
}