NetStreamReceiver.java

1
/*
2
 * Copyright 2006 - 2013 Stefan Balev <stefan.balev@graphstream-project.org>
3
 * Julien Baudry <julien.baudry@graphstream-project.org> Antoine Dutot
4
 * <antoine.dutot@graphstream-project.org> Yoann Pign��
5
 * <yoann.pigne@graphstream-project.org> Guilhelm Savin
6
 * <guilhelm.savin@graphstream-project.org>
7
 * 
8
 * This file is part of GraphStream <http://graphstream-project.org>.
9
 * 
10
 * GraphStream is a library whose purpose is to handle static or dynamic graph,
11
 * create them from scratch, file or any source and display them.
12
 * 
13
 * This program is free software distributed under the terms of two licenses,
14
 * the CeCILL-C license that fits European law, and the GNU Lesser General
15
 * Public License. You can use, modify and/ or redistribute the software under
16
 * the terms of the CeCILL-C license as circulated by CEA, CNRS and INRIA at the
17
 * following URL <http://www.cecill.info> or under the terms of the GNU LGPL as
18
 * published by the Free Software Foundation, either version 3 of the License,
19
 * or (at your option) any later version.
20
 * 
21
 * This program is distributed in the hope that it will be useful, but WITHOUT
22
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
23
 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
24
 * details.
25
 * 
26
 * You should have received a copy of the GNU Lesser General Public License
27
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
28
 * 
29
 * The fact that you are presently reading this means that you have had
30
 * knowledge of the CeCILL-C and LGPL licenses and that you accept their terms.
31
 */
32
package org.graphstream.stream.netstream;
33
34
import java.io.IOException;
35
import java.io.InputStream;
36
import java.net.InetAddress;
37
import java.net.InetSocketAddress;
38
import java.net.UnknownHostException;
39
import java.nio.ByteBuffer;
40
import java.nio.channels.SelectionKey;
41
import java.nio.channels.Selector;
42
import java.nio.channels.ServerSocketChannel;
43
import java.nio.channels.SocketChannel;
44
import java.nio.charset.Charset;
45
import java.util.HashMap;
46
import java.util.Iterator;
47
import java.util.Set;
48
49
import org.graphstream.stream.netstream.packing.NetStreamUnpacker;
50
import org.graphstream.stream.thread.ThreadProxyPipe;
51
import org.miv.mbox.net.PositionableByteArrayInputStream;
52
53
/**
54
 * <p>
55
 * This class implements a receiver according to specifications the NetStream
56
 * protocol.
57
 * </p>
58
 * 
59
 * <p>
60
 * See {@link NetStreamConstants} for a full description of the protocol, the
61
 * sender and the receiver.
62
 * </p>
63
 * 
64
 * @see NetStreamConstants
65
 * @see NetStreamSender
66
 * 
67
 * 
68
 *      Copyright (c) 2010 University of Luxembourg
69
 * 
70
 *      NetStreamReceiver.java
71
 * @since Aug 13, 2011
72
 * 
73
 * @author Yoann Pign��
74
 * 
75
 */
