Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import lombok.extern.slf4j.Slf4j;
import oap.http.server.nio.handlers.CompressionNioHandler;
import oap.io.Closeables;
import oap.util.Dates;
import oap.util.Lists;
import org.xnio.OptionMap;
import org.xnio.Options;
Expand All @@ -61,6 +62,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@Slf4j
Expand Down Expand Up @@ -333,6 +335,13 @@ public void preStop() {
if( undertow != null ) {
undertow.stop();
}
if( xnioWorker != null ) {
xnioWorker.shutdown();
if( !xnioWorker.awaitTermination( Dates.s( 10 ), TimeUnit.MILLISECONDS ) ) {
xnioWorker.shutdownNow();
}
xnioWorker = null;
}
} catch( Exception ex ) {
log.error( "Cannot stop server", ex );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public MessageAvailabilityReport( State state, Map<String, State> subsystemState

public enum State {
OPERATIONAL,
PARTIALLY_OPERATIONAL,
FAILED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,24 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
@ToString
public class MessageSender implements Closeable, AutoCloseable {
private static final Pair<MessageStatus, Short> STATUS_OK = Pair.__( MessageStatus.OK, MessageProtocol.STATUS_OK );
private final Object syncDiskLock = new Object();
private final String host;
private final int port;
private final Path directory;
private final MessageNoRetryStrategy messageNoRetryStrategy;
private final long clientId = Cuid.UNIQUE.nextLong();
private final Messages messages = new Messages();
private final ConcurrentMap<Byte, Pair<MessageStatus, Short>> lastStatus = new ConcurrentHashMap<>();
private final String messageUrl;
private final long memorySyncPeriod;
public final Object syncDiskLock = new Object();
public final String host;
public final int port;
public final Path directory;
public final MessageNoRetryStrategy messageNoRetryStrategy;
public final long clientId = Cuid.UNIQUE.nextLong();
public final Messages messages = new Messages();
public final ConcurrentMap<Byte, Pair<MessageStatus, Short>> lastStatus = new ConcurrentHashMap<>();
public final String messageUrl;
public final long memorySyncPeriod;
private final ReentrantLock lock = new ReentrantLock();
public String uniqueName = Cuid.UNIQUE.next();
public long storageLockExpiration = Dates.h( 1 );
Expand All @@ -97,10 +99,12 @@ public class MessageSender implements Closeable, AutoCloseable {
public long keepAliveDuration = Dates.d( 30 );
public long timeout = Dates.s( 5 );
public long connectionTimeout = Dates.s( 30 );
private volatile boolean closed = false;
private Scheduled diskSyncScheduler;
private boolean networkAvailable = true;
private long ioExceptionStartRetryTimeout = -1;
public volatile boolean closed = false;
public Scheduled diskSyncScheduler;
public AtomicLong networkAvailable = new AtomicLong( 0 );
public int networkAvailableMaxErrors = 2;
public long ioExceptionStartRetryTimeout = -1;
public long timeBetweenLogs = Dates.m( 1 );

public MessageSender( String host, int port, String httpPrefix, Path persistenceDirectory, long memorySyncPeriod ) {
this( host, port, httpPrefix, persistenceDirectory, memorySyncPeriod, MessageNoRetryStrategy.DROP );
Expand Down Expand Up @@ -238,7 +242,7 @@ private void saveMessagesToDirectory( Path directory ) {
}

public MessageAvailabilityReport availabilityReport( byte messageType ) {
boolean operational = networkAvailable
boolean operational = networkAvailable.get() <= networkAvailableMaxErrors
&& !closed
&& lastStatus.getOrDefault( messageType, STATUS_OK )._1 != MessageStatus.ERROR;
return new MessageAvailabilityReport( operational ? State.OPERATIONAL : State.FAILED );
Expand Down Expand Up @@ -280,23 +284,27 @@ public Messages.MessageInfo send( Messages.MessageInfo messageInfo, long now ) {
return onOkRespone( messageInfo, inputStreamResponseListener, now );

} catch( UnknownHostException e ) {
processException( messageInfo, now, message, e, true );

ioExceptionStartRetryTimeout = now;
networkAvailable.incrementAndGet();

processException( messageInfo, now, message, e, true );

throw Throwables.propagate( e );
} catch( Throwable e ) {
networkAvailable.incrementAndGet();
processException( messageInfo, now, message, e, false );

throw Throwables.propagate( e );
}
}

private void processException( Messages.MessageInfo messageInfo, long now, Message message, Throwable e, boolean globalRetryTimeout ) {
String status = e instanceof TimeoutException ? "send_timeout" : "send_io_error";

Metrics.counter( "oap.messages",
"type", MessageProtocol.messageTypeToString( message.messageType ),
"status", "send_io_error" + ( globalRetryTimeout ? "_gr" : "" ) ).increment();
LogConsolidated.log( log, Level.ERROR, Dates.s( 10 ), e.getMessage(), e );
"status", status + ( globalRetryTimeout ? "_gr" : "" ) ).increment();
LogConsolidated.log( log, Level.ERROR, timeBetweenLogs, e.getMessage(), e );
messages.retry( messageInfo, now + retryTimeout );
}

Expand All @@ -318,7 +326,7 @@ private Messages.MessageInfo onOkRespone( Messages.MessageInfo messageInfo, Inpu

log.trace( "[{}] sending done, server status: {}", uniqueName, MessageProtocol.messageStatusToString( status ) );

MessageSender.this.networkAvailable = true;
MessageSender.this.networkAvailable.set( 0 );

switch( status ) {
case MessageProtocol.STATUS_ALREADY_WRITTEN -> {
Expand Down Expand Up @@ -365,44 +373,57 @@ private Messages.MessageInfo onOkRespone( Messages.MessageInfo messageInfo, Inpu
return messageInfo;
}

public void syncMemory() {
if( getReadyMessages() + getRetryMessages() + getInProgressMessages() > 0 )
log.trace( "[{}] sync ready {} retry {} inprogress {} ...",
uniqueName, getReadyMessages(), getRetryMessages(), getInProgressMessages() );
public boolean syncMemory() {
log.trace( "[{}] syncMemory...", uniqueName );
try {
if( getReadyMessages() + getRetryMessages() + getInProgressMessages() > 0 )
log.trace( "[{}] sync ready {} retry {} inprogress {} ...",
uniqueName, getReadyMessages(), getRetryMessages(), getInProgressMessages() );

long now = DateTimeUtils.currentTimeMillis();
long now = DateTimeUtils.currentTimeMillis();

if( isGlobalIoRetryTimeout( now ) ) return;
if( isGlobalIoRetryTimeout( now ) ) {
log.trace( "skip, isGlobalIoRetryTimeout" );
return true;
}

long period = currentPeriod( now );
long period = currentPeriod( now );

Messages.MessageInfo messageInfo = null;
Messages.MessageInfo messageInfo = null;

messages.retry();
messages.retry();

do {
now = DateTimeUtils.currentTimeMillis();
do {
now = DateTimeUtils.currentTimeMillis();

if( messageInfo != null ) {
log.trace( "[{}] message {}...", uniqueName, messageInfo.message.md5 );
Messages.MessageInfo mi = send( messageInfo, now );
if( messageInfo != null ) {
log.trace( "[{}] message {}...", uniqueName, messageInfo.message.md5 );
Messages.MessageInfo mi = send( messageInfo, now );

messages.removeInProgress( mi );
log.trace( "[{}] message {}... done", uniqueName, mi.message.md5 );
}
messages.removeInProgress( mi );
log.trace( "[{}] message {}... done", uniqueName, mi.message.md5 );
}

if( isGlobalIoRetryTimeout( now ) ) {
break;
}
if( isGlobalIoRetryTimeout( now ) ) {
log.trace( "skip, isGlobalIoRetryTimeout" );
break;
}

long currentPeriod = currentPeriod( now );
if( currentPeriod != period ) {
messages.retry();
period = currentPeriod;
}
long currentPeriod = currentPeriod( now );
if( currentPeriod != period ) {
messages.retry();
period = currentPeriod;
}

messageInfo = messages.poll( true );
} while( messageInfo != null );
messageInfo = messages.poll( true );
} while( messageInfo != null );

return true;
} catch( Exception e ) {
return false;
} finally {
log.trace( "[{}] syncMemory... DONE", uniqueName );
}
}

private boolean isGlobalIoRetryTimeout( long now ) {
Expand Down Expand Up @@ -486,7 +507,7 @@ public MessageSender syncDisk() {

Files.delete( messagePath );
} catch( Exception e ) {
LogConsolidated.log( log, Level.ERROR, Dates.s( 5 ), "[" + uniqueName + "] " + messagePath + ": " + e.getMessage(), e );
LogConsolidated.log( log, Level.ERROR, timeBetweenLogs, "[" + uniqueName + "] " + messagePath + ": " + e.getMessage(), e );
} finally {
Files.delete( lockFile );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ public MessageInfo poll( boolean inProgress ) {
@SuppressWarnings( "checkstyle:OverloadMethodsDeclarationOrder" )
public void retry() {
if( getRetryMessages() == 0 ) return;
var now = DateTimeUtils.currentTimeMillis();
long now = DateTimeUtils.currentTimeMillis();

log.trace( "ready {} retry {} now {}",
getReadyMessages(), getRetryMessages(), Dates.formatDateWithMillis( now ) );

var it = retry.iterator();
Iterator<RetryInfo> it = retry.iterator();
while( it.hasNext() ) {
var retryInfo = it.next();
RetryInfo retryInfo = it.next();
if( retryInfo.startTime < now ) {
it.remove();
add( retryInfo.messageInfo );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,16 @@
@Slf4j
public class MessageHttpHandler implements HttpHandler, Closeable {
public final HashMap<Byte, MessageListener> map = new HashMap<>();
public int clientHashCacheSize = 1024;
private final List<MessageListener> listeners;
private final long hashTtl;
private final Path controlStatePath;
private final NioHttpServer server;
private final String context;
public int clientHashCacheSize = 1024;
public String port = null;
private MessageHashStorage hashes;
private Scheduled scheduled;

public String port = null;

public MessageHttpHandler( NioHttpServer server, String context, Path controlStatePath, List<MessageListener> listeners, long hashTtl ) {
this.server = server;
this.context = context;
Expand Down Expand Up @@ -138,10 +137,10 @@ public void preStart() {
log.warn( "Cannot load hashes", e );
}

for( var listener : listeners ) {
for( MessageListener listener : listeners ) {
log.info( "Listener type {} info {}", listener.getId(), listener.getInfo() );

var d = this.map.put( listener.getId(), listener );
MessageListener d = this.map.put( listener.getId(), listener );
if( d != null ) {
throw new IllegalArgumentException( "duplicate [" + listener.getInfo() + ", " + d.getInfo() + "]" );
}
Expand All @@ -152,35 +151,35 @@ public void preStart() {
public void handleRequest( HttpServerExchange exchange ) throws Exception {
log.trace( "new message {}", exchange );

try( var in = new DataInputStream( exchange.getInputStream() ) ) {
try( DataInputStream in = new DataInputStream( exchange.getInputStream() ) ) {
InetSocketAddress peerAddress = ( InetSocketAddress ) exchange.exchange.getConnection().getPeerAddress();
var hostName = peerAddress.getHostName();
var port = peerAddress.getPort();
String hostName = peerAddress.getHostName();
int port = peerAddress.getPort();
String clientHostPort = hostName + ":" + port;

var messageType = in.readByte();
byte messageType = in.readByte();
log.trace( "new message from {}", clientHostPort );

var messageVersion = in.readShort();
var clientId = in.readLong();
final var md5 = Hex.encodeHexString( in.readNBytes( MD5_LENGTH ) ).intern();
short messageVersion = in.readShort();
long clientId = in.readLong();
final String md5 = Hex.encodeHexString( in.readNBytes( MD5_LENGTH ) ).intern();

in.skipBytes( 8 ); // reserved
var size = in.readInt();
int size = in.readInt();

log.trace( "[{}] type {} version {} clientId {} md5 {} size '{}'",
clientHostPort, messageTypeToString( messageType ), messageVersion, clientId, md5, FileUtils.byteCountToDisplaySize( size ) );

//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized( md5 ) {
if( !hashes.contains( messageType, md5 ) ) {
var listener = map.get( messageType );
MessageListener listener = map.get( messageType );
if( listener == null ) {
log.error( "[{}] Unknown message type {}", clientHostPort, messageType );
in.skipNBytes( size );
writeResponse( exchange, STATUS_UNKNOWN_MESSAGE_TYPE, clientId, md5 );
} else {
var data = in.readNBytes( size );
byte[] data = in.readNBytes( size );
short status;
try {
log.trace( "handler {}...", listener.getId() );
Expand Down Expand Up @@ -222,7 +221,7 @@ public void writeResponse( HttpServerExchange exchange, short status, long clien
exchange.setResponseHeader( CONTENT_TYPE, APPLICATION_OCTET_STREAM );
exchange.setStatusCode( HTTP_OK );

try( var out = new DataOutputStream( exchange.getOutputStream() ) ) {
try( DataOutputStream out = new DataOutputStream( exchange.getOutputStream() ) ) {
out.writeByte( PROTOCOL_VERSION_1 );
out.writeLong( clientId );
out.write( Hex.decodeHex( md5 ) );
Expand All @@ -235,6 +234,9 @@ public void writeResponse( HttpServerExchange exchange, short status, long clien
@Override
public void close() {
try {
for( MessageListener listener : listeners ) {
this.map.remove( listener.getId() );
}
if( scheduled != null ) scheduled.close();
hashes.store( controlStatePath );
} catch( IOException e ) {
Expand Down
Loading