1 | /* | |
2 | * Copyright 2006 - 2013 Stefan Balev <stefan.balev@graphstream-project.org> | |
3 | * Julien Baudry <julien.baudry@graphstream-project.org> Antoine Dutot | |
4 | * <antoine.dutot@graphstream-project.org> Yoann Pign�� | |
5 | * <yoann.pigne@graphstream-project.org> Guilhelm Savin | |
6 | * <guilhelm.savin@graphstream-project.org> | |
7 | * | |
8 | * This file is part of GraphStream <http://graphstream-project.org>. | |
9 | * | |
10 | * GraphStream is a library whose purpose is to handle static or dynamic graph, | |
11 | * create them from scratch, file or any source and display them. | |
12 | * | |
13 | * This program is free software distributed under the terms of two licenses, | |
14 | * the CeCILL-C license that fits European law, and the GNU Lesser General | |
15 | * Public License. You can use, modify and/ or redistribute the software under | |
16 | * the terms of the CeCILL-C license as circulated by CEA, CNRS and INRIA at the | |
17 | * following URL <http://www.cecill.info> or under the terms of the GNU LGPL as | |
18 | * published by the Free Software Foundation, either version 3 of the License, | |
19 | * or (at your option) any later version. | |
20 | * | |
21 | * This program is distributed in the hope that it will be useful, but WITHOUT | |
22 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | |
23 | * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more | |
24 | * details. | |
25 | * | |
26 | * You should have received a copy of the GNU Lesser General Public License | |
27 | * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
28 | * | |
29 | * The fact that you are presently reading this means that you have had | |
30 | * knowledge of the CeCILL-C and LGPL licenses and that you accept their terms. | |
31 | */ | |
32 | package org.graphstream.stream.netstream; | |
33 | ||
34 | import java.io.IOException; | |
35 | import java.io.InputStream; | |
36 | import java.net.InetAddress; | |
37 | import java.net.InetSocketAddress; | |
38 | import java.net.UnknownHostException; | |
39 | import java.nio.ByteBuffer; | |
40 | import java.nio.channels.SelectionKey; | |
41 | import java.nio.channels.Selector; | |
42 | import java.nio.channels.ServerSocketChannel; | |
43 | import java.nio.channels.SocketChannel; | |
44 | import java.nio.charset.Charset; | |
45 | import java.util.HashMap; | |
46 | import java.util.Iterator; | |
47 | import java.util.Set; | |
48 | ||
49 | import org.graphstream.stream.netstream.packing.NetStreamUnpacker; | |
50 | import org.graphstream.stream.thread.ThreadProxyPipe; | |
51 | import org.miv.mbox.net.PositionableByteArrayInputStream; | |
52 | ||
53 | /** | |
54 | * <p> | |
55 | * This class implements a receiver according to specifications the NetStream | |
56 | * protocol. | |
57 | * </p> | |
58 | * | |
59 | * <p> | |
60 | * See {@link NetStreamConstants} for a full description of the protocol, the | |
61 | * sender and the receiver. | |
62 | * </p> | |
63 | * | |
64 | * @see NetStreamConstants | |
65 | * @see NetStreamSender | |
66 | * | |
67 | * | |
68 | * Copyright (c) 2010 University of Luxembourg | |
69 | * | |
70 | * NetStreamReceiver.java | |
71 | * @since Aug 13, 2011 | |
72 | * | |
73 | * @author Yoann Pign�� | |
74 | * | |
75 | */ | |
76 | public class NetStreamReceiver extends Thread implements NetStreamDecoder { | |
77 | ||
78 | /** | |
79 | * the hostname this receiver is listening at. | |
80 | */ | |
81 | private String hostname; | |
82 | ||
83 | /** | |
84 | * the port listened to. | |
85 | */ | |
86 | private int port; | |
87 | ||
88 | /** | |
89 | * Receiver socket. | |
90 | */ | |
91 | protected ServerSocketChannel server; | |
92 | ||
93 | /** | |
94 | * Multiplexor. | |
95 | */ | |
96 | protected Selector selector; | |
97 | ||
98 | /** | |
99 | * Key for the selector. | |
100 | */ | |
101 | protected SelectionKey key; | |
102 | ||
103 | /** | |
104 | * While true, the received is running. | |
105 | */ | |
106 | protected boolean loop = true; | |
107 | ||
108 | /** | |
109 | * Show debugging messages. | |
110 | */ | |
111 | protected boolean debug = true; | |
112 | ||
113 | /** | |
114 | * Last encountered error. | |
115 | */ | |
116 | protected String lastError = null; | |
117 | ||
118 | /** | |
119 | * The current pipe commands are being written to. | |
120 | */ | |
121 | protected ThreadProxyPipe currentStream; | |
122 | ||
123 | /** | |
124 | * Utility class that decodes messages according to the NetStream Protocol | |
125 | */ | |
126 | protected NetStreamDecoder decoder; | |
127 | | |
128 | ||
129 | /** | |
130 | * Current active incoming connections. | |
131 | */ | |
132 | protected HashMap<SelectionKey, IncomingBuffer> incoming = new HashMap<SelectionKey, IncomingBuffer>(); | |
133 | ||
134 | class DefaultUnpacker extends NetStreamUnpacker { | |
135 | ||
136 | @Override | |
137 | public ByteBuffer unpackMessage(ByteBuffer buffer, int startIndex, | |
138 | int endIndex) { | |
139 |
1
1. unpackMessage : mutated return of Object value for org/graphstream/stream/netstream/NetStreamReceiver$DefaultUnpacker::unpackMessage to ( if (x != null) null else throw new RuntimeException ) → NO_COVERAGE |
return buffer; |
140 | } | |
141 | ||
142 | @Override | |
143 | public int unpackMessageSize(ByteBuffer buffer) { | |
144 |
1
1. unpackMessageSize : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE |
return buffer.getInt(); |
145 | } | |
146 | ||
147 | /* | |
148 | * (non-Javadoc) | |
149 | * | |
150 | * @see | |
151 | * org.graphstream.stream.netstream.packing.NetStreamUnpacker#sizeOfInt | |
152 | * () | |
153 | */ | |
154 | @Override | |
155 | public int sizeOfInt() { | |
156 |
1
1. sizeOfInt : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE |
return 4; |
157 | } | |
158 | }; | |
159 | private NetStreamUnpacker unpacker; | |
160 | ||
161 | // Constructors | |
162 | ||
163 | /** | |
164 | * New NetStream Receiver, awaiting in its own thread at the given host name | |
165 | * and port, for new graph events. | |
166 | * | |
167 | * @param hostname | |
168 | * The host name to listen at messages. | |
169 | * @param port | |
170 | * The port to listen at messages. | |
171 | */ | |
172 | public NetStreamReceiver(String hostname, int port) throws IOException, | |
173 | UnknownHostException { | |
174 | this(hostname, port, false); | |
175 | } | |
176 | ||
177 | /** | |
178 | * New NetStream Receiver, awaiting in its own thread at "localhost" on the | |
179 | * given port, for new graph events. | |
180 | * | |
181 | * @param port | |
182 | * The port to listen at messages. | |
183 | */ | |
184 | public NetStreamReceiver(int port) throws IOException, UnknownHostException { | |
185 | this("localhost", port, false); | |
186 | } | |
187 | ||
188 | /** | |
189 | * New NetStream Receiver, awaiting in its own thread at the given host name | |
190 | * and port, for new graph events. | |
191 | * | |
192 | * @param hostname | |
193 | * The host name to listen at messages. | |
194 | * @param port | |
195 | * The port to listen at messages. | |
196 | * @param debug | |
197 | * If true informations are output for each message received. | |
198 | */ | |
199 | public NetStreamReceiver(String hostname, int port, boolean debug) | |
200 | throws IOException, UnknownHostException { | |
201 | this.hostname = hostname; | |
202 | this.port = port; | |
203 | this.unpacker = new DefaultUnpacker(); | |
204 | this.decoder = new DefaultNetStreamDecoder(); | |
205 |
1
1. |
setDebugOn(debug); |
206 |
1
1. |
init(); |
207 |
1
1. |
start(); |
208 | } | |
209 | ||
210 | // Access | |
211 | ||
212 | /** | |
213 | * False as soon as the receiver terminates. | |
214 | */ | |
215 | public synchronized boolean isRunning() { | |
216 |
1
1. isRunning : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE |
return loop; |
217 | } | |
218 | ||
219 | ||
220 | // Commands | |
221 | ||
222 | /** | |
223 | * Initialize the server socket. | |
224 | */ | |
225 | protected void init() throws IOException, UnknownHostException { | |
226 | selector = Selector.open(); | |
227 | server = ServerSocketChannel.open(); | |
228 | ||
229 | server.configureBlocking(false); | |
230 | ||
231 | InetAddress ia = InetAddress.getByName(hostname); | |
232 | InetSocketAddress isa = new InetSocketAddress(ia, port); | |
233 | ||
234 |
1
1. init : removed call to java/net/ServerSocket::bind → NO_COVERAGE |
server.socket().bind(isa); |
235 | ||
236 |
1
1. init : negated conditional → NO_COVERAGE |
if (debug) |
237 |
1
1. init : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE |
debug("bound to socket %s:%d", server.socket().getInetAddress(), |
238 | server.socket().getLocalPort()); | |
239 | ||
240 | // Register a first server socket inside the multiplexer. | |
241 | ||
242 | key = server.register(selector, SelectionKey.OP_ACCEPT); | |
243 | } | |
244 | ||
245 | /** | |
246 | * Enable or disable debugging. | |
247 | */ | |
248 | public void setDebugOn(boolean on) { | |
249 | debug = on; | |
250 |
1
1. setDebugOn : removed call to org/graphstream/stream/netstream/NetStreamDecoder::setDebugOn → NO_COVERAGE |
decoder.setDebugOn(on); |
251 | } | |
252 | ||
253 | ||
254 | /** | |
255 | * Stop the receiver. | |
256 | */ | |
257 | public synchronized void quit() { | |
258 | loop = false; | |
259 | key.selector().wakeup(); | |
260 | ||
261 |
1
1. quit : negated conditional → NO_COVERAGE |
if (debug) |
262 |
1
1. quit : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE |
debug("stopped"); |
263 | } | |
264 | ||
265 | /** | |
266 | * Ask the receiver about its active connections | |
267 | */ | |
268 | public synchronized boolean hasActiveConnections() { | |
269 |
2
1. hasActiveConnections : negated conditional → NO_COVERAGE 2. hasActiveConnections : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE |
return !incoming.isEmpty(); |
270 | } | |
271 | ||
272 | /** | |
273 | * Sets an optional NetStreamUnpaker whose "unpack" method will be called on | |
274 | * each message. | |
275 | * | |
276 | * It allows to do extra decoding on the all byte array message. You can | |
277 | * also decrypt things. | |
278 | * | |
279 | * @param unpaker | |
280 | */ | |
281 | public void setUnpacker(NetStreamUnpacker unpaker) { | |
282 | this.unpacker = unpaker; | |
283 | } | |
284 | public void removeUnpacker() { | |
285 | unpacker = new DefaultUnpacker(); | |
286 | } | |
287 | ||
288 | /** | |
289 | * Wait for connections, accept them, demultiplexes them and dispatch | |
290 | * messages to registered message boxes. | |
291 | */ | |
292 | @Override | |
293 | public void run() { | |
294 | boolean l; | |
295 | ||
296 | synchronized (this) { | |
297 | l = loop; | |
298 | } | |
299 | ||
300 |
1
1. run : negated conditional → NO_COVERAGE |
while (l) { |
301 |
1
1. run : removed call to org/graphstream/stream/netstream/NetStreamReceiver::poll → NO_COVERAGE |
poll(); |
302 | ||
303 | synchronized (this) { | |
304 | l = loop; | |
305 | } | |
306 | } | |
307 | ||
308 | try { | |
309 |
1
1. run : removed call to java/nio/channels/ServerSocketChannel::close → NO_COVERAGE |
server.close(); |
310 | } catch (IOException e) { | |
311 |
1
1. run : removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE |
error("cannot close the server socket: " + e.getMessage(), e); |
312 | } | |
313 | ||
314 |
1
1. run : negated conditional → NO_COVERAGE |
if (debug) { |
315 |
1
1. run : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE |
debug("receiver //" + hostname + ":" + port + " finished"); |
316 | } | |
317 | } | |
318 | ||
319 | /** | |
320 | * Wait until one or several chunks of message are acceptable. This method | |
321 | * should be called in a loop. It can be used to block a program until some | |
322 | * data is available. | |
323 | */ | |
324 | public void poll() { | |
325 | try { | |
326 | // Wait for incoming messages in a loop. | |
327 | ||
328 |
2
1. poll : changed conditional boundary → NO_COVERAGE 2. poll : negated conditional → NO_COVERAGE |
if (key.selector().select() > 0) { |
329 | Set<?> readyKeys = selector.selectedKeys(); | |
330 | Iterator<?> i = readyKeys.iterator(); | |
331 | ||
332 |
1
1. poll : negated conditional → NO_COVERAGE |
while (i.hasNext()) { |
333 | SelectionKey akey = (SelectionKey) i.next(); | |
334 | ||
335 |
1
1. poll : removed call to java/util/Iterator::remove → NO_COVERAGE |
i.remove(); |
336 | ||
337 |
1
1. poll : negated conditional → NO_COVERAGE |
if (akey.isAcceptable()) { |
338 | // If a new connection occurs, register the new socket | |
339 | // in the multiplexer. | |
340 | ||
341 | ServerSocketChannel ssocket = (ServerSocketChannel) akey | |
342 | .channel(); | |
343 | SocketChannel socket = ssocket.accept(); | |
344 | ||
345 |
1
1. poll : negated conditional → NO_COVERAGE |
if (debug) |
346 |
1
1. poll : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE |
debug("accepting socket %s:%d", socket.socket() |
347 | .getInetAddress(), socket.socket() | |
348 | .getPort()); | |
349 | ||
350 | socket.configureBlocking(false); | |
351 | socket.finishConnect(); | |
352 | ||
353 | // SelectionKey otherKey = socket.register( selector, | |
354 | // SelectionKey.OP_READ ); | |
355 | socket.register(selector, SelectionKey.OP_READ); | |
356 |
1
1. poll : negated conditional → NO_COVERAGE |
} else if (akey.isReadable()) { |
357 | // If a message arrives, read it. | |
358 | ||
359 |
1
1. poll : removed call to org/graphstream/stream/netstream/NetStreamReceiver::readDataChunk → NO_COVERAGE |
readDataChunk(akey); |
360 |
1
1. poll : negated conditional → NO_COVERAGE |
} else if (akey.isWritable()) { |
361 | throw new RuntimeException("should not happen"); | |
362 | } | |
363 | } | |
364 | } | |
365 | } catch (IOException e) { | |
366 |
1
1. poll : removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE |
error(e, "I/O error in receiver //%s:%d thread: aborting: %s", |
367 | hostname, port, e.getMessage()); | |
368 | ||
369 | loop = false; | |
370 | } catch (Throwable e) { | |
371 |
1
1. poll : removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE |
error(e, "Unknown error: %s", e.getMessage()); |
372 | ||
373 | loop = false; | |
374 | } | |
375 | } | |
376 | ||
377 | /** | |
378 | * When data is readable on a socket, send it to the appropriate buffer | |
379 | * (creating it if needed). | |
380 | */ | |
381 | protected void readDataChunk(SelectionKey key) throws IOException { | |
382 | IncomingBuffer buf = incoming.get(key); | |
383 | ||
384 |
1
1. readDataChunk : negated conditional → NO_COVERAGE |
if (buf == null) { |
385 | buf = new IncomingBuffer(); | |
386 | incoming.put(key, buf); | |
387 | SocketChannel socket = (SocketChannel) key.channel(); | |
388 | ||
389 |
1
1. readDataChunk : negated conditional → NO_COVERAGE |
if (debug) |
390 |
1
1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE |
debug("creating buffer for new connection from %s:%d", socket |
391 | .socket().getInetAddress(), socket.socket().getPort()); | |
392 | } | |
393 | ||
394 | try { | |
395 |
1
1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamReceiver$IncomingBuffer::readDataChunk → NO_COVERAGE |
buf.readDataChunk(key); |
396 | ||
397 | } catch (IOException e) { | |
398 | incoming.remove(key); | |
399 |
1
1. readDataChunk : removed call to java/io/IOException::printStackTrace → NO_COVERAGE |
e.printStackTrace(); |
400 |
1
1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE |
error(e, |
401 | "receiver //%s:%d cannot read object socket channel (I/O error): %s", | |
402 | hostname, port, e.getMessage()); | |
403 | loop = false; | |
404 | } | |
405 | ||
406 |
1
1. readDataChunk : negated conditional → NO_COVERAGE |
if (!buf.active) { |
407 | incoming.remove(key); | |
408 |
1
1. readDataChunk : negated conditional → NO_COVERAGE |
if (debug) |
409 |
1
1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE |
debug("removing buffer %s from incoming for geting inactive. %d left", |
410 | key.toString(), incoming.size()); | |
411 | ||
412 | } | |
413 | ||
414 | } | |
415 | ||
416 | // Utilities | |
417 | ||
418 | protected void error(String message, Object... data) { | |
419 |
1
1. error : removed call to org/graphstream/stream/netstream/NetStreamReceiver::error → NO_COVERAGE |
error(null, message, data); |
420 | } | |
421 | ||
422 | protected static final String LIGHT_YELLOW = "[33;1m"; | |
423 | protected static final String RESET = "[0m"; | |
424 | ||
425 | protected void error(Throwable e, String message, Object... data) { | |
426 | // System.err.print( LIGHT_YELLOW ); | |
427 |
1
1. error : removed call to java/io/PrintStream::print → NO_COVERAGE |
System.err.print("["); |
428 | // System.err.print( RESET ); | |
429 | System.err.printf(message, data); | |
430 | // System.err.print( LIGHT_YELLOW ); | |
431 | System.err.printf("]%n"); | |
432 | // System.err.println( RESET ); | |
433 | ||
434 |
1
1. error : negated conditional → NO_COVERAGE |
if (e != null) |
435 |
1
1. error : removed call to java/lang/Throwable::printStackTrace → NO_COVERAGE |
e.printStackTrace(); |
436 | } | |
437 | ||
438 | protected void debug(String message, Object... data) { | |
439 | // System.err.print( LIGHT_YELLOW ); | |
440 | System.err.printf("[//%s:%d | ", hostname, port); | |
441 | // System.err.print( RESET ); | |
442 | System.err.printf(message, data); | |
443 | // System.err.print( LIGHT_YELLOW ); | |
444 | System.err.printf("]%n"); | |
445 | // System.err.println( RESET ); | |
446 | } | |
447 | ||
448 | // Nested classes | |
449 | ||
450 | /** | |
451 | * The connection to a sender. | |
452 | * | |
453 | * The receiver maintains several incoming connections and demultiplexes | |
454 | * them. | |
455 | */ | |
456 | protected class IncomingBuffer { | |
457 | // Attributes | |
458 | ||
459 | protected static final int BUFFER_INITIAL_SIZE = 8192; // 65535, 4096 | |
460 | ||
461 | /** | |
462 | * Buffer for reading. | |
463 | */ | |
464 | protected ByteBuffer buf = ByteBuffer.allocate(BUFFER_INITIAL_SIZE); | |
465 | ||
466 | /** | |
467 | * Index in the buffer past the last byte that forms the current | |
468 | * message. End can be out of the buffer or out of the data read | |
469 | * actually. | |
470 | */ | |
471 | protected int end = -1; | |
472 | ||
473 | /** | |
474 | * Index in the buffer of the first byte that forms the currents | |
475 | * message. Beg does not count the 4 bytes that give the size of the | |
476 | * message. While the header is being read, beg is the first byte of the | |
477 | * header. | |
478 | */ | |
479 | protected int beg = 0; | |
480 | ||
481 | /** | |
482 | * Position inside beg and end past the last byte read. All bytes at and | |
483 | * after pos have unspecified contents. Pos always verifies pos>=beg | |
484 | * and pos<end. While the header is being read, pos is past the last | |
485 | * byte of the header that has been read. | |
486 | */ | |
487 | protected int pos = 0; | |
488 | ||
489 | /** | |
490 | * Object input stream for reading the buffer. This input stream reads | |
491 | * data from the "bin" positionable byte array input stream, itself | |
492 | * mapped on the current message to decode. | |
493 | */ | |
494 | PositionableByteArrayInputStream in; | |
495 | ||
496 | /** | |
497 | * Input stream filter on the buffer. This descendant of | |
498 | * ByteArrayInputStream is able to change its offset and length so that | |
499 | * we can map exactly the message to decode inside the buffer. | |
500 | */ | |
501 | PositionableByteArrayInputStream bin; | |
502 | ||
503 | /** | |
504 | * When false the socket is closed and this buffer must be removed from | |
505 | * the active connections. | |
506 | */ | |
507 | protected boolean active = true; | |
508 | ||
509 | // Constructors | |
510 | ||
511 | public IncomingBuffer() { | |
512 | } | |
513 | ||
514 | // Commands | |
515 | ||
516 | /** | |
517 | * Read the available bytes and buffers them. If one or more complete | |
518 | * serialised objects are available, send them to their respective | |
519 | * MBoxes. | |
520 | * | |
521 | * Here is the junk... | |
522 | */ | |
523 | public void readDataChunk(SelectionKey key) throws IOException { | |
524 | int limit = 0; // Index past the last byte read during the current | |
525 | // invocation. | |
526 | int nbytes = 0; // Number of bytes read. | |
527 | SocketChannel socket = (SocketChannel) key.channel(); | |
528 | ||
529 | int sizeOfInt = unpacker.sizeOfInt(); | |
530 | // Buffers the data. | |
531 | ||
532 | nbytes = bufferize(pos, socket); | |
533 |
1
1. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE |
limit = pos + nbytes; |
534 | ||
535 |
2
1. readDataChunk : changed conditional boundary → NO_COVERAGE 2. readDataChunk : negated conditional → NO_COVERAGE |
if (nbytes <= 0) |
536 | return; | |
537 | ||
538 |
1
1. readDataChunk : negated conditional → NO_COVERAGE |
if (debug) { |
539 |
1
1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE |
debug("<chunk (%d bytes) from " |
540 | + socket.socket().getInetAddress() + ":" | |
541 | + socket.socket().getPort() + ">", nbytes); | |
542 | int at = buf.position(); | |
543 |
3
1. readDataChunk : changed conditional boundary → NO_COVERAGE 2. readDataChunk : Changed increment from 1 to -1 → NO_COVERAGE 3. readDataChunk : negated conditional → NO_COVERAGE |
for (int i = 0; i < nbytes; i++) { |
544 |
1
1. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE |
System.err.printf("%d ", buf.get(at + i)); |
545 | } | |
546 |
1
1. readDataChunk : removed call to java/io/PrintStream::println → NO_COVERAGE |
System.err.println(); |
547 | buf.position(at); | |
548 | } | |
549 | // Read the first header. | |
550 | ||
551 |
2
1. readDataChunk : changed conditional boundary → NO_COVERAGE 2. readDataChunk : negated conditional → NO_COVERAGE |
if (end < 0) { |
552 |
3
1. readDataChunk : changed conditional boundary → NO_COVERAGE 2. readDataChunk : Replaced integer subtraction with addition → NO_COVERAGE 3. readDataChunk : negated conditional → NO_COVERAGE |
if ((limit - beg) >= sizeOfInt) { |
553 | // If no data has been read yet in the buffer or if the | |
554 | // buffer | |
555 | // was emptied completely at previous call: prepare to read | |
556 | // a | |
557 | // new message by decoding its header. | |
558 | ||
559 | buf.position(0); | |
560 | int size = unpacker.unpackMessageSize(buf); | |
561 |
1
1. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE |
end = size + sizeOfInt; |
562 | beg = sizeOfInt; | |
563 |
1
1. readDataChunk : negated conditional → NO_COVERAGE |
if (debug) |
564 |
1
1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE |
debug("start to bufferize a %d byte long messsage", |
565 | size); | |
566 | } else { | |
567 | // The header is incomplete, wait next call to complete it. | |
568 | ||
569 | pos = limit; | |
570 | } | |
571 | } | |
572 | ||
573 | // Read one or more messages or wait next call to buffers more. | |
574 | ||
575 |
2
1. readDataChunk : changed conditional boundary → NO_COVERAGE 2. readDataChunk : negated conditional → NO_COVERAGE |
if (end > 0) { |
576 |
2
1. readDataChunk : changed conditional boundary → NO_COVERAGE 2. readDataChunk : negated conditional → NO_COVERAGE |
while (end < limit) { |
577 | // While the end of the message is in the limit of what was | |
578 | // read, there are one or more complete messages. Decode | |
579 | // them | |
580 | // and read the header of the next message, until a message | |
581 | // is | |
582 | // incomplete or there are no more messages or a header is | |
583 | // incomplete. | |
584 | ||
585 | ByteBuffer unpackedBuffer = unpacker.unpackMessage(buf, beg, end); | |
586 |
1
1. readDataChunk : negated conditional → NO_COVERAGE |
if (unpackedBuffer == buf) { |
587 |
1
1. readDataChunk : Replaced integer subtraction with addition → NO_COVERAGE |
in = new PositionableByteArrayInputStream(buf.array(), beg, end - beg); |
588 | } else { | |
589 | in = new PositionableByteArrayInputStream( | |
590 | unpackedBuffer.array(), 0, unpackedBuffer.capacity()); | |
591 | } | |
592 | | |
593 |
1
1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamDecoder::decodeMessage → NO_COVERAGE |
decoder.decodeMessage(in); |
594 | buf.position(end); | |
595 | ||
596 |
3
1. readDataChunk : changed conditional boundary → NO_COVERAGE 2. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE 3. readDataChunk : negated conditional → NO_COVERAGE |
if (end + sizeOfInt <= limit) { |
597 | // There is a following message. | |
598 | ||
599 |
1
1. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE |
beg = end + sizeOfInt; |
600 |
2
1. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE 2. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE |
end = end + unpacker.unpackMessageSize(buf) + sizeOfInt; |
601 | } else { | |
602 | // There is the beginning of a following message | |
603 | // but the header is incomplete. Compact the buffer | |
604 | // and stop here. | |
605 | assert (beg >= sizeOfInt); | |
606 | ||
607 | beg = end; | |
608 |
3
1. readDataChunk : Replaced integer addition with subtraction → NO_COVERAGE 2. readDataChunk : Replaced integer subtraction with addition → NO_COVERAGE 3. readDataChunk : Replaced integer subtraction with addition → NO_COVERAGE |
int p = sizeOfInt - ((end + sizeOfInt) - limit); |
609 | compactBuffer(); | |
610 | pos = p; | |
611 | beg = 0; | |
612 | end = -1; | |
613 | break; | |
614 | } | |
615 | } | |
616 | ||
617 |
1
1. readDataChunk : negated conditional → NO_COVERAGE |
if (end == limit) { |
618 | // If the end of the message coincides with the limit of | |
619 | // what | |
620 | // was read we have one last complete message. We decode it | |
621 | // and | |
622 | // clear the buffer for the next call. | |
623 | ||
624 | ByteBuffer unpackedBuffer = unpacker.unpackMessage(buf, beg, end); | |
625 |
1
1. readDataChunk : negated conditional → NO_COVERAGE |
if (unpackedBuffer == buf) { |
626 |
1
1. readDataChunk : Replaced integer subtraction with addition → NO_COVERAGE |
in = new PositionableByteArrayInputStream(buf.array(), beg, end - beg); |
627 | } else { | |
628 | in = new PositionableByteArrayInputStream( | |
629 | unpackedBuffer.array(), 0, unpackedBuffer.capacity()); | |
630 | } | |
631 | | |
632 |
1
1. readDataChunk : removed call to org/graphstream/stream/netstream/NetStreamDecoder::decodeMessage → NO_COVERAGE |
decoder.decodeMessage(in); |
633 | | |
634 | buf.clear(); | |
635 | pos = 0; | |
636 | beg = 0; | |
637 | end = -1; | |
638 |
2
1. readDataChunk : changed conditional boundary → NO_COVERAGE 2. readDataChunk : negated conditional → NO_COVERAGE |
} else if (end > limit) { |
639 | // If the end of the message if after what was read, prepare | |
640 | // to | |
641 | // read more at next call when we will have buffered more | |
642 | // data. If we are at the end of the buffer compact it (else | |
643 | // no | |
644 | // more space will be available for buffering). | |
645 | ||
646 | pos = limit; | |
647 | ||
648 |
2
1. readDataChunk : changed conditional boundary → NO_COVERAGE 2. readDataChunk : negated conditional → NO_COVERAGE |
if (end > buf.capacity()) |
649 | compactBuffer(); | |
650 | } | |
651 | } | |
652 | } | |
653 | ||
654 | /** | |
655 | * Read more data from the <code>socket</code> and put it in the buffer | |
656 | * at <code>at</code>. If the read returns -1 bytes (meaning the | |
657 | * connection ended), the socket is closed and this buffer will be made | |
658 | * inactive (and therefore removed from the active connections by the | |
659 | * Receiver that called it). | |
660 | * | |
661 | * @return the number of bytes read. | |
662 | * @throws IOException | |
663 | * if an I/O error occurs, in between the socket is closed | |
664 | * and the connection is made inactive, then the exception | |
665 | * is thrown. | |
666 | */ | |
667 | protected int bufferize(int at, SocketChannel socket) | |
668 | throws IOException { | |
669 | int nbytes = 0; | |
670 | // int limit = 0; | |
671 | ||
672 | try { | |
673 | buf.position(at); | |
674 | ||
675 | nbytes = socket.read(buf); | |
676 | ||
677 |
2
1. bufferize : changed conditional boundary → NO_COVERAGE 2. bufferize : negated conditional → NO_COVERAGE |
if (nbytes < 0) { |
678 | active = false; | |
679 |
1
1. bufferize : negated conditional → NO_COVERAGE |
if (in != null) |
680 |
1
1. bufferize : removed call to org/miv/mbox/net/PositionableByteArrayInputStream::close → NO_COVERAGE |
in.close(); |
681 |
1
1. bufferize : removed call to java/nio/channels/SocketChannel::close → NO_COVERAGE |
socket.close(); |
682 |
1
1. bufferize : negated conditional → NO_COVERAGE |
if (debug) |
683 |
1
1. bufferize : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE |
debug("socket from %s:%d closed", socket.socket() |
684 | .getInetAddress(), socket.socket().getPort()); | |
685 |
1
1. bufferize : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE |
return nbytes; |
686 |
1
1. bufferize : negated conditional → NO_COVERAGE |
} else if (nbytes == 0) { |
687 | throw new RuntimeException( | |
688 | "should not happen: buffer to small, 0 bytes read: compact does not function? messages is larger than " | |
689 | + buf.capacity() + "?"); | |
690 | // This means that there are no bytes remaining in the | |
691 | // buffer... it is full. | |
692 | // compactBuffer(); | |
693 | // return nbytes; | |
694 | } | |
695 | ||
696 | buf.position(at); | |
697 | ||
698 |
1
1. bufferize : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE |
return nbytes; |
699 | } catch (IOException e) { | |
700 |
1
1. bufferize : negated conditional → NO_COVERAGE |
if (debug) |
701 |
1
1. bufferize : removed call to org/graphstream/stream/netstream/NetStreamReceiver::debug → NO_COVERAGE |
debug("socket from %s:%d I/O error: %s", socket.socket() |
702 | .getInetAddress(), socket.socket().getPort(), | |
703 | e.getMessage()); | |
704 | active = false; | |
705 |
1
1. bufferize : negated conditional → NO_COVERAGE |
if (in != null) |
706 |
1
1. bufferize : removed call to org/miv/mbox/net/PositionableByteArrayInputStream::close → NO_COVERAGE |
in.close(); |
707 |
1
1. bufferize : removed call to java/nio/channels/SocketChannel::close → NO_COVERAGE |
socket.close(); |
708 | throw e; | |
709 | } | |
710 | } | |
711 | | |
712 | ||
713 | /** | |
714 | * Compact the buffer by removing all read data before <code>beg</code>. | |
715 | * The <code>beg</code>, <code>end</code> and <code>pos</code> markers | |
716 | * are updated accordingly. Compact works only if beg is larger than | |
717 | * four (the size of a header). | |
718 | * | |
719 | * @return the offset. | |
720 | */ | |
721 | protected int compactBuffer() { | |
722 |
2
1. compactBuffer : changed conditional boundary → NO_COVERAGE 2. compactBuffer : negated conditional → NO_COVERAGE |
if (beg > unpacker.sizeOfInt()) { |
723 | int off = beg; | |
724 | ||
725 | buf.position(beg); | |
726 | buf.limit(buf.capacity()); | |
727 | buf.compact(); | |
728 | ||
729 |
1
1. compactBuffer : Replaced integer subtraction with addition → NO_COVERAGE |
pos -= beg; |
730 |
1
1. compactBuffer : Replaced integer subtraction with addition → NO_COVERAGE |
end -= beg; |
731 | beg = 0; | |
732 | ||
733 |
1
1. compactBuffer : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE |
return off; |
734 | } | |
735 | ||
736 |
1
1. compactBuffer : replaced return of integer sized value with (x == 0 ? 1 : 0) → NO_COVERAGE |
return 0; |
737 | } | |
738 | ||
739 | /** | |
740 | * Not used in the current implementation, we assumes that no message | |
741 | * will be larger than the size of the buffer. | |
742 | */ | |
743 | protected void enlargeBuffer() { | |
744 |
1
1. enlargeBuffer : Replaced integer multiplication with division → NO_COVERAGE |
ByteBuffer tmp = ByteBuffer.allocate(buf.capacity() * 2); |
745 | ||
746 | buf.position(0); | |
747 | buf.limit(buf.capacity()); | |
748 | tmp.put(buf); | |
749 | tmp.position(pos); | |
750 | ||
751 | buf = tmp; | |
752 | ||
753 |
1
1. enlargeBuffer : negated conditional → NO_COVERAGE |
if (bin != null) |
754 |
1
1. enlargeBuffer : removed call to org/miv/mbox/net/PositionableByteArrayInputStream::changeBuffer → NO_COVERAGE |
bin.changeBuffer(buf.array()); |
755 | } | |
756 | } | |
757 | ||
758 | /* (non-Javadoc) | |
759 | * @see org.graphstream.stream.netstream.NetStreamDecoder#getStream(java.lang.String) | |
760 | */ | |
761 | public ThreadProxyPipe getStream(String name) { | |
762 |
1
1. getStream : mutated return of Object value for org/graphstream/stream/netstream/NetStreamReceiver::getStream to ( if (x != null) null else throw new RuntimeException ) → NO_COVERAGE |
return decoder.getStream(name); |
763 | } | |
764 | ||
765 | /* (non-Javadoc) | |
766 | * @see org.graphstream.stream.netstream.NetStreamDecoder#getDefaultStream() | |
767 | */ | |
768 | public ThreadProxyPipe getDefaultStream() { | |
769 |
1
1. getDefaultStream : mutated return of Object value for org/graphstream/stream/netstream/NetStreamReceiver::getDefaultStream to ( if (x != null) null else throw new RuntimeException ) → NO_COVERAGE |
return decoder.getDefaultStream(); |
770 | } | |
771 | ||
772 | /* (non-Javadoc) | |
773 | * @see org.graphstream.stream.netstream.NetStreamDecoder#register(java.lang.String, org.graphstream.stream.thread.ThreadProxyPipe) | |
774 | */ | |
775 | public void register(String name, ThreadProxyPipe stream) throws Exception { | |
776 |
1
1. register : removed call to org/graphstream/stream/netstream/NetStreamDecoder::register → NO_COVERAGE |
decoder.register(name, stream); |
777 | } | |
778 | ||
779 | /* (non-Javadoc) | |
780 | * @see org.graphstream.stream.netstream.NetStreamDecoder#decodeMessage(java.io.InputStream) | |
781 | */ | |
782 | public void decodeMessage(InputStream in) throws IOException { | |
783 |
1
1. decodeMessage : removed call to org/graphstream/stream/netstream/NetStreamDecoder::decodeMessage → NO_COVERAGE |
decoder.decodeMessage(in); |
784 | | |
785 | } | |
786 | ||
787 | ||
788 | } | |
Mutations | ||
139 |
1.1 |
|
144 |
1.1 |
|
156 |
1.1 |
|
205 |
1.1 |
|
206 |
1.1 |
|
207 |
1.1 |
|
216 |
1.1 |
|
234 |
1.1 |
|
236 |
1.1 |
|
237 |
1.1 |
|
250 |
1.1 |
|
261 |
1.1 |
|
262 |
1.1 |
|
269 |
1.1 2.2 |
|
300 |
1.1 |
|
301 |
1.1 |
|
309 |
1.1 |
|
311 |
1.1 |
|
314 |
1.1 |
|
315 |
1.1 |
|
328 |
1.1 2.2 |
|
332 |
1.1 |
|
335 |
1.1 |
|
337 |
1.1 |
|
345 |
1.1 |
|
346 |
1.1 |
|
356 |
1.1 |
|
359 |
1.1 |
|
360 |
1.1 |
|
366 |
1.1 |
|
371 |
1.1 |
|
384 |
1.1 |
|
389 |
1.1 |
|
390 |
1.1 |
|
395 |
1.1 |
|
399 |
1.1 |
|
400 |
1.1 |
|
406 |
1.1 |
|
408 |
1.1 |
|
409 |
1.1 |
|
419 |
1.1 |
|
427 |
1.1 |
|
434 |
1.1 |
|
435 |
1.1 |
|
533 |
1.1 |
|
535 |
1.1 2.2 |
|
538 |
1.1 |
|
539 |
1.1 |
|
543 |
1.1 2.2 3.3 |
|
544 |
1.1 |
|
546 |
1.1 |
|
551 |
1.1 2.2 |
|
552 |
1.1 2.2 3.3 |
|
561 |
1.1 |
|
563 |
1.1 |
|
564 |
1.1 |
|
575 |
1.1 2.2 |
|
576 |
1.1 2.2 |
|
586 |
1.1 |
|
587 |
1.1 |
|
593 |
1.1 |
|
596 |
1.1 2.2 3.3 |
|
599 |
1.1 |
|
600 |
1.1 2.2 |
|
608 |
1.1 2.2 3.3 |
|
617 |
1.1 |
|
625 |
1.1 |
|
626 |
1.1 |
|
632 |
1.1 |
|
638 |
1.1 2.2 |
|
648 |
1.1 2.2 |
|
677 |
1.1 2.2 |
|
679 |
1.1 |
|
680 |
1.1 |
|
681 |
1.1 |
|
682 |
1.1 |
|
683 |
1.1 |
|
685 |
1.1 |
|
686 |
1.1 |
|
698 |
1.1 |
|
700 |
1.1 |
|
701 |
1.1 |
|
705 |
1.1 |
|
706 |
1.1 |
|
707 |
1.1 |
|
722 |
1.1 2.2 |
|
729 |
1.1 |
|
730 |
1.1 |
|
733 |
1.1 |
|
736 |
1.1 |
|
744 |
1.1 |
|
753 |
1.1 |
|
754 |
1.1 |
|
762 |
1.1 |
|
769 |
1.1 |
|
776 |
1.1 |
|
783 |
1.1 |