76
public class NetStreamReceiver extends Thread implements NetStreamDecoder {
77
78
	/**
79
	 * the hostname this receiver is listening at.
80
	 */
81
	private String hostname;
82
83
	/**
84
	 * the port listened to.
85
	 */
86
	private int port;
87
88
	/**
89
	 * Receiver socket.
90
	 */
91
	protected ServerSocketChannel server;
92
93
	/**
94
	 * Multiplexor.
95
	 */
96
	protected Selector selector;
97
98
	/**
99
	 * Key for the selector.
100
	 */
101
	protected SelectionKey key;
102
103
	/**
104
	 * While true, the received is running.
105
	 */
106
	protected boolean loop = true;
107
108
	/**
109
	 * Show debugging messages.
110
	 */
111
	protected boolean debug = true;
112
113
	/**
114
	 * Last encountered error.
115
	 */
116
	protected String lastError = null;
117
118
	/**
119
	 * The current pipe commands are being written to.
120
	 */
121
	protected ThreadProxyPipe currentStream;
122
123
	/**
124
	 * Utility class that decodes messages according to the NetStream Protocol
125
	 */
126
	protected NetStreamDecoder decoder;
127
	
128
129
	/**
130
	 * Current active incoming connections.
131
	 */
132
	protected HashMap<SelectionKey, IncomingBuffer> incoming = new HashMap<SelectionKey, IncomingBuffer>();
133
134
	class DefaultUnpacker extends NetStreamUnpacker {
135
136
		@Override
137
		public ByteBuffer unpackMessage(ByteBuffer buffer, int startIndex,
138
				int endIndex) {
139 1 1. unpackMessage : mutated return of Object value for org/graphstream/stream/netstream/NetStreamReceiver$DefaultUnpacker::unpackMessage to ( if (x != null) null else throw new RuntimeException ) → NO_COVERAGE
			return buffer;
140
		}
141
142
		@Override
143
		public int unpackMessageSize(ByteBuffer buffer) {
144 1 1. unpackMessageSize : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE
			return buffer.getInt();
145
		}
146
147
		/*
148
		 * (non-Javadoc)
149
		 * 
150
		 * @see
151
		 * org.graphstream.stream.netstream.packing.NetStreamUnpacker#sizeOfInt
152
		 * ()
153
		 */
154
		@Override
155
		public int sizeOfInt() {
156 1 1. sizeOfInt : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE
			return 4;
157
		}
158
	};
159
	private NetStreamUnpacker unpacker;
160
161
	// Constructors
162
163
	/**
164
	 * New NetStream Receiver, awaiting in its own thread at the given host name
165
	 * and port, for new graph events.
166
	 * 
167
	 * @param hostname
168
	 *            The host name to listen at messages.
169
	 * @param port
170
	 *            The port to listen at messages.
171
	 */
172
	public NetStreamReceiver(String hostname, int port) throws IOException,
173
			UnknownHostException {
174
		this(hostname, port, false);
175
	}
176
177
	/**
178
	 * New NetStream Receiver, awaiting in its own thread at "localhost" on the
179
	 * given port, for new graph events.
180
	 * 
181
	 * @param port
182
	 *            The port to listen at messages.
183
	 */
184
	public NetStreamReceiver(int port) throws IOException, UnknownHostException {
185
		this("localhost", port, false);
186
	}
187
188
	/**
189
	 * New NetStream Receiver, awaiting in its own thread at the given host name
190
	 * and port, for new graph events.
191
	 * 
192
	 * @param hostname
193
	 *            The host name to listen at messages.
194
	 * @param port
195
	 *            The port to listen at messages.
196
	 * @param debug
197
	 *            If true informations are output for each message received.
198
	 */
199
	public NetStreamReceiver(String hostname, int port, boolean debug)
200
			throws IOException, UnknownHostException {
201
		this.hostname = hostname;
202
		this.port = port;
203
		this.unpacker = new DefaultUnpacker();
204
		this.decoder = new DefaultNetStreamDecoder();
205 1 1. : removed call to org/graphstream/stream/netstream/NetStreamReceiver::setDebugOn → NO_COVERAGE
		setDebugOn(debug);
206 1 1. : removed call to org/graphstream/stream/netstream/NetStreamReceiver::init → NO_COVERAGE
		init();
207 1 1. : removed call to org/graphstream/stream/netstream/NetStreamReceiver::start → NO_COVERAGE
		start();
208
	}
209
210
	// Access
211
212
	/**
213
	 * False as soon as the receiver terminates.
214
	 */
215
	public synchronized boolean isRunning() {
216 1 1. isRunning : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE
		return loop;
217
	}
218
219
220
	// Commands
221
222
	/**
223
	 * Initialize the server socket.
224
	 */
225
	protected void init() throws IOException, UnknownHostException {
226
		selector = Selector.open();
227
		server = ServerSocketChannel.open();
228
229
		server.configureBlocking(false);
230
231
		InetAddress ia = InetAddress.getByName(hostname);
232
		InetSocketAddress isa = new InetSocketAddress(ia, port);
233
234 1 1. init : removed call to java/net/ServerSocket::bind → NO_COVERAGE
		server.socket().bind(isa);
235
236 1 1. init : negated conditional → NO_COVERAGE
		if (debug)
237 1 1. init : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE
			debug("bound to socket %s:%d", server.socket().getInetAddress(),
238
					server.socket().getLocalPort());
239
240
		// Register a first server socket inside the multiplexer.
241
242
		key = server.register(selector, SelectionKey.OP_ACCEPT);
243
	}
244
245
	/**
246
	 * Enable or disable debugging.
247
	 */
248
	public void setDebugOn(boolean on) {
249
		debug = on;
250 1 1. setDebugOn : removed call to org/graphstream/stream/netstream/NetStreamDecoder::setDebugOn → NO_COVERAGE
		decoder.setDebugOn(on);
251
	}
252
253
254
	/**
255
	 * Stop the receiver.
256
	 */
257
	public synchronized void quit() {
258
		loop = false;
259
		key.selector().wakeup();
260
261 1 1. quit : negated conditional → NO_COVERAGE
		if (debug)
262 1 1. quit : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE
			debug("stopped");
263
	}
264
265
	/**
266
	 * Ask the receiver about its active connections
267
	 */
268
	public synchronized boolean hasActiveConnections() {
269 2 1. hasActiveConnections : negated conditional → NO_COVERAGE
2. hasActiveConnections : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE
		return !incoming.isEmpty();
270
	}
271
272
	/**
273
	 * Sets an optional NetStreamUnpaker whose "unpack" method will be called on
274
	 * each message.
275
	 * 
276
	 * It allows to do extra decoding on the all byte array message. You can
277
	 * also decrypt things.
278
	 * 
279
	 * @param unpaker
280
	 */
281
	public void setUnpacker(NetStreamUnpacker unpaker) {
282
		this.unpacker = unpaker;
283
	}
284
	public void removeUnpacker() {
285
		unpacker = new DefaultUnpacker();
286
	}
287
288
	/**
289
	 * Wait for connections, accept them, demultiplexes them and dispatch
290
	 * messages to registered message boxes.
291
	 */
292
	@Override
293
	public void run() {
294
		boolean l;
295
296
		synchronized (this) {
297
			l = loop;
298
		}
299
300 1 1. run : negated conditional → NO_COVERAGE
		while (l) {
301 1 1. run : removed call to org/graphstream/stream/netstream/NetStreamReceiver::poll → NO_COVERAGE
			poll();
302
303
			synchronized (this) {
304
				l = loop;
305
			}
306
		}
307
308
		try {
309 1 1. run : removed call to java/nio/channels/ServerSocketChannel::close → NO_COVERAGE
			server.close();
310
		} catch (IOException e) {
311 1 1. run : removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE
			error("cannot close the server socket: " + e.getMessage(), e);
312
		}
313
314 1 1. run : negated conditional → NO_COVERAGE
		if (debug) {
315 1 1. run : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE
			debug("receiver //" + hostname + ":" + port + " finished");
316
		}
317
	}
318
319
	/**
320
	 * Wait until one or several chunks of message are acceptable. This method
321
	 * should be called in a loop. It can be used to block a program until some
322
	 * data is available.
323
	 */
324
	public void poll() {
325
		try {
326
			// Wait for incoming messages in a loop.
327
328 2 1. poll : changed conditional boundary → NO_COVERAGE
2. poll : negated conditional → NO_COVERAGE
			if (key.selector().select() > 0) {
329
				Set<?> readyKeys = selector.selectedKeys();
330
				Iterator<?> i = readyKeys.iterator();
331
332 1 1. poll : negated conditional → NO_COVERAGE
				while (i.hasNext()) {
333
					SelectionKey akey = (SelectionKey) i.next();
334
335 1 1. poll : removed call to java/util/Iterator::remove → NO_COVERAGE
					i.remove();
336
337 1 1. poll : negated conditional → NO_COVERAGE
					if (akey.isAcceptable()) {
338
						// If a new connection occurs, register the new socket
339
						// in the multiplexer.
340
341
						ServerSocketChannel ssocket = (ServerSocketChannel) akey
342
								.channel();
343
						SocketChannel socket = ssocket.accept();
344
345 1 1. poll : negated conditional → NO_COVERAGE
						if (debug)
346 1 1. poll : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE
							debug("accepting socket %s:%d", socket.socket()
347
									.getInetAddress(), socket.socket()
348
									.getPort());
349
350
						socket.configureBlocking(false);
351
						socket.finishConnect();
352
353
						// SelectionKey otherKey = socket.register( selector,
354
						// SelectionKey.OP_READ );
355
						socket.register(selector, SelectionKey.OP_READ);
356 1 1. poll : negated conditional → NO_COVERAGE
					} else if (akey.isReadable()) {
357
						// If a message arrives, read it.
358
359 1 1. poll : removed call to org/graphstream/stream/netstream/NetStreamReceiver::readDataChunk → NO_COVERAGE
						readDataChunk(akey);
360 1 1. poll : negated conditional → NO_COVERAGE
					} else if (akey.isWritable()) {
361
						throw new RuntimeException("should not happen");
362
					}
363
				}
364
			}
365
		} catch (IOException e) {
366 1 1. poll : removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE
			error(e, "I/O error in receiver //%s:%d thread: aborting: %s",
367
					hostname, port, e.getMessage());
368
369
			loop = false;
370
		} catch (Throwable e) {
371 1 1. poll : removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE
			error(e, "Unknown error: %s", e.getMessage());
372
373
			loop = false;
374
		}
375
	}
376
377
	/**
378
	 * When data is readable on a socket, send it to the appropriate buffer
379
	 * (creating it if needed).
380
	 */
381
	protected void readDataChunk(SelectionKey key) throws IOException {
382
		IncomingBuffer buf = incoming.get(key);
383
384 1 1. readDataChunk : negated conditional → NO_COVERAGE
		if (buf == null) {
385
			buf = new IncomingBuffer();
386
			incoming.put(key, buf);
387
			SocketChannel socket = (SocketChannel) key.channel();
388
389 1 1. readDataChunk : negated conditional → NO_COVERAGE
			if (debug)
390 1 1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE
				debug("creating buffer for new connection from %s:%d", socket
391
						.socket().getInetAddress(), socket.socket().getPort());
392
		}
393
394
		try {
395 1 1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamReceiver$IncomingBuffer::readDataChunk → NO_COVERAGE
			buf.readDataChunk(key);
396
397
		} catch (IOException e) {
398
			incoming.remove(key);
399 1 1. readDataChunk : removed call to java/io/IOException::printStackTrace → NO_COVERAGE
			e.printStackTrace();
400 1 1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE
			error(e,
401
					"receiver //%s:%d cannot read object socket channel (I/O error): %s",
402
					hostname, port, e.getMessage());
403
			loop = false;
404
		}
405
406 1 1. readDataChunk : negated conditional → NO_COVERAGE
		if (!buf.active) {
407
			incoming.remove(key);
408 1 1. readDataChunk : negated conditional → NO_COVERAGE
			if (debug)
409 1 1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE
				debug("removing buffer %s from incoming for geting inactive. %d left",
410
						key.toString(), incoming.size());
411
412
		}
413
414
	}
415
416
	// Utilities
417
418
	protected void error(String message, Object... data) {
419 1 1. error : removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE
		error(null, message, data);
420
	}
421
422
	protected static final String LIGHT_YELLOW = "";
423
	protected static final String RESET = "";
424
425
	protected void error(Throwable e, String message, Object... data) {
426
		// System.err.print( LIGHT_YELLOW );
427 1 1. error : removed call to java/io/PrintStream::print → NO_COVERAGE
		System.err.print("[");
428
		// System.err.print( RESET );
429
		System.err.printf(message, data);
430
		// System.err.print( LIGHT_YELLOW );
431
		System.err.printf("]%n");
432
		// System.err.println( RESET );
433
434 1 1. error : negated conditional → NO_COVERAGE
		if (e != null)
435 1 1. error : removed call to java/lang/Throwable::printStackTrace → NO_COVERAGE
			e.printStackTrace();
436
	}
437
438
	protected void debug(String message, Object... data) {
439
		// System.err.print( LIGHT_YELLOW );
440
		System.err.printf("[//%s:%d | ", hostname, port);
441
		// System.err.print( RESET );
442
		System.err.printf(message, data);
443
		// System.err.print( LIGHT_YELLOW );
444
		System.err.printf("]%n");
445
		// System.err.println( RESET );
446
	}
447
448
	// Nested classes
449
450
	/**
451
	 * The connection to a sender.
452
	 * 
453
	 * The receiver maintains several incoming connections and demultiplexes
454
	 * them.
455
	 */
456
	protected class IncomingBuffer {
457
		// Attributes
458
459
		protected static final int BUFFER_INITIAL_SIZE = 8192; // 65535, 4096
460
461
		/**
462
		 * Buffer for reading.
463
		 */
464
		protected ByteBuffer buf = ByteBuffer.allocate(BUFFER_INITIAL_SIZE);
465
466
		/**
467
		 * Index in the buffer past the last byte that forms the current
468
		 * message. End can be out of the buffer or out of the data read
469
		 * actually.
470
		 */
471
		protected int end = -1;
472
473
		/**
474
		 * Index in the buffer of the first byte that forms the currents
475
		 * message. Beg does not count the 4 bytes that give the size of the
476
		 * message. While the header is being read, beg is the first byte of the
477
		 * header.
478
		 */
479
		protected int beg = 0;
480
481
		/**
482
		 * Position inside beg and end past the last byte read. All bytes at and
483
		 * after pos have unspecified contents. Pos always verifies pos&gt;=beg
484
		 * and pos&lt;end. While the header is being read, pos is past the last
485
		 * byte of the header that has been read.
486
		 */
487
		protected int pos = 0;
488
489
		/**
490
		 * Object input stream for reading the buffer. This input stream reads
491
		 * data from the "bin" positionable byte array input stream, itself
492
		 * mapped on the current message to decode.
493
		 */
494
		PositionableByteArrayInputStream in;
495
496
		/**
497
		 * Input stream filter on the buffer. This descendant of
498
		 * ByteArrayInputStream is able to change its offset and length so that
499
		 * we can map exactly the message to decode inside the buffer.
500
		 */
501
		PositionableByteArrayInputStream bin;
502
503
		/**
504
		 * When false the socket is closed and this buffer must be removed from
505
		 * the active connections.
506
		 */
507
		protected boolean active = true;
508
509
		// Constructors
510
511
		public IncomingBuffer() {
512
		}
513
514
		// Commands
515
516
		/**
517
		 * Read the available bytes and buffers them. If one or more complete
518
		 * serialised objects are available, send them to their respective
519
		 * MBoxes.
520
		 * 
521
		 * Here is the junk...
522
		 */
523
		public void readDataChunk(SelectionKey key) throws IOException {
524
			int limit = 0; // Index past the last byte read during the current
525
							// invocation.
526
			int nbytes = 0; // Number of bytes read.
527
			SocketChannel socket = (SocketChannel) key.channel();
528
529
			int sizeOfInt = unpacker.sizeOfInt();
530
			// Buffers the data.
531
532
			nbytes = bufferize(pos, socket);
533 1 1. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE
			limit = pos + nbytes;
534
535 2 1. readDataChunk : changed conditional boundary → NO_COVERAGE
2. readDataChunk : negated conditional → NO_COVERAGE
			if (nbytes <= 0)
536
				return;
537
538 1 1. readDataChunk : negated conditional → NO_COVERAGE
			if (debug) {
539 1 1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE
				debug("<chunk (%d bytes) from "
540
						+ socket.socket().getInetAddress() + ":"
541
						+ socket.socket().getPort() + ">", nbytes);
542
				int at = buf.position();
543 3 1. readDataChunk : changed conditional boundary → NO_COVERAGE
2. readDataChunk : Changed increment from 1 to -1 → NO_COVERAGE
3. readDataChunk : negated conditional → NO_COVERAGE
				for (int i = 0; i < nbytes; i++) {
544 1 1. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE
					System.err.printf("%d ", buf.get(at + i));
545
				}
546 1 1. readDataChunk : removed call to java/io/PrintStream::println → NO_COVERAGE
				System.err.println();
547
				buf.position(at);
548
			}
549
			// Read the first header.
550
551 2 1. readDataChunk : changed conditional boundary → NO_COVERAGE
2. readDataChunk : negated conditional → NO_COVERAGE
			if (end < 0) {
552 3 1. readDataChunk : changed conditional boundary → NO_COVERAGE
2. readDataChunk : Replaced integer subtraction with addition → NO_COVERAGE
3. readDataChunk : negated conditional → NO_COVERAGE
				if ((limit - beg) >= sizeOfInt) {
553
					// If no data has been read yet in the buffer or if the
554
					// buffer
555
					// was emptied completely at previous call: prepare to read
556
					// a
557
					// new message by decoding its header.
558
559
					buf.position(0);
560
					int size = unpacker.unpackMessageSize(buf);
561 1 1. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE
					end = size + sizeOfInt;
562
					beg = sizeOfInt;
563 1 1. readDataChunk : negated conditional → NO_COVERAGE
					if (debug)
564 1 1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE
						debug("start to bufferize a %d byte long messsage",
565
								size);
566
				} else {
567
					// The header is incomplete, wait next call to complete it.
568
569
					pos = limit;
570
				}
571
			}
572
573
			// Read one or more messages or wait next call to buffers more.
574
575 2 1. readDataChunk : changed conditional boundary → NO_COVERAGE
2. readDataChunk : negated conditional → NO_COVERAGE
			if (end > 0) {
576 2 1. readDataChunk : changed conditional boundary → NO_COVERAGE
2. readDataChunk : negated conditional → NO_COVERAGE
				while (end < limit) {
577
					// While the end of the message is in the limit of what was
578
					// read, there are one or more complete messages. Decode
579
					// them
580
					// and read the header of the next message, until a message
581
					// is
582
					// incomplete or there are no more messages or a header is
583
					// incomplete.
584
585
					ByteBuffer unpackedBuffer = unpacker.unpackMessage(buf, beg, end);
586 1 1. readDataChunk : negated conditional → NO_COVERAGE
					if (unpackedBuffer == buf) {
587 1 1. readDataChunk : Replaced integer subtraction with addition → NO_COVERAGE
						in = new PositionableByteArrayInputStream(buf.array(), beg, end - beg);
588
					} else {
589
						in = new PositionableByteArrayInputStream(
590
								unpackedBuffer.array(), 0, unpackedBuffer.capacity());
591
					}
592
					
593 1 1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamDecoder::decodeMessage → NO_COVERAGE
					decoder.decodeMessage(in);
594
					buf.position(end);
595
596 3 1. readDataChunk : changed conditional boundary → NO_COVERAGE
2. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE
3. readDataChunk : negated conditional → NO_COVERAGE
					if (end + sizeOfInt <= limit) {
597
						// There is a following message.
598
599 1 1. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE
						beg = end + sizeOfInt;
600 2 1. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE
2. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE
						end = end + unpacker.unpackMessageSize(buf) + sizeOfInt;
601
					} else {
602
						// There is the beginning of a following message
603
						// but the header is incomplete. Compact the buffer
604
						// and stop here.
605
						assert (beg >= sizeOfInt);
606
607
						beg = end;
608 3 1. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE
2. readDataChunk : Replaced integer subtraction with addition → NO_COVERAGE
3. readDataChunk : Replaced integer subtraction with addition → NO_COVERAGE
						int p = sizeOfInt - ((end + sizeOfInt) - limit);
609
						compactBuffer();
610
						pos = p;
611
						beg = 0;
612
						end = -1;
613
						break;
614
					}
615
				}
616
617 1 1. readDataChunk : negated conditional → NO_COVERAGE
				if (end == limit) {
618
					// If the end of the message coincides with the limit of
619
					// what
620
					// was read we have one last complete message. We decode it
621
					// and
622
					// clear the buffer for the next call.
623
624
					ByteBuffer unpackedBuffer = unpacker.unpackMessage(buf, beg, end);
625 1 1. readDataChunk : negated conditional → NO_COVERAGE
					if (unpackedBuffer == buf) {
626 1 1. readDataChunk : Replaced integer subtraction with addition → NO_COVERAGE
						in = new PositionableByteArrayInputStream(buf.array(), beg, end - beg);
627
					} else {
628
						in = new PositionableByteArrayInputStream(
629
								unpackedBuffer.array(), 0, unpackedBuffer.capacity());
630
					}
631
					
632 1 1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamDecoder::decodeMessage → NO_COVERAGE
					decoder.decodeMessage(in);
633
					
634
					buf.clear();
635
					pos = 0;
636
					beg = 0;
637
					end = -1;
638 2 1. readDataChunk : changed conditional boundary → NO_COVERAGE
2. readDataChunk : negated conditional → NO_COVERAGE
				} else if (end > limit) {
639
					// If the end of the message if after what was read, prepare
640
					// to
641
					// read more at next call when we will have buffered more
642
					// data. If we are at the end of the buffer compact it (else
643
					// no
644
					// more space will be available for buffering).
645
646
					pos = limit;
647
648 2 1. readDataChunk : changed conditional boundary → NO_COVERAGE
2. readDataChunk : negated conditional → NO_COVERAGE
					if (end > buf.capacity())
649
						compactBuffer();
650
				}
651
			}
652
		}
653
654
		/**
655
		 * Read more data from the <code>socket</code> and put it in the buffer
656
		 * at <code>at</code>. If the read returns -1 bytes (meaning the
657
		 * connection ended), the socket is closed and this buffer will be made
658
		 * inactive (and therefore removed from the active connections by the
659
		 * Receiver that called it).
660
		 * 
661
		 * @return the number of bytes read.
662
		 * @throws IOException
663
		 *             if an I/O error occurs, in between the socket is closed
664
		 *             and the connection is made inactive, then the exception
665
		 *             is thrown.
666
		 */
667
		protected int bufferize(int at, SocketChannel socket)
668
				throws IOException {
669
			int nbytes = 0;
670
			// int limit = 0;
671
672
			try {
673
				buf.position(at);
674
675
				nbytes = socket.read(buf);
676
677 2 1. bufferize : changed conditional boundary → NO_COVERAGE
2. bufferize : negated conditional → NO_COVERAGE
				if (nbytes < 0) {
678
					active = false;
679 1 1. bufferize : negated conditional → NO_COVERAGE
					if (in != null)
680 1 1. bufferize : removed call to org/miv/mbox/net/PositionableByteArrayInputStream::close → NO_COVERAGE
						in.close();
681 1 1. bufferize : removed call to java/nio/channels/SocketChannel::close → NO_COVERAGE
					socket.close();
682 1 1. bufferize : negated conditional → NO_COVERAGE
					if (debug)
683 1 1. bufferize : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE
						debug("socket from %s:%d closed", socket.socket()
684
								.getInetAddress(), socket.socket().getPort());
685 1 1. bufferize : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE
					return nbytes;
686 1 1. bufferize : negated conditional → NO_COVERAGE
				} else if (nbytes == 0) {
687
					throw new RuntimeException(
688
							"should not happen: buffer to small, 0 bytes read: compact does not function? messages is larger than "
689
									+ buf.capacity() + "?");
690
					// This means that there are no bytes remaining in the
691
					// buffer... it is full.
692
					// compactBuffer();
693
					// return nbytes;
694
				}
695
696
				buf.position(at);
697
698 1 1. bufferize : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE
				return nbytes;
699
			} catch (IOException e) {
700 1 1. bufferize : negated conditional → NO_COVERAGE
				if (debug)
701 1 1. bufferize : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE
					debug("socket from %s:%d I/O error: %s", socket.socket()
702
							.getInetAddress(), socket.socket().getPort(),
703
							e.getMessage());
704
				active = false;
705 1 1. bufferize : negated conditional → NO_COVERAGE
				if (in != null)
706 1 1. bufferize : removed call to org/miv/mbox/net/PositionableByteArrayInputStream::close → NO_COVERAGE
					in.close();
707 1 1. bufferize : removed call to java/nio/channels/SocketChannel::close → NO_COVERAGE
				socket.close();
708
				throw e;
709
			}
710
		}
711
		
712
713
		/**
714
		 * Compact the buffer by removing all read data before <code>beg</code>.
715
		 * The <code>beg</code>, <code>end</code> and <code>pos</code> markers
716
		 * are updated accordingly. Compact works only if beg is larger than
717
		 * four (the size of a header).
718
		 * 
719
		 * @return the offset.
720
		 */
721
		protected int compactBuffer() {
722 2 1. compactBuffer : changed conditional boundary → NO_COVERAGE
2. compactBuffer : negated conditional → NO_COVERAGE
			if (beg > unpacker.sizeOfInt()) {
723
				int off = beg;
724
725
				buf.position(beg);
726
				buf.limit(buf.capacity());
727
				buf.compact();
728
729 1 1. compactBuffer : Replaced integer subtraction with addition → NO_COVERAGE
				pos -= beg;
730 1 1. compactBuffer : Replaced integer subtraction with addition → NO_COVERAGE
				end -= beg;
731
				beg = 0;
732
733 1 1. compactBuffer : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE
				return off;
734
			}
735
736 1 1. compactBuffer : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE
			return 0;
737
		}
738
739
		/**
740
		 * Not used in the current implementation, we assumes that no message
741
		 * will be larger than the size of the buffer.
742
		 */
743
		protected void enlargeBuffer() {
744 1 1. enlargeBuffer : Replaced integer multiplication with division → NO_COVERAGE
			ByteBuffer tmp = ByteBuffer.allocate(buf.capacity() * 2);
745
746
			buf.position(0);
747
			buf.limit(buf.capacity());
748
			tmp.put(buf);
749
			tmp.position(pos);
750
751
			buf = tmp;
752
753 1 1. enlargeBuffer : negated conditional → NO_COVERAGE
			if (bin != null)
754 1 1. enlargeBuffer : removed call to org/miv/mbox/net/PositionableByteArrayInputStream::changeBuffer → NO_COVERAGE
				bin.changeBuffer(buf.array());
755
		}
756
	}
757
758
	/* (non-Javadoc)
759
	 * @see org.graphstream.stream.netstream.NetStreamDecoder#getStream(java.lang.String)
760
	 */
761
	public ThreadProxyPipe getStream(String name) {
762 1 1. getStream : mutated return of Object value for org/graphstream/stream/netstream/NetStreamReceiver::getStream to ( if (x != null) null else throw new RuntimeException ) → NO_COVERAGE
		return decoder.getStream(name);
763
	}
764
765
	/* (non-Javadoc)
766
	 * @see org.graphstream.stream.netstream.NetStreamDecoder#getDefaultStream()
767
	 */
768
	public ThreadProxyPipe getDefaultStream() {
769 1 1. getDefaultStream : mutated return of Object value for org/graphstream/stream/netstream/NetStreamReceiver::getDefaultStream to ( if (x != null) null else throw new RuntimeException ) → NO_COVERAGE
		return decoder.getDefaultStream();
770
	}
771
772
	/* (non-Javadoc)
773
	 * @see org.graphstream.stream.netstream.NetStreamDecoder#register(java.lang.String, org.graphstream.stream.thread.ThreadProxyPipe)
774
	 */
775
	public void register(String name, ThreadProxyPipe stream) throws Exception {
776 1 1. register : removed call to org/graphstream/stream/netstream/NetStreamDecoder::register → NO_COVERAGE
		decoder.register(name, stream);
777
	}
778
779
	/* (non-Javadoc)
780
	 * @see org.graphstream.stream.netstream.NetStreamDecoder#decodeMessage(java.io.InputStream)
781
	 */
782
	public void decodeMessage(InputStream in) throws IOException {
783 1 1. decodeMessage : removed call to org/graphstream/stream/netstream/NetStreamDecoder::decodeMessage → NO_COVERAGE
		decoder.decodeMessage(in);
784
		
785
	}
786
787
788
}

