diff --git a/oap-http/oap-http/src/main/java/oap/http/server/nio/NioHttpServer.java b/oap-http/oap-http/src/main/java/oap/http/server/nio/NioHttpServer.java index 6113da933f..813b2c506d 100644 --- a/oap-http/oap-http/src/main/java/oap/http/server/nio/NioHttpServer.java +++ b/oap-http/oap-http/src/main/java/oap/http/server/nio/NioHttpServer.java @@ -38,6 +38,7 @@ import lombok.extern.slf4j.Slf4j; import oap.http.server.nio.handlers.CompressionNioHandler; import oap.io.Closeables; +import oap.util.Dates; import oap.util.Lists; import org.xnio.OptionMap; import org.xnio.Options; @@ -61,6 +62,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @Slf4j @@ -333,6 +335,13 @@ public void preStop() { if( undertow != null ) { undertow.stop(); } + if( xnioWorker != null ) { + xnioWorker.shutdown(); + if( !xnioWorker.awaitTermination( Dates.s( 10 ), TimeUnit.MILLISECONDS ) ) { + xnioWorker.shutdownNow(); + } + xnioWorker = null; + } } catch( Exception ex ) { log.error( "Cannot stop server", ex ); } diff --git a/oap-message/oap-message-client/src/main/java/oap/message/client/MessageAvailabilityReport.java b/oap-message/oap-message-client/src/main/java/oap/message/client/MessageAvailabilityReport.java index 2c139f9bcc..5e79ad9736 100644 --- a/oap-message/oap-message-client/src/main/java/oap/message/client/MessageAvailabilityReport.java +++ b/oap-message/oap-message-client/src/main/java/oap/message/client/MessageAvailabilityReport.java @@ -43,7 +43,6 @@ public MessageAvailabilityReport( State state, Map subsystemState public enum State { OPERATIONAL, - PARTIALLY_OPERATIONAL, FAILED } } diff --git a/oap-message/oap-message-client/src/main/java/oap/message/client/MessageSender.java b/oap-message/oap-message-client/src/main/java/oap/message/client/MessageSender.java index 42a9199da4..0550a9cb96 100644 --- a/oap-message/oap-message-client/src/main/java/oap/message/client/MessageSender.java +++ b/oap-message/oap-message-client/src/main/java/oap/message/client/MessageSender.java @@ -72,22 +72,24 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; @Slf4j @ToString public class MessageSender implements Closeable, AutoCloseable { private static final Pair STATUS_OK = Pair.__( MessageStatus.OK, MessageProtocol.STATUS_OK ); - private final Object syncDiskLock = new Object(); - private final String host; - private final int port; - private final Path directory; - private final MessageNoRetryStrategy messageNoRetryStrategy; - private final long clientId = Cuid.UNIQUE.nextLong(); - private final Messages messages = new Messages(); - private final ConcurrentMap> lastStatus = new ConcurrentHashMap<>(); - private final String messageUrl; - private final long memorySyncPeriod; + public final Object syncDiskLock = new Object(); + public final String host; + public final int port; + public final Path directory; + public final MessageNoRetryStrategy messageNoRetryStrategy; + public final long clientId = Cuid.UNIQUE.nextLong(); + public final Messages messages = new Messages(); + public final ConcurrentMap> lastStatus = new ConcurrentHashMap<>(); + public final String messageUrl; + public final long memorySyncPeriod; private final ReentrantLock lock = new ReentrantLock(); public String uniqueName = Cuid.UNIQUE.next(); public long storageLockExpiration = Dates.h( 1 ); @@ -97,10 +99,12 @@ public class MessageSender implements Closeable, AutoCloseable { public long keepAliveDuration = Dates.d( 30 ); public long timeout = Dates.s( 5 ); public long connectionTimeout = Dates.s( 30 ); - private volatile boolean closed = false; - private Scheduled diskSyncScheduler; - private boolean networkAvailable = true; - private long ioExceptionStartRetryTimeout = -1; + public volatile boolean closed = false; + public Scheduled diskSyncScheduler; + public AtomicLong networkAvailable = new AtomicLong( 0 ); + public int networkAvailableMaxErrors = 2; + public long ioExceptionStartRetryTimeout = -1; + public long timeBetweenLogs = Dates.m( 1 ); public MessageSender( String host, int port, String httpPrefix, Path persistenceDirectory, long memorySyncPeriod ) { this( host, port, httpPrefix, persistenceDirectory, memorySyncPeriod, MessageNoRetryStrategy.DROP ); @@ -238,7 +242,7 @@ private void saveMessagesToDirectory( Path directory ) { } public MessageAvailabilityReport availabilityReport( byte messageType ) { - boolean operational = networkAvailable + boolean operational = networkAvailable.get() <= networkAvailableMaxErrors && !closed && lastStatus.getOrDefault( messageType, STATUS_OK )._1 != MessageStatus.ERROR; return new MessageAvailabilityReport( operational ? State.OPERATIONAL : State.FAILED ); @@ -280,12 +284,14 @@ public Messages.MessageInfo send( Messages.MessageInfo messageInfo, long now ) { return onOkRespone( messageInfo, inputStreamResponseListener, now ); } catch( UnknownHostException e ) { - processException( messageInfo, now, message, e, true ); - ioExceptionStartRetryTimeout = now; + networkAvailable.incrementAndGet(); + + processException( messageInfo, now, message, e, true ); throw Throwables.propagate( e ); } catch( Throwable e ) { + networkAvailable.incrementAndGet(); processException( messageInfo, now, message, e, false ); throw Throwables.propagate( e ); @@ -293,10 +299,12 @@ public Messages.MessageInfo send( Messages.MessageInfo messageInfo, long now ) { } private void processException( Messages.MessageInfo messageInfo, long now, Message message, Throwable e, boolean globalRetryTimeout ) { + String status = e instanceof TimeoutException ? "send_timeout" : "send_io_error"; + Metrics.counter( "oap.messages", "type", MessageProtocol.messageTypeToString( message.messageType ), - "status", "send_io_error" + ( globalRetryTimeout ? "_gr" : "" ) ).increment(); - LogConsolidated.log( log, Level.ERROR, Dates.s( 10 ), e.getMessage(), e ); + "status", status + ( globalRetryTimeout ? "_gr" : "" ) ).increment(); + LogConsolidated.log( log, Level.ERROR, timeBetweenLogs, e.getMessage(), e ); messages.retry( messageInfo, now + retryTimeout ); } @@ -318,7 +326,7 @@ private Messages.MessageInfo onOkRespone( Messages.MessageInfo messageInfo, Inpu log.trace( "[{}] sending done, server status: {}", uniqueName, MessageProtocol.messageStatusToString( status ) ); - MessageSender.this.networkAvailable = true; + MessageSender.this.networkAvailable.set( 0 ); switch( status ) { case MessageProtocol.STATUS_ALREADY_WRITTEN -> { @@ -365,44 +373,57 @@ private Messages.MessageInfo onOkRespone( Messages.MessageInfo messageInfo, Inpu return messageInfo; } - public void syncMemory() { - if( getReadyMessages() + getRetryMessages() + getInProgressMessages() > 0 ) - log.trace( "[{}] sync ready {} retry {} inprogress {} ...", - uniqueName, getReadyMessages(), getRetryMessages(), getInProgressMessages() ); + public boolean syncMemory() { + log.trace( "[{}] syncMemory...", uniqueName ); + try { + if( getReadyMessages() + getRetryMessages() + getInProgressMessages() > 0 ) + log.trace( "[{}] sync ready {} retry {} inprogress {} ...", + uniqueName, getReadyMessages(), getRetryMessages(), getInProgressMessages() ); - long now = DateTimeUtils.currentTimeMillis(); + long now = DateTimeUtils.currentTimeMillis(); - if( isGlobalIoRetryTimeout( now ) ) return; + if( isGlobalIoRetryTimeout( now ) ) { + log.trace( "skip, isGlobalIoRetryTimeout" ); + return true; + } - long period = currentPeriod( now ); + long period = currentPeriod( now ); - Messages.MessageInfo messageInfo = null; + Messages.MessageInfo messageInfo = null; - messages.retry(); + messages.retry(); - do { - now = DateTimeUtils.currentTimeMillis(); + do { + now = DateTimeUtils.currentTimeMillis(); - if( messageInfo != null ) { - log.trace( "[{}] message {}...", uniqueName, messageInfo.message.md5 ); - Messages.MessageInfo mi = send( messageInfo, now ); + if( messageInfo != null ) { + log.trace( "[{}] message {}...", uniqueName, messageInfo.message.md5 ); + Messages.MessageInfo mi = send( messageInfo, now ); - messages.removeInProgress( mi ); - log.trace( "[{}] message {}... done", uniqueName, mi.message.md5 ); - } + messages.removeInProgress( mi ); + log.trace( "[{}] message {}... done", uniqueName, mi.message.md5 ); + } - if( isGlobalIoRetryTimeout( now ) ) { - break; - } + if( isGlobalIoRetryTimeout( now ) ) { + log.trace( "skip, isGlobalIoRetryTimeout" ); + break; + } - long currentPeriod = currentPeriod( now ); - if( currentPeriod != period ) { - messages.retry(); - period = currentPeriod; - } + long currentPeriod = currentPeriod( now ); + if( currentPeriod != period ) { + messages.retry(); + period = currentPeriod; + } - messageInfo = messages.poll( true ); - } while( messageInfo != null ); + messageInfo = messages.poll( true ); + } while( messageInfo != null ); + + return true; + } catch( Exception e ) { + return false; + } finally { + log.trace( "[{}] syncMemory... DONE", uniqueName ); + } } private boolean isGlobalIoRetryTimeout( long now ) { @@ -486,7 +507,7 @@ public MessageSender syncDisk() { Files.delete( messagePath ); } catch( Exception e ) { - LogConsolidated.log( log, Level.ERROR, Dates.s( 5 ), "[" + uniqueName + "] " + messagePath + ": " + e.getMessage(), e ); + LogConsolidated.log( log, Level.ERROR, timeBetweenLogs, "[" + uniqueName + "] " + messagePath + ": " + e.getMessage(), e ); } finally { Files.delete( lockFile ); } diff --git a/oap-message/oap-message-client/src/main/java/oap/message/client/Messages.java b/oap-message/oap-message-client/src/main/java/oap/message/client/Messages.java index a2ef7b0a7f..f0a9b667fb 100644 --- a/oap-message/oap-message-client/src/main/java/oap/message/client/Messages.java +++ b/oap-message/oap-message-client/src/main/java/oap/message/client/Messages.java @@ -71,14 +71,14 @@ public MessageInfo poll( boolean inProgress ) { @SuppressWarnings( "checkstyle:OverloadMethodsDeclarationOrder" ) public void retry() { if( getRetryMessages() == 0 ) return; - var now = DateTimeUtils.currentTimeMillis(); + long now = DateTimeUtils.currentTimeMillis(); log.trace( "ready {} retry {} now {}", getReadyMessages(), getRetryMessages(), Dates.formatDateWithMillis( now ) ); - var it = retry.iterator(); + Iterator it = retry.iterator(); while( it.hasNext() ) { - var retryInfo = it.next(); + RetryInfo retryInfo = it.next(); if( retryInfo.startTime < now ) { it.remove(); add( retryInfo.messageInfo ); diff --git a/oap-message/oap-message-server/src/main/java/oap/message/server/MessageHttpHandler.java b/oap-message/oap-message-server/src/main/java/oap/message/server/MessageHttpHandler.java index eea7001354..0a29e34a0a 100644 --- a/oap-message/oap-message-server/src/main/java/oap/message/server/MessageHttpHandler.java +++ b/oap-message/oap-message-server/src/main/java/oap/message/server/MessageHttpHandler.java @@ -94,17 +94,16 @@ @Slf4j public class MessageHttpHandler implements HttpHandler, Closeable { public final HashMap map = new HashMap<>(); - public int clientHashCacheSize = 1024; private final List listeners; private final long hashTtl; private final Path controlStatePath; private final NioHttpServer server; private final String context; + public int clientHashCacheSize = 1024; + public String port = null; private MessageHashStorage hashes; private Scheduled scheduled; - public String port = null; - public MessageHttpHandler( NioHttpServer server, String context, Path controlStatePath, List listeners, long hashTtl ) { this.server = server; this.context = context; @@ -138,10 +137,10 @@ public void preStart() { log.warn( "Cannot load hashes", e ); } - for( var listener : listeners ) { + for( MessageListener listener : listeners ) { log.info( "Listener type {} info {}", listener.getId(), listener.getInfo() ); - var d = this.map.put( listener.getId(), listener ); + MessageListener d = this.map.put( listener.getId(), listener ); if( d != null ) { throw new IllegalArgumentException( "duplicate [" + listener.getInfo() + ", " + d.getInfo() + "]" ); } @@ -152,21 +151,21 @@ public void preStart() { public void handleRequest( HttpServerExchange exchange ) throws Exception { log.trace( "new message {}", exchange ); - try( var in = new DataInputStream( exchange.getInputStream() ) ) { + try( DataInputStream in = new DataInputStream( exchange.getInputStream() ) ) { InetSocketAddress peerAddress = ( InetSocketAddress ) exchange.exchange.getConnection().getPeerAddress(); - var hostName = peerAddress.getHostName(); - var port = peerAddress.getPort(); + String hostName = peerAddress.getHostName(); + int port = peerAddress.getPort(); String clientHostPort = hostName + ":" + port; - var messageType = in.readByte(); + byte messageType = in.readByte(); log.trace( "new message from {}", clientHostPort ); - var messageVersion = in.readShort(); - var clientId = in.readLong(); - final var md5 = Hex.encodeHexString( in.readNBytes( MD5_LENGTH ) ).intern(); + short messageVersion = in.readShort(); + long clientId = in.readLong(); + final String md5 = Hex.encodeHexString( in.readNBytes( MD5_LENGTH ) ).intern(); in.skipBytes( 8 ); // reserved - var size = in.readInt(); + int size = in.readInt(); log.trace( "[{}] type {} version {} clientId {} md5 {} size '{}'", clientHostPort, messageTypeToString( messageType ), messageVersion, clientId, md5, FileUtils.byteCountToDisplaySize( size ) ); @@ -174,13 +173,13 @@ public void handleRequest( HttpServerExchange exchange ) throws Exception { //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized( md5 ) { if( !hashes.contains( messageType, md5 ) ) { - var listener = map.get( messageType ); + MessageListener listener = map.get( messageType ); if( listener == null ) { log.error( "[{}] Unknown message type {}", clientHostPort, messageType ); in.skipNBytes( size ); writeResponse( exchange, STATUS_UNKNOWN_MESSAGE_TYPE, clientId, md5 ); } else { - var data = in.readNBytes( size ); + byte[] data = in.readNBytes( size ); short status; try { log.trace( "handler {}...", listener.getId() ); @@ -222,7 +221,7 @@ public void writeResponse( HttpServerExchange exchange, short status, long clien exchange.setResponseHeader( CONTENT_TYPE, APPLICATION_OCTET_STREAM ); exchange.setStatusCode( HTTP_OK ); - try( var out = new DataOutputStream( exchange.getOutputStream() ) ) { + try( DataOutputStream out = new DataOutputStream( exchange.getOutputStream() ) ) { out.writeByte( PROTOCOL_VERSION_1 ); out.writeLong( clientId ); out.write( Hex.decodeHex( md5 ) ); @@ -235,6 +234,9 @@ public void writeResponse( HttpServerExchange exchange, short status, long clien @Override public void close() { try { + for( MessageListener listener : listeners ) { + this.map.remove( listener.getId() ); + } if( scheduled != null ) scheduled.close(); hashes.store( controlStatePath ); } catch( IOException e ) { diff --git a/oap-message/oap-message-test/src/test/java/oap/message/MessageServerTest.java b/oap-message/oap-message-test/src/test/java/oap/message/MessageServerTest.java index 1752fdf68d..444518f1c9 100644 --- a/oap-message/oap-message-test/src/test/java/oap/message/MessageServerTest.java +++ b/oap-message/oap-message-test/src/test/java/oap/message/MessageServerTest.java @@ -24,11 +24,11 @@ package oap.message; +import lombok.extern.slf4j.Slf4j; import oap.application.testng.KernelFixture; import oap.http.server.nio.NioHttpServer; import oap.io.Files; import oap.message.MessageListenerMock.TestMessage; -import oap.message.client.MessageAvailabilityReport.State; import oap.message.client.MessageSender; import oap.message.server.MessageHttpHandler; import oap.testng.Fixtures; @@ -49,12 +49,14 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static oap.io.content.ContentWriter.ofJson; import static oap.io.content.ContentWriter.ofString; +import static oap.message.client.MessageAvailabilityReport.State.FAILED; +import static oap.message.client.MessageAvailabilityReport.State.OPERATIONAL; import static oap.testng.Asserts.urlOfTestResource; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.testng.Assert.assertNotNull; -@Test +@Slf4j public class MessageServerTest extends Fixtures { private final TestDirectoryFixture testDirectoryFixture; @@ -71,8 +73,8 @@ public void uniqueMessageTypeListener() throws IOException { MessageListenerMock listener2 = new MessageListenerMock( "l2-", MessageListenerMock.MESSAGE_TYPE ); try( NioHttpServer server = new NioHttpServer( new NioHttpServer.DefaultPort( port ) ); - var messageHttpHandler = new MessageHttpHandler( server, "/messages", testDirectoryFixture.testPath( "controlStatePath.st" ), List.of( listener1, listener2 ), -1 ); - var client = new MessageSender( "localhost", port, "/messages", testDirectoryFixture.testPath( "tmp" ), -1 ) ) { + MessageHttpHandler messageHttpHandler = new MessageHttpHandler( server, "/messages", testDirectoryFixture.testPath( "controlStatePath.st" ), List.of( listener1, listener2 ), -1 ); + MessageSender client = new MessageSender( "localhost", port, "/messages", testDirectoryFixture.testPath( "tmp" ), -1 ) ) { client.start(); assertThatCode( messageHttpHandler::preStart ) @@ -565,15 +567,71 @@ public void availabilityReport() throws IOException { client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "availabilityReport", ofString() ) .syncMemory(); - assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).isEqualTo( State.FAILED ); - assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE2 ).state ).isEqualTo( State.OPERATIONAL ); + assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).isEqualTo( FAILED ); + assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE2 ).state ).isEqualTo( OPERATIONAL ); listener1.setStatus( MessageProtocol.STATUS_OK ); client.syncMemory(); - assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).isEqualTo( State.OPERATIONAL ); - assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE2 ).state ).isEqualTo( State.OPERATIONAL ); + assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).isEqualTo( OPERATIONAL ); + assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE2 ).state ).isEqualTo( OPERATIONAL ); + } + } + + @Test + public void availabilityReportNetwork() throws IOException { + int port = Ports.getFreePort( getClass() ); + Path controlStatePath = testDirectoryFixture.testPath( "controlStatePath.st" ); + + MessageListenerMock listener1 = new MessageListenerMock( MessageListenerMock.MESSAGE_TYPE ); + + try( NioHttpServer server = new NioHttpServer( new NioHttpServer.DefaultPort( port ) ); + MessageHttpHandler messageHttpHandler = new MessageHttpHandler( server, "/messages", controlStatePath, List.of( listener1 ), -1 ); + MessageSender client = new MessageSender( "localhost", port, "/messages", testDirectoryFixture.testPath( "tmp" ), -1 ) ) { + client.retryTimeout = 100; + client.timeout = Dates.s( 10 ); + client.connectionTimeout = Dates.s( 10 ); + client.networkAvailableMaxErrors = 5; + + server.bind( "/messages", messageHttpHandler ); + listener1.setStatus( MessageProtocol.STATUS_OK ); + + messageHttpHandler.preStart(); + server.start(); + client.start(); + + assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).isEqualTo( OPERATIONAL ); + client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "availabilityReport", ofString() ) + .syncMemory(); + assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).isEqualTo( OPERATIONAL ); + + server.preStop(); + messageHttpHandler.close(); + + log.trace( "server -> stop" ); + + Dates.incFixed( 100 + 1 ); + client.send( MessageListenerMock.MESSAGE_TYPE, ( short ) 1, "availabilityReport2", ofString() ) + .syncMemory(); + + for( int i = 0; i < 5; i++ ) { + assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).describedAs( "retry " + i ).isEqualTo( OPERATIONAL ); + + Dates.incFixed( 100 + 1 ); + client.syncMemory(); + + log.debug( "errors {}", client.networkAvailable.get() ); + } + + assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).isEqualTo( FAILED ); + + messageHttpHandler.preStart(); + server.start(); + + Dates.incFixed( 100 + 1 ); + client.syncMemory(); + assertThat( client.availabilityReport( MessageListenerMock.MESSAGE_TYPE ).state ).isEqualTo( OPERATIONAL ); } } diff --git a/oap-stdlib-test/src/test/resources/logback-test.xml b/oap-stdlib-test/src/main/resources/logback-test.xml similarity index 84% rename from oap-stdlib-test/src/test/resources/logback-test.xml rename to oap-stdlib-test/src/main/resources/logback-test.xml index a86a1fa106..9eca244470 100644 --- a/oap-stdlib-test/src/test/resources/logback-test.xml +++ b/oap-stdlib-test/src/main/resources/logback-test.xml @@ -14,8 +14,9 @@ + - + diff --git a/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java b/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java index 0685494a82..e9847644c2 100644 --- a/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java +++ b/oap-storage/oap-storage/src/main/java/oap/storage/Replicator.java @@ -28,13 +28,17 @@ import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tags; import lombok.extern.slf4j.Slf4j; +import oap.LogConsolidated; import oap.concurrent.scheduler.Scheduled; import oap.concurrent.scheduler.Scheduler; import oap.io.Closeables; import oap.storage.Storage.DataListener.IdObject; import oap.util.Cuid; +import oap.util.Dates; import oap.util.Pair; +import org.slf4j.event.Level; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.UncheckedIOException; import java.net.SocketException; @@ -54,17 +58,18 @@ */ @Slf4j public class Replicator implements Closeable { - final AtomicLong replicatorCounterFullSync = new AtomicLong(); - final AtomicLong replicatorCounterPartialSync = new AtomicLong(); - final AtomicLong replicatorSizeFullSync = new AtomicLong(); - final AtomicLong replicatorSizePartialSync = new AtomicLong(); + public final AtomicLong replicatorCounterFullSync = new AtomicLong(); + public final AtomicLong replicatorCounterPartialSync = new AtomicLong(); + public final AtomicLong replicatorSizeFullSync = new AtomicLong(); + public final AtomicLong replicatorSizePartialSync = new AtomicLong(); + public final MemoryStorage slave; + public final ReplicationMaster master; + public final Scheduled scheduled; final ReentrantLock lock = new ReentrantLock(); - private final MemoryStorage slave; - private final ReplicationMaster master; - private final Scheduled scheduled; - private String uniqueName = Cuid.UNIQUE.next(); - private transient long timestamp = -1L; - private transient long hash = -1L; + public String uniqueName = Cuid.UNIQUE.next(); + public transient long timestamp = -1L; + public transient long hash = -1L; + public long timeBetweenLogs = Dates.m( 1 ); public Replicator( MemoryStorage slave, ReplicationMaster master, long interval ) { Preconditions.checkArgument( slave.transactionLog instanceof TransactionLogZero ); @@ -76,8 +81,10 @@ public Replicator( MemoryStorage slave, ReplicationMaster master, lo Pair newTimestamp = replicate( timestamp ); log.trace( "[{}] newTimestamp {}, lastModified {}", uniqueName, newTimestamp, timestamp ); - timestamp = newTimestamp._1; - hash = newTimestamp._2; + if( newTimestamp != null ) { + timestamp = newTimestamp._1; + hash = newTimestamp._2; + } } ); } @@ -99,6 +106,7 @@ public void replicateAllNow() { replicateNow(); } + @Nullable public Pair replicate( long timestamp ) { lock.lock(); try { @@ -163,14 +171,14 @@ public Pair replicate( long timestamp ) { return __( lastUpdate, updatedSince.hash ); } catch( UncheckedIOException e ) { - log.error( e.getCause().getMessage() ); + LogConsolidated.log( log, Level.ERROR, timeBetweenLogs, e.getMessage(), e ); return __( timestamp, hash ); } catch( Exception e ) { if( e.getCause() instanceof SocketException ) { - log.error( e.getCause().getMessage() ); + LogConsolidated.log( log, Level.ERROR, timeBetweenLogs, e.getMessage(), e ); return __( timestamp, hash ); } - throw e; + return null; } } finally { lock.unlock(); diff --git a/pom.xml b/pom.xml index eafd25f884..11845909ad 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ - 25.4.5 + 25.4.6 25.0.1 25.0.0