diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java index 035c5371cc9b5..04e5dae887247 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java @@ -25,6 +25,8 @@ import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Properties; @@ -84,7 +86,7 @@ public final void updateProperties(@Nullable String filePath) throws IOException return; } try (InputStream confInput = Files.newInputStream(Paths.get(filePath))) { - properties.load(confInput); + properties.load(new InputStreamReader(confInput, StandardCharsets.UTF_8)); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index e59f8d2234331..1bad7b48c5c76 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -39,9 +39,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.net.MalformedURLException; import java.net.URL; import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Properties; @@ -105,7 +107,7 @@ private void loadProps() { try (InputStream inputStream = url.openStream()) { LOGGER.info("Start to read config file {}", url); - commonProperties.load(inputStream); + commonProperties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); } catch (FileNotFoundException e) { LOGGER.error("Fail to find config file {}, reject ConfigNode startup.", url, e); @@ -128,7 +130,7 @@ private void loadProps() { try (InputStream inputStream = url.openStream()) { LOGGER.info("start reading ConfigNode conf file: {}", url); Properties properties = new Properties(); - properties.load(inputStream); + properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); commonProperties.putAll(properties); loadProperties(commonProperties); } catch (IOException | BadNodeUrlException e) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java index 62c94bee1f16c..331a2a69542f4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java @@ -35,6 +35,8 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.util.Comparator; import java.util.List; import java.util.Properties; @@ -64,7 +66,7 @@ public TConfigNodeLocation removeCheck(String args) { return nodeLocation; } try (FileInputStream inputStream = new FileInputStream(systemPropertiesFile)) { - systemProperties.load(inputStream); + systemProperties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); if (isNumeric(args)) { int id = Integer.parseInt(args); nodeLocation = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java index b9839a37f075f..ca7bcb3e9e54f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java @@ -33,6 +33,9 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -414,7 +417,7 @@ private static synchronized Properties getSystemProperties() throws IOException Properties systemProperties = new Properties(); try (FileInputStream inputStream = new FileInputStream(systemPropertiesFile)) { - systemProperties.load(inputStream); + systemProperties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); } return systemProperties; } @@ -423,7 +426,7 @@ private static synchronized void storeSystemProperties(Properties systemProperti throws IOException { try (FileOutputStream fileOutputStream = new FileOutputStream(systemPropertiesFile)) { systemProperties.store( - fileOutputStream, + new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8), " THIS FILE IS AUTOMATICALLY GENERATED. PLEASE DO NOT MODIFY THIS FILE !!!"); fileOutputStream.getFD().sync(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 75ebb34540bb1..048afadc585cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -68,8 +68,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.net.MalformedURLException; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -163,7 +165,7 @@ private void loadProps() { if (url != null) { try (InputStream inputStream = url.openStream()) { LOGGER.info("Start to read config file {}", url); - commonProperties.load(inputStream); + commonProperties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); } catch (FileNotFoundException e) { LOGGER.error("Fail to find config file {}, reject DataNode startup.", url, e); System.exit(-1); @@ -184,7 +186,7 @@ private void loadProps() { try (InputStream inputStream = url.openStream()) { LOGGER.info("Start to read config file {}", url); Properties properties = new Properties(); - properties.load(inputStream); + properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); commonProperties.putAll(properties); loadProperties(commonProperties); } catch (FileNotFoundException e) { @@ -1640,7 +1642,7 @@ public void loadHotModifiedProps() throws QueryProcessException { Properties commonProperties = new Properties(); try (InputStream inputStream = url.openStream()) { LOGGER.info("Start to reload config file {}", url); - commonProperties.load(inputStream); + commonProperties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); } catch (Exception e) { LOGGER.warn("Fail to reload config file {}", url, e); throw new QueryProcessException( @@ -1655,7 +1657,7 @@ public void loadHotModifiedProps() throws QueryProcessException { try (InputStream inputStream = url.openStream()) { LOGGER.info("Start to reload config file {}", url); Properties properties = new Properties(); - properties.load(inputStream); + properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); commonProperties.putAll(properties); loadHotModifiedProps(commonProperties); } catch (Exception e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java index e37df2a3d663b..e366ae8c98758 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java @@ -38,6 +38,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.HashMap; import java.util.Map; @@ -176,7 +178,8 @@ public boolean checkIsFirstStart() throws IOException { // write properties to system.properties try (FileOutputStream outputStream = new FileOutputStream(propertiesFile)) { systemProperties.forEach((k, v) -> properties.setProperty(k, v.get())); - properties.store(outputStream, SYSTEM_PROPERTIES_STRING); + properties.store( + new OutputStreamWriter(outputStream, StandardCharsets.UTF_8), SYSTEM_PROPERTIES_STRING); } isFirstStart = true; return true; @@ -266,7 +269,8 @@ public void checkSystemConfig() throws ConfigurationException, IOException { // overwrite system.properties when first start try (FileOutputStream outputStream = new FileOutputStream(propertiesFile)) { systemProperties.forEach((k, v) -> properties.setProperty(k, v.get())); - properties.store(outputStream, SYSTEM_PROPERTIES_STRING); + properties.store( + new OutputStreamWriter(outputStream, StandardCharsets.UTF_8), SYSTEM_PROPERTIES_STRING); } if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) && config.getWalMode().equals(WALMode.DISABLE)) { @@ -309,7 +313,8 @@ private void upgradePropertiesFileFromBrokenFile() throws IOException { }); properties.setProperty(IOTDB_VERSION_STRING, IoTDBConstant.VERSION); properties.setProperty(COMMIT_ID_STRING, IoTDBConstant.BUILD_INFO); - properties.store(tmpFOS, SYSTEM_PROPERTIES_STRING); + properties.store( + new OutputStreamWriter(tmpFOS, StandardCharsets.UTF_8), SYSTEM_PROPERTIES_STRING); // upgrade finished, delete old system.properties file if (propertiesFile.exists()) { Files.delete(propertiesFile.toPath()); @@ -380,7 +385,8 @@ public void serializeClusterNameAndDataNodeId(String clusterName, int dataNodeId try { properties.setProperty(IoTDBConstant.CLUSTER_NAME, clusterName); properties.setProperty(DATA_NODE_ID, String.valueOf(dataNodeId)); - properties.store(tmpFOS, SYSTEM_PROPERTIES_STRING); + properties.store( + new OutputStreamWriter(tmpFOS, StandardCharsets.UTF_8), SYSTEM_PROPERTIES_STRING); // serialize finished, delete old system.properties file if (propertiesFile.exists()) { Files.delete(propertiesFile.toPath()); @@ -417,7 +423,8 @@ public void serializeMutableSystemPropertiesIfNecessary() throws IOException { if (needsSerialize) { try (FileOutputStream outputStream = new FileOutputStream(propertiesFile)) { systemProperties.forEach((k, v) -> properties.setProperty(k, v.get())); - properties.store(outputStream, SYSTEM_PROPERTIES_STRING); + properties.store( + new OutputStreamWriter(outputStream, StandardCharsets.UTF_8), SYSTEM_PROPERTIES_STRING); } } long endTime = System.currentTimeMillis(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceDescriptor.java index aada981303526..f84f6809f746f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/rest/IoTDBRestServiceDescriptor.java @@ -28,8 +28,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.net.MalformedURLException; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.util.Properties; public class IoTDBRestServiceDescriptor { @@ -64,7 +66,7 @@ private Properties loadProps(String configName) { try (InputStream inputStream = url.openStream()) { logger.info("Start to read config file {}", url); Properties properties = new Properties(); - properties.load(inputStream); + properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); return properties; } catch (FileNotFoundException e) { logger.warn("REST service fail to find config file {}", url, e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java index f003219c7818e..f2c529bc4a076 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.iotdb.tsfile.utils.BytesUtils; @@ -135,7 +136,9 @@ private void receive() throws IOException { req); fail(); } - } catch (Exception e) { + } catch (final PipeConnectionException e) { + LOGGER.info("Socket closed when listening to data. Because: {}", e.getMessage()); + } catch (final Exception e) { LOGGER.warn("Exception during handling receiving, receiverId: {}", receiverId, e); fail(); } @@ -167,7 +170,7 @@ private boolean checkSum(byte[] bytes) { private byte[] readData(InputStream inputStream) throws IOException { final int length = readLength(inputStream); - if (length == 0) { + if (length <= 0) { // Will fail() after checkSum() return new byte[0]; } @@ -207,18 +210,48 @@ private int readLength(InputStream inputStream) throws IOException { : 0; } - private void readTillFull(InputStream inputStream, byte[] readBuffer) throws IOException { + /** + * Read to the buffer until it is full. + * + * @param inputStream the input socket stream + * @param readBuffer the buffer to read into + * @throws IOException if any IOException occurs + * @throws PipeConnectionException if the socket is closed during listening + */ + private void readTillFull(final InputStream inputStream, final byte[] readBuffer) + throws IOException, PipeConnectionException { int alreadyReadBytes = 0; while (alreadyReadBytes < readBuffer.length) { - alreadyReadBytes += + final int readBytes = inputStream.read(readBuffer, alreadyReadBytes, readBuffer.length - alreadyReadBytes); + // In socket input stream readBytes == -1 indicates EOF, namely the + // socket is closed + if (readBytes == -1) { + throw new PipeConnectionException("Socket closed when executing readTillFull."); + } + alreadyReadBytes += readBytes; } } - private void skipTillEnough(InputStream inputStream, long length) throws IOException { - int currentSkippedBytes = 0; + /** + * Skip given number of bytes of the buffer until enough bytes is skipped. + * + * @param inputStream the input socket stream + * @param length the length to skip + * @throws IOException if any IOException occurs + * @throws PipeConnectionException if the socket is closed during skipping + */ + private void skipTillEnough(final InputStream inputStream, final long length) + throws IOException, PipeConnectionException { + long currentSkippedBytes = 0; while (currentSkippedBytes < length) { - currentSkippedBytes += (int) inputStream.skip(length - currentSkippedBytes); + final long skippedBytes = inputStream.skip(length - currentSkippedBytes); + // In socket input stream skippedBytes == 0 indicates EOF, namely the + // socket is closed + if (skippedBytes == 0) { + throw new PipeConnectionException("Socket closed when executing skipTillEnough."); + } + currentSkippedBytes += skippedBytes; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeInfo.java index 91050bc73b8be..4726366fd27ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeInfo.java @@ -34,6 +34,9 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -124,12 +127,12 @@ public boolean updateConfigNodeList(List latestConfigNodes) { private void storeConfigNode() throws IOException { Properties properties = new Properties(); try (FileInputStream inputStream = new FileInputStream(propertiesFile)) { - properties.load(inputStream); + properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); } properties.setProperty( CONFIG_NODE_LIST, NodeUrlUtils.convertTEndPointUrls(new ArrayList<>(onlineConfigNodes))); try (FileOutputStream fileOutputStream = new FileOutputStream(propertiesFileTmp)) { - properties.store(fileOutputStream, ""); + properties.store(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8), ""); } updatePropertiesFile(); } @@ -151,7 +154,7 @@ public void loadConfigNodeList() { configNodeInfoReadWriteLock.writeLock().lock(); Properties properties = new Properties(); try (FileInputStream inputStream = new FileInputStream(propertiesFile)) { - properties.load(inputStream); + properties.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 0c9e28ae500f8..22a8616ee95ef 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -226,6 +226,7 @@ public class CommonConfig { private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = (long) 2 * 1024 * 1024; // 2MB private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min private float pipeLeaderCacheMemoryUsagePercentage = 0.1F; + private long pipeListeningQueueTransferSnapshotThreshold = 1000; private int subscriptionSubtaskExecutorMaxThreadNum = Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); @@ -951,6 +952,15 @@ public void setPipeLeaderCacheMemoryUsagePercentage(float pipeLeaderCacheMemoryU this.pipeLeaderCacheMemoryUsagePercentage = pipeLeaderCacheMemoryUsagePercentage; } + public long getPipeListeningQueueTransferSnapshotThreshold() { + return pipeListeningQueueTransferSnapshotThreshold; + } + + public void setPipeListeningQueueTransferSnapshotThreshold( + long pipeListeningQueueTransferSnapshotThreshold) { + this.pipeListeningQueueTransferSnapshotThreshold = pipeListeningQueueTransferSnapshotThreshold; + } + public int getSubscriptionSubtaskExecutorMaxThreadNum() { return subscriptionSubtaskExecutorMaxThreadNum; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 3a9e20179028a..4e347ccd41a13 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -511,6 +511,11 @@ private void loadPipeProps(Properties properties) { properties.getProperty( "pipe_leader_cache_memory_usage_percentage", String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage())))); + config.setPipeListeningQueueTransferSnapshotThreshold( + Long.parseLong( + properties.getProperty( + "pipe_listening_queue_transfer_snapshot_threshold", + String.valueOf(config.getPipeListeningQueueTransferSnapshotThreshold())))); } private void loadSubscriptionProps(Properties properties) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index b6697483f9fa3..88df1615a5804 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java @@ -20,8 +20,11 @@ package org.apache.iotdb.commons.conf; import java.io.File; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashSet; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; @@ -34,7 +37,10 @@ private IoTDBConstant() {} Properties prop = new Properties(); String finalBuildInfo = "UNKNOWN"; try { - prop.load(IoTDBConstant.class.getResourceAsStream("/git.properties")); + prop.load( + new InputStreamReader( + Objects.requireNonNull(IoTDBConstant.class.getResourceAsStream("/git.properties")), + StandardCharsets.UTF_8)); finalBuildInfo = prop.getProperty("git.commit.id.abbrev", "UNKNOWN"); String isDirty = prop.getProperty("git.dirty", "false"); if (isDirty.equalsIgnoreCase("true")) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 80223a4439c3e..88405d64c9346 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -131,6 +131,10 @@ public float getPipeLeaderCacheMemoryUsagePercentage() { return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage(); } + public long getPipeListeningQueueTransferSnapshotThreshold() { + return COMMON_CONFIG.getPipeListeningQueueTransferSnapshotThreshold(); + } + /////////////////////////////// Meta Consistency /////////////////////////////// public boolean isSeperatedPipeHeartbeatEnabled() { @@ -284,6 +288,9 @@ public void printAllConfigs() { isPipeConnectorRPCThriftCompressionEnabled()); LOGGER.info( "PipeLeaderCacheMemoryUsagePercentage: {}", getPipeLeaderCacheMemoryUsagePercentage()); + LOGGER.info( + "PipeListeningQueueTransferSnapshotThreshold: {}", + getPipeListeningQueueTransferSnapshotThreshold()); LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", getPipeAsyncConnectorSelectorNumber()); LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", getPipeAsyncConnectorMaxClientNumber()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java index 8d98f02460404..d5d1edb345c0e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java @@ -19,6 +19,7 @@ package org.apache.iotdb.commons.pipe.datastructure.queue.listening; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.queue.serializer.QueueSerializerType; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent; @@ -77,8 +78,9 @@ protected synchronized void tryListen(List events) { } public synchronized Pair> findAvailableSnapshots() { - // TODO: configure maximum number of events from snapshot to queue tail - if (queueTailIndex2SnapshotsCache.getLeft() < queue.getTailIndex() - 1000) { + if (queueTailIndex2SnapshotsCache.getLeft() + < queue.getTailIndex() + - PipeConfig.getInstance().getPipeListeningQueueTransferSnapshotThreshold()) { clearSnapshots(); } return queueTailIndex2SnapshotsCache; diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java index 3b060507d070f..09db06c0fe01a 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java @@ -28,9 +28,11 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.Optional; import java.util.Properties; @@ -134,7 +136,7 @@ private Optional loadProperties() { private Optional loadPropertiesFromFile(String filePath) { try (FileInputStream fileInputStream = new FileInputStream(filePath)) { Properties properties = new Properties(); - properties.load(fileInputStream); + properties.load(new InputStreamReader(fileInputStream, StandardCharsets.UTF_8)); return Optional.of(properties); } catch (FileNotFoundException e) { logger.warn("Fail to find config file {}", filePath);