Mutations

139

1.1
Location : unpackMessage
Killed by : none
mutated return of Object value for org/graphstream/stream/netstream/NetStreamReceiver$DefaultUnpacker::unpackMessage to ( if (x != null) null else throw new RuntimeException ) → NO_COVERAGE

144

1.1
Location : unpackMessageSize
Killed by : none
replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE

156

1.1
Location : sizeOfInt
Killed by : none
replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE

205

1.1
Location :
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::setDebugOn → NO_COVERAGE

206

1.1
Location :
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::init → NO_COVERAGE

207

1.1
Location :
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::start → NO_COVERAGE

216

1.1
Location : isRunning
Killed by : none
replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE

234

1.1
Location : init
Killed by : none
removed call to java/net/ServerSocket::bind → NO_COVERAGE

236

1.1
Location : init
Killed by : none
negated conditional → NO_COVERAGE

237

1.1
Location : init
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE

250

1.1
Location : setDebugOn
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamDecoder::setDebugOn → NO_COVERAGE

261

1.1
Location : quit
Killed by : none
negated conditional → NO_COVERAGE

262

1.1
Location : quit
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE

269

1.1
Location : hasActiveConnections
Killed by : none
negated conditional → NO_COVERAGE

2.2
Location : hasActiveConnections
Killed by : none
replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE

