From 98c47df8297543da021af9c57945b62b28fa10d7 Mon Sep 17 00:00:00 2001 From: Yi Zhang Date: Fri, 16 Jan 2026 16:34:56 +0800 Subject: [PATCH] [FLINK-38777][history] HistoryServer supports application archives --- .../history_server_configuration.html | 18 +- .../configuration/HistoryServerOptions.java | 93 ++- .../webmonitor/history/HistoryServer.java | 41 +- ...istoryServerApplicationArchiveFetcher.java | 577 ++++++++++++++++++ .../history/HistoryServerArchiveFetcher.java | 10 +- ...tegy.java => ArchiveRetainedStrategy.java} | 2 +- ... => CompositeArchiveRetainedStrategy.java} | 48 +- .../webmonitor/history/HistoryServerTest.java | 492 ++++++++++++++- ...CompositeArchiveRetainedStrategyTest.java} | 94 ++- .../ArchivedExecutionGraph.java | 35 +- 10 files changed, 1313 insertions(+), 97 deletions(-) create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java rename flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/{JobRetainedStrategy.java => ArchiveRetainedStrategy.java} (96%) rename flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/{CompositeJobRetainedStrategy.java => CompositeArchiveRetainedStrategy.java} (63%) rename flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/{CompositeJobRetainedStrategyTest.java => CompositeArchiveRetainedStrategyTest.java} (64%) diff --git a/docs/layouts/shortcodes/generated/history_server_configuration.html b/docs/layouts/shortcodes/generated/history_server_configuration.html index 8b171ba627c41..5e6696cf9db54 100644 --- a/docs/layouts/shortcodes/generated/history_server_configuration.html +++ b/docs/layouts/shortcodes/generated/history_server_configuration.html @@ -8,11 +8,17 @@ + +
historyserver.archive.clean-expired-applications
+ false + Boolean + Whether HistoryServer should cleanup jobs that are no longer present in the archive directory defined by historyserver.archive.fs.dir. +
historyserver.archive.clean-expired-jobs
false Boolean - Whether HistoryServer should cleanup jobs that are no longer present `historyserver.archive.fs.dir`. + Whether HistoryServer should cleanup jobs that are no longer present in the archive directory defined by historyserver.archive.fs.dir.
Note: This option applies only to legacy job archives created before the introduction of application archiving (FLINK-38761).
historyserver.archive.fs.dir
@@ -26,17 +32,23 @@ Duration Interval for refreshing the archived job directories. + +
historyserver.archive.retained-applications
+ -1 + Integer + The maximum number of applications to retain in each archive directory defined by org.apache.flink.configuration.description.TextElement@ae3540e. This option works together with the TTL (see historyserver.archive.retained-ttl). Archived entities will be removed if their TTL has expired or the retention count limit has been reached.
If set to `-1`(default), there is no limit to the number of archives. If set to 0 or less than -1, HistoryServer will throw an IllegalConfigurationException.
Note, when there are multiple history server instances, two recommended approaches when using this option are: +
historyserver.archive.retained-jobs
-1 Integer - The maximum number of jobs to retain in each archive directory defined by historyserver.archive.fs.dir. If set to 0 or less than -1, HistoryServer will throw an IllegalConfigurationException.
Note, when there are multiple history server instances, two recommended approaches when using this option are: + The maximum number of jobs to retain in each archive directory defined by historyserver.archive.fs.dir. If set to 0 or less than -1, HistoryServer will throw an IllegalConfigurationException.
Note, when there are multiple history server instances, two recommended approaches when using this option are:
Note: This option applies only to legacy job archives created before the introduction of application archiving (FLINK-38761).
historyserver.archive.retained-ttl
(none) Duration - The time-to-live duration to retain the jobs archived in each archive directory defined by historyserver.archive.fs.dir. If set to equal to or less than 0 milliseconds, HistoryServer will throw an IllegalConfigurationException.
Note, when there are multiple history server instances, two recommended approaches when using this option are: + The time-to-live duration to retain the archived entities (jobs and applications) in each archive directory defined by historyserver.archive.fs.dir. This option works together with the retention count limits (see historyserver.archive.retained-applications and historyserver.archive.retained-jobs). Archived entities will be removed if their TTL has expired or the retention count limit has been reached.
If set to equal to or less than 0 milliseconds, HistoryServer will throw an IllegalConfigurationException.
Note, when there are multiple history server instances, two recommended approaches when using this option are:
historyserver.log.jobmanager.url-pattern
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java index 1628d266f3516..0190b095b7d81 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java @@ -51,17 +51,6 @@ public class HistoryServerOptions { + " monitor these directories for archived jobs. You can configure the JobManager to archive jobs to a" + " directory via `jobmanager.archive.fs.dir`."); - /** If this option is enabled then deleted job archives are also deleted from HistoryServer. */ - public static final ConfigOption HISTORY_SERVER_CLEANUP_EXPIRED_JOBS = - key("historyserver.archive.clean-expired-jobs") - .booleanType() - .defaultValue(false) - .withDescription( - String.format( - "Whether HistoryServer should cleanup jobs" - + " that are no longer present `%s`.", - HISTORY_SERVER_ARCHIVE_DIRS.key())); - /** * Pattern of the log URL of TaskManager. The HistoryServer will generate actual URLs from it. */ @@ -137,6 +126,24 @@ public class HistoryServerOptions { "Specify the option in only one HistoryServer instance to avoid errors caused by multiple instances simultaneously cleaning up remote files, "; private static final String CONFIGURE_CONSISTENT = "Or you can keep the value of this configuration consistent across them. "; + private static final String LEGACY_NOTE_MESSAGE = + "Note: This option applies only to legacy job archives created before the introduction of application archiving (FLINK-38761)."; + private static final String RETAINED_STRATEGY_MESSAGE = + "Archived entities will be removed if their TTL has expired or the retention count limit has been reached. "; + + /** If this option is enabled then deleted job archives are also deleted from HistoryServer. */ + public static final ConfigOption HISTORY_SERVER_CLEANUP_EXPIRED_JOBS = + key("historyserver.archive.clean-expired-jobs") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "Whether HistoryServer should cleanup jobs that are no longer present in the archive directory defined by %s. ", + code(HISTORY_SERVER_ARCHIVE_DIRS.key())) + .linebreak() + .text(LEGACY_NOTE_MESSAGE) + .build()); public static final ConfigOption HISTORY_SERVER_RETAINED_JOBS = key(HISTORY_SERVER_RETAINED_JOBS_KEY) @@ -164,6 +171,52 @@ public class HistoryServerOptions { code("IllegalConfigurationException")) .linebreak() .text(NOTE_MESSAGE) + .list( + text(CONFIGURE_SINGLE_INSTANCE), + text(CONFIGURE_CONSISTENT)) + .linebreak() + .text(LEGACY_NOTE_MESSAGE) + .build()); + + /** + * If this option is enabled then deleted application archives are also deleted from + * HistoryServer. + */ + public static final ConfigOption HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS = + key("historyserver.archive.clean-expired-applications") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "Whether HistoryServer should cleanup jobs that are no longer present in the archive directory defined by %s. ", + code(HISTORY_SERVER_ARCHIVE_DIRS.key())) + .build()); + + public static final ConfigOption HISTORY_SERVER_RETAINED_APPLICATIONS = + key("historyserver.archive.retained-applications") + .intType() + .defaultValue(-1) + .withDescription( + Description.builder() + .text( + String.format( + "The maximum number of applications to retain in each archive directory defined by %s. ", + code(HISTORY_SERVER_ARCHIVE_DIRS.key()))) + .text( + "This option works together with the TTL (see %s). ", + code(HISTORY_SERVER_RETAINED_TTL_KEY)) + .text(RETAINED_STRATEGY_MESSAGE) + .linebreak() + .text( + "If set to `-1`(default), there is no limit to the number of archives. ") + .text( + "If set to %s or less than %s, HistoryServer will throw an %s. ", + code("0"), + code("-1"), + code("IllegalConfigurationException")) + .linebreak() + .text(NOTE_MESSAGE) .list( text(CONFIGURE_SINGLE_INSTANCE), text(CONFIGURE_CONSISTENT)) @@ -176,18 +229,14 @@ public class HistoryServerOptions { .withDescription( Description.builder() .text( - "The time-to-live duration to retain the jobs archived in each archive directory defined by %s. ", + "The time-to-live duration to retain the archived entities (jobs and applications) in each archive directory defined by %s. ", code(HISTORY_SERVER_ARCHIVE_DIRS.key())) - .list( - text( - "If the option is not specified without specifying %s, all of the jobs archives will be retained. ", - code(HISTORY_SERVER_RETAINED_JOBS_KEY)), - text( - "If the option is specified without specifying %s, the jobs archive whose modification time in the time-to-live duration will be retained. ", - code(HISTORY_SERVER_RETAINED_JOBS_KEY)), - text( - "If this option is specified as a positive time duration together with the %s option, the job archive will be removed if its TTL has expired or the retained job count has been reached. ", - code(HISTORY_SERVER_RETAINED_JOBS_KEY))) + .text( + "This option works together with the retention count limits (see %s and %s). ", + code(HISTORY_SERVER_RETAINED_APPLICATIONS.key()), + code(HISTORY_SERVER_RETAINED_JOBS_KEY)) + .text(RETAINED_STRATEGY_MESSAGE) + .linebreak() .text( "If set to equal to or less than %s milliseconds, HistoryServer will throw an %s. ", code("0"), code("IllegalConfigurationException")) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 69d14e7fecbdf..9564815609359 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -37,7 +37,7 @@ import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.Runnables; -import org.apache.flink.runtime.webmonitor.history.retaining.CompositeJobRetainedStrategy; +import org.apache.flink.runtime.webmonitor.history.retaining.CompositeArchiveRetainedStrategy; import org.apache.flink.runtime.webmonitor.utils.LogUrlUtil; import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; import org.apache.flink.util.ExceptionUtils; @@ -93,6 +93,8 @@ *
  • /config *
  • /joboverview *
  • /jobs/:jobid/* + *
  • /applications/overview + *
  • /applications/:applicationid/* * * *

    and relies on static files that are served by the {@link @@ -110,8 +112,18 @@ public class HistoryServer { private final long webRefreshIntervalMillis; private final File webDir; + /** + * The archive fetcher is responsible for fetching job archives that are not part of an + * application (legacy jobs created before application archiving was introduced in FLINK-38761). + */ private final HistoryServerArchiveFetcher archiveFetcher; + /** + * The archive fetcher is responsible for fetching application archives and their associated job + * archives. + */ + private final HistoryServerApplicationArchiveFetcher applicationArchiveFetcher; + @Nullable private final SSLHandlerFactory serverSSLFactory; private WebFrontendBootstrap netty; @@ -161,7 +173,7 @@ public Integer call() throws Exception { } public HistoryServer(Configuration config) throws IOException, FlinkException { - this(config, (event) -> {}); + this(config, (event) -> {}, (event) -> {}); } /** @@ -175,7 +187,9 @@ public HistoryServer(Configuration config) throws IOException, FlinkException { */ public HistoryServer( Configuration config, - Consumer jobArchiveEventListener) + Consumer jobArchiveEventListener, + Consumer + applicationArchiveEventListener) throws IOException, FlinkException { Preconditions.checkNotNull(config); Preconditions.checkNotNull(jobArchiveEventListener); @@ -199,8 +213,10 @@ public HistoryServer( webDir = clearWebDir(config); - boolean cleanupExpiredArchives = + boolean cleanupExpiredJobs = config.get(HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS); + boolean cleanupExpiredApplications = + config.get(HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS); String refreshDirectories = config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS); if (refreshDirectories == null) { @@ -235,8 +251,15 @@ public HistoryServer( refreshDirs, webDir, jobArchiveEventListener, - cleanupExpiredArchives, - CompositeJobRetainedStrategy.createFrom(config)); + cleanupExpiredJobs, + CompositeArchiveRetainedStrategy.createForJobFromConfig(config)); + applicationArchiveFetcher = + new HistoryServerApplicationArchiveFetcher( + refreshDirs, + webDir, + applicationArchiveEventListener, + cleanupExpiredApplications, + CompositeArchiveRetainedStrategy.createForApplicationFromConfig(config)); this.shutdownHook = ShutdownHookUtil.addShutdownHook( @@ -339,7 +362,11 @@ void start() throws IOException, InterruptedException { private Runnable getArchiveFetchingRunnable() { return Runnables.withUncaughtExceptionHandler( - () -> archiveFetcher.fetchArchives(), FatalExitExceptionHandler.INSTANCE); + () -> { + archiveFetcher.fetchArchives(); + applicationArchiveFetcher.fetchArchives(); + }, + FatalExitExceptionHandler.INSTANCE); } void stop() { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java new file mode 100644 index 0000000000000..04b4a768c61e4 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java @@ -0,0 +1,577 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.history.ArchivePathUtils; +import org.apache.flink.runtime.history.FsJsonArchivist; +import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.rest.messages.ApplicationsOverviewHeaders; +import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; +import org.apache.flink.runtime.webmonitor.history.retaining.ArchiveRetainedStrategy; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.jackson.JacksonMapperFactory; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is used by the {@link HistoryServer} to fetch the application and job archives that + * are located at {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The directories are + * polled in regular intervals, defined by {@link + * HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}. + * + *

    The archives are downloaded and expanded into a file structure analog to the REST API. + * + *

    Removes existing archives from these directories and the cache according to {@link + * ArchiveRetainedStrategy} and {@link + * HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS}. + */ +public class HistoryServerApplicationArchiveFetcher { + + /** Possible application archive operations in history-server. */ + public enum ArchiveEventType { + /** Archive was found in one refresh location and created in history server. */ + CREATED, + /** Archive was deleted from one of refresh locations and deleted from history server. */ + DELETED + } + + /** Representation of application archive event. */ + public static class ArchiveEvent { + private final String id; + private final HistoryServerApplicationArchiveFetcher.ArchiveEventType operation; + + ArchiveEvent(String id, HistoryServerApplicationArchiveFetcher.ArchiveEventType operation) { + this.id = id; + this.operation = operation; + } + + public String getId() { + return id; + } + + public HistoryServerApplicationArchiveFetcher.ArchiveEventType getType() { + return operation; + } + } + + private static final Logger LOG = + LoggerFactory.getLogger(HistoryServerApplicationArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); + + private static final String JSON_FILE_ENDING = ".json"; + private static final String JOBS_SUBDIR = "jobs"; + private static final String JOB_OVERVIEWS_SUBDIR = "overviews"; + private static final String APPLICATIONS_SUBDIR = "applications"; + private static final String APPLICATION_OVERVIEWS_SUBDIR = "application-overviews"; + + private final List refreshDirs; + private final Consumer + archiveEventListener; + + private final boolean processExpiredArchiveDeletion; + private final ArchiveRetainedStrategy retainedStrategy; + + /** Cache of all available applications identified by their id. */ + private final Map> cachedArchivesPerRefreshDirectory = new HashMap<>(); + + private final Map>> cachedApplicationIdsToJobIds = + new HashMap<>(); + + private final File webDir; + private final File webJobDir; + private final File webJobsOverviewDir; + private final File webApplicationDir; + private final File webApplicationsOverviewDir; + + HistoryServerApplicationArchiveFetcher( + List refreshDirs, + File webDir, + Consumer archiveEventListener, + boolean cleanupExpiredArchives, + ArchiveRetainedStrategy retainedStrategy) + throws IOException { + this.refreshDirs = checkNotNull(refreshDirs); + this.archiveEventListener = archiveEventListener; + this.processExpiredArchiveDeletion = cleanupExpiredArchives; + this.retainedStrategy = checkNotNull(retainedStrategy); + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new HashSet<>()); + cachedApplicationIdsToJobIds.put(refreshDir.getPath(), new HashMap<>()); + } + this.webDir = checkNotNull(webDir); + this.webJobDir = new File(webDir, JOBS_SUBDIR); + Files.createDirectories(webJobDir.toPath()); + this.webJobsOverviewDir = new File(webDir, JOB_OVERVIEWS_SUBDIR); + Files.createDirectories(webJobsOverviewDir.toPath()); + updateJobOverview(); + + this.webApplicationDir = new File(webDir, APPLICATIONS_SUBDIR); + Files.createDirectories(webApplicationDir.toPath()); + this.webApplicationsOverviewDir = new File(webDir, APPLICATION_OVERVIEWS_SUBDIR); + Files.createDirectories(webApplicationsOverviewDir.toPath()); + updateApplicationOverview(); + + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info( + "Monitoring directory {} for archived applications.", refreshDir.getPath()); + } + } + } + + void fetchArchives() { + try { + LOG.debug("Starting archive fetching."); + List events = new ArrayList<>(); + Map> applicationsToRemove = new HashMap<>(); + cachedArchivesPerRefreshDirectory.forEach( + (path, archives) -> applicationsToRemove.put(path, new HashSet<>(archives))); + Map> archivesBeyondRetainedLimit = new HashMap<>(); + for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) { + Path refreshDir = refreshLocation.getPath(); + FileSystem refreshFS = refreshLocation.getFs(); + LOG.debug("Checking archive directory {}.", refreshDir); + + FileStatus[] applicationArchiveDirs; + try { + applicationArchiveDirs = listApplicationArchiveDirs(refreshFS, refreshDir); + } catch (IOException e) { + LOG.error( + "Failed to access application archive location for path {}.", + refreshDir, + e); + // something went wrong, potentially due to a concurrent deletion + // do not remove any applications now; we will retry later + applicationsToRemove.remove(refreshDir); + continue; + } + + int fileOrderedIndexOnModifiedTime = 0; + for (FileStatus applicationArchiveDir : applicationArchiveDirs) { + Path applicationArchiveDirPath = applicationArchiveDir.getPath(); + String applicationId = applicationArchiveDirPath.getName(); + // the application is not expired and no need to remove it + applicationsToRemove.get(refreshDir).remove(applicationId); + + fileOrderedIndexOnModifiedTime++; + if (!retainedStrategy.shouldRetain( + applicationArchiveDir, fileOrderedIndexOnModifiedTime)) { + archivesBeyondRetainedLimit + .computeIfAbsent(refreshDir, ignored -> new HashSet<>()) + .add(applicationArchiveDirPath); + continue; + } + + if (cachedArchivesPerRefreshDirectory.get(refreshDir).contains(applicationId)) { + LOG.trace( + "Ignoring archive {} because it was already fetched.", + applicationArchiveDirPath); + } else { + LOG.info("Processing archive {}.", applicationArchiveDirPath); + try { + processArchive( + applicationId, + refreshFS, + refreshDir, + applicationArchiveDirPath, + events); + events.add( + new HistoryServerApplicationArchiveFetcher.ArchiveEvent( + applicationId, + HistoryServerApplicationArchiveFetcher.ArchiveEventType + .CREATED)); + cachedArchivesPerRefreshDirectory.get(refreshDir).add(applicationId); + LOG.info("Processing archive {} finished.", applicationArchiveDirPath); + } catch (IOException e) { + LOG.error( + "Failure while fetching/processing archive for application {}.", + applicationId, + e); + deleteApplicationFiles(applicationId); + } + } + } + } + + if (applicationsToRemove.values().stream().flatMap(Set::stream).findAny().isPresent() + && processExpiredArchiveDeletion) { + cleanupExpiredApplications(applicationsToRemove, events); + } + if (!archivesBeyondRetainedLimit.isEmpty()) { + cleanupApplicationsBeyondSizeLimit(archivesBeyondRetainedLimit, events); + } + if (!events.isEmpty()) { + updateApplicationOverview(); + updateJobOverview(); + } + events.forEach(archiveEventListener); + LOG.debug("Finished archive fetching."); + } catch (Exception e) { + LOG.error("Critical failure while fetching/processing archives.", e); + } + } + + private static FileStatus[] listApplicationArchiveDirs(FileSystem refreshFS, Path refreshDir) + throws IOException { + List applicationArchiveDirs = new ArrayList<>(); + FileStatus[] clusterDirs = refreshFS.listStatus(refreshDir); + if (clusterDirs == null) { + // the entire refreshDirectory was removed + return new FileStatus[0]; + } + + for (FileStatus clusterDir : clusterDirs) { + if (clusterDir.isDir() && isValidId(clusterDir.getPath().getName(), refreshDir)) { + Path applicationsDir = + new Path(clusterDir.getPath(), ArchivePathUtils.APPLICATIONS_DIR); + if (refreshFS.exists(applicationsDir)) { + FileStatus[] applicationDirs = refreshFS.listStatus(applicationsDir); + for (FileStatus applicationDir : applicationDirs) { + if (applicationDir.isDir() + && isValidId(applicationDir.getPath().getName(), refreshDir)) { + applicationArchiveDirs.add(applicationDir); + } + } + } + } + } + + applicationArchiveDirs.sort( + Comparator.comparingLong(FileStatus::getModificationTime).reversed()); + return applicationArchiveDirs.toArray(new FileStatus[0]); + } + + private static boolean isValidJobId(String jobId, Path refreshDir) { + try { + JobID.fromHexString(jobId); + return true; + } catch (IllegalArgumentException iae) { + LOG.debug( + "Archive directory {} contained file with unexpected name {}. Ignoring file.", + refreshDir, + jobId, + iae); + return false; + } + } + + private static boolean isValidId(String id, Path refreshDir) { + try { + ApplicationID.fromHexString(id); + return true; + } catch (IllegalArgumentException iae) { + LOG.debug( + "Archive directory {} contained file with unexpected name {}. Ignoring file.", + refreshDir, + id, + iae); + return false; + } + } + + private void processArchive( + String applicationId, + FileSystem fs, + Path refreshDir, + Path applicationArchiveDir, + List jobEvents) + throws IOException { + Path applicationArchive = + new Path(applicationArchiveDir, ArchivePathUtils.APPLICATION_ARCHIVE_NAME); + if (!fs.exists(applicationArchive)) { + throw new IOException("Application archive " + applicationArchive + " does not exist."); + } + + processApplicationArchive(applicationId, applicationArchive); + + Path jobArchivesDir = new Path(applicationArchiveDir, ArchivePathUtils.JOBS_DIR); + if (fs.exists(jobArchivesDir)) { + for (FileStatus jobArchiveStatus : fs.listStatus(jobArchivesDir)) { + Path jobArchive = jobArchiveStatus.getPath(); + String jobId = jobArchive.getName(); + if (!jobArchiveStatus.isDir() && isValidJobId(jobId, jobArchivesDir)) { + cachedApplicationIdsToJobIds + .get(refreshDir) + .computeIfAbsent(applicationId, k -> new HashSet<>()) + .add(jobId); + processJobArchive(jobId, jobArchive); + jobEvents.add( + new HistoryServerApplicationArchiveFetcher.ArchiveEvent( + jobId, + HistoryServerApplicationArchiveFetcher.ArchiveEventType + .CREATED)); + } + } + } + } + + private void processApplicationArchive(String applicationId, Path applicationArchive) + throws IOException { + for (ArchivedJson archive : FsJsonArchivist.readArchivedJsons(applicationArchive)) { + String path = archive.getPath(); + String json = archive.getJson(); + + File target; + if (path.equals(ApplicationsOverviewHeaders.URL)) { + target = new File(webApplicationsOverviewDir, applicationId + JSON_FILE_ENDING); + } else { + // this implicitly writes into webApplicationDir + target = new File(webDir, path + JSON_FILE_ENDING); + } + writeTargetFile(target, json); + } + } + + private void processJobArchive(String jobId, Path jobArchive) throws IOException { + for (ArchivedJson archive : FsJsonArchivist.readArchivedJsons(jobArchive)) { + String path = archive.getPath(); + String json = archive.getJson(); + + File target; + if (path.equals(JobsOverviewHeaders.URL)) { + target = new File(webJobsOverviewDir, jobId + JSON_FILE_ENDING); + } else { + // this implicitly writes into webJobDir + target = new File(webDir, path + JSON_FILE_ENDING); + } + + try { + writeTargetFile(target, json); + } catch (IOException e) { + deleteJobFiles(jobId); + throw e; + } + } + } + + private void writeTargetFile(File target, String json) throws IOException { + java.nio.file.Path parent = target.getParentFile().toPath(); + + try { + Files.createDirectories(parent); + } catch (FileAlreadyExistsException ignored) { + // there may be left-over directories from the previous attempt + } + + java.nio.file.Path targetPath = target.toPath(); + + // We overwrite existing files since this may be another attempt + // at fetching this archive. Existing files may be incomplete/corrupt. + Files.deleteIfExists(targetPath); + + Files.createFile(target.toPath()); + try (FileWriter fw = new FileWriter(target)) { + fw.write(json); + fw.flush(); + } + } + + private void cleanupApplicationsBeyondSizeLimit( + Map> applicationArchivesToRemove, + List events) { + Map> allApplicationIdsToRemove = new HashMap<>(); + + for (Map.Entry> pathSetEntry : applicationArchivesToRemove.entrySet()) { + HashSet applicationIdsToRemove = new HashSet<>(); + + for (Path archive : pathSetEntry.getValue()) { + applicationIdsToRemove.add(archive.getName()); + try { + archive.getFileSystem().delete(archive, true); + } catch (IOException ioe) { + LOG.warn("Could not delete old archive " + archive, ioe); + } + } + allApplicationIdsToRemove.put(pathSetEntry.getKey(), applicationIdsToRemove); + } + + cleanupExpiredApplications(allApplicationIdsToRemove, events); + } + + private void cleanupExpiredApplications( + Map> applicationsToRemove, + List events) { + + LOG.info("Archive directories for applications {} were deleted.", applicationsToRemove); + + for (Map.Entry> pathSetEntry : applicationsToRemove.entrySet()) { + Path refreshDir = pathSetEntry.getKey(); + cachedArchivesPerRefreshDirectory.get(refreshDir).removeAll(pathSetEntry.getValue()); + for (String applicationId : pathSetEntry.getValue()) { + deleteApplicationFiles(applicationId); + events.add( + new HistoryServerApplicationArchiveFetcher.ArchiveEvent( + applicationId, + HistoryServerApplicationArchiveFetcher.ArchiveEventType.DELETED)); + Set jobIds = + cachedApplicationIdsToJobIds.get(refreshDir).remove(applicationId); + if (jobIds != null) { + jobIds.forEach( + jobId -> { + deleteJobFiles(jobId); + events.add( + new HistoryServerApplicationArchiveFetcher.ArchiveEvent( + jobId, + HistoryServerApplicationArchiveFetcher + .ArchiveEventType.DELETED)); + }); + } + } + } + } + + private void deleteJobFiles(String jobId) { + // Make sure we do not include this job in the overview + try { + Files.deleteIfExists(new File(webJobsOverviewDir, jobId + JSON_FILE_ENDING).toPath()); + } catch (IOException ioe) { + LOG.warn("Could not delete file from overview directory.", ioe); + } + + // Clean up job files we may have created + File jobDirectory = new File(webJobDir, jobId); + try { + FileUtils.deleteDirectory(jobDirectory); + } catch (IOException ioe) { + LOG.warn("Could not clean up job directory.", ioe); + } + + try { + Files.deleteIfExists(new File(webJobDir, jobId + JSON_FILE_ENDING).toPath()); + } catch (IOException ioe) { + LOG.warn("Could not delete file from job directory.", ioe); + } + } + + private void deleteApplicationFiles(String applicationId) { + // Make sure we do not include this application in the overview + try { + Files.deleteIfExists( + new File(webApplicationsOverviewDir, applicationId + JSON_FILE_ENDING) + .toPath()); + } catch (IOException ioe) { + LOG.warn("Could not delete file from overview directory.", ioe); + } + + // Clean up application files we may have created + File applicationDirectory = new File(webApplicationDir, applicationId); + try { + FileUtils.deleteDirectory(applicationDirectory); + } catch (IOException ioe) { + LOG.warn("Could not clean up application directory.", ioe); + } + + try { + Files.deleteIfExists( + new File(webApplicationDir, applicationId + JSON_FILE_ENDING).toPath()); + } catch (IOException ioe) { + LOG.warn("Could not delete file from application directory.", ioe); + } + } + + /** + * This method replicates the JSON response that would be given by the JobsOverviewHandler when + * listing jobs. + * + *

    Every job archive contains an overview entry with the same structure. Since jobs are + * archived on their own however the list of jobs only contains a single job. + * + *

    For the display in the HistoryServer WebFrontend we have to combine these overviews. + */ + private void updateJobOverview() { + try (JsonGenerator gen = + jacksonFactory.createGenerator( + HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) { + File[] overviews = new File(webJobsOverviewDir.getPath()).listFiles(); + if (overviews != null) { + Collection allJobs = new ArrayList<>(overviews.length); + for (File overview : overviews) { + MultipleJobsDetails subJobs = + mapper.readValue(overview, MultipleJobsDetails.class); + allJobs.addAll(subJobs.getJobs()); + } + mapper.writeValue(gen, new MultipleJobsDetails(allJobs)); + } + } catch (IOException ioe) { + LOG.error("Failed to update job overview.", ioe); + } + } + + /** + * This method replicates the JSON response that would be given by the + * ApplicationsOverviewHandler when listing applications. + * + *

    Every application archive contains an overview entry with the same structure. Since + * applications are archived on their own however the list of applications only contains a + * single application. + * + *

    For the display in the HistoryServer WebFrontend we have to combine these overviews. + */ + private void updateApplicationOverview() { + try (JsonGenerator gen = + jacksonFactory.createGenerator( + HistoryServer.createOrGetFile(webDir, ApplicationsOverviewHeaders.URL))) { + File[] overviews = new File(webApplicationsOverviewDir.getPath()).listFiles(); + if (overviews != null) { + Collection allApplications = new ArrayList<>(overviews.length); + for (File overview : overviews) { + MultipleApplicationsDetails subApplications = + mapper.readValue(overview, MultipleApplicationsDetails.class); + allApplications.addAll(subApplications.getApplications()); + } + mapper.writeValue(gen, new MultipleApplicationsDetails(allApplications)); + } + } catch (IOException ioe) { + LOG.error("Failed to update application overview.", ioe); + } + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index 4fe8bd58d5b7f..b50b96327bfa1 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; -import org.apache.flink.runtime.webmonitor.history.retaining.JobRetainedStrategy; +import org.apache.flink.runtime.webmonitor.history.retaining.ArchiveRetainedStrategy; import org.apache.flink.util.FileUtils; import org.apache.flink.util.jackson.JacksonMapperFactory; @@ -113,7 +113,7 @@ public ArchiveEventType getType() { private final List refreshDirs; private final Consumer jobArchiveEventListener; private final boolean processExpiredArchiveDeletion; - private final JobRetainedStrategy jobRetainedStrategy; + private final ArchiveRetainedStrategy jobRetainedStrategy; /** Cache of all available jobs identified by their id. */ private final Map> cachedArchivesPerRefreshDirectory; @@ -127,7 +127,7 @@ public ArchiveEventType getType() { File webDir, Consumer jobArchiveEventListener, boolean cleanupExpiredArchives, - JobRetainedStrategy jobRetainedStrategy) + ArchiveRetainedStrategy jobRetainedStrategy) throws IOException { this.refreshDirs = checkNotNull(refreshDirs); this.jobArchiveEventListener = jobArchiveEventListener; @@ -177,6 +177,10 @@ void fetchArchives() { int fileOrderedIndexOnModifiedTime = 0; for (FileStatus jobArchive : jobArchives) { + if (jobArchive.isDir()) { + continue; + } + Path jobArchivePath = jobArchive.getPath(); String jobID = jobArchivePath.getName(); if (!isValidJobID(jobID, refreshDir)) { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java similarity index 96% rename from flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java rename to flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java index 2ef991698bf71..f2e50dd73c551 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java @@ -21,7 +21,7 @@ import org.apache.flink.core.fs.FileStatus; /** To define the strategy interface to judge whether the file should be retained. */ -public interface JobRetainedStrategy { +public interface ArchiveRetainedStrategy { /** * Judge whether the file should be retained. diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java similarity index 63% rename from flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java rename to flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java index acd35c93f7903..2a38dfaeff773 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java @@ -31,23 +31,42 @@ import java.util.List; import java.util.Optional; +import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS; import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS; import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL; +import static org.apache.flink.util.Preconditions.checkArgument; /** The retained strategy. */ -public class CompositeJobRetainedStrategy implements JobRetainedStrategy { +public class CompositeArchiveRetainedStrategy implements ArchiveRetainedStrategy { - public static JobRetainedStrategy createFrom(ReadableConfig config) { + public static ArchiveRetainedStrategy createForJobFromConfig(ReadableConfig config) { int maxHistorySizeByOldKey = config.get(HISTORY_SERVER_RETAINED_JOBS); + if (maxHistorySizeByOldKey == 0 || maxHistorySizeByOldKey < -1) { + throw new IllegalConfigurationException( + "Cannot set %s to 0 or less than -1", HISTORY_SERVER_RETAINED_JOBS.key()); + } Optional retainedTtlOpt = config.getOptional(HISTORY_SERVER_RETAINED_TTL); - return new CompositeJobRetainedStrategy( - new QuantityJobRetainedStrategy(maxHistorySizeByOldKey), - new TimeToLiveJobRetainedStrategy(retainedTtlOpt.orElse(null))); + return new CompositeArchiveRetainedStrategy( + new QuantityArchiveRetainedStrategy(maxHistorySizeByOldKey), + new TimeToLiveArchiveRetainedStrategy(retainedTtlOpt.orElse(null))); } - private final List strategies; + public static ArchiveRetainedStrategy createForApplicationFromConfig(ReadableConfig config) { + int maxHistorySize = config.get(HISTORY_SERVER_RETAINED_APPLICATIONS); + if (maxHistorySize == 0 || maxHistorySize < -1) { + throw new IllegalConfigurationException( + "Cannot set %s to 0 or less than -1", + HISTORY_SERVER_RETAINED_APPLICATIONS.key()); + } + Optional retainedTtlOpt = config.getOptional(HISTORY_SERVER_RETAINED_TTL); + return new CompositeArchiveRetainedStrategy( + new QuantityArchiveRetainedStrategy(maxHistorySize), + new TimeToLiveArchiveRetainedStrategy(retainedTtlOpt.orElse(null))); + } + + private final List strategies; - CompositeJobRetainedStrategy(JobRetainedStrategy... strategies) { + CompositeArchiveRetainedStrategy(ArchiveRetainedStrategy... strategies) { this.strategies = strategies == null || strategies.length == 0 ? Collections.emptyList() @@ -64,11 +83,11 @@ public boolean shouldRetain(FileStatus file, int fileOrderedIndex) { } /** The time to live based retained strategy. */ -class TimeToLiveJobRetainedStrategy implements JobRetainedStrategy { +class TimeToLiveArchiveRetainedStrategy implements ArchiveRetainedStrategy { @Nullable private final Duration ttlThreshold; - TimeToLiveJobRetainedStrategy(Duration ttlThreshold) { + TimeToLiveArchiveRetainedStrategy(@Nullable Duration ttlThreshold) { if (ttlThreshold != null && ttlThreshold.toMillis() <= 0) { throw new IllegalConfigurationException( "Cannot set %s to 0 or less than 0 milliseconds", @@ -86,16 +105,13 @@ public boolean shouldRetain(FileStatus file, int fileOrderedIndex) { } } -/** The job quantity based retained strategy. */ -class QuantityJobRetainedStrategy implements JobRetainedStrategy { +/** The quantity based retained strategy. */ +class QuantityArchiveRetainedStrategy implements ArchiveRetainedStrategy { private final int quantityThreshold; - QuantityJobRetainedStrategy(int quantityThreshold) { - if (quantityThreshold == 0 || quantityThreshold < -1) { - throw new IllegalConfigurationException( - "Cannot set %s to 0 or less than -1", HISTORY_SERVER_RETAINED_JOBS.key()); - } + QuantityArchiveRetainedStrategy(int quantityThreshold) { + checkArgument(quantityThreshold == -1 || quantityThreshold > 0); this.quantityThreshold = quantityThreshold; } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index 537d58fae8e77..2cd6f1d4f8575 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -18,19 +18,32 @@ package org.apache.flink.runtime.webmonitor.history; +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.ApplicationState; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HistoryServerOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.application.ArchivedApplication; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.history.ArchivePathUtils; import org.apache.flink.runtime.history.FsJsonArchivist; +import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails; +import org.apache.flink.runtime.messages.webmonitor.ApplicationDetailsInfo; import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter; +import org.apache.flink.runtime.rest.messages.ApplicationsOverviewHeaders; import org.apache.flink.runtime.rest.messages.DashboardConfiguration; import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; +import org.apache.flink.runtime.rest.messages.application.ApplicationDetailsHeaders; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.runtime.webmonitor.testutils.HttpUtils; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -51,6 +64,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.io.StringWriter; @@ -58,11 +73,14 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -86,13 +104,14 @@ class HistoryServerTest { private MiniClusterWithClientResource cluster; private File jmDirectory; private File hsDirectory; + private Configuration clusterConfig; @BeforeEach void setUp(@TempDir File jmDirectory, @TempDir File hsDirectory) throws Exception { this.jmDirectory = jmDirectory; this.hsDirectory = hsDirectory; - Configuration clusterConfig = new Configuration(); + clusterConfig = new Configuration(); clusterConfig.set(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString()); cluster = @@ -130,6 +149,9 @@ void testHistoryServerIntegration(final boolean versionLessThan14) throws Except == HistoryServerArchiveFetcher.ArchiveEventType.CREATED) { numExpectedArchivedJobs.countDown(); } + }, + (event) -> { + throw new RuntimeException("Should not called"); }); try { @@ -164,10 +186,10 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio final int numArchivesToRemoveUponHsStart = numArchivesBeforeHsStarted - numArchivesToKeepInHistory; final long oneMinuteSinceEpoch = 1000L * 60L; - List expectedJobIdsToKeep = new LinkedList<>(); + List expectedJobIdsToKeep = new LinkedList<>(); for (int j = 0; j < numArchivesBeforeHsStarted; j++) { - String jobId = + JobID jobId = createLegacyArchive( jmDirectory.toPath(), j * oneMinuteSinceEpoch, versionLessThan14); if (j >= numArchivesToRemoveUponHsStart) { @@ -205,6 +227,9 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio numArchivesDeletedTotal.countDown(); break; } + }, + (event) -> { + throw new RuntimeException("Should not called"); }); try { @@ -232,10 +257,9 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio } } - private Set getIdsFromJobOverview(String baseUrl) throws Exception { + private Set getIdsFromJobOverview(String baseUrl) throws Exception { return getJobsOverview(baseUrl).getJobs().stream() .map(JobDetails::getJobId) - .map(JobID::toString) .collect(Collectors.toSet()); } @@ -288,15 +312,26 @@ void testClearWebDir() throws Exception { new File(hsDirectory.toURI() + "/overviews/dirtyEmptySubFile.json").createNewFile(); new File(hsDirectory.toURI() + "/jobs/dirtyEmptySubDir").mkdir(); new File(hsDirectory.toURI() + "/jobs/dirtyEmptySubFile.json").createNewFile(); + new File(hsDirectory.toURI() + "/application-overviews/dirtyEmptySubDir").mkdir(); + new File(hsDirectory.toURI() + "/application-overviews/dirtyEmptySubFile.json") + .createNewFile(); + new File(hsDirectory.toURI() + "/applications/dirtyEmptySubDir").mkdir(); + new File(hsDirectory.toURI() + "/applications/dirtyEmptySubFile.json").createNewFile(); hs = new HistoryServer(historyServerConfig); assertInitializedHistoryServerWebDir(hs.getWebDir()); } private void assertInitializedHistoryServerWebDir(File historyWebDir) { - - assertThat(historyWebDir.list()).containsExactlyInAnyOrder("overviews", "jobs"); + assertThat(historyWebDir.list()) + .containsExactlyInAnyOrder( + "overviews", "jobs", "application-overviews", "applications"); assertThat(new File(historyWebDir, "overviews")).exists().isDirectory().isEmptyDirectory(); assertThat(new File(historyWebDir, "jobs").list()).containsExactly("overview.json"); + assertThat(new File(historyWebDir, "application-overviews")) + .exists() + .isDirectory() + .isEmptyDirectory(); + assertThat(new File(historyWebDir, "applications").list()).containsExactly("overview.json"); } private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Exception { @@ -327,6 +362,9 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti allArchivesExpiredLatch.countDown(); break; } + }, + (event) -> { + throw new RuntimeException("Should not called"); }); try { @@ -378,27 +416,33 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti assertThat(allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue(); - assertJobFilesCleanedUp(cleanupExpiredJobs); + assertFilesCleanedUp(cleanupExpiredJobs); } finally { hs.stop(); } } - private void assertJobFilesCleanedUp(boolean jobFilesShouldBeDeleted) throws IOException { + private void assertFilesCleanedUp(boolean filesShouldBeDeleted) throws IOException { try (Stream paths = Files.walk(hsDirectory.toPath())) { - final List jobFiles = + final List applicationOrJobFiles = paths.filter(path -> !path.equals(hsDirectory.toPath())) .map(path -> hsDirectory.toPath().relativize(path)) .filter(path -> !path.equals(Paths.get("config.json"))) .filter(path -> !path.equals(Paths.get("jobs"))) .filter(path -> !path.equals(Paths.get("jobs", "overview.json"))) .filter(path -> !path.equals(Paths.get("overviews"))) + .filter(path -> !path.equals(Paths.get("applications"))) + .filter( + path -> + !path.equals( + Paths.get("applications", "overview.json"))) + .filter(path -> !path.equals(Paths.get("application-overviews"))) .collect(Collectors.toList()); - if (jobFilesShouldBeDeleted) { - assertThat(jobFiles).isEmpty(); + if (filesShouldBeDeleted) { + assertThat(applicationOrJobFiles).isEmpty(); } else { - assertThat(jobFiles).isNotEmpty(); + assertThat(applicationOrJobFiles).isNotEmpty(); } } } @@ -413,6 +457,13 @@ private void waitForArchivesCreation(int numJobs) throws InterruptedException { } private Configuration createTestConfiguration(boolean cleanupExpiredJobs) { + return createTestConfiguration( + cleanupExpiredJobs, + HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS.defaultValue()); + } + + private Configuration createTestConfiguration( + boolean cleanupExpiredJobs, boolean cleanupExpiredApplications) { Configuration historyServerConfig = new Configuration(); historyServerConfig.set( HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString()); @@ -424,6 +475,9 @@ private Configuration createTestConfiguration(boolean cleanupExpiredJobs) { historyServerConfig.set( HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS, cleanupExpiredJobs); + historyServerConfig.set( + HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS, + cleanupExpiredApplications); historyServerConfig.set(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0); return historyServerConfig; @@ -451,15 +505,15 @@ private static void runJob() throws Exception { env.execute(); } - private static String createLegacyArchive( + private static JobID createLegacyArchive( Path directory, long fileModifiedDate, boolean versionLessThan14) throws IOException { - String jobId = createLegacyArchive(directory, versionLessThan14); - File jobArchive = directory.resolve(jobId).toFile(); + JobID jobId = createLegacyArchive(directory, versionLessThan14); + File jobArchive = directory.resolve(jobId.toString()).toFile(); jobArchive.setLastModified(fileModifiedDate); return jobId; } - private static String createLegacyArchive(Path directory, boolean versionLessThan14) + private static JobID createLegacyArchive(Path directory, boolean versionLessThan14) throws IOException { JobID jobId = JobID.generate(); @@ -504,7 +558,409 @@ private static String createLegacyArchive(Path directory, boolean versionLessTha directory.toAbsolutePath().toString(), jobId.toString()), Collections.singleton(archivedJson)); - return jobId.toString(); + return jobId; + } + + @Test + void testApplicationAndJobArchives() throws Exception { + int numApplications = 2; + int numJobsPerApplication = 2; + // jobs that are not part of an application + int numJobsOutsideApplication = 1; + + Map> expectedApplicationAndJobIds = + new HashMap<>(numApplications); + for (int i = 0; i < numApplications; i++) { + ArchivedApplication archivedApplication = mockApplicationArchive(numJobsPerApplication); + ApplicationID applicationId = archivedApplication.getApplicationId(); + List jobIds = + archivedApplication.getJobs().values().stream() + .map(ExecutionGraphInfo::getJobId) + .collect(Collectors.toList()); + expectedApplicationAndJobIds.put(applicationId, new HashSet<>(jobIds)); + } + Set expectedJobIdsOutsideApplication = new HashSet<>(numJobsOutsideApplication); + for (int i = 0; i < numJobsOutsideApplication; i++) { + ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(null); + mockJobArchive(executionGraphInfo); + expectedJobIdsOutsideApplication.add(executionGraphInfo.getJobId()); + } + + int numTotalJobs = numApplications * numJobsPerApplication + numJobsOutsideApplication; + int numTotal = numApplications + numTotalJobs; + CountDownLatch numExpectedArchives = new CountDownLatch(numTotal); + Configuration historyServerConfig = createTestConfiguration(false); + HistoryServer hs = + new HistoryServer( + historyServerConfig, + (event) -> { + if (event.getType() + == HistoryServerArchiveFetcher.ArchiveEventType.CREATED) { + numExpectedArchives.countDown(); + } + }, + (event) -> { + if (event.getType() + == HistoryServerApplicationArchiveFetcher.ArchiveEventType + .CREATED) { + numExpectedArchives.countDown(); + } + }); + try { + hs.start(); + String baseUrl = "http://localhost:" + hs.getWebPort(); + assertThat(numExpectedArchives.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl)) + .isEqualTo(expectedApplicationAndJobIds); + Set expectedJobIds = + Stream.concat( + expectedApplicationAndJobIds.values().stream() + .flatMap(Set::stream), + expectedJobIdsOutsideApplication.stream()) + .collect(Collectors.toSet()); + assertThat(getIdsFromJobOverview(baseUrl)).isEqualTo(expectedJobIds); + // checks whether the dashboard configuration contains all expected fields + getDashboardConfiguration(baseUrl); + } finally { + hs.stop(); + } + } + + @Test + void testRemoveApplicationArchivesBeyondHistorySizeLimit() throws Exception { + int numJobsPerApplication = 1; + int numApplicationsToKeepInHistory = 2; + int numApplicationsBeforeHsStarted = 4; + int numApplicationsAfterHsStarted = 2; + int numApplicationsToRemoveUponHsStart = + numApplicationsBeforeHsStarted - numApplicationsToKeepInHistory; + List>> expectedApplicationAndJobIdsToKeep = + new LinkedList<>(); + for (int i = 0; i < numApplicationsBeforeHsStarted; i++) { + ArchivedApplication archivedApplication = mockApplicationArchive(numJobsPerApplication); + ApplicationID applicationId = archivedApplication.getApplicationId(); + List jobIds = + archivedApplication.getJobs().values().stream() + .map(ExecutionGraphInfo::getJobId) + .collect(Collectors.toList()); + if (i >= numApplicationsToRemoveUponHsStart) { + expectedApplicationAndJobIdsToKeep.add( + new Tuple2<>(applicationId, new HashSet<>(jobIds))); + } + } + + // one for application itself, numJobsPerApplication for jobs + int numArchivesRatio = 1 + numJobsPerApplication; + CountDownLatch numArchivesCreatedInitially = + new CountDownLatch(numApplicationsToKeepInHistory * numArchivesRatio); + // jobs in applications that exceed the size limit are not read by the fetcher at all, + // so there is no need to delete these jobs. + CountDownLatch numArchivesDeletedInitially = + new CountDownLatch(numApplicationsToRemoveUponHsStart); + CountDownLatch numArchivesCreatedTotal = + new CountDownLatch( + (numApplicationsBeforeHsStarted + - numApplicationsToRemoveUponHsStart + + numApplicationsAfterHsStarted) + * numArchivesRatio); + CountDownLatch numArchivesDeletedTotal = + new CountDownLatch( + numApplicationsToRemoveUponHsStart + + numApplicationsAfterHsStarted * numArchivesRatio); + Configuration historyServerConfig = + createTestConfiguration( + HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue()); + historyServerConfig.set( + HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS, + numApplicationsToKeepInHistory); + HistoryServer hs = + new HistoryServer( + historyServerConfig, + (event) -> { + throw new RuntimeException("Should not called"); + }, + (event) -> { + if (event.getType() + == HistoryServerApplicationArchiveFetcher.ArchiveEventType + .CREATED) { + numArchivesCreatedInitially.countDown(); + numArchivesCreatedTotal.countDown(); + } else if (event.getType() + == HistoryServerApplicationArchiveFetcher.ArchiveEventType + .DELETED) { + numArchivesDeletedInitially.countDown(); + numArchivesDeletedTotal.countDown(); + } + }); + try { + hs.start(); + String baseUrl = "http://localhost:" + hs.getWebPort(); + assertThat(numArchivesCreatedInitially.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(numArchivesDeletedInitially.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl)) + .isEqualTo( + expectedApplicationAndJobIdsToKeep.stream() + .collect( + Collectors.toMap( + tuple -> tuple.f0, tuple -> tuple.f1))); + for (int i = numApplicationsBeforeHsStarted; + i < numApplicationsBeforeHsStarted + numApplicationsAfterHsStarted; + i++) { + expectedApplicationAndJobIdsToKeep.remove(0); + ArchivedApplication archivedApplication = + mockApplicationArchive(numJobsPerApplication); + ApplicationID applicationId = archivedApplication.getApplicationId(); + List jobIds = + archivedApplication.getJobs().values().stream() + .map(ExecutionGraphInfo::getJobId) + .collect(Collectors.toList()); + expectedApplicationAndJobIdsToKeep.add( + new Tuple2<>(applicationId, new HashSet<>(jobIds))); + // avoid executing too fast, resulting in the same creation time of archive files + Thread.sleep(50); + } + + assertThat(numArchivesCreatedTotal.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(numArchivesDeletedTotal.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl)) + .isEqualTo( + expectedApplicationAndJobIdsToKeep.stream() + .collect( + Collectors.toMap( + tuple -> tuple.f0, tuple -> tuple.f1))); + } finally { + hs.stop(); + } + } + + @Test + void testFailIfApplicationHistorySizeLimitIsZero() { + assertThatThrownBy(() -> startHistoryServerWithApplicationSizeLimit(0)) + .isInstanceOf(IllegalConfigurationException.class); + } + + @Test + void testFailIfApplicationHistorySizeLimitIsLessThanMinusOne() { + assertThatThrownBy(() -> startHistoryServerWithApplicationSizeLimit(-2)) + .isInstanceOf(IllegalConfigurationException.class); + } + + private void startHistoryServerWithApplicationSizeLimit(int maxHistorySize) + throws IOException, FlinkException, InterruptedException { + Configuration historyServerConfig = + createTestConfiguration( + HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS + .defaultValue()); + historyServerConfig.set( + HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS, maxHistorySize); + new HistoryServer(historyServerConfig).start(); + } + + @Test + void testCleanExpiredApplication() throws Exception { + runApplicationArchiveExpirationTest(true); + } + + @Test + void testRemainExpiredApplication() throws Exception { + runApplicationArchiveExpirationTest(false); + } + + private void runApplicationArchiveExpirationTest(boolean cleanupExpiredApplications) + throws Exception { + int numExpiredApplications = cleanupExpiredApplications ? 1 : 0; + int numApplications = 3; + int numJobsPerApplication = 1; + + Map> expectedApplicationAndJobIds = + new HashMap<>(numApplications); + for (int i = 0; i < numApplications; i++) { + ArchivedApplication archivedApplication = mockApplicationArchive(numJobsPerApplication); + ApplicationID applicationId = archivedApplication.getApplicationId(); + List jobIds = + archivedApplication.getJobs().values().stream() + .map(ExecutionGraphInfo::getJobId) + .collect(Collectors.toList()); + expectedApplicationAndJobIds.put(applicationId, new HashSet<>(jobIds)); + } + + // one for application itself, numJobsPerApplication for jobs + int numArchivesRatio = 1 + numJobsPerApplication; + CountDownLatch numExpectedArchives = new CountDownLatch(numApplications * numArchivesRatio); + CountDownLatch firstArchiveExpiredLatch = + new CountDownLatch(numExpiredApplications * numArchivesRatio); + CountDownLatch allArchivesExpiredLatch = + new CountDownLatch( + cleanupExpiredApplications ? numApplications * numArchivesRatio : 0); + + Configuration historyServerConfig = + createTestConfiguration( + HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue(), + cleanupExpiredApplications); + HistoryServer hs = + new HistoryServer( + historyServerConfig, + (event) -> { + throw new RuntimeException("Should not called"); + }, + (event) -> { + if (event.getType() + == HistoryServerApplicationArchiveFetcher.ArchiveEventType + .CREATED) { + numExpectedArchives.countDown(); + } else if (event.getType() + == HistoryServerApplicationArchiveFetcher.ArchiveEventType + .DELETED) { + firstArchiveExpiredLatch.countDown(); + allArchivesExpiredLatch.countDown(); + } + }); + try { + hs.start(); + String baseUrl = "http://localhost:" + hs.getWebPort(); + assertThat(numExpectedArchives.await(10L, TimeUnit.SECONDS)).isTrue(); + assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl)) + .isEqualTo(expectedApplicationAndJobIds); + ApplicationID applicationIdToDelete = + expectedApplicationAndJobIds.keySet().stream() + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "Expected at least one application")); + if (cleanupExpiredApplications) { + expectedApplicationAndJobIds.remove(applicationIdToDelete); + } + // trigger another fetch and delete one archive from jm + // we fetch again to probabilistically cause a concurrent deletion + hs.fetchArchives(); + deleteApplicationArchiveDir(applicationIdToDelete); + + assertThat(firstArchiveExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue(); + // check that archive is still/no longer present in hs + assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl)) + .isEqualTo(expectedApplicationAndJobIds); + for (ApplicationID remainingApplicationId : expectedApplicationAndJobIds.keySet()) { + deleteApplicationArchiveDir(remainingApplicationId); + } + assertThat(allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue(); + assertFilesCleanedUp(cleanupExpiredApplications); + } finally { + hs.stop(); + } + } + + private Map> getApplicationAndJobIdsFromApplicationOverview( + String baseUrl) throws Exception { + Set applicationIds = + getApplicationsOverview(baseUrl).getApplications().stream() + .map(ApplicationDetails::getApplicationId) + .collect(Collectors.toSet()); + Map> applicationAndJobIds = new HashMap<>(applicationIds.size()); + for (ApplicationID applicationId : applicationIds) { + Set jobIds = + getApplicationDetails(baseUrl, applicationId).getJobs().stream() + .map(JobDetails::getJobId) + .collect(Collectors.toSet()); + applicationAndJobIds.put(applicationId, jobIds); + } + return applicationAndJobIds; + } + + private static MultipleApplicationsDetails getApplicationsOverview(String baseUrl) + throws Exception { + Tuple2 response = + HttpUtils.getFromHTTP(baseUrl + ApplicationsOverviewHeaders.URL); + return OBJECT_MAPPER.readValue(response.f1, MultipleApplicationsDetails.class); + } + + private static ApplicationDetailsInfo getApplicationDetails( + String baseUrl, ApplicationID applicationId) throws Exception { + Tuple2 response = + HttpUtils.getFromHTTP( + baseUrl + + ApplicationDetailsHeaders.URL.replace( + ':' + ApplicationIDPathParameter.KEY, + applicationId.toString())); + return OBJECT_MAPPER.readValue(response.f1, ApplicationDetailsInfo.class); + } + + private ArchivedApplication mockApplicationArchive(int numJobs) throws IOException { + ArchivedApplication archivedApplication = createArchivedApplication(numJobs); + ApplicationID applicationId = archivedApplication.getApplicationId(); + ArchivedJson archivedApplicationsOverview = + new ArchivedJson( + ApplicationsOverviewHeaders.URL, + new MultipleApplicationsDetails( + Collections.singleton( + ApplicationDetails.fromArchivedApplication( + archivedApplication)))); + ArchivedJson archivedApplicationDetails = + new ArchivedJson( + ApplicationDetailsHeaders.URL.replace( + ':' + ApplicationIDPathParameter.KEY, applicationId.toString()), + ApplicationDetailsInfo.fromArchivedApplication(archivedApplication)); + // set cluster id to application id to simplify the test + clusterConfig.set(ClusterOptions.CLUSTER_ID, applicationId.toString()); + FsJsonArchivist.writeArchivedJsons( + ArchivePathUtils.getApplicationArchivePath(clusterConfig, applicationId), + Arrays.asList(archivedApplicationsOverview, archivedApplicationDetails)); + + Map jobs = archivedApplication.getJobs(); + for (Map.Entry jobEntry : jobs.entrySet()) { + mockJobArchive(jobEntry.getValue()); + } + return archivedApplication; + } + + private void mockJobArchive(ExecutionGraphInfo executionGraphInfo) throws IOException { + JobID jobId = executionGraphInfo.getJobId(); + ApplicationID applicationId = executionGraphInfo.getApplicationId().orElse(null); + ArchivedJson archivedJobsOverview = + new ArchivedJson( + JobsOverviewHeaders.URL, + new MultipleJobsDetails( + Collections.singleton( + JobDetails.createDetailsForJob( + executionGraphInfo.getArchivedExecutionGraph())))); + FsJsonArchivist.writeArchivedJsons( + ArchivePathUtils.getJobArchivePath(clusterConfig, jobId, applicationId), + Collections.singletonList(archivedJobsOverview)); + } + + private ArchivedApplication createArchivedApplication(int numJobs) { + ApplicationID applicationId = ApplicationID.generate(); + Map jobs = new HashMap<>(numJobs); + for (int i = 0; i < numJobs; i++) { + ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(applicationId); + jobs.put(executionGraphInfo.getJobId(), executionGraphInfo); + } + return new ArchivedApplication( + applicationId, + "test-application", + ApplicationState.FINISHED, + new long[ApplicationState.values().length], + jobs); + } + + private ExecutionGraphInfo createExecutionGraphInfo(@Nullable ApplicationID applicationId) { + return createExecutionGraphInfo(JobID.generate(), applicationId); + } + + private ExecutionGraphInfo createExecutionGraphInfo( + JobID jobId, @Nullable ApplicationID applicationId) { + return new ExecutionGraphInfo( + ArchivedExecutionGraph.createSparseArchivedExecutionGraph( + jobId, "test-job", JobStatus.FINISHED, null, null, null, 0, applicationId)); + } + + private void deleteApplicationArchiveDir(ApplicationID applicationId) throws IOException { + // set cluster id to application id to simplify the test + clusterConfig.set(ClusterOptions.CLUSTER_ID, applicationId.toString()); + org.apache.flink.core.fs.Path applicationArchiveDir = + ArchivePathUtils.getApplicationArchivePath(clusterConfig, applicationId) + .getParent(); + applicationArchiveDir.getFileSystem().delete(applicationArchiveDir, true); } private static final class JsonObject implements AutoCloseable { diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java similarity index 64% rename from flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java rename to flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java index e8983967df537..d1d6124b3570f 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java @@ -18,31 +18,49 @@ package org.apache.flink.runtime.webmonitor.history.retaining; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.Path; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import java.time.Duration; import java.time.Instant; +import java.util.function.Function; +import java.util.stream.Stream; +import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS; import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS; import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Testing for {@link CompositeJobRetainedStrategy}. */ -class CompositeJobRetainedStrategyTest { +/** Testing for {@link CompositeArchiveRetainedStrategy}. */ +class CompositeArchiveRetainedStrategyTest { + + private static Stream getTestCases() { + return Stream.of( + new TestCase( + "Legacy Jobs", + HISTORY_SERVER_RETAINED_JOBS, + CompositeArchiveRetainedStrategy::createForJobFromConfig), + new TestCase( + "Applications", + HISTORY_SERVER_RETAINED_APPLICATIONS, + CompositeArchiveRetainedStrategy::createForApplicationFromConfig)); + } - @Test - void testTimeToLiveBasedJobRetainedStrategy() { + @ParameterizedTest(name = "{index}: {0}") + @MethodSource("getTestCases") + void testTimeToLiveBasedArchiveRetainedStrategy(TestCase testCase) { final Configuration conf = new Configuration(); // Test for invalid option value. conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ZERO); - assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf)) + assertThatThrownBy(() -> testCase.createStrategy(conf)) .isInstanceOf(IllegalConfigurationException.class); // Skipped for option value that is less than 0 milliseconds, which will throw a // java.lang.NumberFormatException caused by TimeUtils. @@ -51,7 +69,7 @@ void testTimeToLiveBasedJobRetainedStrategy() { // Test the case where no specific retention policy is configured, i.e., all archived files // are retained. - JobRetainedStrategy strategy = CompositeJobRetainedStrategy.createFrom(conf); + ArchiveRetainedStrategy strategy = testCase.createStrategy(conf); assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); assertThat( strategy.shouldRetain( @@ -63,7 +81,7 @@ void testTimeToLiveBasedJobRetainedStrategy() { // Test the case where TTL-based retention policies is specified only. conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1L)); - strategy = CompositeJobRetainedStrategy.createFrom(conf); + strategy = testCase.createStrategy(conf); assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); assertThat( strategy.shouldRetain( @@ -74,35 +92,37 @@ void testTimeToLiveBasedJobRetainedStrategy() { .isFalse(); } - @Test - void testQuantityBasedJobRetainedStrategy() { + @ParameterizedTest(name = "{index}: {0}") + @MethodSource("getTestCases") + void testQuantityBasedArchiveRetainedStrategy(TestCase testCase) { final Configuration conf = new Configuration(); // Test for invalid option value. - conf.set(HISTORY_SERVER_RETAINED_JOBS, 0); - assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf)) + conf.set(testCase.getQuantityConfigOption(), 0); + assertThatThrownBy(() -> testCase.createStrategy(conf)) .isInstanceOf(IllegalConfigurationException.class); - conf.set(HISTORY_SERVER_RETAINED_JOBS, -2); - assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf)) + conf.set(testCase.getQuantityConfigOption(), -2); + assertThatThrownBy(() -> testCase.createStrategy(conf)) .isInstanceOf(IllegalConfigurationException.class); - conf.removeConfig(HISTORY_SERVER_RETAINED_JOBS); + conf.removeConfig(testCase.getQuantityConfigOption()); // Test the case where no specific retention policy is configured, i.e., all archived files // are retained. - JobRetainedStrategy strategy = CompositeJobRetainedStrategy.createFrom(conf); + ArchiveRetainedStrategy strategy = testCase.createStrategy(conf); assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isTrue(); // Test the case where QUANTITY-based retention policies is specified only. - conf.set(HISTORY_SERVER_RETAINED_JOBS, 2); - strategy = CompositeJobRetainedStrategy.createFrom(conf); + conf.set(testCase.getQuantityConfigOption(), 2); + strategy = testCase.createStrategy(conf); assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isFalse(); } - @Test - void testCompositeBasedJobRetainedStrategy() { + @ParameterizedTest(name = "{index}: {0}") + @MethodSource("getTestCases") + void testCompositeBasedArchiveRetainedStrategy(TestCase testCase) { final long outOfTtlMillis = Instant.now().toEpochMilli() - Duration.ofMinutes(2L).toMillis(); @@ -110,7 +130,7 @@ void testCompositeBasedJobRetainedStrategy() { // Test the case where no specific retention policy is configured, i.e., all archived files // are retained. final Configuration conf = new Configuration(); - JobRetainedStrategy strategy = CompositeJobRetainedStrategy.createFrom(conf); + ArchiveRetainedStrategy strategy = testCase.createStrategy(conf); assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 1)).isTrue(); assertThat(strategy.shouldRetain(new TestingFileStatus(), 10)).isTrue(); assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 3)).isTrue(); @@ -118,8 +138,8 @@ void testCompositeBasedJobRetainedStrategy() { // Test the case where both retention policies are specified. conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1)); - conf.set(HISTORY_SERVER_RETAINED_JOBS, 2); - strategy = CompositeJobRetainedStrategy.createFrom(conf); + conf.set(testCase.getQuantityConfigOption(), 2); + strategy = testCase.createStrategy(conf); assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 1)).isFalse(); assertThat(strategy.shouldRetain(new TestingFileStatus(), 10)).isFalse(); assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 3)).isFalse(); @@ -173,4 +193,32 @@ public Path getPath() { return null; } } + + private static final class TestCase { + private final String testName; + private final ConfigOption quantityConfigOption; + private final Function strategyFunction; + + TestCase( + String testName, + ConfigOption quantityConfigOption, + Function strategyFunction) { + this.testName = testName; + this.quantityConfigOption = quantityConfigOption; + this.strategyFunction = strategyFunction; + } + + ArchiveRetainedStrategy createStrategy(Configuration conf) { + return strategyFunction.apply(conf); + } + + ConfigOption getQuantityConfigOption() { + return quantityConfigOption; + } + + @Override + public String toString() { + return testName; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java index ec50feaaca854..4cccb1a8ac1f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.api.common.ExecutionConfig; @@ -422,7 +423,31 @@ public static ArchivedExecutionGraph createSparseArchivedExecutionGraph( Collections.emptyList(), throwable, checkpointingSettings, - initializationTimestamp); + initializationTimestamp, + null); + } + + @VisibleForTesting + public static ArchivedExecutionGraph createSparseArchivedExecutionGraph( + JobID jobId, + String jobName, + JobStatus jobStatus, + @Nullable JobType jobType, + @Nullable Throwable throwable, + @Nullable JobCheckpointingSettings checkpointingSettings, + long initializationTimestamp, + @Nullable ApplicationID applicationId) { + return createSparseArchivedExecutionGraph( + jobId, + jobName, + jobStatus, + jobType, + Collections.emptyMap(), + Collections.emptyList(), + throwable, + checkpointingSettings, + initializationTimestamp, + applicationId); } public static ArchivedExecutionGraph createSparseArchivedExecutionGraphWithJobVertices( @@ -464,7 +489,8 @@ public static ArchivedExecutionGraph createSparseArchivedExecutionGraphWithJobVe archivedVerticesInCreationOrder, throwable, checkpointingSettings, - initializationTimestamp); + initializationTimestamp, + null); } private static ArchivedExecutionGraph createSparseArchivedExecutionGraph( @@ -476,7 +502,8 @@ private static ArchivedExecutionGraph createSparseArchivedExecutionGraph( List archivedVerticesInCreationOrder, @Nullable Throwable throwable, @Nullable JobCheckpointingSettings checkpointingSettings, - long initializationTimestamp) { + long initializationTimestamp, + @Nullable ApplicationID applicationId) { final Map>> serializedUserAccumulators = Collections.emptyMap(); StringifiedAccumulatorResult[] archivedUserAccumulators = @@ -522,6 +549,6 @@ private static ArchivedExecutionGraph createSparseArchivedExecutionGraph( checkpointingSettings == null ? null : "Unknown", null, 0, - null); + applicationId); } }