/applications/:applicationid/*
*
*
* and relies on static files that are served by the {@link
@@ -110,8 +112,18 @@ public class HistoryServer {
private final long webRefreshIntervalMillis;
private final File webDir;
+ /**
+ * The archive fetcher is responsible for fetching job archives that are not part of an
+ * application (legacy jobs created before application archiving was introduced in FLINK-38761).
+ */
private final HistoryServerArchiveFetcher archiveFetcher;
+ /**
+ * The archive fetcher is responsible for fetching application archives and their associated job
+ * archives.
+ */
+ private final HistoryServerApplicationArchiveFetcher applicationArchiveFetcher;
+
@Nullable private final SSLHandlerFactory serverSSLFactory;
private WebFrontendBootstrap netty;
@@ -161,7 +173,7 @@ public Integer call() throws Exception {
}
public HistoryServer(Configuration config) throws IOException, FlinkException {
- this(config, (event) -> {});
+ this(config, (event) -> {}, (event) -> {});
}
/**
@@ -175,7 +187,9 @@ public HistoryServer(Configuration config) throws IOException, FlinkException {
*/
public HistoryServer(
Configuration config,
- Consumer jobArchiveEventListener)
+ Consumer jobArchiveEventListener,
+ Consumer
+ applicationArchiveEventListener)
throws IOException, FlinkException {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(jobArchiveEventListener);
@@ -199,8 +213,10 @@ public HistoryServer(
webDir = clearWebDir(config);
- boolean cleanupExpiredArchives =
+ boolean cleanupExpiredJobs =
config.get(HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS);
+ boolean cleanupExpiredApplications =
+ config.get(HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS);
String refreshDirectories = config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS);
if (refreshDirectories == null) {
@@ -235,8 +251,15 @@ public HistoryServer(
refreshDirs,
webDir,
jobArchiveEventListener,
- cleanupExpiredArchives,
- CompositeJobRetainedStrategy.createFrom(config));
+ cleanupExpiredJobs,
+ CompositeArchiveRetainedStrategy.createForJobFromConfig(config));
+ applicationArchiveFetcher =
+ new HistoryServerApplicationArchiveFetcher(
+ refreshDirs,
+ webDir,
+ applicationArchiveEventListener,
+ cleanupExpiredApplications,
+ CompositeArchiveRetainedStrategy.createForApplicationFromConfig(config));
this.shutdownHook =
ShutdownHookUtil.addShutdownHook(
@@ -339,7 +362,11 @@ void start() throws IOException, InterruptedException {
private Runnable getArchiveFetchingRunnable() {
return Runnables.withUncaughtExceptionHandler(
- () -> archiveFetcher.fetchArchives(), FatalExitExceptionHandler.INSTANCE);
+ () -> {
+ archiveFetcher.fetchArchives();
+ applicationArchiveFetcher.fetchArchives();
+ },
+ FatalExitExceptionHandler.INSTANCE);
}
void stop() {
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java
new file mode 100644
index 0000000000000..04b4a768c61e4
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.ArchivePathUtils;
+import org.apache.flink.runtime.history.FsJsonArchivist;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.messages.ApplicationsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.webmonitor.history.retaining.ArchiveRetainedStrategy;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is used by the {@link HistoryServer} to fetch the application and job archives that
+ * are located at {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The directories are
+ * polled in regular intervals, defined by {@link
+ * HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}.
+ *
+ * The archives are downloaded and expanded into a file structure analog to the REST API.
+ *
+ *
Removes existing archives from these directories and the cache according to {@link
+ * ArchiveRetainedStrategy} and {@link
+ * HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS}.
+ */
+public class HistoryServerApplicationArchiveFetcher {
+
+ /** Possible application archive operations in history-server. */
+ public enum ArchiveEventType {
+ /** Archive was found in one refresh location and created in history server. */
+ CREATED,
+ /** Archive was deleted from one of refresh locations and deleted from history server. */
+ DELETED
+ }
+
+ /** Representation of application archive event. */
+ public static class ArchiveEvent {
+ private final String id;
+ private final HistoryServerApplicationArchiveFetcher.ArchiveEventType operation;
+
+ ArchiveEvent(String id, HistoryServerApplicationArchiveFetcher.ArchiveEventType operation) {
+ this.id = id;
+ this.operation = operation;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public HistoryServerApplicationArchiveFetcher.ArchiveEventType getType() {
+ return operation;
+ }
+ }
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(HistoryServerApplicationArchiveFetcher.class);
+
+ private static final JsonFactory jacksonFactory = new JsonFactory();
+ private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
+
+ private static final String JSON_FILE_ENDING = ".json";
+ private static final String JOBS_SUBDIR = "jobs";
+ private static final String JOB_OVERVIEWS_SUBDIR = "overviews";
+ private static final String APPLICATIONS_SUBDIR = "applications";
+ private static final String APPLICATION_OVERVIEWS_SUBDIR = "application-overviews";
+
+ private final List refreshDirs;
+ private final Consumer
+ archiveEventListener;
+
+ private final boolean processExpiredArchiveDeletion;
+ private final ArchiveRetainedStrategy retainedStrategy;
+
+ /** Cache of all available applications identified by their id. */
+ private final Map> cachedArchivesPerRefreshDirectory = new HashMap<>();
+
+ private final Map>> cachedApplicationIdsToJobIds =
+ new HashMap<>();
+
+ private final File webDir;
+ private final File webJobDir;
+ private final File webJobsOverviewDir;
+ private final File webApplicationDir;
+ private final File webApplicationsOverviewDir;
+
+ HistoryServerApplicationArchiveFetcher(
+ List refreshDirs,
+ File webDir,
+ Consumer archiveEventListener,
+ boolean cleanupExpiredArchives,
+ ArchiveRetainedStrategy retainedStrategy)
+ throws IOException {
+ this.refreshDirs = checkNotNull(refreshDirs);
+ this.archiveEventListener = archiveEventListener;
+ this.processExpiredArchiveDeletion = cleanupExpiredArchives;
+ this.retainedStrategy = checkNotNull(retainedStrategy);
+ for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
+ cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new HashSet<>());
+ cachedApplicationIdsToJobIds.put(refreshDir.getPath(), new HashMap<>());
+ }
+ this.webDir = checkNotNull(webDir);
+ this.webJobDir = new File(webDir, JOBS_SUBDIR);
+ Files.createDirectories(webJobDir.toPath());
+ this.webJobsOverviewDir = new File(webDir, JOB_OVERVIEWS_SUBDIR);
+ Files.createDirectories(webJobsOverviewDir.toPath());
+ updateJobOverview();
+
+ this.webApplicationDir = new File(webDir, APPLICATIONS_SUBDIR);
+ Files.createDirectories(webApplicationDir.toPath());
+ this.webApplicationsOverviewDir = new File(webDir, APPLICATION_OVERVIEWS_SUBDIR);
+ Files.createDirectories(webApplicationsOverviewDir.toPath());
+ updateApplicationOverview();
+
+ if (LOG.isInfoEnabled()) {
+ for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
+ LOG.info(
+ "Monitoring directory {} for archived applications.", refreshDir.getPath());
+ }
+ }
+ }
+
+ void fetchArchives() {
+ try {
+ LOG.debug("Starting archive fetching.");
+ List events = new ArrayList<>();
+ Map> applicationsToRemove = new HashMap<>();
+ cachedArchivesPerRefreshDirectory.forEach(
+ (path, archives) -> applicationsToRemove.put(path, new HashSet<>(archives)));
+ Map> archivesBeyondRetainedLimit = new HashMap<>();
+ for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
+ Path refreshDir = refreshLocation.getPath();
+ FileSystem refreshFS = refreshLocation.getFs();
+ LOG.debug("Checking archive directory {}.", refreshDir);
+
+ FileStatus[] applicationArchiveDirs;
+ try {
+ applicationArchiveDirs = listApplicationArchiveDirs(refreshFS, refreshDir);
+ } catch (IOException e) {
+ LOG.error(
+ "Failed to access application archive location for path {}.",
+ refreshDir,
+ e);
+ // something went wrong, potentially due to a concurrent deletion
+ // do not remove any applications now; we will retry later
+ applicationsToRemove.remove(refreshDir);
+ continue;
+ }
+
+ int fileOrderedIndexOnModifiedTime = 0;
+ for (FileStatus applicationArchiveDir : applicationArchiveDirs) {
+ Path applicationArchiveDirPath = applicationArchiveDir.getPath();
+ String applicationId = applicationArchiveDirPath.getName();
+ // the application is not expired and no need to remove it
+ applicationsToRemove.get(refreshDir).remove(applicationId);
+
+ fileOrderedIndexOnModifiedTime++;
+ if (!retainedStrategy.shouldRetain(
+ applicationArchiveDir, fileOrderedIndexOnModifiedTime)) {
+ archivesBeyondRetainedLimit
+ .computeIfAbsent(refreshDir, ignored -> new HashSet<>())
+ .add(applicationArchiveDirPath);
+ continue;
+ }
+
+ if (cachedArchivesPerRefreshDirectory.get(refreshDir).contains(applicationId)) {
+ LOG.trace(
+ "Ignoring archive {} because it was already fetched.",
+ applicationArchiveDirPath);
+ } else {
+ LOG.info("Processing archive {}.", applicationArchiveDirPath);
+ try {
+ processArchive(
+ applicationId,
+ refreshFS,
+ refreshDir,
+ applicationArchiveDirPath,
+ events);
+ events.add(
+ new HistoryServerApplicationArchiveFetcher.ArchiveEvent(
+ applicationId,
+ HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .CREATED));
+ cachedArchivesPerRefreshDirectory.get(refreshDir).add(applicationId);
+ LOG.info("Processing archive {} finished.", applicationArchiveDirPath);
+ } catch (IOException e) {
+ LOG.error(
+ "Failure while fetching/processing archive for application {}.",
+ applicationId,
+ e);
+ deleteApplicationFiles(applicationId);
+ }
+ }
+ }
+ }
+
+ if (applicationsToRemove.values().stream().flatMap(Set::stream).findAny().isPresent()
+ && processExpiredArchiveDeletion) {
+ cleanupExpiredApplications(applicationsToRemove, events);
+ }
+ if (!archivesBeyondRetainedLimit.isEmpty()) {
+ cleanupApplicationsBeyondSizeLimit(archivesBeyondRetainedLimit, events);
+ }
+ if (!events.isEmpty()) {
+ updateApplicationOverview();
+ updateJobOverview();
+ }
+ events.forEach(archiveEventListener);
+ LOG.debug("Finished archive fetching.");
+ } catch (Exception e) {
+ LOG.error("Critical failure while fetching/processing archives.", e);
+ }
+ }
+
+ private static FileStatus[] listApplicationArchiveDirs(FileSystem refreshFS, Path refreshDir)
+ throws IOException {
+ List applicationArchiveDirs = new ArrayList<>();
+ FileStatus[] clusterDirs = refreshFS.listStatus(refreshDir);
+ if (clusterDirs == null) {
+ // the entire refreshDirectory was removed
+ return new FileStatus[0];
+ }
+
+ for (FileStatus clusterDir : clusterDirs) {
+ if (clusterDir.isDir() && isValidId(clusterDir.getPath().getName(), refreshDir)) {
+ Path applicationsDir =
+ new Path(clusterDir.getPath(), ArchivePathUtils.APPLICATIONS_DIR);
+ if (refreshFS.exists(applicationsDir)) {
+ FileStatus[] applicationDirs = refreshFS.listStatus(applicationsDir);
+ for (FileStatus applicationDir : applicationDirs) {
+ if (applicationDir.isDir()
+ && isValidId(applicationDir.getPath().getName(), refreshDir)) {
+ applicationArchiveDirs.add(applicationDir);
+ }
+ }
+ }
+ }
+ }
+
+ applicationArchiveDirs.sort(
+ Comparator.comparingLong(FileStatus::getModificationTime).reversed());
+ return applicationArchiveDirs.toArray(new FileStatus[0]);
+ }
+
+ private static boolean isValidJobId(String jobId, Path refreshDir) {
+ try {
+ JobID.fromHexString(jobId);
+ return true;
+ } catch (IllegalArgumentException iae) {
+ LOG.debug(
+ "Archive directory {} contained file with unexpected name {}. Ignoring file.",
+ refreshDir,
+ jobId,
+ iae);
+ return false;
+ }
+ }
+
+ private static boolean isValidId(String id, Path refreshDir) {
+ try {
+ ApplicationID.fromHexString(id);
+ return true;
+ } catch (IllegalArgumentException iae) {
+ LOG.debug(
+ "Archive directory {} contained file with unexpected name {}. Ignoring file.",
+ refreshDir,
+ id,
+ iae);
+ return false;
+ }
+ }
+
+ private void processArchive(
+ String applicationId,
+ FileSystem fs,
+ Path refreshDir,
+ Path applicationArchiveDir,
+ List jobEvents)
+ throws IOException {
+ Path applicationArchive =
+ new Path(applicationArchiveDir, ArchivePathUtils.APPLICATION_ARCHIVE_NAME);
+ if (!fs.exists(applicationArchive)) {
+ throw new IOException("Application archive " + applicationArchive + " does not exist.");
+ }
+
+ processApplicationArchive(applicationId, applicationArchive);
+
+ Path jobArchivesDir = new Path(applicationArchiveDir, ArchivePathUtils.JOBS_DIR);
+ if (fs.exists(jobArchivesDir)) {
+ for (FileStatus jobArchiveStatus : fs.listStatus(jobArchivesDir)) {
+ Path jobArchive = jobArchiveStatus.getPath();
+ String jobId = jobArchive.getName();
+ if (!jobArchiveStatus.isDir() && isValidJobId(jobId, jobArchivesDir)) {
+ cachedApplicationIdsToJobIds
+ .get(refreshDir)
+ .computeIfAbsent(applicationId, k -> new HashSet<>())
+ .add(jobId);
+ processJobArchive(jobId, jobArchive);
+ jobEvents.add(
+ new HistoryServerApplicationArchiveFetcher.ArchiveEvent(
+ jobId,
+ HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .CREATED));
+ }
+ }
+ }
+ }
+
+ private void processApplicationArchive(String applicationId, Path applicationArchive)
+ throws IOException {
+ for (ArchivedJson archive : FsJsonArchivist.readArchivedJsons(applicationArchive)) {
+ String path = archive.getPath();
+ String json = archive.getJson();
+
+ File target;
+ if (path.equals(ApplicationsOverviewHeaders.URL)) {
+ target = new File(webApplicationsOverviewDir, applicationId + JSON_FILE_ENDING);
+ } else {
+ // this implicitly writes into webApplicationDir
+ target = new File(webDir, path + JSON_FILE_ENDING);
+ }
+ writeTargetFile(target, json);
+ }
+ }
+
+ private void processJobArchive(String jobId, Path jobArchive) throws IOException {
+ for (ArchivedJson archive : FsJsonArchivist.readArchivedJsons(jobArchive)) {
+ String path = archive.getPath();
+ String json = archive.getJson();
+
+ File target;
+ if (path.equals(JobsOverviewHeaders.URL)) {
+ target = new File(webJobsOverviewDir, jobId + JSON_FILE_ENDING);
+ } else {
+ // this implicitly writes into webJobDir
+ target = new File(webDir, path + JSON_FILE_ENDING);
+ }
+
+ try {
+ writeTargetFile(target, json);
+ } catch (IOException e) {
+ deleteJobFiles(jobId);
+ throw e;
+ }
+ }
+ }
+
+ private void writeTargetFile(File target, String json) throws IOException {
+ java.nio.file.Path parent = target.getParentFile().toPath();
+
+ try {
+ Files.createDirectories(parent);
+ } catch (FileAlreadyExistsException ignored) {
+ // there may be left-over directories from the previous attempt
+ }
+
+ java.nio.file.Path targetPath = target.toPath();
+
+ // We overwrite existing files since this may be another attempt
+ // at fetching this archive. Existing files may be incomplete/corrupt.
+ Files.deleteIfExists(targetPath);
+
+ Files.createFile(target.toPath());
+ try (FileWriter fw = new FileWriter(target)) {
+ fw.write(json);
+ fw.flush();
+ }
+ }
+
+ private void cleanupApplicationsBeyondSizeLimit(
+ Map> applicationArchivesToRemove,
+ List events) {
+ Map> allApplicationIdsToRemove = new HashMap<>();
+
+ for (Map.Entry> pathSetEntry : applicationArchivesToRemove.entrySet()) {
+ HashSet applicationIdsToRemove = new HashSet<>();
+
+ for (Path archive : pathSetEntry.getValue()) {
+ applicationIdsToRemove.add(archive.getName());
+ try {
+ archive.getFileSystem().delete(archive, true);
+ } catch (IOException ioe) {
+ LOG.warn("Could not delete old archive " + archive, ioe);
+ }
+ }
+ allApplicationIdsToRemove.put(pathSetEntry.getKey(), applicationIdsToRemove);
+ }
+
+ cleanupExpiredApplications(allApplicationIdsToRemove, events);
+ }
+
+ private void cleanupExpiredApplications(
+ Map> applicationsToRemove,
+ List events) {
+
+ LOG.info("Archive directories for applications {} were deleted.", applicationsToRemove);
+
+ for (Map.Entry> pathSetEntry : applicationsToRemove.entrySet()) {
+ Path refreshDir = pathSetEntry.getKey();
+ cachedArchivesPerRefreshDirectory.get(refreshDir).removeAll(pathSetEntry.getValue());
+ for (String applicationId : pathSetEntry.getValue()) {
+ deleteApplicationFiles(applicationId);
+ events.add(
+ new HistoryServerApplicationArchiveFetcher.ArchiveEvent(
+ applicationId,
+ HistoryServerApplicationArchiveFetcher.ArchiveEventType.DELETED));
+ Set jobIds =
+ cachedApplicationIdsToJobIds.get(refreshDir).remove(applicationId);
+ if (jobIds != null) {
+ jobIds.forEach(
+ jobId -> {
+ deleteJobFiles(jobId);
+ events.add(
+ new HistoryServerApplicationArchiveFetcher.ArchiveEvent(
+ jobId,
+ HistoryServerApplicationArchiveFetcher
+ .ArchiveEventType.DELETED));
+ });
+ }
+ }
+ }
+ }
+
+ private void deleteJobFiles(String jobId) {
+ // Make sure we do not include this job in the overview
+ try {
+ Files.deleteIfExists(new File(webJobsOverviewDir, jobId + JSON_FILE_ENDING).toPath());
+ } catch (IOException ioe) {
+ LOG.warn("Could not delete file from overview directory.", ioe);
+ }
+
+ // Clean up job files we may have created
+ File jobDirectory = new File(webJobDir, jobId);
+ try {
+ FileUtils.deleteDirectory(jobDirectory);
+ } catch (IOException ioe) {
+ LOG.warn("Could not clean up job directory.", ioe);
+ }
+
+ try {
+ Files.deleteIfExists(new File(webJobDir, jobId + JSON_FILE_ENDING).toPath());
+ } catch (IOException ioe) {
+ LOG.warn("Could not delete file from job directory.", ioe);
+ }
+ }
+
+ private void deleteApplicationFiles(String applicationId) {
+ // Make sure we do not include this application in the overview
+ try {
+ Files.deleteIfExists(
+ new File(webApplicationsOverviewDir, applicationId + JSON_FILE_ENDING)
+ .toPath());
+ } catch (IOException ioe) {
+ LOG.warn("Could not delete file from overview directory.", ioe);
+ }
+
+ // Clean up application files we may have created
+ File applicationDirectory = new File(webApplicationDir, applicationId);
+ try {
+ FileUtils.deleteDirectory(applicationDirectory);
+ } catch (IOException ioe) {
+ LOG.warn("Could not clean up application directory.", ioe);
+ }
+
+ try {
+ Files.deleteIfExists(
+ new File(webApplicationDir, applicationId + JSON_FILE_ENDING).toPath());
+ } catch (IOException ioe) {
+ LOG.warn("Could not delete file from application directory.", ioe);
+ }
+ }
+
+ /**
+ * This method replicates the JSON response that would be given by the JobsOverviewHandler when
+ * listing jobs.
+ *
+ * Every job archive contains an overview entry with the same structure. Since jobs are
+ * archived on their own however the list of jobs only contains a single job.
+ *
+ *
For the display in the HistoryServer WebFrontend we have to combine these overviews.
+ */
+ private void updateJobOverview() {
+ try (JsonGenerator gen =
+ jacksonFactory.createGenerator(
+ HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) {
+ File[] overviews = new File(webJobsOverviewDir.getPath()).listFiles();
+ if (overviews != null) {
+ Collection allJobs = new ArrayList<>(overviews.length);
+ for (File overview : overviews) {
+ MultipleJobsDetails subJobs =
+ mapper.readValue(overview, MultipleJobsDetails.class);
+ allJobs.addAll(subJobs.getJobs());
+ }
+ mapper.writeValue(gen, new MultipleJobsDetails(allJobs));
+ }
+ } catch (IOException ioe) {
+ LOG.error("Failed to update job overview.", ioe);
+ }
+ }
+
+ /**
+ * This method replicates the JSON response that would be given by the
+ * ApplicationsOverviewHandler when listing applications.
+ *
+ * Every application archive contains an overview entry with the same structure. Since
+ * applications are archived on their own however the list of applications only contains a
+ * single application.
+ *
+ *
For the display in the HistoryServer WebFrontend we have to combine these overviews.
+ */
+ private void updateApplicationOverview() {
+ try (JsonGenerator gen =
+ jacksonFactory.createGenerator(
+ HistoryServer.createOrGetFile(webDir, ApplicationsOverviewHeaders.URL))) {
+ File[] overviews = new File(webApplicationsOverviewDir.getPath()).listFiles();
+ if (overviews != null) {
+ Collection allApplications = new ArrayList<>(overviews.length);
+ for (File overview : overviews) {
+ MultipleApplicationsDetails subApplications =
+ mapper.readValue(overview, MultipleApplicationsDetails.class);
+ allApplications.addAll(subApplications.getApplications());
+ }
+ mapper.writeValue(gen, new MultipleApplicationsDetails(allApplications));
+ }
+ } catch (IOException ioe) {
+ LOG.error("Failed to update application overview.", ioe);
+ }
+ }
+}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 4fe8bd58d5b7f..b50b96327bfa1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -29,7 +29,7 @@
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
-import org.apache.flink.runtime.webmonitor.history.retaining.JobRetainedStrategy;
+import org.apache.flink.runtime.webmonitor.history.retaining.ArchiveRetainedStrategy;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.jackson.JacksonMapperFactory;
@@ -113,7 +113,7 @@ public ArchiveEventType getType() {
private final List refreshDirs;
private final Consumer jobArchiveEventListener;
private final boolean processExpiredArchiveDeletion;
- private final JobRetainedStrategy jobRetainedStrategy;
+ private final ArchiveRetainedStrategy jobRetainedStrategy;
/** Cache of all available jobs identified by their id. */
private final Map> cachedArchivesPerRefreshDirectory;
@@ -127,7 +127,7 @@ public ArchiveEventType getType() {
File webDir,
Consumer jobArchiveEventListener,
boolean cleanupExpiredArchives,
- JobRetainedStrategy jobRetainedStrategy)
+ ArchiveRetainedStrategy jobRetainedStrategy)
throws IOException {
this.refreshDirs = checkNotNull(refreshDirs);
this.jobArchiveEventListener = jobArchiveEventListener;
@@ -177,6 +177,10 @@ void fetchArchives() {
int fileOrderedIndexOnModifiedTime = 0;
for (FileStatus jobArchive : jobArchives) {
+ if (jobArchive.isDir()) {
+ continue;
+ }
+
Path jobArchivePath = jobArchive.getPath();
String jobID = jobArchivePath.getName();
if (!isValidJobID(jobID, refreshDir)) {
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java
similarity index 96%
rename from flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
rename to flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java
index 2ef991698bf71..f2e50dd73c551 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobRetainedStrategy.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/ArchiveRetainedStrategy.java
@@ -21,7 +21,7 @@
import org.apache.flink.core.fs.FileStatus;
/** To define the strategy interface to judge whether the file should be retained. */
-public interface JobRetainedStrategy {
+public interface ArchiveRetainedStrategy {
/**
* Judge whether the file should be retained.
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java
similarity index 63%
rename from flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
rename to flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java
index acd35c93f7903..2a38dfaeff773 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategy.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategy.java
@@ -31,23 +31,42 @@
import java.util.List;
import java.util.Optional;
+import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS;
import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
+import static org.apache.flink.util.Preconditions.checkArgument;
/** The retained strategy. */
-public class CompositeJobRetainedStrategy implements JobRetainedStrategy {
+public class CompositeArchiveRetainedStrategy implements ArchiveRetainedStrategy {
- public static JobRetainedStrategy createFrom(ReadableConfig config) {
+ public static ArchiveRetainedStrategy createForJobFromConfig(ReadableConfig config) {
int maxHistorySizeByOldKey = config.get(HISTORY_SERVER_RETAINED_JOBS);
+ if (maxHistorySizeByOldKey == 0 || maxHistorySizeByOldKey < -1) {
+ throw new IllegalConfigurationException(
+ "Cannot set %s to 0 or less than -1", HISTORY_SERVER_RETAINED_JOBS.key());
+ }
Optional retainedTtlOpt = config.getOptional(HISTORY_SERVER_RETAINED_TTL);
- return new CompositeJobRetainedStrategy(
- new QuantityJobRetainedStrategy(maxHistorySizeByOldKey),
- new TimeToLiveJobRetainedStrategy(retainedTtlOpt.orElse(null)));
+ return new CompositeArchiveRetainedStrategy(
+ new QuantityArchiveRetainedStrategy(maxHistorySizeByOldKey),
+ new TimeToLiveArchiveRetainedStrategy(retainedTtlOpt.orElse(null)));
}
- private final List strategies;
+ public static ArchiveRetainedStrategy createForApplicationFromConfig(ReadableConfig config) {
+ int maxHistorySize = config.get(HISTORY_SERVER_RETAINED_APPLICATIONS);
+ if (maxHistorySize == 0 || maxHistorySize < -1) {
+ throw new IllegalConfigurationException(
+ "Cannot set %s to 0 or less than -1",
+ HISTORY_SERVER_RETAINED_APPLICATIONS.key());
+ }
+ Optional retainedTtlOpt = config.getOptional(HISTORY_SERVER_RETAINED_TTL);
+ return new CompositeArchiveRetainedStrategy(
+ new QuantityArchiveRetainedStrategy(maxHistorySize),
+ new TimeToLiveArchiveRetainedStrategy(retainedTtlOpt.orElse(null)));
+ }
+
+ private final List strategies;
- CompositeJobRetainedStrategy(JobRetainedStrategy... strategies) {
+ CompositeArchiveRetainedStrategy(ArchiveRetainedStrategy... strategies) {
this.strategies =
strategies == null || strategies.length == 0
? Collections.emptyList()
@@ -64,11 +83,11 @@ public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
}
/** The time to live based retained strategy. */
-class TimeToLiveJobRetainedStrategy implements JobRetainedStrategy {
+class TimeToLiveArchiveRetainedStrategy implements ArchiveRetainedStrategy {
@Nullable private final Duration ttlThreshold;
- TimeToLiveJobRetainedStrategy(Duration ttlThreshold) {
+ TimeToLiveArchiveRetainedStrategy(@Nullable Duration ttlThreshold) {
if (ttlThreshold != null && ttlThreshold.toMillis() <= 0) {
throw new IllegalConfigurationException(
"Cannot set %s to 0 or less than 0 milliseconds",
@@ -86,16 +105,13 @@ public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
}
}
-/** The job quantity based retained strategy. */
-class QuantityJobRetainedStrategy implements JobRetainedStrategy {
+/** The quantity based retained strategy. */
+class QuantityArchiveRetainedStrategy implements ArchiveRetainedStrategy {
private final int quantityThreshold;
- QuantityJobRetainedStrategy(int quantityThreshold) {
- if (quantityThreshold == 0 || quantityThreshold < -1) {
- throw new IllegalConfigurationException(
- "Cannot set %s to 0 or less than -1", HISTORY_SERVER_RETAINED_JOBS.key());
- }
+ QuantityArchiveRetainedStrategy(int quantityThreshold) {
+ checkArgument(quantityThreshold == -1 || quantityThreshold > 0);
this.quantityThreshold = quantityThreshold;
}
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 537d58fae8e77..2cd6f1d4f8575 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -18,19 +18,32 @@
package org.apache.flink.runtime.webmonitor.history;
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.ApplicationState;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HistoryServerOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.application.ArchivedApplication;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.history.ArchivePathUtils;
import org.apache.flink.runtime.history.FsJsonArchivist;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
+import org.apache.flink.runtime.messages.webmonitor.ApplicationDetailsInfo;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
+import org.apache.flink.runtime.rest.messages.ApplicationsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.application.ApplicationDetailsHeaders;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.webmonitor.testutils.HttpUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -51,6 +64,8 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
@@ -58,11 +73,14 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -86,13 +104,14 @@ class HistoryServerTest {
private MiniClusterWithClientResource cluster;
private File jmDirectory;
private File hsDirectory;
+ private Configuration clusterConfig;
@BeforeEach
void setUp(@TempDir File jmDirectory, @TempDir File hsDirectory) throws Exception {
this.jmDirectory = jmDirectory;
this.hsDirectory = hsDirectory;
- Configuration clusterConfig = new Configuration();
+ clusterConfig = new Configuration();
clusterConfig.set(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
cluster =
@@ -130,6 +149,9 @@ void testHistoryServerIntegration(final boolean versionLessThan14) throws Except
== HistoryServerArchiveFetcher.ArchiveEventType.CREATED) {
numExpectedArchivedJobs.countDown();
}
+ },
+ (event) -> {
+ throw new RuntimeException("Should not called");
});
try {
@@ -164,10 +186,10 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio
final int numArchivesToRemoveUponHsStart =
numArchivesBeforeHsStarted - numArchivesToKeepInHistory;
final long oneMinuteSinceEpoch = 1000L * 60L;
- List expectedJobIdsToKeep = new LinkedList<>();
+ List expectedJobIdsToKeep = new LinkedList<>();
for (int j = 0; j < numArchivesBeforeHsStarted; j++) {
- String jobId =
+ JobID jobId =
createLegacyArchive(
jmDirectory.toPath(), j * oneMinuteSinceEpoch, versionLessThan14);
if (j >= numArchivesToRemoveUponHsStart) {
@@ -205,6 +227,9 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio
numArchivesDeletedTotal.countDown();
break;
}
+ },
+ (event) -> {
+ throw new RuntimeException("Should not called");
});
try {
@@ -232,10 +257,9 @@ void testRemoveOldestModifiedArchivesBeyondHistorySizeLimit(final boolean versio
}
}
- private Set getIdsFromJobOverview(String baseUrl) throws Exception {
+ private Set getIdsFromJobOverview(String baseUrl) throws Exception {
return getJobsOverview(baseUrl).getJobs().stream()
.map(JobDetails::getJobId)
- .map(JobID::toString)
.collect(Collectors.toSet());
}
@@ -288,15 +312,26 @@ void testClearWebDir() throws Exception {
new File(hsDirectory.toURI() + "/overviews/dirtyEmptySubFile.json").createNewFile();
new File(hsDirectory.toURI() + "/jobs/dirtyEmptySubDir").mkdir();
new File(hsDirectory.toURI() + "/jobs/dirtyEmptySubFile.json").createNewFile();
+ new File(hsDirectory.toURI() + "/application-overviews/dirtyEmptySubDir").mkdir();
+ new File(hsDirectory.toURI() + "/application-overviews/dirtyEmptySubFile.json")
+ .createNewFile();
+ new File(hsDirectory.toURI() + "/applications/dirtyEmptySubDir").mkdir();
+ new File(hsDirectory.toURI() + "/applications/dirtyEmptySubFile.json").createNewFile();
hs = new HistoryServer(historyServerConfig);
assertInitializedHistoryServerWebDir(hs.getWebDir());
}
private void assertInitializedHistoryServerWebDir(File historyWebDir) {
-
- assertThat(historyWebDir.list()).containsExactlyInAnyOrder("overviews", "jobs");
+ assertThat(historyWebDir.list())
+ .containsExactlyInAnyOrder(
+ "overviews", "jobs", "application-overviews", "applications");
assertThat(new File(historyWebDir, "overviews")).exists().isDirectory().isEmptyDirectory();
assertThat(new File(historyWebDir, "jobs").list()).containsExactly("overview.json");
+ assertThat(new File(historyWebDir, "application-overviews"))
+ .exists()
+ .isDirectory()
+ .isEmptyDirectory();
+ assertThat(new File(historyWebDir, "applications").list()).containsExactly("overview.json");
}
private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Exception {
@@ -327,6 +362,9 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti
allArchivesExpiredLatch.countDown();
break;
}
+ },
+ (event) -> {
+ throw new RuntimeException("Should not called");
});
try {
@@ -378,27 +416,33 @@ private void runArchiveExpirationTest(boolean cleanupExpiredJobs) throws Excepti
assertThat(allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue();
- assertJobFilesCleanedUp(cleanupExpiredJobs);
+ assertFilesCleanedUp(cleanupExpiredJobs);
} finally {
hs.stop();
}
}
- private void assertJobFilesCleanedUp(boolean jobFilesShouldBeDeleted) throws IOException {
+ private void assertFilesCleanedUp(boolean filesShouldBeDeleted) throws IOException {
try (Stream paths = Files.walk(hsDirectory.toPath())) {
- final List jobFiles =
+ final List applicationOrJobFiles =
paths.filter(path -> !path.equals(hsDirectory.toPath()))
.map(path -> hsDirectory.toPath().relativize(path))
.filter(path -> !path.equals(Paths.get("config.json")))
.filter(path -> !path.equals(Paths.get("jobs")))
.filter(path -> !path.equals(Paths.get("jobs", "overview.json")))
.filter(path -> !path.equals(Paths.get("overviews")))
+ .filter(path -> !path.equals(Paths.get("applications")))
+ .filter(
+ path ->
+ !path.equals(
+ Paths.get("applications", "overview.json")))
+ .filter(path -> !path.equals(Paths.get("application-overviews")))
.collect(Collectors.toList());
- if (jobFilesShouldBeDeleted) {
- assertThat(jobFiles).isEmpty();
+ if (filesShouldBeDeleted) {
+ assertThat(applicationOrJobFiles).isEmpty();
} else {
- assertThat(jobFiles).isNotEmpty();
+ assertThat(applicationOrJobFiles).isNotEmpty();
}
}
}
@@ -413,6 +457,13 @@ private void waitForArchivesCreation(int numJobs) throws InterruptedException {
}
private Configuration createTestConfiguration(boolean cleanupExpiredJobs) {
+ return createTestConfiguration(
+ cleanupExpiredJobs,
+ HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS.defaultValue());
+ }
+
+ private Configuration createTestConfiguration(
+ boolean cleanupExpiredJobs, boolean cleanupExpiredApplications) {
Configuration historyServerConfig = new Configuration();
historyServerConfig.set(
HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
@@ -424,6 +475,9 @@ private Configuration createTestConfiguration(boolean cleanupExpiredJobs) {
historyServerConfig.set(
HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS, cleanupExpiredJobs);
+ historyServerConfig.set(
+ HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS,
+ cleanupExpiredApplications);
historyServerConfig.set(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
return historyServerConfig;
@@ -451,15 +505,15 @@ private static void runJob() throws Exception {
env.execute();
}
- private static String createLegacyArchive(
+ private static JobID createLegacyArchive(
Path directory, long fileModifiedDate, boolean versionLessThan14) throws IOException {
- String jobId = createLegacyArchive(directory, versionLessThan14);
- File jobArchive = directory.resolve(jobId).toFile();
+ JobID jobId = createLegacyArchive(directory, versionLessThan14);
+ File jobArchive = directory.resolve(jobId.toString()).toFile();
jobArchive.setLastModified(fileModifiedDate);
return jobId;
}
- private static String createLegacyArchive(Path directory, boolean versionLessThan14)
+ private static JobID createLegacyArchive(Path directory, boolean versionLessThan14)
throws IOException {
JobID jobId = JobID.generate();
@@ -504,7 +558,409 @@ private static String createLegacyArchive(Path directory, boolean versionLessTha
directory.toAbsolutePath().toString(), jobId.toString()),
Collections.singleton(archivedJson));
- return jobId.toString();
+ return jobId;
+ }
+
+ @Test
+ void testApplicationAndJobArchives() throws Exception {
+ int numApplications = 2;
+ int numJobsPerApplication = 2;
+ // jobs that are not part of an application
+ int numJobsOutsideApplication = 1;
+
+ Map> expectedApplicationAndJobIds =
+ new HashMap<>(numApplications);
+ for (int i = 0; i < numApplications; i++) {
+ ArchivedApplication archivedApplication = mockApplicationArchive(numJobsPerApplication);
+ ApplicationID applicationId = archivedApplication.getApplicationId();
+ List jobIds =
+ archivedApplication.getJobs().values().stream()
+ .map(ExecutionGraphInfo::getJobId)
+ .collect(Collectors.toList());
+ expectedApplicationAndJobIds.put(applicationId, new HashSet<>(jobIds));
+ }
+ Set expectedJobIdsOutsideApplication = new HashSet<>(numJobsOutsideApplication);
+ for (int i = 0; i < numJobsOutsideApplication; i++) {
+ ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(null);
+ mockJobArchive(executionGraphInfo);
+ expectedJobIdsOutsideApplication.add(executionGraphInfo.getJobId());
+ }
+
+ int numTotalJobs = numApplications * numJobsPerApplication + numJobsOutsideApplication;
+ int numTotal = numApplications + numTotalJobs;
+ CountDownLatch numExpectedArchives = new CountDownLatch(numTotal);
+ Configuration historyServerConfig = createTestConfiguration(false);
+ HistoryServer hs =
+ new HistoryServer(
+ historyServerConfig,
+ (event) -> {
+ if (event.getType()
+ == HistoryServerArchiveFetcher.ArchiveEventType.CREATED) {
+ numExpectedArchives.countDown();
+ }
+ },
+ (event) -> {
+ if (event.getType()
+ == HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .CREATED) {
+ numExpectedArchives.countDown();
+ }
+ });
+ try {
+ hs.start();
+ String baseUrl = "http://localhost:" + hs.getWebPort();
+ assertThat(numExpectedArchives.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(expectedApplicationAndJobIds);
+ Set expectedJobIds =
+ Stream.concat(
+ expectedApplicationAndJobIds.values().stream()
+ .flatMap(Set::stream),
+ expectedJobIdsOutsideApplication.stream())
+ .collect(Collectors.toSet());
+ assertThat(getIdsFromJobOverview(baseUrl)).isEqualTo(expectedJobIds);
+ // checks whether the dashboard configuration contains all expected fields
+ getDashboardConfiguration(baseUrl);
+ } finally {
+ hs.stop();
+ }
+ }
+
+ @Test
+ void testRemoveApplicationArchivesBeyondHistorySizeLimit() throws Exception {
+ int numJobsPerApplication = 1;
+ int numApplicationsToKeepInHistory = 2;
+ int numApplicationsBeforeHsStarted = 4;
+ int numApplicationsAfterHsStarted = 2;
+ int numApplicationsToRemoveUponHsStart =
+ numApplicationsBeforeHsStarted - numApplicationsToKeepInHistory;
+ List>> expectedApplicationAndJobIdsToKeep =
+ new LinkedList<>();
+ for (int i = 0; i < numApplicationsBeforeHsStarted; i++) {
+ ArchivedApplication archivedApplication = mockApplicationArchive(numJobsPerApplication);
+ ApplicationID applicationId = archivedApplication.getApplicationId();
+ List jobIds =
+ archivedApplication.getJobs().values().stream()
+ .map(ExecutionGraphInfo::getJobId)
+ .collect(Collectors.toList());
+ if (i >= numApplicationsToRemoveUponHsStart) {
+ expectedApplicationAndJobIdsToKeep.add(
+ new Tuple2<>(applicationId, new HashSet<>(jobIds)));
+ }
+ }
+
+ // one for application itself, numJobsPerApplication for jobs
+ int numArchivesRatio = 1 + numJobsPerApplication;
+ CountDownLatch numArchivesCreatedInitially =
+ new CountDownLatch(numApplicationsToKeepInHistory * numArchivesRatio);
+ // jobs in applications that exceed the size limit are not read by the fetcher at all,
+ // so there is no need to delete these jobs.
+ CountDownLatch numArchivesDeletedInitially =
+ new CountDownLatch(numApplicationsToRemoveUponHsStart);
+ CountDownLatch numArchivesCreatedTotal =
+ new CountDownLatch(
+ (numApplicationsBeforeHsStarted
+ - numApplicationsToRemoveUponHsStart
+ + numApplicationsAfterHsStarted)
+ * numArchivesRatio);
+ CountDownLatch numArchivesDeletedTotal =
+ new CountDownLatch(
+ numApplicationsToRemoveUponHsStart
+ + numApplicationsAfterHsStarted * numArchivesRatio);
+ Configuration historyServerConfig =
+ createTestConfiguration(
+ HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue());
+ historyServerConfig.set(
+ HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS,
+ numApplicationsToKeepInHistory);
+ HistoryServer hs =
+ new HistoryServer(
+ historyServerConfig,
+ (event) -> {
+ throw new RuntimeException("Should not called");
+ },
+ (event) -> {
+ if (event.getType()
+ == HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .CREATED) {
+ numArchivesCreatedInitially.countDown();
+ numArchivesCreatedTotal.countDown();
+ } else if (event.getType()
+ == HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .DELETED) {
+ numArchivesDeletedInitially.countDown();
+ numArchivesDeletedTotal.countDown();
+ }
+ });
+ try {
+ hs.start();
+ String baseUrl = "http://localhost:" + hs.getWebPort();
+ assertThat(numArchivesCreatedInitially.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertThat(numArchivesDeletedInitially.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(
+ expectedApplicationAndJobIdsToKeep.stream()
+ .collect(
+ Collectors.toMap(
+ tuple -> tuple.f0, tuple -> tuple.f1)));
+ for (int i = numApplicationsBeforeHsStarted;
+ i < numApplicationsBeforeHsStarted + numApplicationsAfterHsStarted;
+ i++) {
+ expectedApplicationAndJobIdsToKeep.remove(0);
+ ArchivedApplication archivedApplication =
+ mockApplicationArchive(numJobsPerApplication);
+ ApplicationID applicationId = archivedApplication.getApplicationId();
+ List jobIds =
+ archivedApplication.getJobs().values().stream()
+ .map(ExecutionGraphInfo::getJobId)
+ .collect(Collectors.toList());
+ expectedApplicationAndJobIdsToKeep.add(
+ new Tuple2<>(applicationId, new HashSet<>(jobIds)));
+ // avoid executing too fast, resulting in the same creation time of archive files
+ Thread.sleep(50);
+ }
+
+ assertThat(numArchivesCreatedTotal.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertThat(numArchivesDeletedTotal.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(
+ expectedApplicationAndJobIdsToKeep.stream()
+ .collect(
+ Collectors.toMap(
+ tuple -> tuple.f0, tuple -> tuple.f1)));
+ } finally {
+ hs.stop();
+ }
+ }
+
+ @Test
+ void testFailIfApplicationHistorySizeLimitIsZero() {
+ assertThatThrownBy(() -> startHistoryServerWithApplicationSizeLimit(0))
+ .isInstanceOf(IllegalConfigurationException.class);
+ }
+
+ @Test
+ void testFailIfApplicationHistorySizeLimitIsLessThanMinusOne() {
+ assertThatThrownBy(() -> startHistoryServerWithApplicationSizeLimit(-2))
+ .isInstanceOf(IllegalConfigurationException.class);
+ }
+
+ private void startHistoryServerWithApplicationSizeLimit(int maxHistorySize)
+ throws IOException, FlinkException, InterruptedException {
+ Configuration historyServerConfig =
+ createTestConfiguration(
+ HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS
+ .defaultValue());
+ historyServerConfig.set(
+ HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS, maxHistorySize);
+ new HistoryServer(historyServerConfig).start();
+ }
+
+ @Test
+ void testCleanExpiredApplication() throws Exception {
+ runApplicationArchiveExpirationTest(true);
+ }
+
+ @Test
+ void testRemainExpiredApplication() throws Exception {
+ runApplicationArchiveExpirationTest(false);
+ }
+
+ private void runApplicationArchiveExpirationTest(boolean cleanupExpiredApplications)
+ throws Exception {
+ int numExpiredApplications = cleanupExpiredApplications ? 1 : 0;
+ int numApplications = 3;
+ int numJobsPerApplication = 1;
+
+ Map> expectedApplicationAndJobIds =
+ new HashMap<>(numApplications);
+ for (int i = 0; i < numApplications; i++) {
+ ArchivedApplication archivedApplication = mockApplicationArchive(numJobsPerApplication);
+ ApplicationID applicationId = archivedApplication.getApplicationId();
+ List jobIds =
+ archivedApplication.getJobs().values().stream()
+ .map(ExecutionGraphInfo::getJobId)
+ .collect(Collectors.toList());
+ expectedApplicationAndJobIds.put(applicationId, new HashSet<>(jobIds));
+ }
+
+ // one for application itself, numJobsPerApplication for jobs
+ int numArchivesRatio = 1 + numJobsPerApplication;
+ CountDownLatch numExpectedArchives = new CountDownLatch(numApplications * numArchivesRatio);
+ CountDownLatch firstArchiveExpiredLatch =
+ new CountDownLatch(numExpiredApplications * numArchivesRatio);
+ CountDownLatch allArchivesExpiredLatch =
+ new CountDownLatch(
+ cleanupExpiredApplications ? numApplications * numArchivesRatio : 0);
+
+ Configuration historyServerConfig =
+ createTestConfiguration(
+ HistoryServerOptions.HISTORY_SERVER_CLEANUP_EXPIRED_JOBS.defaultValue(),
+ cleanupExpiredApplications);
+ HistoryServer hs =
+ new HistoryServer(
+ historyServerConfig,
+ (event) -> {
+ throw new RuntimeException("Should not called");
+ },
+ (event) -> {
+ if (event.getType()
+ == HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .CREATED) {
+ numExpectedArchives.countDown();
+ } else if (event.getType()
+ == HistoryServerApplicationArchiveFetcher.ArchiveEventType
+ .DELETED) {
+ firstArchiveExpiredLatch.countDown();
+ allArchivesExpiredLatch.countDown();
+ }
+ });
+ try {
+ hs.start();
+ String baseUrl = "http://localhost:" + hs.getWebPort();
+ assertThat(numExpectedArchives.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(expectedApplicationAndJobIds);
+ ApplicationID applicationIdToDelete =
+ expectedApplicationAndJobIds.keySet().stream()
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Expected at least one application"));
+ if (cleanupExpiredApplications) {
+ expectedApplicationAndJobIds.remove(applicationIdToDelete);
+ }
+ // trigger another fetch and delete one archive from jm
+ // we fetch again to probabilistically cause a concurrent deletion
+ hs.fetchArchives();
+ deleteApplicationArchiveDir(applicationIdToDelete);
+
+ assertThat(firstArchiveExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue();
+ // check that archive is still/no longer present in hs
+ assertThat(getApplicationAndJobIdsFromApplicationOverview(baseUrl))
+ .isEqualTo(expectedApplicationAndJobIds);
+ for (ApplicationID remainingApplicationId : expectedApplicationAndJobIds.keySet()) {
+ deleteApplicationArchiveDir(remainingApplicationId);
+ }
+ assertThat(allArchivesExpiredLatch.await(10L, TimeUnit.SECONDS)).isTrue();
+ assertFilesCleanedUp(cleanupExpiredApplications);
+ } finally {
+ hs.stop();
+ }
+ }
+
+ private Map> getApplicationAndJobIdsFromApplicationOverview(
+ String baseUrl) throws Exception {
+ Set applicationIds =
+ getApplicationsOverview(baseUrl).getApplications().stream()
+ .map(ApplicationDetails::getApplicationId)
+ .collect(Collectors.toSet());
+ Map> applicationAndJobIds = new HashMap<>(applicationIds.size());
+ for (ApplicationID applicationId : applicationIds) {
+ Set jobIds =
+ getApplicationDetails(baseUrl, applicationId).getJobs().stream()
+ .map(JobDetails::getJobId)
+ .collect(Collectors.toSet());
+ applicationAndJobIds.put(applicationId, jobIds);
+ }
+ return applicationAndJobIds;
+ }
+
+ private static MultipleApplicationsDetails getApplicationsOverview(String baseUrl)
+ throws Exception {
+ Tuple2 response =
+ HttpUtils.getFromHTTP(baseUrl + ApplicationsOverviewHeaders.URL);
+ return OBJECT_MAPPER.readValue(response.f1, MultipleApplicationsDetails.class);
+ }
+
+ private static ApplicationDetailsInfo getApplicationDetails(
+ String baseUrl, ApplicationID applicationId) throws Exception {
+ Tuple2 response =
+ HttpUtils.getFromHTTP(
+ baseUrl
+ + ApplicationDetailsHeaders.URL.replace(
+ ':' + ApplicationIDPathParameter.KEY,
+ applicationId.toString()));
+ return OBJECT_MAPPER.readValue(response.f1, ApplicationDetailsInfo.class);
+ }
+
+ private ArchivedApplication mockApplicationArchive(int numJobs) throws IOException {
+ ArchivedApplication archivedApplication = createArchivedApplication(numJobs);
+ ApplicationID applicationId = archivedApplication.getApplicationId();
+ ArchivedJson archivedApplicationsOverview =
+ new ArchivedJson(
+ ApplicationsOverviewHeaders.URL,
+ new MultipleApplicationsDetails(
+ Collections.singleton(
+ ApplicationDetails.fromArchivedApplication(
+ archivedApplication))));
+ ArchivedJson archivedApplicationDetails =
+ new ArchivedJson(
+ ApplicationDetailsHeaders.URL.replace(
+ ':' + ApplicationIDPathParameter.KEY, applicationId.toString()),
+ ApplicationDetailsInfo.fromArchivedApplication(archivedApplication));
+ // set cluster id to application id to simplify the test
+ clusterConfig.set(ClusterOptions.CLUSTER_ID, applicationId.toString());
+ FsJsonArchivist.writeArchivedJsons(
+ ArchivePathUtils.getApplicationArchivePath(clusterConfig, applicationId),
+ Arrays.asList(archivedApplicationsOverview, archivedApplicationDetails));
+
+ Map jobs = archivedApplication.getJobs();
+ for (Map.Entry jobEntry : jobs.entrySet()) {
+ mockJobArchive(jobEntry.getValue());
+ }
+ return archivedApplication;
+ }
+
+ private void mockJobArchive(ExecutionGraphInfo executionGraphInfo) throws IOException {
+ JobID jobId = executionGraphInfo.getJobId();
+ ApplicationID applicationId = executionGraphInfo.getApplicationId().orElse(null);
+ ArchivedJson archivedJobsOverview =
+ new ArchivedJson(
+ JobsOverviewHeaders.URL,
+ new MultipleJobsDetails(
+ Collections.singleton(
+ JobDetails.createDetailsForJob(
+ executionGraphInfo.getArchivedExecutionGraph()))));
+ FsJsonArchivist.writeArchivedJsons(
+ ArchivePathUtils.getJobArchivePath(clusterConfig, jobId, applicationId),
+ Collections.singletonList(archivedJobsOverview));
+ }
+
+ private ArchivedApplication createArchivedApplication(int numJobs) {
+ ApplicationID applicationId = ApplicationID.generate();
+ Map jobs = new HashMap<>(numJobs);
+ for (int i = 0; i < numJobs; i++) {
+ ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(applicationId);
+ jobs.put(executionGraphInfo.getJobId(), executionGraphInfo);
+ }
+ return new ArchivedApplication(
+ applicationId,
+ "test-application",
+ ApplicationState.FINISHED,
+ new long[ApplicationState.values().length],
+ jobs);
+ }
+
+ private ExecutionGraphInfo createExecutionGraphInfo(@Nullable ApplicationID applicationId) {
+ return createExecutionGraphInfo(JobID.generate(), applicationId);
+ }
+
+ private ExecutionGraphInfo createExecutionGraphInfo(
+ JobID jobId, @Nullable ApplicationID applicationId) {
+ return new ExecutionGraphInfo(
+ ArchivedExecutionGraph.createSparseArchivedExecutionGraph(
+ jobId, "test-job", JobStatus.FINISHED, null, null, null, 0, applicationId));
+ }
+
+ private void deleteApplicationArchiveDir(ApplicationID applicationId) throws IOException {
+ // set cluster id to application id to simplify the test
+ clusterConfig.set(ClusterOptions.CLUSTER_ID, applicationId.toString());
+ org.apache.flink.core.fs.Path applicationArchiveDir =
+ ArchivePathUtils.getApplicationArchivePath(clusterConfig, applicationId)
+ .getParent();
+ applicationArchiveDir.getFileSystem().delete(applicationArchiveDir, true);
}
private static final class JsonObject implements AutoCloseable {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java
similarity index 64%
rename from flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
rename to flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java
index e8983967df537..d1d6124b3570f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeJobRetainedStrategyTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeArchiveRetainedStrategyTest.java
@@ -18,31 +18,49 @@
package org.apache.flink.runtime.webmonitor.history.retaining;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.Path;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration;
import java.time.Instant;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_APPLICATIONS;
import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Testing for {@link CompositeJobRetainedStrategy}. */
-class CompositeJobRetainedStrategyTest {
+/** Testing for {@link CompositeArchiveRetainedStrategy}. */
+class CompositeArchiveRetainedStrategyTest {
+
+ private static Stream getTestCases() {
+ return Stream.of(
+ new TestCase(
+ "Legacy Jobs",
+ HISTORY_SERVER_RETAINED_JOBS,
+ CompositeArchiveRetainedStrategy::createForJobFromConfig),
+ new TestCase(
+ "Applications",
+ HISTORY_SERVER_RETAINED_APPLICATIONS,
+ CompositeArchiveRetainedStrategy::createForApplicationFromConfig));
+ }
- @Test
- void testTimeToLiveBasedJobRetainedStrategy() {
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("getTestCases")
+ void testTimeToLiveBasedArchiveRetainedStrategy(TestCase testCase) {
final Configuration conf = new Configuration();
// Test for invalid option value.
conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ZERO);
- assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+ assertThatThrownBy(() -> testCase.createStrategy(conf))
.isInstanceOf(IllegalConfigurationException.class);
// Skipped for option value that is less than 0 milliseconds, which will throw a
// java.lang.NumberFormatException caused by TimeUtils.
@@ -51,7 +69,7 @@ void testTimeToLiveBasedJobRetainedStrategy() {
// Test the case where no specific retention policy is configured, i.e., all archived files
// are retained.
- JobRetainedStrategy strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ ArchiveRetainedStrategy strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
assertThat(
strategy.shouldRetain(
@@ -63,7 +81,7 @@ void testTimeToLiveBasedJobRetainedStrategy() {
// Test the case where TTL-based retention policies is specified only.
conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1L));
- strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
assertThat(
strategy.shouldRetain(
@@ -74,35 +92,37 @@ void testTimeToLiveBasedJobRetainedStrategy() {
.isFalse();
}
- @Test
- void testQuantityBasedJobRetainedStrategy() {
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("getTestCases")
+ void testQuantityBasedArchiveRetainedStrategy(TestCase testCase) {
final Configuration conf = new Configuration();
// Test for invalid option value.
- conf.set(HISTORY_SERVER_RETAINED_JOBS, 0);
- assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+ conf.set(testCase.getQuantityConfigOption(), 0);
+ assertThatThrownBy(() -> testCase.createStrategy(conf))
.isInstanceOf(IllegalConfigurationException.class);
- conf.set(HISTORY_SERVER_RETAINED_JOBS, -2);
- assertThatThrownBy(() -> CompositeJobRetainedStrategy.createFrom(conf))
+ conf.set(testCase.getQuantityConfigOption(), -2);
+ assertThatThrownBy(() -> testCase.createStrategy(conf))
.isInstanceOf(IllegalConfigurationException.class);
- conf.removeConfig(HISTORY_SERVER_RETAINED_JOBS);
+ conf.removeConfig(testCase.getQuantityConfigOption());
// Test the case where no specific retention policy is configured, i.e., all archived files
// are retained.
- JobRetainedStrategy strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ ArchiveRetainedStrategy strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isTrue();
// Test the case where QUANTITY-based retention policies is specified only.
- conf.set(HISTORY_SERVER_RETAINED_JOBS, 2);
- strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ conf.set(testCase.getQuantityConfigOption(), 2);
+ strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue();
assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isFalse();
}
- @Test
- void testCompositeBasedJobRetainedStrategy() {
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("getTestCases")
+ void testCompositeBasedArchiveRetainedStrategy(TestCase testCase) {
final long outOfTtlMillis =
Instant.now().toEpochMilli() - Duration.ofMinutes(2L).toMillis();
@@ -110,7 +130,7 @@ void testCompositeBasedJobRetainedStrategy() {
// Test the case where no specific retention policy is configured, i.e., all archived files
// are retained.
final Configuration conf = new Configuration();
- JobRetainedStrategy strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ ArchiveRetainedStrategy strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 1)).isTrue();
assertThat(strategy.shouldRetain(new TestingFileStatus(), 10)).isTrue();
assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 3)).isTrue();
@@ -118,8 +138,8 @@ void testCompositeBasedJobRetainedStrategy() {
// Test the case where both retention policies are specified.
conf.set(HISTORY_SERVER_RETAINED_TTL, Duration.ofMinutes(1));
- conf.set(HISTORY_SERVER_RETAINED_JOBS, 2);
- strategy = CompositeJobRetainedStrategy.createFrom(conf);
+ conf.set(testCase.getQuantityConfigOption(), 2);
+ strategy = testCase.createStrategy(conf);
assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 1)).isFalse();
assertThat(strategy.shouldRetain(new TestingFileStatus(), 10)).isFalse();
assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 3)).isFalse();
@@ -173,4 +193,32 @@ public Path getPath() {
return null;
}
}
+
+ private static final class TestCase {
+ private final String testName;
+ private final ConfigOption quantityConfigOption;
+ private final Function strategyFunction;
+
+ TestCase(
+ String testName,
+ ConfigOption quantityConfigOption,
+ Function strategyFunction) {
+ this.testName = testName;
+ this.quantityConfigOption = quantityConfigOption;
+ this.strategyFunction = strategyFunction;
+ }
+
+ ArchiveRetainedStrategy createStrategy(Configuration conf) {
+ return strategyFunction.apply(conf);
+ }
+
+ ConfigOption getQuantityConfigOption() {
+ return quantityConfigOption;
+ }
+
+ @Override
+ public String toString() {
+ return testName;
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index ec50feaaca854..4cccb1a8ac1f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
@@ -422,7 +423,31 @@ public static ArchivedExecutionGraph createSparseArchivedExecutionGraph(
Collections.emptyList(),
throwable,
checkpointingSettings,
- initializationTimestamp);
+ initializationTimestamp,
+ null);
+ }
+
+ @VisibleForTesting
+ public static ArchivedExecutionGraph createSparseArchivedExecutionGraph(
+ JobID jobId,
+ String jobName,
+ JobStatus jobStatus,
+ @Nullable JobType jobType,
+ @Nullable Throwable throwable,
+ @Nullable JobCheckpointingSettings checkpointingSettings,
+ long initializationTimestamp,
+ @Nullable ApplicationID applicationId) {
+ return createSparseArchivedExecutionGraph(
+ jobId,
+ jobName,
+ jobStatus,
+ jobType,
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ throwable,
+ checkpointingSettings,
+ initializationTimestamp,
+ applicationId);
}
public static ArchivedExecutionGraph createSparseArchivedExecutionGraphWithJobVertices(
@@ -464,7 +489,8 @@ public static ArchivedExecutionGraph createSparseArchivedExecutionGraphWithJobVe
archivedVerticesInCreationOrder,
throwable,
checkpointingSettings,
- initializationTimestamp);
+ initializationTimestamp,
+ null);
}
private static ArchivedExecutionGraph createSparseArchivedExecutionGraph(
@@ -476,7 +502,8 @@ private static ArchivedExecutionGraph createSparseArchivedExecutionGraph(
List archivedVerticesInCreationOrder,
@Nullable Throwable throwable,
@Nullable JobCheckpointingSettings checkpointingSettings,
- long initializationTimestamp) {
+ long initializationTimestamp,
+ @Nullable ApplicationID applicationId) {
final Map>> serializedUserAccumulators =
Collections.emptyMap();
StringifiedAccumulatorResult[] archivedUserAccumulators =
@@ -522,6 +549,6 @@ private static ArchivedExecutionGraph createSparseArchivedExecutionGraph(
checkpointingSettings == null ? null : "Unknown",
null,
0,
- null);
+ applicationId);
}
}