300

1.1
Location : run
Killed by : none
negated conditional → NO_COVERAGE

301

1.1
Location : run
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::poll → NO_COVERAGE

309

1.1
Location : run
Killed by : none
removed call to java/nio/channels/ServerSocketChannel::close → NO_COVERAGE

311

1.1
Location : run
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE

314

1.1
Location : run
Killed by : none
negated conditional → NO_COVERAGE

315

1.1
Location : run
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE

328

1.1
Location : poll
Killed by : none
changed conditional boundary → NO_COVERAGE

2.2
Location : poll
Killed by : none
negated conditional → NO_COVERAGE

332

1.1
Location : poll
Killed by : none
negated conditional → NO_COVERAGE

335

1.1
Location : poll
Killed by : none
removed call to java/util/Iterator::remove → NO_COVERAGE

337

1.1
Location : poll
Killed by : none
negated conditional → NO_COVERAGE

345

1.1
Location : poll
Killed by : none
negated conditional → NO_COVERAGE

346

1.1
Location : poll
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE

356

1.1
Location : poll
Killed by : none
negated conditional → NO_COVERAGE

359

1.1
Location : poll
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::readDataChunk → NO_COVERAGE

360

1.1
Location : poll
Killed by : none
negated conditional → NO_COVERAGE

366

