Leverage Kubernetes node affinity to increase job cache hit rate

This commit is contained in:
Robin Shen 2019-07-07 22:05:29 +08:00
parent 94af4a3625
commit 99248180a4
7 changed files with 266 additions and 57 deletions

View File

@ -2,10 +2,11 @@ package io.onedev.server.ci.job;
import java.io.Serializable;
import javax.validation.constraints.Pattern;
import org.hibernate.validator.constraints.NotEmpty;
import io.onedev.server.util.validation.annotation.Path;
import io.onedev.server.util.validation.annotation.PathSegment;
import io.onedev.server.web.editable.annotation.Editable;
@Editable
@ -18,8 +19,8 @@ public class CacheSpec implements Serializable {
private String path;
@Editable(order=100, description="Specify key of the cache. Caches with same key can be reused by different builds")
@PathSegment
@NotEmpty
@Pattern(regexp="[a-zA-Z0-9\\-_]+", message="Can only contain alphanumeric, dash and underscore")
public String getKey() {
return key;
}

View File

@ -629,57 +629,83 @@ public class DefaultJobManager implements JobManager, Runnable, CodePullAuthoriz
}
@Override
public synchronized Map<CacheInstance, String> allocateJobCaches(String jobToken, Date currentTime,
public Map<CacheInstance, String> allocateJobCaches(String jobToken, Date currentTime,
Map<CacheInstance, Date> cacheInstances) {
JobContext context = getJobContext(jobToken, true);
synchronized (jobContexts) {
JobContext context = getJobContext(jobToken, true);
List<CacheInstance> sortedInstances = new ArrayList<>(cacheInstances.keySet());
sortedInstances.sort(new Comparator<CacheInstance>() {
@Override
public int compare(CacheInstance o1, CacheInstance o2) {
return cacheInstances.get(o2).compareTo(cacheInstances.get(o1));
}
});
List<CacheInstance> sortedInstances = new ArrayList<>(cacheInstances.keySet());
sortedInstances.sort(new Comparator<CacheInstance>() {
@Override
public int compare(CacheInstance o1, CacheInstance o2) {
return cacheInstances.get(o2).compareTo(cacheInstances.get(o1));
Collection<String> allAllocated = new HashSet<>();
for (JobContext each: jobContexts.values())
allAllocated.addAll(each.getAllocatedCaches());
Map<CacheInstance, String> allocations = new HashMap<>();
for (CacheSpec cacheSpec: context.getCacheSpecs()) {
Optional<CacheInstance> result = sortedInstances
.stream()
.filter(it->it.getCacheKey().equals(cacheSpec.getKey()))
.filter(it->!allAllocated.contains(it.getName()))
.findFirst();
CacheInstance allocation;
if (result.isPresent())
allocation = result.get();
else
allocation = new CacheInstance(UUID.randomUUID().toString(), cacheSpec.getKey());
allocations.put(allocation, cacheSpec.getPath());
context.getAllocatedCaches().add(allocation.getName());
allAllocated.add(allocation.getName());
}
});
Collection<String> allAllocated = new HashSet<>();
for (JobContext each: jobContexts.values())
allAllocated.addAll(each.getAllocatedCaches());
Map<CacheInstance, String> allocations = new HashMap<>();
for (CacheSpec cacheSpec: context.getCacheSpecs()) {
Optional<CacheInstance> result = sortedInstances
Consumer<CacheInstance> deletionMarker = new Consumer<CacheInstance>() {
@Override
public void accept(CacheInstance instance) {
long ellapsed = currentTime.getTime() - cacheInstances.get(instance).getTime();
if (ellapsed > context.getCacheTTL() * 24L * 3600L * 1000L) {
allocations.put(instance, null);
context.getAllocatedCaches().add(instance.getName());
}
}
};
cacheInstances.keySet()
.stream()
.filter(it->it.getCacheKey().equals(cacheSpec.getKey()))
.filter(it->!allAllocated.contains(it.getName()))
.findFirst();
CacheInstance allocation;
if (result.isPresent())
allocation = result.get();
else
allocation = new CacheInstance(UUID.randomUUID().toString(), cacheSpec.getKey());
allocations.put(allocation, cacheSpec.getPath());
context.getAllocatedCaches().add(allocation.getName());
allAllocated.add(allocation.getName());
.forEach(deletionMarker);
return allocations;
}
Consumer<CacheInstance> cacheCleaner = new Consumer<CacheInstance>() {
}
@Override
public void accept(CacheInstance instance) {
long ellapsed = currentTime.getTime() - cacheInstances.get(instance).getTime();
if (ellapsed > context.getCacheTTL() * 24L * 3600L * 1000L) {
allocations.put(instance, null);
context.getAllocatedCaches().add(instance.getName());
@Override
public void reportJobCaches(String jobToken, Collection<CacheInstance> cacheInstances) {
synchronized (jobContexts) {
JobContext context = getJobContext(jobToken, true);
Collection<String> allOtherAllocated = new HashSet<>();
for (JobContext each: jobContexts.values()) {
if (each != context)
allOtherAllocated.addAll(each.getAllocatedCaches());
}
for (CacheInstance cacheInstance: cacheInstances) {
if (!allOtherAllocated.contains(cacheInstance.getName())) {
String cacheKey = cacheInstance.getCacheKey();
Integer cacheCount = context.getCacheCounts().get(cacheKey);
if (cacheCount == null)
cacheCount = 0;
cacheCount++;
context.getCacheCounts().put(cacheKey, cacheCount);
}
}
};
cacheInstances.keySet()
.stream()
.filter(it->!allAllocated.contains(it.getName()))
.forEach(cacheCleaner);
return allocations;
}
}
}

View File

@ -6,6 +6,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.errors.GitAPIException;
@ -45,6 +46,8 @@ public abstract class JobContext {
private final Collection<String> allocatedCaches = new HashSet<>();
private final Map<String, Integer> cacheCounts = new ConcurrentHashMap<>();
public JobContext(String projectName, File gitDir, String environment,
File workspace, Map<String, String> envVars, List<String> commands,
boolean retrieveSource, ObjectId commitId, Collection<CacheSpec> caches,
@ -132,6 +135,10 @@ public abstract class JobContext {
return allocatedCaches;
}
public Map<String, Integer> getCacheCounts() {
return cacheCounts;
}
public abstract void notifyJobRunning();
}

View File

@ -1,5 +1,6 @@
package io.onedev.server.ci.job;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
@ -29,4 +30,6 @@ public interface JobManager {
Map<CacheInstance, String> allocateJobCaches(String jobToken, Date currentTime,
Map<CacheInstance, Date> cacheInstances);
void reportJobCaches(String jobToken, Collection<CacheInstance> cacheInstances);
}

View File

@ -6,6 +6,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -14,10 +15,10 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.validation.ConstraintValidatorContext;
import org.hibernate.validator.constraints.NotEmpty;
import org.apache.commons.codec.Charsets;
import org.apache.commons.io.FilenameUtils;
import org.hibernate.validator.constraints.NotEmpty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.unbescape.json.JsonEscape;
@ -35,10 +36,12 @@ import io.onedev.commons.utils.FileUtils;
import io.onedev.commons.utils.Maps;
import io.onedev.commons.utils.StringUtils;
import io.onedev.commons.utils.command.Commandline;
import io.onedev.commons.utils.command.ExecuteResult;
import io.onedev.commons.utils.command.LineConsumer;
import io.onedev.k8shelper.KubernetesHelper;
import io.onedev.server.OneDev;
import io.onedev.server.OneException;
import io.onedev.server.ci.job.CacheSpec;
import io.onedev.server.ci.job.JobContext;
import io.onedev.server.entitymanager.SettingManager;
import io.onedev.server.model.support.JobExecutor;
@ -60,6 +63,12 @@ public class KubernetesExecutor extends JobExecutor implements Testable<TestData
private static final Logger logger = LoggerFactory.getLogger(KubernetesExecutor.class);
private static final int MAX_AFFINITY_WEIGHT = 10;
private static final String CACHE_LABEL_PREFIX = "onedev-cache/";
private static final int LABEL_UPDATE_BATCH = 100;
private String configFile;
private String kubeCtlPath;
@ -323,12 +332,53 @@ public class KubernetesExecutor extends JobExecutor implements Testable<TestData
}
return data;
}
private Map<String, String> getNodeSelectorData() {
Map<String, String> data = new LinkedHashMap<>();
for (NodeSelectorEntry selector: getNodeSelector())
data.put(selector.getLabelName(), selector.getLabelValue());
return data;
@Nullable
private Map<Object, Object> getAffinity(@Nullable JobContext jobContext) {
Map<Object, Object> nodeAffinity = new LinkedHashMap<>();
List<Object> matchExpressions = new ArrayList<>();
for (NodeSelectorEntry selector: getNodeSelector()) {
matchExpressions.add(Maps.newLinkedHashMap(
"key", selector.getLabelName(),
"operator", "In",
"values", Lists.newArrayList(selector.getLabelValue())));
}
if (!matchExpressions.isEmpty()) {
List<Object> nodeSelectorTerms = Lists.<Object>newArrayList(
Maps.newLinkedHashMap("matchExpressions", matchExpressions));
nodeAffinity.put("requiredDuringSchedulingIgnoredDuringExecution",
Maps.newLinkedHashMap("nodeSelectorTerms", nodeSelectorTerms));
}
if (jobContext != null) {
List<Object> preferredDuringSchedulingIgnoredDuringExecution = new ArrayList<>();
for (CacheSpec cacheSpec: jobContext.getCacheSpecs()) {
for (int i=1; i<MAX_AFFINITY_WEIGHT; i++) {
preferredDuringSchedulingIgnoredDuringExecution.add(Maps.newLinkedHashMap(
"weight", i,
"preference", Maps.newLinkedHashMap("matchExpressions",
Lists.<Object>newArrayList(Maps.newLinkedHashMap(
"key", "onedev-cache-" + cacheSpec.getKey(),
"operator", "In",
"values", Lists.newArrayList(String.valueOf(i)))))));
}
preferredDuringSchedulingIgnoredDuringExecution.add(Maps.newLinkedHashMap(
"weight", MAX_AFFINITY_WEIGHT,
"preference", Maps.newLinkedHashMap(
"matchExpressions", Lists.<Object>newArrayList(Maps.newLinkedHashMap(
"key", CACHE_LABEL_PREFIX + cacheSpec.getKey(),
"operator", "Gt",
"values", Lists.newArrayList(String.valueOf(MAX_AFFINITY_WEIGHT-1)))))));
}
if (!preferredDuringSchedulingIgnoredDuringExecution.isEmpty())
nodeAffinity.put("preferredDuringSchedulingIgnoredDuringExecution", preferredDuringSchedulingIgnoredDuringExecution);
}
if (!nodeAffinity.isEmpty())
return Maps.newLinkedHashMap("nodeAffinity", nodeAffinity);
else
return null;
}
private String getOSName(JobLogger logger) {
@ -462,10 +512,11 @@ public class KubernetesExecutor extends JobExecutor implements Testable<TestData
podSpec.put("containers", Lists.<Object>newArrayList(mainContainerSpec, sidecarContainerSpec));
podSpec.put("initContainers", Lists.<Object>newArrayList(initContainerSpec));
Map<Object, Object> affinity = getAffinity(jobContext);
if (affinity != null)
podSpec.put("affinity", affinity);
Map<String, String> nodeSelectorData = getNodeSelectorData();
if (!nodeSelectorData.isEmpty())
podSpec.put("nodeSelector", nodeSelectorData);
List<Object> imagePullSecretsData = getImagePullSecretsData();
if (!imagePullSecretsData.isEmpty())
podSpec.put("imagePullSecrets", imagePullSecretsData);
@ -586,6 +637,108 @@ public class KubernetesExecutor extends JobExecutor implements Testable<TestData
}, logger);
if (jobContext != null) {
logger.log("Updating job cache labels...");
AtomicReference<String> nodeNameRef = new AtomicReference<>(null);
Commandline kubectl = newKubeCtl();
kubectl.addArgs("get", "pod", podName, "-n", getNamespace(), "-o", "jsonpath={.spec.nodeName}");
kubectl.execute(new LineConsumer() {
@Override
public void consume(String line) {
nodeNameRef.set(line);
}
}, new LineConsumer() {
@Override
public void consume(String line) {
logger.log("Kubernetes: " + line);
}
}).checkReturnCode();
String nodeName = Preconditions.checkNotNull(nodeNameRef.get());
kubectl.clearArgs();
StringBuilder nodeJson = new StringBuilder();
kubectl.addArgs("get", "node", nodeName, "-o", "json");
kubectl.execute(new LineConsumer() {
@Override
public void consume(String line) {
if (line.startsWith("{"))
nodeJson.append("{").append("\n");
else if (line.startsWith("}"))
nodeJson.append("}");
else
nodeJson.append(line).append("\n");
}
}, new LineConsumer() {
@Override
public void consume(String line) {
logger.log("Kubernetes: " + line);
}
}).checkReturnCode();
JsonNode nodeNode;
KubernetesExecutor.logger.trace("Node json:\n" + nodeJson.toString());
try {
nodeNode = new ObjectMapper().readTree(nodeJson.toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
List<String> labelUpdates = new ArrayList<>();
Iterator<Map.Entry<String, JsonNode>> it = nodeNode.get("metadata").get("labels").fields();
while (it.hasNext()) {
Map.Entry<String, JsonNode> entry = it.next();
if (entry.getKey().startsWith(CACHE_LABEL_PREFIX)) {
String cacheKey = entry.getKey().substring(CACHE_LABEL_PREFIX.length());
int labelValue = entry.getValue().asInt();
Integer count = jobContext.getCacheCounts().remove(cacheKey);
if (count == null)
labelUpdates.add(entry.getKey() + "-");
else if (count != labelValue)
labelUpdates.add(entry.getKey() + "=" + count);
}
}
for (Map.Entry<String, Integer> entry: jobContext.getCacheCounts().entrySet())
labelUpdates.add(CACHE_LABEL_PREFIX + entry.getKey() + "=" + entry.getValue());
for (List<String> partition: Lists.partition(labelUpdates, LABEL_UPDATE_BATCH)) {
kubectl.clearArgs();
kubectl.addArgs("label", "node", nodeName, "--overwrite");
for (String labelUpdate: partition)
kubectl.addArgs(labelUpdate);
AtomicBoolean labelNotFound = new AtomicBoolean(false);
ExecuteResult result = kubectl.execute(new LineConsumer() {
@Override
public void consume(String line) {
KubernetesExecutor.logger.debug(line);
}
}, new LineConsumer() {
@Override
public void consume(String line) {
if (line.startsWith("label") && line.endsWith("not found."))
labelNotFound.set(true);
logger.log("Kubernetes: " + line);
}
});
if (!labelNotFound.get())
result.checkReturnCode();
}
}
} finally {
deleteResource("pod", podName, logger);
}
@ -690,11 +843,11 @@ public class KubernetesExecutor extends JobExecutor implements Testable<TestData
json.append("{").append("\n");
} else if (line.startsWith("}")) {
json.append("}");
KubernetesExecutor.logger.trace("Watching pod:\n" + json.toString());
KubernetesExecutor.logger.trace("Pod watching output:\n" + json.toString());
try {
process(mapper.readTree(json.toString()));
} catch (Exception e) {
KubernetesExecutor.logger.error("Error processing pod watching record", e);
KubernetesExecutor.logger.error("Error processing pod watching output", e);
}
json.setLength(0);
} else {

View File

@ -5,6 +5,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@ -28,6 +29,7 @@ import com.google.common.collect.Lists;
import io.onedev.commons.utils.TarUtils;
import io.onedev.k8shelper.CacheAllocationRequest;
import io.onedev.k8shelper.CacheInstance;
import io.onedev.server.OneException;
import io.onedev.server.ci.job.JobContext;
import io.onedev.server.ci.job.JobManager;
@ -75,6 +77,16 @@ public class KubernetesResource {
getJobToken(), allocationRequest.getCurrentTime(), allocationRequest.getInstances()));
}
@Path("/report-job-caches")
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
@POST
public void reportJobCaches(byte[] cacheInstanceBytes) {
@SuppressWarnings("unchecked")
Collection<CacheInstance> cacheInstances = (Collection<CacheInstance>) SerializationUtils
.deserialize(cacheInstanceBytes);
jobManager.reportJobCaches(getJobToken(), cacheInstances);
}
@Path("/download-dependencies")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@GET

View File

@ -170,11 +170,12 @@ public class ServerDockerExecutor extends JobExecutor implements Testable<TestDa
public Void call() {
jobContext.notifyJobRunning();
JobManager jobManager = OneDev.getInstance(JobManager.class);
File hostCacheHome = getCacheHome();
logger.log("Allocating job caches...") ;
Map<CacheInstance, Date> cacheInstances = KubernetesHelper.getCacheInstances(hostCacheHome);
Map<CacheInstance, String> cacheAllocations = OneDev.getInstance(JobManager.class)
.allocateJobCaches(jobToken, new Date(), cacheInstances);
Map<CacheInstance, String> cacheAllocations = jobManager.allocateJobCaches(jobToken, new Date(), cacheInstances);
KubernetesHelper.preprocess(hostCacheHome, cacheAllocations, new Consumer<File>() {
@Override
@ -225,6 +226,7 @@ public class ServerDockerExecutor extends JobExecutor implements Testable<TestDa
jobContext.retrieveSource(hostWorkspace);
}
logger.log("Retrieving job dependencies...");
try {
FileUtils.copyDirectory(jobContext.getServerWorkspace(), hostWorkspace);
} catch (IOException e) {
@ -286,6 +288,8 @@ public class ServerDockerExecutor extends JobExecutor implements Testable<TestDa
}).checkReturnCode();
} finally {
logger.log("Sending job outcomes...");
int baseLen = hostWorkspace.getAbsolutePath().length()+1;
for (File file: jobContext.getCollectFiles().listFiles(hostWorkspace)) {
try {
@ -295,6 +299,9 @@ public class ServerDockerExecutor extends JobExecutor implements Testable<TestDa
}
}
}
logger.log("Reporting job caches...");
jobManager.reportJobCaches(jobToken, KubernetesHelper.getCacheInstances(hostCacheHome).keySet());
return null;
}