Update node cache labels when job is started for accuracy when job is

running
This commit is contained in:
Robin Shen 2019-07-10 18:17:48 +08:00
parent 863607360b
commit 3d13e64d16
3 changed files with 150 additions and 127 deletions

View File

@ -388,7 +388,7 @@
<dependency>
<groupId>io.onedev</groupId>
<artifactId>k8s-helper</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>
</dependency>
</dependencies>
<properties>

View File

@ -634,7 +634,7 @@ public class DefaultJobManager implements JobManager, Runnable, CodePullAuthoriz
public Map<CacheInstance, String> allocateJobCaches(String jobToken, Date currentTime,
Map<CacheInstance, Date> cacheInstances) {
synchronized (jobContexts) {
JobContext context = getJobContext(jobToken, true);
JobContext jobContext = getJobContext(jobToken, true);
List<CacheInstance> sortedInstances = new ArrayList<>(cacheInstances.keySet());
sortedInstances.sort(new Comparator<CacheInstance>() {
@ -650,7 +650,7 @@ public class DefaultJobManager implements JobManager, Runnable, CodePullAuthoriz
for (JobContext each: jobContexts.values())
allAllocated.addAll(each.getAllocatedCaches());
Map<CacheInstance, String> allocations = new HashMap<>();
for (CacheSpec cacheSpec: context.getCacheSpecs()) {
for (CacheSpec cacheSpec: jobContext.getCacheSpecs()) {
Optional<CacheInstance> result = sortedInstances
.stream()
.filter(it->it.getCacheKey().equals(cacheSpec.getKey()))
@ -662,7 +662,7 @@ public class DefaultJobManager implements JobManager, Runnable, CodePullAuthoriz
else
allocation = new CacheInstance(UUID.randomUUID().toString(), cacheSpec.getKey());
allocations.put(allocation, cacheSpec.getPath());
context.getAllocatedCaches().add(allocation.getName());
jobContext.getAllocatedCaches().add(allocation.getName());
allAllocated.add(allocation.getName());
}
@ -671,9 +671,10 @@ public class DefaultJobManager implements JobManager, Runnable, CodePullAuthoriz
@Override
public void accept(CacheInstance instance) {
long ellapsed = currentTime.getTime() - cacheInstances.get(instance).getTime();
if (ellapsed > context.getCacheTTL() * 24L * 3600L * 1000L) {
if (ellapsed > jobContext.getCacheTTL() * 24L * 3600L * 1000L) {
allocations.put(instance, null);
context.getAllocatedCaches().add(instance.getName());
jobContext.getAllocatedCaches().add(instance.getName());
allAllocated.add(instance.getName());
}
}
@ -683,30 +684,37 @@ public class DefaultJobManager implements JobManager, Runnable, CodePullAuthoriz
.stream()
.filter(it->!allAllocated.contains(it.getName()))
.forEach(deletionMarker);
updateCacheCounts(jobContext, cacheInstances.keySet(), allAllocated);
return allocations;
}
}
private void updateCacheCounts(JobContext jobContext, Collection<CacheInstance> cacheInstances,
Collection<String> allAllocated) {
for (CacheInstance cacheInstance: cacheInstances) {
if (!allAllocated.contains(cacheInstance.getName())) {
String cacheKey = cacheInstance.getCacheKey();
Integer cacheCount = jobContext.getCacheCounts().get(cacheKey);
if (cacheCount == null)
cacheCount = 0;
cacheCount++;
jobContext.getCacheCounts().put(cacheKey, cacheCount);
}
}
}
@Override
public void reportJobCaches(String jobToken, Collection<CacheInstance> cacheInstances) {
synchronized (jobContexts) {
JobContext context = getJobContext(jobToken, true);
Collection<String> allOtherAllocated = new HashSet<>();
JobContext jobContext = getJobContext(jobToken, true);
Collection<String> allAllocated = new HashSet<>();
for (JobContext each: jobContexts.values()) {
if (each != context)
allOtherAllocated.addAll(each.getAllocatedCaches());
}
for (CacheInstance cacheInstance: cacheInstances) {
if (!allOtherAllocated.contains(cacheInstance.getName())) {
String cacheKey = cacheInstance.getCacheKey();
Integer cacheCount = context.getCacheCounts().get(cacheKey);
if (cacheCount == null)
cacheCount = 0;
cacheCount++;
context.getCacheCounts().put(cacheKey, cacheCount);
}
if (each != jobContext)
allAllocated.addAll(each.getAllocatedCaches());
}
updateCacheCounts(jobContext, cacheInstances, allAllocated);
}
}

View File

@ -148,7 +148,7 @@ public class KubernetesExecutor extends JobExecutor implements Testable<TestData
this.serviceAccount = serviceAccount;
}
@Editable(order=24000, group="More Settings", description="Specify cpu requirement of jobs using this executor. "
@Editable(order=24000, name="CPU Request", group="More Settings", description="Specify cpu requirement of jobs using this executor. "
+ "Refer to <a href='https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu'>"
+ "kubernetes documentation</a> for details")
@NotEmpty
@ -353,7 +353,7 @@ public class KubernetesExecutor extends JobExecutor implements Testable<TestData
}
private String getOSName(JobLogger logger) {
logger.log("Checking working node OS...");
logger.log("Checking working node operating system...");
Commandline kubectl = newKubeCtl();
kubectl.addArgs("get", "nodes", "-o", "jsonpath={range .items[*]}{.status.nodeInfo.operatingSystem}{'\\n'}{end}");
for (NodeSelectorEntry entry: getNodeSelector())
@ -378,7 +378,7 @@ public class KubernetesExecutor extends JobExecutor implements Testable<TestData
String osName = osNameRef.get();
if (osName != null) {
logger.log(String.format("OS of working node is '%s'", osName));
logger.log(String.format("Working node is running on %s", osName));
return osName;
} else {
throw new OneException("No applicable working nodes found");
@ -548,9 +548,34 @@ public class KubernetesExecutor extends JobExecutor implements Testable<TestData
if (jobContext != null)
jobContext.notifyJobRunning();
AtomicReference<String> nodeNameRef = new AtomicReference<>(null);
Commandline kubectl = newKubeCtl();
kubectl.addArgs("get", "pod", podName, "-n", getNamespace(), "-o", "jsonpath={.spec.nodeName}");
kubectl.execute(new LineConsumer() {
@Override
public void consume(String line) {
nodeNameRef.set(line);
}
}, new LineConsumer() {
@Override
public void consume(String line) {
logger.log("Kubernetes: " + line);
}
}).checkReturnCode();
String nodeName = Preconditions.checkNotNull(nodeNameRef.get());
logger.log("Running job on node " + nodeName + "...");
KubernetesExecutor.logger.debug("Collecting init container log (pod: {})...", podName);
collectContainerLog(podName, "init", logger);
if (jobContext != null)
updateCacheLabels(nodeName, jobContext, logger);
KubernetesExecutor.logger.debug("Waiting for main container to start (pod: {})...", podName);
watchPod(podName, new StatusChecker() {
@ -611,108 +636,8 @@ public class KubernetesExecutor extends JobExecutor implements Testable<TestData
}, logger);
if (jobContext != null) {
logger.log("Updating job cache labels...");
AtomicReference<String> nodeNameRef = new AtomicReference<>(null);
Commandline kubectl = newKubeCtl();
kubectl.addArgs("get", "pod", podName, "-n", getNamespace(), "-o", "jsonpath={.spec.nodeName}");
kubectl.execute(new LineConsumer() {
@Override
public void consume(String line) {
nodeNameRef.set(line);
}
}, new LineConsumer() {
@Override
public void consume(String line) {
logger.log("Kubernetes: " + line);
}
}).checkReturnCode();
String nodeName = Preconditions.checkNotNull(nodeNameRef.get());
kubectl.clearArgs();
StringBuilder nodeJson = new StringBuilder();
kubectl.addArgs("get", "node", nodeName, "-o", "json");
kubectl.execute(new LineConsumer() {
@Override
public void consume(String line) {
if (line.startsWith("{"))
nodeJson.append("{").append("\n");
else if (line.startsWith("}"))
nodeJson.append("}");
else
nodeJson.append(line).append("\n");
}
}, new LineConsumer() {
@Override
public void consume(String line) {
logger.log("Kubernetes: " + line);
}
}).checkReturnCode();
JsonNode nodeNode;
KubernetesExecutor.logger.trace("Node json:\n" + nodeJson.toString());
try {
nodeNode = new ObjectMapper().readTree(nodeJson.toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
List<String> labelUpdates = new ArrayList<>();
Iterator<Map.Entry<String, JsonNode>> it = nodeNode.get("metadata").get("labels").fields();
while (it.hasNext()) {
Map.Entry<String, JsonNode> entry = it.next();
if (entry.getKey().startsWith(CACHE_LABEL_PREFIX)) {
String cacheKey = entry.getKey().substring(CACHE_LABEL_PREFIX.length());
int labelValue = entry.getValue().asInt();
Integer count = jobContext.getCacheCounts().remove(cacheKey);
if (count == null)
labelUpdates.add(entry.getKey() + "-");
else if (count != labelValue)
labelUpdates.add(entry.getKey() + "=" + count);
}
}
for (Map.Entry<String, Integer> entry: jobContext.getCacheCounts().entrySet())
labelUpdates.add(CACHE_LABEL_PREFIX + entry.getKey() + "=" + entry.getValue());
for (List<String> partition: Lists.partition(labelUpdates, LABEL_UPDATE_BATCH)) {
kubectl.clearArgs();
kubectl.addArgs("label", "node", nodeName, "--overwrite");
for (String labelUpdate: partition)
kubectl.addArgs(labelUpdate);
AtomicBoolean labelNotFound = new AtomicBoolean(false);
ExecuteResult result = kubectl.execute(new LineConsumer() {
@Override
public void consume(String line) {
KubernetesExecutor.logger.debug(line);
}
}, new LineConsumer() {
@Override
public void consume(String line) {
if (line.startsWith("label") && line.endsWith("not found."))
labelNotFound.set(true);
logger.log("Kubernetes: " + line);
}
});
if (!labelNotFound.get())
result.checkReturnCode();
}
}
if (jobContext != null)
updateCacheLabels(nodeName, jobContext, logger);
} finally {
deleteResource("pod", podName, logger);
}
@ -784,6 +709,91 @@ public class KubernetesExecutor extends JobExecutor implements Testable<TestData
return false;
}
private void updateCacheLabels(String nodeName, JobContext jobContext, JobLogger logger) {
logger.log("Updating cache labels on node...");
Commandline kubectl = newKubeCtl();
kubectl.clearArgs();
StringBuilder nodeJson = new StringBuilder();
kubectl.addArgs("get", "node", nodeName, "-o", "json");
kubectl.execute(new LineConsumer() {
@Override
public void consume(String line) {
if (line.startsWith("{"))
nodeJson.append("{").append("\n");
else if (line.startsWith("}"))
nodeJson.append("}");
else
nodeJson.append(line).append("\n");
}
}, new LineConsumer() {
@Override
public void consume(String line) {
logger.log("Kubernetes: " + line);
}
}).checkReturnCode();
JsonNode nodeNode;
KubernetesExecutor.logger.trace("Node json:\n" + nodeJson.toString());
try {
nodeNode = new ObjectMapper().readTree(nodeJson.toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
List<String> labelUpdates = new ArrayList<>();
Iterator<Map.Entry<String, JsonNode>> it = nodeNode.get("metadata").get("labels").fields();
while (it.hasNext()) {
Map.Entry<String, JsonNode> entry = it.next();
if (entry.getKey().startsWith(CACHE_LABEL_PREFIX)) {
String cacheKey = entry.getKey().substring(CACHE_LABEL_PREFIX.length());
int labelValue = entry.getValue().asInt();
Integer count = jobContext.getCacheCounts().remove(cacheKey);
if (count == null)
labelUpdates.add(entry.getKey() + "-");
else if (count != labelValue)
labelUpdates.add(entry.getKey() + "=" + count);
}
}
for (Map.Entry<String, Integer> entry: jobContext.getCacheCounts().entrySet())
labelUpdates.add(CACHE_LABEL_PREFIX + entry.getKey() + "=" + entry.getValue());
jobContext.getCacheCounts().clear();
for (List<String> partition: Lists.partition(labelUpdates, LABEL_UPDATE_BATCH)) {
kubectl.clearArgs();
kubectl.addArgs("label", "node", nodeName, "--overwrite");
for (String labelUpdate: partition)
kubectl.addArgs(labelUpdate);
AtomicBoolean labelNotFound = new AtomicBoolean(false);
ExecuteResult result = kubectl.execute(new LineConsumer() {
@Override
public void consume(String line) {
KubernetesExecutor.logger.debug(line);
}
}, new LineConsumer() {
@Override
public void consume(String line) {
if (line.startsWith("label") && line.endsWith("not found."))
labelNotFound.set(true);
logger.log("Kubernetes: " + line);
}
});
if (!labelNotFound.get())
result.checkReturnCode();
}
}
private String createSecret(Map<String, String> secrets, JobLogger logger) {
Map<String, String> encodedSecrets = new LinkedHashMap<>();
for (Map.Entry<String, String> entry: secrets.entrySet())
@ -975,7 +985,12 @@ public class KubernetesExecutor extends JobExecutor implements Testable<TestData
@Override
public void consume(String line) {
logger.log(line);
if (line.contains("rpc error:") && line.contains("No such container:")
|| line.contains("Unable to retrieve container logs for")) {
KubernetesExecutor.logger.debug(line);
} else {
logger.log(line);
}
}
}, new LineConsumer() {