1.1
Location : poll
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE

371

1.1
Location : poll
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE

384

1.1
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

389

1.1
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

390

1.1
Location : readDataChunk
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE

395

1.1
Location : readDataChunk
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver$IncomingBuffer::readDataChunk → NO_COVERAGE

399

1.1
Location : readDataChunk
Killed by : none
removed call to java/io/IOException::printStackTrace → NO_COVERAGE

400

1.1
Location : readDataChunk
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE

406

1.1
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

408

1.1
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

409

1.1
Location : readDataChunk
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE

419

1.1
Location : error
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE

427

1.1
Location : error
Killed by : none
removed call to java/io/PrintStream::print → NO_COVERAGE

434

1.1
Location : error
Killed by : none
negated conditional → NO_COVERAGE

435

1.1
Location : error
Killed by : none
removed call to java/lang/Throwable::printStackTrace → NO_COVERAGE

533

1.1
Location : readDataChunk
Killed by : none
Replaced integer addition with subtraction → NO_COVERAGE

535

1.1
Location : readDataChunk
Killed by : none
changed conditional boundary → NO_COVERAGE

2.2
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

538

1.1
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

539

1.1
Location : readDataChunk
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE

543

1.1
Location : readDataChunk
Killed by : none
changed conditional boundary → NO_COVERAGE

