From 99248180a42a55a550b0a9d280aa30f6f16d9ef4 Mon Sep 17 00:00:00 2001 From: Robin Shen Date: Sun, 7 Jul 2019 22:05:29 +0800 Subject: [PATCH] Leverage Kubernetes node affinity to increase job cache hit rate --- .../io/onedev/server/ci/job/CacheSpec.java | 5 +- .../server/ci/job/DefaultJobManager.java | 108 +++++++---- .../io/onedev/server/ci/job/JobContext.java | 7 + .../io/onedev/server/ci/job/JobManager.java | 3 + .../plugin/kubernetes/KubernetesExecutor.java | 177 ++++++++++++++++-- .../plugin/kubernetes/KubernetesResource.java | 12 ++ .../serverdocker/ServerDockerExecutor.java | 11 +- 7 files changed, 266 insertions(+), 57 deletions(-) diff --git a/server-core/src/main/java/io/onedev/server/ci/job/CacheSpec.java b/server-core/src/main/java/io/onedev/server/ci/job/CacheSpec.java index cccdc49a0c..87aaed3616 100644 --- a/server-core/src/main/java/io/onedev/server/ci/job/CacheSpec.java +++ b/server-core/src/main/java/io/onedev/server/ci/job/CacheSpec.java @@ -2,10 +2,11 @@ package io.onedev.server.ci.job; import java.io.Serializable; +import javax.validation.constraints.Pattern; + import org.hibernate.validator.constraints.NotEmpty; import io.onedev.server.util.validation.annotation.Path; -import io.onedev.server.util.validation.annotation.PathSegment; import io.onedev.server.web.editable.annotation.Editable; @Editable @@ -18,8 +19,8 @@ public class CacheSpec implements Serializable { private String path; @Editable(order=100, description="Specify key of the cache. Caches with same key can be reused by different builds") - @PathSegment @NotEmpty + @Pattern(regexp="[a-zA-Z0-9\\-_]+", message="Can only contain alphanumeric, dash and underscore") public String getKey() { return key; } diff --git a/server-core/src/main/java/io/onedev/server/ci/job/DefaultJobManager.java b/server-core/src/main/java/io/onedev/server/ci/job/DefaultJobManager.java index 8c4de6da41..5ee916f533 100644 --- a/server-core/src/main/java/io/onedev/server/ci/job/DefaultJobManager.java +++ b/server-core/src/main/java/io/onedev/server/ci/job/DefaultJobManager.java @@ -629,57 +629,83 @@ public class DefaultJobManager implements JobManager, Runnable, CodePullAuthoriz } @Override - public synchronized Map allocateJobCaches(String jobToken, Date currentTime, + public Map allocateJobCaches(String jobToken, Date currentTime, Map cacheInstances) { - JobContext context = getJobContext(jobToken, true); + synchronized (jobContexts) { + JobContext context = getJobContext(jobToken, true); + + List sortedInstances = new ArrayList<>(cacheInstances.keySet()); + sortedInstances.sort(new Comparator() { + + @Override + public int compare(CacheInstance o1, CacheInstance o2) { + return cacheInstances.get(o2).compareTo(cacheInstances.get(o1)); + } + + }); - List sortedInstances = new ArrayList<>(cacheInstances.keySet()); - sortedInstances.sort(new Comparator() { - - @Override - public int compare(CacheInstance o1, CacheInstance o2) { - return cacheInstances.get(o2).compareTo(cacheInstances.get(o1)); + Collection allAllocated = new HashSet<>(); + for (JobContext each: jobContexts.values()) + allAllocated.addAll(each.getAllocatedCaches()); + Map allocations = new HashMap<>(); + for (CacheSpec cacheSpec: context.getCacheSpecs()) { + Optional result = sortedInstances + .stream() + .filter(it->it.getCacheKey().equals(cacheSpec.getKey())) + .filter(it->!allAllocated.contains(it.getName())) + .findFirst(); + CacheInstance allocation; + if (result.isPresent()) + allocation = result.get(); + else + allocation = new CacheInstance(UUID.randomUUID().toString(), cacheSpec.getKey()); + allocations.put(allocation, cacheSpec.getPath()); + context.getAllocatedCaches().add(allocation.getName()); + allAllocated.add(allocation.getName()); } - }); - Collection allAllocated = new HashSet<>(); - for (JobContext each: jobContexts.values()) - allAllocated.addAll(each.getAllocatedCaches()); - Map allocations = new HashMap<>(); - for (CacheSpec cacheSpec: context.getCacheSpecs()) { - Optional result = sortedInstances + Consumer deletionMarker = new Consumer() { + + @Override + public void accept(CacheInstance instance) { + long ellapsed = currentTime.getTime() - cacheInstances.get(instance).getTime(); + if (ellapsed > context.getCacheTTL() * 24L * 3600L * 1000L) { + allocations.put(instance, null); + context.getAllocatedCaches().add(instance.getName()); + } + } + + }; + + cacheInstances.keySet() .stream() - .filter(it->it.getCacheKey().equals(cacheSpec.getKey())) .filter(it->!allAllocated.contains(it.getName())) - .findFirst(); - CacheInstance allocation; - if (result.isPresent()) - allocation = result.get(); - else - allocation = new CacheInstance(UUID.randomUUID().toString(), cacheSpec.getKey()); - allocations.put(allocation, cacheSpec.getPath()); - context.getAllocatedCaches().add(allocation.getName()); - allAllocated.add(allocation.getName()); + .forEach(deletionMarker); + return allocations; } - - Consumer cacheCleaner = new Consumer() { + } - @Override - public void accept(CacheInstance instance) { - long ellapsed = currentTime.getTime() - cacheInstances.get(instance).getTime(); - if (ellapsed > context.getCacheTTL() * 24L * 3600L * 1000L) { - allocations.put(instance, null); - context.getAllocatedCaches().add(instance.getName()); + @Override + public void reportJobCaches(String jobToken, Collection cacheInstances) { + synchronized (jobContexts) { + JobContext context = getJobContext(jobToken, true); + + Collection allOtherAllocated = new HashSet<>(); + for (JobContext each: jobContexts.values()) { + if (each != context) + allOtherAllocated.addAll(each.getAllocatedCaches()); + } + for (CacheInstance cacheInstance: cacheInstances) { + if (!allOtherAllocated.contains(cacheInstance.getName())) { + String cacheKey = cacheInstance.getCacheKey(); + Integer cacheCount = context.getCacheCounts().get(cacheKey); + if (cacheCount == null) + cacheCount = 0; + cacheCount++; + context.getCacheCounts().put(cacheKey, cacheCount); } } - - }; - cacheInstances.keySet() - .stream() - .filter(it->!allAllocated.contains(it.getName())) - .forEach(cacheCleaner); - - return allocations; + } } } diff --git a/server-core/src/main/java/io/onedev/server/ci/job/JobContext.java b/server-core/src/main/java/io/onedev/server/ci/job/JobContext.java index 9ede1de790..92b9018a74 100644 --- a/server-core/src/main/java/io/onedev/server/ci/job/JobContext.java +++ b/server-core/src/main/java/io/onedev/server/ci/job/JobContext.java @@ -6,6 +6,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.eclipse.jgit.api.Git; import org.eclipse.jgit.api.errors.GitAPIException; @@ -45,6 +46,8 @@ public abstract class JobContext { private final Collection allocatedCaches = new HashSet<>(); + private final Map cacheCounts = new ConcurrentHashMap<>(); + public JobContext(String projectName, File gitDir, String environment, File workspace, Map envVars, List commands, boolean retrieveSource, ObjectId commitId, Collection caches, @@ -132,6 +135,10 @@ public abstract class JobContext { return allocatedCaches; } + public Map getCacheCounts() { + return cacheCounts; + } + public abstract void notifyJobRunning(); } diff --git a/server-core/src/main/java/io/onedev/server/ci/job/JobManager.java b/server-core/src/main/java/io/onedev/server/ci/job/JobManager.java index 6645447b5f..4dff4280d2 100644 --- a/server-core/src/main/java/io/onedev/server/ci/job/JobManager.java +++ b/server-core/src/main/java/io/onedev/server/ci/job/JobManager.java @@ -1,5 +1,6 @@ package io.onedev.server.ci.job; +import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; @@ -29,4 +30,6 @@ public interface JobManager { Map allocateJobCaches(String jobToken, Date currentTime, Map cacheInstances); + void reportJobCaches(String jobToken, Collection cacheInstances); + } diff --git a/server-plugin/server-plugin-kubernetes/src/main/java/io/onedev/server/plugin/kubernetes/KubernetesExecutor.java b/server-plugin/server-plugin-kubernetes/src/main/java/io/onedev/server/plugin/kubernetes/KubernetesExecutor.java index 0fc655909b..ebb7a29830 100644 --- a/server-plugin/server-plugin-kubernetes/src/main/java/io/onedev/server/plugin/kubernetes/KubernetesExecutor.java +++ b/server-plugin/server-plugin-kubernetes/src/main/java/io/onedev/server/plugin/kubernetes/KubernetesExecutor.java @@ -6,6 +6,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Base64; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -14,10 +15,10 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.validation.ConstraintValidatorContext; -import org.hibernate.validator.constraints.NotEmpty; import org.apache.commons.codec.Charsets; import org.apache.commons.io.FilenameUtils; +import org.hibernate.validator.constraints.NotEmpty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.unbescape.json.JsonEscape; @@ -35,10 +36,12 @@ import io.onedev.commons.utils.FileUtils; import io.onedev.commons.utils.Maps; import io.onedev.commons.utils.StringUtils; import io.onedev.commons.utils.command.Commandline; +import io.onedev.commons.utils.command.ExecuteResult; import io.onedev.commons.utils.command.LineConsumer; import io.onedev.k8shelper.KubernetesHelper; import io.onedev.server.OneDev; import io.onedev.server.OneException; +import io.onedev.server.ci.job.CacheSpec; import io.onedev.server.ci.job.JobContext; import io.onedev.server.entitymanager.SettingManager; import io.onedev.server.model.support.JobExecutor; @@ -60,6 +63,12 @@ public class KubernetesExecutor extends JobExecutor implements Testable getNodeSelectorData() { - Map data = new LinkedHashMap<>(); - for (NodeSelectorEntry selector: getNodeSelector()) - data.put(selector.getLabelName(), selector.getLabelValue()); - return data; + + @Nullable + private Map getAffinity(@Nullable JobContext jobContext) { + Map nodeAffinity = new LinkedHashMap<>(); + + List matchExpressions = new ArrayList<>(); + for (NodeSelectorEntry selector: getNodeSelector()) { + matchExpressions.add(Maps.newLinkedHashMap( + "key", selector.getLabelName(), + "operator", "In", + "values", Lists.newArrayList(selector.getLabelValue()))); + } + if (!matchExpressions.isEmpty()) { + List nodeSelectorTerms = Lists.newArrayList( + Maps.newLinkedHashMap("matchExpressions", matchExpressions)); + nodeAffinity.put("requiredDuringSchedulingIgnoredDuringExecution", + Maps.newLinkedHashMap("nodeSelectorTerms", nodeSelectorTerms)); + } + + if (jobContext != null) { + List preferredDuringSchedulingIgnoredDuringExecution = new ArrayList<>(); + for (CacheSpec cacheSpec: jobContext.getCacheSpecs()) { + for (int i=1; inewArrayList(Maps.newLinkedHashMap( + "key", "onedev-cache-" + cacheSpec.getKey(), + "operator", "In", + "values", Lists.newArrayList(String.valueOf(i))))))); + } + preferredDuringSchedulingIgnoredDuringExecution.add(Maps.newLinkedHashMap( + "weight", MAX_AFFINITY_WEIGHT, + "preference", Maps.newLinkedHashMap( + "matchExpressions", Lists.newArrayList(Maps.newLinkedHashMap( + "key", CACHE_LABEL_PREFIX + cacheSpec.getKey(), + "operator", "Gt", + "values", Lists.newArrayList(String.valueOf(MAX_AFFINITY_WEIGHT-1))))))); + } + if (!preferredDuringSchedulingIgnoredDuringExecution.isEmpty()) + nodeAffinity.put("preferredDuringSchedulingIgnoredDuringExecution", preferredDuringSchedulingIgnoredDuringExecution); + } + + if (!nodeAffinity.isEmpty()) + return Maps.newLinkedHashMap("nodeAffinity", nodeAffinity); + else + return null; } private String getOSName(JobLogger logger) { @@ -462,10 +512,11 @@ public class KubernetesExecutor extends JobExecutor implements TestablenewArrayList(mainContainerSpec, sidecarContainerSpec)); podSpec.put("initContainers", Lists.newArrayList(initContainerSpec)); + + Map affinity = getAffinity(jobContext); + if (affinity != null) + podSpec.put("affinity", affinity); - Map nodeSelectorData = getNodeSelectorData(); - if (!nodeSelectorData.isEmpty()) - podSpec.put("nodeSelector", nodeSelectorData); List imagePullSecretsData = getImagePullSecretsData(); if (!imagePullSecretsData.isEmpty()) podSpec.put("imagePullSecrets", imagePullSecretsData); @@ -586,6 +637,108 @@ public class KubernetesExecutor extends JobExecutor implements Testable nodeNameRef = new AtomicReference<>(null); + Commandline kubectl = newKubeCtl(); + kubectl.addArgs("get", "pod", podName, "-n", getNamespace(), "-o", "jsonpath={.spec.nodeName}"); + kubectl.execute(new LineConsumer() { + + @Override + public void consume(String line) { + nodeNameRef.set(line); + } + + }, new LineConsumer() { + + @Override + public void consume(String line) { + logger.log("Kubernetes: " + line); + } + + }).checkReturnCode(); + + String nodeName = Preconditions.checkNotNull(nodeNameRef.get()); + + kubectl.clearArgs(); + StringBuilder nodeJson = new StringBuilder(); + kubectl.addArgs("get", "node", nodeName, "-o", "json"); + kubectl.execute(new LineConsumer() { + + @Override + public void consume(String line) { + if (line.startsWith("{")) + nodeJson.append("{").append("\n"); + else if (line.startsWith("}")) + nodeJson.append("}"); + else + nodeJson.append(line).append("\n"); + } + + }, new LineConsumer() { + + @Override + public void consume(String line) { + logger.log("Kubernetes: " + line); + } + + }).checkReturnCode(); + + JsonNode nodeNode; + KubernetesExecutor.logger.trace("Node json:\n" + nodeJson.toString()); + try { + nodeNode = new ObjectMapper().readTree(nodeJson.toString()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + List labelUpdates = new ArrayList<>(); + + Iterator> it = nodeNode.get("metadata").get("labels").fields(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + if (entry.getKey().startsWith(CACHE_LABEL_PREFIX)) { + String cacheKey = entry.getKey().substring(CACHE_LABEL_PREFIX.length()); + int labelValue = entry.getValue().asInt(); + Integer count = jobContext.getCacheCounts().remove(cacheKey); + if (count == null) + labelUpdates.add(entry.getKey() + "-"); + else if (count != labelValue) + labelUpdates.add(entry.getKey() + "=" + count); + } + } + + for (Map.Entry entry: jobContext.getCacheCounts().entrySet()) + labelUpdates.add(CACHE_LABEL_PREFIX + entry.getKey() + "=" + entry.getValue()); + + for (List partition: Lists.partition(labelUpdates, LABEL_UPDATE_BATCH)) { + kubectl.clearArgs(); + kubectl.addArgs("label", "node", nodeName, "--overwrite"); + for (String labelUpdate: partition) + kubectl.addArgs(labelUpdate); + AtomicBoolean labelNotFound = new AtomicBoolean(false); + ExecuteResult result = kubectl.execute(new LineConsumer() { + + @Override + public void consume(String line) { + KubernetesExecutor.logger.debug(line); + } + + }, new LineConsumer() { + + @Override + public void consume(String line) { + if (line.startsWith("label") && line.endsWith("not found.")) + labelNotFound.set(true); + logger.log("Kubernetes: " + line); + } + + }); + if (!labelNotFound.get()) + result.checkReturnCode(); + } + } } finally { deleteResource("pod", podName, logger); } @@ -690,11 +843,11 @@ public class KubernetesExecutor extends JobExecutor implements Testable cacheInstances = (Collection) SerializationUtils + .deserialize(cacheInstanceBytes); + jobManager.reportJobCaches(getJobToken(), cacheInstances); + } + @Path("/download-dependencies") @Produces(MediaType.APPLICATION_OCTET_STREAM) @GET diff --git a/server-plugin/server-plugin-serverdocker/src/main/java/io/onedev/server/plugin/serverdocker/ServerDockerExecutor.java b/server-plugin/server-plugin-serverdocker/src/main/java/io/onedev/server/plugin/serverdocker/ServerDockerExecutor.java index 04fcc5e70b..7289be4981 100644 --- a/server-plugin/server-plugin-serverdocker/src/main/java/io/onedev/server/plugin/serverdocker/ServerDockerExecutor.java +++ b/server-plugin/server-plugin-serverdocker/src/main/java/io/onedev/server/plugin/serverdocker/ServerDockerExecutor.java @@ -170,11 +170,12 @@ public class ServerDockerExecutor extends JobExecutor implements Testable cacheInstances = KubernetesHelper.getCacheInstances(hostCacheHome); - Map cacheAllocations = OneDev.getInstance(JobManager.class) - .allocateJobCaches(jobToken, new Date(), cacheInstances); + Map cacheAllocations = jobManager.allocateJobCaches(jobToken, new Date(), cacheInstances); KubernetesHelper.preprocess(hostCacheHome, cacheAllocations, new Consumer() { @Override @@ -225,6 +226,7 @@ public class ServerDockerExecutor extends JobExecutor implements Testable