Send server step log progressively

This commit is contained in:
Robin Shen 2021-06-09 21:35:27 +08:00
parent 54aebb356c
commit 4a0bbd277e
6 changed files with 86 additions and 42 deletions

View File

@ -494,7 +494,7 @@ public class DefaultJobManager implements JobManager, Runnable, CodePullAuthoriz
}
@Override
public Map<String, byte[]> runServerStep(List<Integer> stepPosition, File filesDir,
public Map<String, byte[]> doRunServerStep(List<Integer> stepPosition, File filesDir,
Map<String, String> placeholderValues, SimpleLogger logger) {
return sessionManager.call(new Callable<Map<String, byte[]>>() {
@ -644,6 +644,7 @@ public class DefaultJobManager implements JobManager, Runnable, CodePullAuthoriz
}
} finally {
jobContexts.remove(jobToken);
jobContext.onJobFinished();
}
}
} catch (Throwable e) {

View File

@ -1,6 +1,7 @@
package io.onedev.server.buildspec.job;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@ -43,6 +44,8 @@ public abstract class JobContext {
private final Map<String, Integer> cacheCounts = new ConcurrentHashMap<>();
protected final Collection<Thread> serverStepThreads = new ArrayList<>();
public JobContext(String projectName, Long buildNumber, File projectGitDir,
List<Action> actions, String cpuRequirement, String memoryRequirement,
ObjectId commitId, Collection<CacheSpec> caches, int cacheTTL,
@ -116,14 +119,36 @@ public abstract class JobContext {
public Map<String, Integer> getCacheCounts() {
return cacheCounts;
}
public abstract void notifyJobRunning();
public abstract void reportJobWorkspace(String jobWorkspace);
public abstract Map<String, byte[]> runServerStep(List<Integer> stepPosition,
public Map<String, byte[]> runServerStep(List<Integer> stepPosition,
File filesDir, Map<String, String> placeholderValues, SimpleLogger logger) {
Thread thread = Thread.currentThread();
synchronized (serverStepThreads) {
serverStepThreads.add(thread);
}
try {
return doRunServerStep(stepPosition, filesDir, placeholderValues, logger);
} finally {
synchronized (serverStepThreads) {
serverStepThreads.remove(thread);
}
}
}
protected abstract Map<String, byte[]> doRunServerStep(List<Integer> stepPosition,
File filesDir, Map<String, String> placeholderValues, SimpleLogger logger);
public abstract void copyDependencies(File targetDir);
public void onJobFinished() {
synchronized (serverStepThreads) {
for (Thread thread: serverStepThreads)
thread.interrupt();
}
}
}

View File

@ -9,7 +9,7 @@ import javax.annotation.Nullable;
public class JobExecution {
private final Future<?> future;
private final long timeout;
private volatile long beginTime;

View File

@ -6,6 +6,8 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.eclipse.jgit.lib.ObjectId;
import io.onedev.k8shelper.CacheInstance;
@ -31,6 +33,7 @@ public interface JobManager {
void reportJobCaches(String jobToken, Collection<CacheInstance> cacheInstances);
@Nullable
Map<String, byte[]> runServerStep(String jobToken, List<Integer> stepPosition,
File filesDir, Map<String, String> placeholderValues, SimpleLogger logger);

View File

@ -26,7 +26,11 @@ public class JerseyApplication extends ResourceConfig {
getConfiguration().getRuntimeType());
property(disableMoxy, true);
property(ServerProperties.BV_SEND_ERROR_IN_RESPONSE, true);
// Add this in order to send build log entries as soon as possible in
// KubernetesResource.runServerStep
property(ServerProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 0);
// add the default Jackson exception mappers
register(JacksonFeature.class);

View File

@ -32,7 +32,6 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.lang.SerializationUtils;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.onedev.commons.utils.ExplicitException;
@ -41,12 +40,11 @@ import io.onedev.commons.utils.StringUtils;
import io.onedev.commons.utils.TarUtils;
import io.onedev.k8shelper.CacheAllocationRequest;
import io.onedev.k8shelper.CacheInstance;
import io.onedev.k8shelper.ServerExecutionResult;
import io.onedev.k8shelper.KubernetesHelper;
import io.onedev.server.buildspec.job.Job;
import io.onedev.server.buildspec.job.JobContext;
import io.onedev.server.buildspec.job.JobManager;
import io.onedev.server.rest.annotation.Api;
import io.onedev.server.util.ExceptionUtils;
import io.onedev.server.util.SimpleLogger;
@Api(internal=true)
@ -105,41 +103,54 @@ public class KubernetesResource {
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
@POST
public Response runServerStep(InputStream is) {
File filesDir = FileUtils.createTempDir();
try {
int length = readInt(is);
List<Integer> stepPosition = new ArrayList<>();
for (int i=0; i<length; i++)
stepPosition.add(readInt(is));
Map<String, String> placeholderValues = new HashMap<>();
length = readInt(is);
for (int i=0; i<length; i++)
placeholderValues.put(readString(is), readString(is));
TarUtils.untar(is, filesDir);
List<String> logMessages = new ArrayList<>();
Map<String, byte[]> outputFiles = jobManager.runServerStep(getJobToken(), stepPosition,
filesDir, placeholderValues, new SimpleLogger() {
StreamingOutput os = new StreamingOutput() {
@Override
public void log(String message) {
logMessages.add(message);
}
});
ServerExecutionResult result = new ServerExecutionResult(logMessages, outputFiles);
return Response.ok(SerializationUtils.serialize(result)).build();
} catch (Exception e) {
String errorMessage = ExceptionUtils.getExpectedError(e);
if (errorMessage == null)
errorMessage = Throwables.getStackTraceAsString(e);
return Response.serverError().entity(errorMessage).build();
} finally {
FileUtils.deleteDir(filesDir);
}
@Override
public void write(OutputStream output) throws IOException {
File filesDir = FileUtils.createTempDir();
try {
int length = readInt(is);
List<Integer> stepPosition = new ArrayList<>();
for (int i=0; i<length; i++)
stepPosition.add(readInt(is));
Map<String, String> placeholderValues = new HashMap<>();
length = readInt(is);
for (int i=0; i<length; i++)
placeholderValues.put(readString(is), readString(is));
TarUtils.untar(is, filesDir);
Map<String, byte[]> outputFiles = jobManager.runServerStep(getJobToken(), stepPosition,
filesDir, placeholderValues, new SimpleLogger() {
@Override
public void log(String message) {
// While testing, ngrok.io buffers response and build can not get log entries
// timely. This won't happen on pagekite however
KubernetesHelper.writeInt(output, 1);
KubernetesHelper.writeString(output, message);
try {
output.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
if (outputFiles == null)
outputFiles = new HashMap<>();
byte[] bytes = SerializationUtils.serialize((Serializable) outputFiles);
KubernetesHelper.writeInt(output, 2);
KubernetesHelper.writeInt(output, bytes.length);
output.write(bytes);
} finally {
FileUtils.deleteDir(filesDir);
}
}
};
return Response.ok(os).build();
}
@Path("/download-dependencies")