2.2
Location : readDataChunk
Killed by : none
Changed increment from 1 to -1 → NO_COVERAGE

3.3
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

544

1.1
Location : readDataChunk
Killed by : none
Replaced integer addition with subtraction → NO_COVERAGE

546

1.1
Location : readDataChunk
Killed by : none
removed call to java/io/PrintStream::println → NO_COVERAGE

551

1.1
Location : readDataChunk
Killed by : none
changed conditional boundary → NO_COVERAGE

2.2
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

552

1.1
Location : readDataChunk
Killed by : none
changed conditional boundary → NO_COVERAGE

2.2
Location : readDataChunk
Killed by : none
Replaced integer subtraction with addition → NO_COVERAGE

3.3
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

561

1.1
Location : readDataChunk
Killed by : none
Replaced integer addition with subtraction → NO_COVERAGE

563

1.1
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

564

1.1
Location : readDataChunk
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE

575

1.1
Location : readDataChunk
Killed by : none
changed conditional boundary → NO_COVERAGE

2.2
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

576

1.1
Location : readDataChunk
Killed by : none
changed conditional boundary → NO_COVERAGE

2.2
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

586

1.1
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

587

1.1
Location : readDataChunk
Killed by : none
Replaced integer subtraction with addition → NO_COVERAGE

