|
18 | 18 |
|
19 | 19 | package org.apache.cassandra.db.compression; |
20 | 20 |
|
| 21 | +import java.time.Instant; |
| 22 | +import java.time.temporal.ChronoUnit; |
21 | 23 | import java.util.List; |
22 | 24 | import java.util.Map; |
23 | 25 | import java.util.Set; |
|
33 | 35 | import org.slf4j.LoggerFactory; |
34 | 36 |
|
35 | 37 | import org.apache.cassandra.config.DataStorageSpec; |
| 38 | +import org.apache.cassandra.config.DurationSpec; |
36 | 39 | import org.apache.cassandra.db.ColumnFamilyStore; |
37 | 40 | import org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary; |
38 | 41 | import org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData.CompressionDictionaryDataObject; |
39 | 42 | import org.apache.cassandra.io.sstable.format.SSTableReader; |
40 | 43 | import org.apache.cassandra.schema.CompressionParams; |
41 | 44 | import org.apache.cassandra.schema.SystemDistributedKeyspace; |
| 45 | +import org.apache.cassandra.utils.FBUtilities; |
42 | 46 | import org.apache.cassandra.utils.MBeanWrapper; |
43 | 47 | import org.apache.cassandra.utils.MBeanWrapper.OnException; |
44 | 48 |
|
45 | 49 | import static java.lang.String.format; |
46 | 50 | import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE; |
47 | 51 | import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE; |
| 52 | +import static org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MIN_FREQUENCY; |
48 | 53 | import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME; |
49 | 54 | import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME; |
| 55 | +import static org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MIN_FREQUENCY_PARAMETER_NAME; |
50 | 56 |
|
51 | 57 | public class CompressionDictionaryManager implements CompressionDictionaryManagerMBean, |
52 | 58 | ICompressionDictionaryCache, |
@@ -219,6 +225,12 @@ public synchronized void train(boolean force, Map<String, String> parameters) |
219 | 225 | // resolve training config and fail fast when invalid, so we do not reach logic which would e.g. flush unnecessarily. |
220 | 226 | CompressionDictionaryTrainingConfig trainingConfig = createTrainingConfig(parameters); |
221 | 227 |
|
| 228 | + LightweightCompressionDictionary dictionary = SystemDistributedKeyspace.retrieveLightweightLatestCompressionDictionary(columnFamilyStore.getKeyspaceName(), |
| 229 | + columnFamilyStore.getTableName(), |
| 230 | + columnFamilyStore.metadata.id.toLongString()); |
| 231 | + |
| 232 | + checkTrainingFrequency(dictionary); |
| 233 | + |
222 | 234 | // SSTable-based training: sample from existing SSTables |
223 | 235 | Set<SSTableReader> sstables = columnFamilyStore.getLiveSSTables(); |
224 | 236 | if (sstables.isEmpty()) |
@@ -319,10 +331,15 @@ public synchronized void importCompressionDictionary(CompositeData compositeData |
319 | 331 | CompressionDictionary.DictId dictId = new CompressionDictionary.DictId(kind, dataObject.dictId); |
320 | 332 |
|
321 | 333 | LightweightCompressionDictionary latestCompressionDictionary = SystemDistributedKeyspace.retrieveLightweightLatestCompressionDictionary(keyspaceName, tableName, tableId); |
322 | | - if (latestCompressionDictionary != null && latestCompressionDictionary.dictId.id > dictId.id) |
| 334 | + if (latestCompressionDictionary != null) |
323 | 335 | { |
324 | | - throw new IllegalArgumentException(format("Dictionary to import has older dictionary id (%s) than the latest compression dictionary (%s) for table %s.%s", |
325 | | - dictId.id, latestCompressionDictionary.dictId.id, keyspaceName, tableName)); |
| 336 | + if (latestCompressionDictionary.dictId.id > dictId.id) |
| 337 | + { |
| 338 | + throw new IllegalArgumentException(format("Dictionary to import has older dictionary id (%s) than the latest compression dictionary (%s) for table %s.%s", |
| 339 | + dictId.id, latestCompressionDictionary.dictId.id, keyspaceName, tableName)); |
| 340 | + } |
| 341 | + |
| 342 | + checkTrainingFrequency(latestCompressionDictionary); |
326 | 343 | } |
327 | 344 |
|
328 | 345 | handleNewDictionary(kind.createDictionary(dictId, dataObject.dict, dataObject.dictChecksum)); |
@@ -394,6 +411,46 @@ private int getCompressionDictionaryTrainingMaxTotalSampleSize(CompressionParams |
394 | 411 | DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE); |
395 | 412 | } |
396 | 413 |
|
| 414 | + private DurationSpec.IntMinutesBound getCompressionDictionaryMinTrainingFrequency(CompressionParams compressionParams) |
| 415 | + { |
| 416 | + String resolvedValue = compressionParams.getOtherOptions().getOrDefault(TRAINING_MIN_FREQUENCY_PARAMETER_NAME, DEFAULT_TRAINING_MIN_FREQUENCY); |
| 417 | + |
| 418 | + try |
| 419 | + { |
| 420 | + return new DurationSpec.IntMinutesBound(resolvedValue); |
| 421 | + } |
| 422 | + catch (Throwable t) |
| 423 | + { |
| 424 | + throw new IllegalArgumentException(String.format("Invalid value for %s: %s. Reason: %s", |
| 425 | + TRAINING_MIN_FREQUENCY_PARAMETER_NAME, |
| 426 | + resolvedValue, |
| 427 | + t.getMessage())); |
| 428 | + } |
| 429 | + } |
| 430 | + |
| 431 | + private void checkTrainingFrequency(LightweightCompressionDictionary lastDictionary) |
| 432 | + { |
| 433 | + Instant lastTraining = lastDictionary == null ? null : lastDictionary.createdAt; |
| 434 | + DurationSpec.IntMinutesBound minTrainingFrequency = getCompressionDictionaryMinTrainingFrequency(columnFamilyStore.metadata().params.compression); |
| 435 | + |
| 436 | + // if there is no dictionary trained so far or min frequency is 0 - that is we can train as often as we want - |
| 437 | + // then do not check if we can |
| 438 | + if (lastTraining != null && minTrainingFrequency.toMinutes() != 0) |
| 439 | + { |
| 440 | + Instant now = FBUtilities.now(); |
| 441 | + int minTrainingFrequencyMinutes = minTrainingFrequency.toMinutes(); |
| 442 | + if (lastTraining.isAfter(now.minus(minTrainingFrequencyMinutes, ChronoUnit.MINUTES))) |
| 443 | + { |
| 444 | + Instant nextEarliestTraining = lastTraining.plus(minTrainingFrequencyMinutes, ChronoUnit.MINUTES); |
| 445 | + throw new IllegalArgumentException(format("The next training or importing can occur only at least after %s from the last training which happened at %s. " + |
| 446 | + "You can train again no earlier than at %s.", |
| 447 | + minTrainingFrequency, |
| 448 | + lastTraining, |
| 449 | + nextEarliestTraining)); |
| 450 | + } |
| 451 | + } |
| 452 | + } |
| 453 | + |
397 | 454 | private int internalTrainingParameterResolution(CompressionParams compressionParams, |
398 | 455 | String userSuppliedValue, |
399 | 456 | String parameterName, |
|
0 commit comments