593

1.1
Location : readDataChunk
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamDecoder::decodeMessage → NO_COVERAGE

596

1.1
Location : readDataChunk
Killed by : none
changed conditional boundary → NO_COVERAGE

2.2
Location : readDataChunk
Killed by : none
Replaced integer addition with subtraction → NO_COVERAGE

3.3
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

599

1.1
Location : readDataChunk
Killed by : none
Replaced integer addition with subtraction → NO_COVERAGE

600

1.1
Location : readDataChunk
Killed by : none
Replaced integer addition with subtraction → NO_COVERAGE

2.2
Location : readDataChunk
Killed by : none
Replaced integer addition with subtraction → NO_COVERAGE

608

1.1
Location : readDataChunk
Killed by : none
Replaced integer addition with subtraction → NO_COVERAGE

2.2
Location : readDataChunk
Killed by : none
Replaced integer subtraction with addition → NO_COVERAGE

3.3
Location : readDataChunk
Killed by : none
Replaced integer subtraction with addition → NO_COVERAGE

617

1.1
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

625

1.1
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

626

1.1
Location : readDataChunk
Killed by : none
Replaced integer subtraction with addition → NO_COVERAGE

632

1.1
Location : readDataChunk
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamDecoder::decodeMessage → NO_COVERAGE

638

1.1
Location : readDataChunk
Killed by : none
changed conditional boundary → NO_COVERAGE

2.2
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

648

1.1
Location : readDataChunk
Killed by : none
changed conditional boundary → NO_COVERAGE

2.2
Location : readDataChunk
Killed by : none
negated conditional → NO_COVERAGE

677

1.1
Location : bufferize
Killed by : none
changed conditional boundary → NO_COVERAGE

2.2
Location : bufferize
Killed by : none
negated conditional → NO_COVERAGE

679

1.1
Location : bufferize
Killed by : none
negated conditional → NO_COVERAGE

680

1.1
Location : bufferize
Killed by : none
removed call to org/miv/mbox/net/PositionableByteArrayInputStream::close → NO_COVERAGE

681

1.1
Location : bufferize
Killed by : none
removed call to java/nio/channels/SocketChannel::close → NO_COVERAGE

682

1.1
Location : bufferize
Killed by : none
negated conditional → NO_COVERAGE

683

1.1
Location : bufferize
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE

685

1.1
Location : bufferize
Killed by : none
replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE

686

1.1
Location : bufferize
Killed by : none
negated conditional → NO_COVERAGE

698

1.1
Location : bufferize
Killed by : none
replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE

700

1.1
Location : bufferize
Killed by : none
negated conditional → NO_COVERAGE

701

1.1
Location : bufferize
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE

705

1.1
Location : bufferize
Killed by : none
negated conditional → NO_COVERAGE

706

1.1
Location : bufferize
Killed by : none
removed call to org/miv/mbox/net/PositionableByteArrayInputStream::close → NO_COVERAGE

707

1.1
Location : bufferize
Killed by : none
removed call to java/nio/channels/SocketChannel::close → NO_COVERAGE

722

1.1
Location : compactBuffer
Killed by : none
changed conditional boundary → NO_COVERAGE

2.2
Location : compactBuffer
Killed by : none
negated conditional → NO_COVERAGE

729

1.1
Location : compactBuffer
Killed by : none
Replaced integer subtraction with addition → NO_COVERAGE

730

1.1
Location : compactBuffer
Killed by : none
Replaced integer subtraction with addition → NO_COVERAGE

733

1.1
Location : compactBuffer
Killed by : none
replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE

736

1.1
Location : compactBuffer
Killed by : none
replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE

744

1.1
Location : enlargeBuffer
Killed by : none
Replaced integer multiplication with division → NO_COVERAGE

753

1.1
Location : enlargeBuffer
Killed by : none
negated conditional → NO_COVERAGE

754

1.1
Location : enlargeBuffer
Killed by : none
removed call to org/miv/mbox/net/PositionableByteArrayInputStream::changeBuffer → NO_COVERAGE

762

1.1
Location : getStream
Killed by : none
mutated return of Object value for org/graphstream/stream/netstream/NetStreamReceiver::getStream to ( if (x != null) null else throw new RuntimeException ) → NO_COVERAGE

769

1.1
Location : getDefaultStream
Killed by : none
mutated return of Object value for org/graphstream/stream/netstream/NetStreamReceiver::getDefaultStream to ( if (x != null) null else throw new RuntimeException ) → NO_COVERAGE

776

1.1
Location : register
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamDecoder::register → NO_COVERAGE

783

1.1
Location : decodeMessage
Killed by : none
removed call to org/graphstream/stream/netstream/NetStreamDecoder::decodeMessage → NO_COVERAGE

Active mutators

Tests examined


Report generated by PIT 0.33