fix: Websocket not closed when session times out (OD-2622)

This commit is contained in:
Robin Shen 2025-12-07 20:08:42 +08:00
parent 241c70d10d
commit 9c756c74ca
23 changed files with 278 additions and 610 deletions

View File

@ -197,7 +197,6 @@ import io.onedev.server.service.BuildMetricService;
import io.onedev.server.service.BuildParamService;
import io.onedev.server.service.BuildQueryPersonalizationService;
import io.onedev.server.service.BuildService;
import io.onedev.server.service.ChatService;
import io.onedev.server.service.CodeCommentMentionService;
import io.onedev.server.service.CodeCommentQueryPersonalizationService;
import io.onedev.server.service.CodeCommentReplyService;
@ -289,7 +288,6 @@ import io.onedev.server.service.impl.DefaultBuildMetricService;
import io.onedev.server.service.impl.DefaultBuildParamService;
import io.onedev.server.service.impl.DefaultBuildQueryPersonalizationService;
import io.onedev.server.service.impl.DefaultBuildService;
import io.onedev.server.service.impl.DefaultChatService;
import io.onedev.server.service.impl.DefaultCodeCommentMentionService;
import io.onedev.server.service.impl.DefaultCodeCommentQueryPersonalizationService;
import io.onedev.server.service.impl.DefaultCodeCommentReplyService;
@ -609,7 +607,6 @@ public class CoreModule extends AbstractPluginModule {
bind(PullRequestDescriptionRevisionService.class).to(DefaultPullRequestDescriptionRevisionService.class);
bind(SsoProviderService.class).to(DefaultSsoProviderService.class);
bind(SsoAccountService.class).to(DefaultSsoAccountService.class);
bind(ChatService.class).to(DefaultChatService.class);
bind(BaseAuthorizationService.class).to(DefaultBaseAuthorizationService.class);
bind(GroupEntitlementService.class).to(DefaultGroupEntitlementService.class);
bind(UserEntitlementService.class).to(DefaultUserEntitlementService.class);
@ -733,7 +730,6 @@ public class CoreModule extends AbstractPluginModule {
contributeFromPackage(ExceptionHandler.class, PageExpiredExceptionHandler.class);
contributeFromPackage(ExceptionHandler.class, WebApplicationExceptionHandler.class);
contribute(SessionListener.class, DefaultChatService.class);
contribute(SessionListener.class, DefaultWebSocketService.class);
bind(UrlService.class).to(DefaultUrlService.class);

View File

@ -123,7 +123,7 @@ public class DefaultAttachmentService implements AttachmentService, SchedulableT
@Inject
private BatchWorkExecutionService batchWorkExecutionService;
private String taskId;
private volatile String taskId;
public Object writeReplace() throws ObjectStreamException {
return new ManagedSerializedForm(AttachmentService.class);

View File

@ -196,7 +196,7 @@ public class DefaultDataService implements DataService, Serializable {
@Inject
private AlertService alertService;
private String backupTaskId;
private volatile String backupTaskId;
private Metadata getMetadata() {
return sessionFactoryService.getMetadata();

View File

@ -19,6 +19,8 @@ import org.eclipse.jetty.http.HttpCookie.SameSite;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.eclipse.jetty.server.session.DefaultSessionIdManager;
import org.eclipse.jetty.server.session.HouseKeeper;
import org.eclipse.jetty.server.session.SessionDataStoreFactory;
import org.eclipse.jetty.servlet.ErrorPageErrorHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
@ -44,6 +46,8 @@ public class DefaultJettyService implements JettyService, Provider<ServletContex
private static final int MAX_CONTENT_SIZE = 5000000;
private static final int SESSION_SCAVENGE_INTERVAL = 60;
@Inject
private SessionDataStoreFactory sessionDataStoreFactory;
@ -75,6 +79,17 @@ public class DefaultJettyService implements JettyService, Provider<ServletContex
public void start() {
server = new Server();
var sessionIdManager = new DefaultSessionIdManager(server);
var houseKeeper = new HouseKeeper();
try {
// Set the interval to clean up expired sessions
houseKeeper.setIntervalSec(SESSION_SCAVENGE_INTERVAL);
} catch (Exception e) {
throw ExceptionUtils.unchecked(e);
}
sessionIdManager.setSessionHouseKeeper(houseKeeper);
server.setSessionIdManager(sessionIdManager);
server.addBean(sessionDataStoreFactory);
servletContextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS);
@ -88,8 +103,10 @@ public class DefaultJettyService implements JettyService, Provider<ServletContex
servletContextHandler.getSessionHandler().setSessionIdPathParameterName(null);
servletContextHandler.getSessionHandler().setSameSite(SameSite.LAX);
servletContextHandler.getSessionHandler().setHttpOnly(true);
var sessionTimeout = 1800;
if (settingService.getSystemSetting() != null)
servletContextHandler.getSessionHandler().setMaxInactiveInterval(settingService.getSystemSetting().getSessionTimeout() * 60);
sessionTimeout = settingService.getSystemSetting().getSessionTimeout() * 60;
servletContextHandler.getSessionHandler().setMaxInactiveInterval(sessionTimeout);
/*
* By default contributions is in reverse dependency order. We reverse the order so that

View File

@ -1,438 +0,0 @@
package io.onedev.server.service.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.shiro.subject.Subject;
import org.apache.shiro.util.ThreadContext;
import org.hibernate.criterion.Order;
import org.hibernate.criterion.Restrictions;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hazelcast.cp.IAtomicLong;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.SystemMessage;
import dev.langchain4j.data.message.ToolExecutionResultMessage;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.model.chat.request.ChatRequest;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.model.chat.response.PartialResponse;
import dev.langchain4j.model.chat.response.PartialResponseContext;
import dev.langchain4j.model.chat.response.PartialThinking;
import dev.langchain4j.model.chat.response.PartialThinkingContext;
import dev.langchain4j.model.chat.response.PartialToolCall;
import dev.langchain4j.model.chat.response.PartialToolCallContext;
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
import io.onedev.commons.utils.ExplicitException;
import io.onedev.commons.utils.StringUtils;
import io.onedev.server.cluster.ClusterService;
import io.onedev.server.event.Listen;
import io.onedev.server.event.system.SystemStarting;
import io.onedev.server.exception.ExceptionUtils;
import io.onedev.server.model.Chat;
import io.onedev.server.model.ChatMessage;
import io.onedev.server.model.User;
import io.onedev.server.persistence.SessionService;
import io.onedev.server.persistence.TransactionService;
import io.onedev.server.persistence.annotation.Sessional;
import io.onedev.server.persistence.annotation.Transactional;
import io.onedev.server.persistence.dao.EntityCriteria;
import io.onedev.server.security.SecurityUtils;
import io.onedev.server.service.ChatService;
import io.onedev.server.service.support.ChatResponding;
import io.onedev.server.service.support.ChatTool;
import io.onedev.server.web.SessionListener;
import io.onedev.server.web.WebSession;
import io.onedev.server.web.websocket.WebSocketService;
@Singleton
public class DefaultChatService extends BaseEntityService<Chat> implements ChatService, SessionListener {
private static final Logger logger = LoggerFactory.getLogger(DefaultChatService.class);
private static final int MAX_HISTORY_MESSAGES = 25;
private static final int MAX_HISTORY_MESSAGE_LEN = 1024;
private static final int PARTIAL_RESPONSE_NOTIFICATION_INTERVAL = 1000;
@Inject
private ExecutorService executorService;
@Inject
private TransactionService transactionService;
@Inject
private WebSocketService webSocketService;
@Inject
private ObjectMapper objectMapper;
@Inject
private SessionService sessionService;
@Inject
private ClusterService clusterService;
/*
* Use cluster wide id for anonymous chats and messages as we use id to route
* websocket observable changes to correct connections
*/
private volatile IAtomicLong nextAnonymousChatId;
private volatile IAtomicLong nextAnonymousChatMessageId;
private final Map<String, Map<Long, ChatRespondingImpl>> respondings = new ConcurrentHashMap<>();
@Override
public List<Chat> query(User user, User ai, String term, int count) {
EntityCriteria<Chat> criteria = EntityCriteria.of(Chat.class);
criteria.add(Restrictions.eq(Chat.PROP_USER, user));
criteria.add(Restrictions.eq(Chat.PROP_AI, ai));
criteria.add(Restrictions.ilike(Chat.PROP_TITLE, "%" + term + "%"));
criteria.addOrder(Order.desc(Chat.PROP_ID));
return query(criteria);
}
@Override
public void createOrUpdate(Chat chat) {
dao.persist(chat);
}
@Sessional
@Override
public ChatResponding getResponding(WebSession session, Chat chat) {
return getResponding(session.getId(), chat.getId());
}
@Transactional
@Override
public void sendRequest(WebSession session, ChatMessage request, List<ChatTool> tools, int timeoutSeconds) {
var sessionId = session.getId();
var requestId = request.getId();
var chatId = request.getChat().getId();
var anonymous = request.getChat().getUser() == null;
var modelSetting = request.getChat().getAi().getAiSetting().getModelSetting();
var streamingChatModel = modelSetting.getStreamingChatModel();
var chatModel = modelSetting.getChatModel();
var messages = request.getChat().getSortedMessages()
.stream()
.filter(it->!it.isError())
.collect(Collectors.toList());
if (messages.size() > MAX_HISTORY_MESSAGES)
messages = messages.subList(messages.size()-MAX_HISTORY_MESSAGES, messages.size());
var langchain4jMessages = new ArrayList<dev.langchain4j.data.message.ChatMessage>();
langchain4jMessages.addAll(messages.stream()
.map(it -> {
var content = it.getContent();
if (!it.equals(request))
content = StringUtils.abbreviate(content, MAX_HISTORY_MESSAGE_LEN);
if (it.isRequest())
return (dev.langchain4j.data.message.ChatMessage) new UserMessage(content);
else
return (dev.langchain4j.data.message.ChatMessage) new AiMessage(content);
})
.collect(Collectors.toList()));
var toolSpecifications = tools.stream()
.map(ChatTool::getSpecification)
.collect(Collectors.toList());
var completableFuture = new CompletableFuture<String>();
var executableFuture = executorService.submit(new Runnable() {
private StreamingChatResponseHandler newResponseHandler(Subject subject) {
return new StreamingChatResponseHandler() {
private long lastPartialResponseNotificationTime = System.currentTimeMillis();
@Override
public void onPartialResponse(PartialResponse partialResponse, PartialResponseContext context) {
ThreadContext.bind(subject);
if (completableFuture.isDone()) {
context.streamingHandle().cancel();
} else {
var responding = getResponding(sessionId, chatId, requestId);
if (responding != null) {
var content = responding.getContent();
if (content == null)
content = "";
content += partialResponse.text();
responding.content = content;
if (System.currentTimeMillis() - lastPartialResponseNotificationTime > PARTIAL_RESPONSE_NOTIFICATION_INTERVAL) {
lastPartialResponseNotificationTime = System.currentTimeMillis();
webSocketService.notifyObservableChange(Chat.getPartialResponseObservable(chatId), null);
}
}
}
}
@Override
public void onPartialThinking(PartialThinking partialThinking, PartialThinkingContext context) {
ThreadContext.bind(subject);
if (completableFuture.isDone())
context.streamingHandle().cancel();
}
@Override
public void onPartialToolCall(PartialToolCall partialToolCall, PartialToolCallContext context) {
ThreadContext.bind(subject);
if (completableFuture.isDone()) {
context.streamingHandle().cancel();
}
}
@Override
public void onCompleteResponse(ChatResponse completeResponse) {
ThreadContext.bind(subject);
sessionService.runAsync(() -> {
try {
var aiMessage = completeResponse.aiMessage();
if (aiMessage.hasToolExecutionRequests()) {
langchain4jMessages.add(aiMessage);
var toolRequests = aiMessage.toolExecutionRequests();
for (int i = 0; i < toolRequests.size(); i++) {
var toolRequest = toolRequests.get(i);
String toolName = toolRequest.name();
String toolArgs = toolRequest.arguments();
var toolResult = tools.stream()
.filter(it->it.getSpecification().name().equals(toolName))
.findFirst()
.orElseThrow(() -> new ExplicitException("Unknown tool: " + toolName))
.execute(objectMapper.readTree(toolArgs));
ToolExecutionResultMessage toolResultMessage = ToolExecutionResultMessage.from(
toolRequest.id(), toolName, toolResult);
langchain4jMessages.add(toolResultMessage);
}
var handler = newResponseHandler(SecurityUtils.getSubject());
var langchain4jChatRequest = ChatRequest.builder()
.messages(new ArrayList<>(langchain4jMessages))
.toolSpecifications(toolSpecifications)
.build();
streamingChatModel.chat(langchain4jChatRequest, handler);
} else {
completableFuture.complete(aiMessage.text());
}
} catch (Throwable t) {
completableFuture.completeExceptionally(t);
}
});
}
@Override
public void onError(Throwable error) {
ThreadContext.bind(subject);
completableFuture.completeExceptionally(error);
}
};
}
private void createResponse(String content, @Nullable Throwable throwable) {
var responding = getResponding(session.getId(), chatId, requestId);
if (responding != null) {
transactionService.run(() -> {
if (throwable != null)
logger.error("Error getting chat response", throwable);
var response = new ChatMessage();
response.setError(throwable != null);
response.setContent(content);
if (anonymous) {
var chat = SerializationUtils.clone(checkNotNull(session.getAnonymousChats().get(chatId)));
response.setChat(chat);
response.setId(nextAnonymousChatMessageId());
chat.getMessages().add(response);
session.getAnonymousChats().put(chatId, chat);
} else {
response.setChat(load(chatId));
dao.persist(response);
}
webSocketService.notifyObservableChange(Chat.getNewMessagesObservable(chatId), null);
});
};
}
private void createIncompleteResponse(String reason) {
var responding = getResponding(sessionId, chatId, requestId);
if (responding != null) {
var content = responding.getContent();
if (content == null)
content = "";
else
content += "\n\n";
content += "<b class='text text-danger'>" + reason + "</b>";
createResponse(content, null);
}
}
@Override
public void run() {
var handler = newResponseHandler(SecurityUtils.getSubject());
try {
var langchain4jChatRequest = ChatRequest.builder()
.messages(langchain4jMessages.stream()
.map(msg -> (dev.langchain4j.data.message.ChatMessage) msg)
.collect(Collectors.toList()))
.toolSpecifications(toolSpecifications)
.build();
streamingChatModel.chat(langchain4jChatRequest, handler);
transactionService.run(() -> {
var chat = anonymous ? checkNotNull(session.getAnonymousChats().get(chatId)) : load(chatId);
var requests = chat.getMessages().stream().filter(it->it.isRequest()).collect(Collectors.toList());
if (requests.size() == 1) {
var systemPrompt = String.format("""
Summarize provided message to get a compact title with below requirements:
1. Just summarize the message itself, no need to ask user for clarification or confirmation
2. It should be within %d characters
3. Only title is returned, no other text or comments
4. Display in %s
""", Chat.MAX_TITLE_LEN, session.getLocale().getDisplayLanguage());
var title = chatModel.chat(new SystemMessage(systemPrompt), new UserMessage(requests.get(0).getContent())).aiMessage().text();
if (anonymous) {
chat = SerializationUtils.clone(chat);
chat.setTitle(title);
session.getAnonymousChats().put(chatId, chat);
} else {
chat.setTitle(title);
dao.persist(chat);
}
webSocketService.notifyObservableChange(Chat.getChangeObservable(chatId), null);
}
});
var response = completableFuture.get(timeoutSeconds, TimeUnit.SECONDS);
if (StringUtils.isBlank(response))
throw new ExplicitException("Received empty response");
createResponse(response, null);
} catch (Throwable e) {
if (ExceptionUtils.find(e, InterruptedException.class) != null) {
createIncompleteResponse("Conversation cancelled");
} else if (ExceptionUtils.find(e, TimeoutException.class) != null) {
createIncompleteResponse("Conversation timed out");
} else {
logger.error("Error getting chat response", e);
String errorMessage = e.getMessage();
if (errorMessage == null)
errorMessage = "Error getting chat response, check server log for details";
createResponse(errorMessage, e);
}
} finally {
completableFuture.cancel(false);
var respondingsOfSession = respondings.get(sessionId);
if (respondingsOfSession != null) {
var responding = respondingsOfSession.get(chatId);
if (responding != null) {
respondingsOfSession.computeIfPresent(chatId, (k, v)-> {
if (v.requestId.equals(requestId))
return null;
else
return v;
});
webSocketService.notifyObservableChange(Chat.getPartialResponseObservable(chatId), null);
}
}
}
}
});
var previousResponding = respondings.computeIfAbsent(sessionId, it->new ConcurrentHashMap<>()).put(chatId, new ChatRespondingImpl(requestId, executableFuture));
if (previousResponding != null)
previousResponding.cancel();
}
private ChatRespondingImpl getResponding(String sessionId, Long chatId) {
var respondingsOfSession = respondings.get(sessionId);
if (respondingsOfSession != null) {
return respondingsOfSession.get(chatId);
}
return null;
}
private ChatRespondingImpl getResponding(String sessionId, Long chatId, Long requestId) {
var responding = getResponding(sessionId, chatId);
if (responding != null && responding.requestId.equals(requestId))
return responding;
else
return null;
}
@Listen
public void on(SystemStarting event) {
nextAnonymousChatId = clusterService.getHazelcastInstance().getCPSubsystem().getAtomicLong("nextAnonymousChatId");
nextAnonymousChatMessageId = clusterService.getHazelcastInstance().getCPSubsystem().getAtomicLong("nextAnonymousChatMessageId");
if (clusterService.isLeaderServer()) {
nextAnonymousChatId.set(1L);
nextAnonymousChatMessageId.set(1L);
}
}
@Override
public long nextAnonymousChatId() {
return nextAnonymousChatId.getAndIncrement();
}
@Override
public long nextAnonymousChatMessageId() {
return nextAnonymousChatMessageId.getAndIncrement();
}
@Override
public void sessionCreated(String sessionId) {
}
@Override
public void sessionDestroyed(String sessionId) {
var respondingsOfSession = respondings.remove(sessionId);
if (respondingsOfSession != null) {
for (var responding : respondingsOfSession.values())
responding.cancel();
}
}
private static class ChatRespondingImpl implements ChatResponding {
private final Long requestId;
private final Future<?> future;
private volatile String content;
ChatRespondingImpl(Long requestId, Future<?> future) {
this.requestId = requestId;
this.future = future;
}
@Override
public String getContent() {
return content;
}
@Override
public void cancel() {
future.cancel(true);
}
}
}

View File

@ -166,7 +166,7 @@ public class DefaultIssueChangeService extends BaseEntityService<IssueChange>
@Inject
private TransactionService transactionService;
private String taskId;
private volatile String taskId;
@Transactional
@Override

View File

@ -2,6 +2,7 @@ package io.onedev.server.web.behavior;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.apache.wicket.Component;
import org.apache.wicket.behavior.Behavior;
@ -44,9 +45,9 @@ public abstract class ChangeObserver extends Behavior {
}
}
public static Collection<String> filterObservables(Collection<String> observingObservables,
public static Set<String> filterObservables(Collection<String> observingObservables,
Collection<String> changedObservables) {
Collection<String> observingChangedObservables = new HashSet<>();
Set<String> observingChangedObservables = new HashSet<>();
for (var observingObservable: observingObservables) {
for (var changedObservable: changedObservables) {
if (containsObservable(observingObservable, changedObservable))

View File

@ -335,7 +335,7 @@ public class ChatPanel extends Panel {
else
return Collections.emptySet();
}
});
respondingContainer.setOutputMarkupPlaceholderTag(true);

View File

@ -3,7 +3,7 @@
top: 0;
bottom: 0;
right: 0;
z-index: 2000;
z-index: 1040;
background: white;
box-shadow: 0 0 12px rgba(0,0,0,0.2);
}

View File

@ -279,7 +279,7 @@ public abstract class TaskButton extends AjaxButton {
@Inject
private TaskScheduler taskScheduler;
private String taskId;
private volatile String taskId;
@Listen
public void on(SystemStarted event) {

View File

@ -35,6 +35,7 @@ import io.onedev.server.web.editable.BeanContext;
import io.onedev.server.web.page.admin.AdministrationPage;
import io.onedev.server.web.page.user.UserCssResourceReference;
import io.onedev.server.web.page.user.basicsetting.UserBasicSettingPage;
import io.onedev.server.web.util.WicketUtils;
import io.onedev.server.web.util.editbean.NewUserBean;
public class NewUserPage extends AdministrationPage {
@ -51,7 +52,10 @@ public class NewUserPage extends AdministrationPage {
protected void onInitialize() {
super.onInitialize();
var editor = BeanContext.edit("editor", bean, Sets.newHashSet(User.PROP_NOTIFY_OWN_EVENTS), true);
var excludeProperties = Sets.newHashSet(User.PROP_NOTIFY_OWN_EVENTS);
if (!WicketUtils.isSubscriptionActive())
excludeProperties.add(User.PROP_TYPE);
var editor = BeanContext.edit("editor", bean, excludeProperties, true);
Form<?> form = new Form<Void>("form") {

View File

@ -17,7 +17,10 @@
<div wicket:id="sessionFeedback" id="session-feedback"></div>
<div id="ajax-loading-indicator"><div><wicket:t>Please wait...</wicket:t></div></div>
<div class="websocket-error">
<div class="connection-error">
<wicket:t>Connection lost or session expired, reload to recover</wicket:t>
</div>
<div class="page-error">
<wicket:t>Page is in error, reload to recover</wicket:t>
</div>
<div wicket:id="rootComponents"></div>

View File

@ -10,7 +10,6 @@ import java.time.ZoneId;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import javax.inject.Inject;
@ -28,6 +27,7 @@ import org.apache.wicket.ajax.AjaxRequestTarget;
import org.apache.wicket.ajax.attributes.AjaxRequestAttributes;
import org.apache.wicket.ajax.attributes.AjaxRequestAttributes.Method;
import org.apache.wicket.core.request.handler.IPartialPageRequestHandler;
import org.apache.wicket.event.IEvent;
import org.apache.wicket.markup.ComponentTag;
import org.apache.wicket.markup.head.IHeaderResponse;
import org.apache.wicket.markup.head.JavaScriptHeaderItem;
@ -40,8 +40,7 @@ import org.apache.wicket.markup.html.panel.FeedbackPanel;
import org.apache.wicket.markup.repeater.RepeatingView;
import org.apache.wicket.model.LoadableDetachableModel;
import org.apache.wicket.protocol.ws.api.WebSocketBehavior;
import org.apache.wicket.protocol.ws.api.WebSocketRequestHandler;
import org.apache.wicket.protocol.ws.api.message.TextMessage;
import org.apache.wicket.protocol.ws.api.event.WebSocketPushPayload;
import org.apache.wicket.request.IRequestParameters;
import org.apache.wicket.request.cycle.RequestCycle;
import org.apache.wicket.request.http.WebRequest;
@ -55,11 +54,9 @@ import org.unbescape.javascript.JavaScriptEscape;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Sets;
import io.onedev.commons.bootstrap.Bootstrap;
import io.onedev.commons.loader.AppLoader;
import io.onedev.server.OneDev;
import io.onedev.server.commandhandler.Upgrade;
import io.onedev.server.event.ListenerRegistry;
@ -80,7 +77,7 @@ import io.onedev.server.web.page.security.LoginPage;
import io.onedev.server.web.page.serverinit.ServerInitPage;
import io.onedev.server.web.page.simple.SimplePage;
import io.onedev.server.web.util.WicketUtils;
import io.onedev.server.web.websocket.WebSocketMessages;
import io.onedev.server.web.websocket.ObservablesChanged;
import io.onedev.server.web.websocket.WebSocketService;
public abstract class BasePage extends WebPage {
@ -105,6 +102,12 @@ public abstract class BasePage extends WebPage {
@Inject
protected ListenerRegistry listenerRegistry;
@Inject
private WebSocketService webSocketService;
@Inject
private ObjectMapper objectMapper;
public BasePage(PageParameters params) {
super(params);
@ -148,6 +151,15 @@ public abstract class BasePage extends WebPage {
setResponsePage(getPageClass(), getPageParameters());
}
@Override
public void onEvent(IEvent<?> event) {
super.onEvent(event);
if (event.getPayload() instanceof WebSocketPushPayload payload
&& payload.getMessage() instanceof ObservablesChanged observablesChanged) {
notifyObservablesChange(payload.getHandler(), observablesChanged.getObservables());
}
}
@Override
protected void onInitialize() {
super.onInitialize();
@ -205,8 +217,8 @@ public abstract class BasePage extends WebPage {
String.valueOf(OneDev.getInstance().getBootDate().getTime()),
SpriteImage.getVersionedHref(IconScope.class, null),
popStateBehavior.getCallbackFunction(explicit("data")).toString(),
OneDev.getInstance(ObjectMapper.class).writeValueAsString(getRemoveAutosaveKeys()),
OneDev.getInstance(ObjectMapper.class).writeValueAsString(translations))));
objectMapper.writeValueAsString(getRemoveAutosaveKeys()),
objectMapper.writeValueAsString(translations))));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
@ -295,20 +307,7 @@ public abstract class BasePage extends WebPage {
add(rootComponents = new RepeatingView("rootComponents"));
add(new WebSocketBehavior() {
@Override
protected void onMessage(WebSocketRequestHandler handler, TextMessage message) {
super.onMessage(handler, message);
if (message.getText().startsWith(WebSocketMessages.OBSERVABLE_CHANGED)) {
List<String> observables = Splitter.on('\n').splitToList(
message.getText().substring(WebSocketMessages.OBSERVABLE_CHANGED.length()+1));
notifyObservablesChange(handler, observables);
}
}
});
add(new WebSocketBehavior() {});
add(zoneIdDetectBehavior = new AbstractDefaultAjaxBehavior() {
@ -389,7 +388,7 @@ public abstract class BasePage extends WebPage {
@Override
protected void onAfterRender() {
AppLoader.getInstance(WebSocketService.class).observe(this);
webSocketService.observe(this);
super.onAfterRender();
}

View File

@ -216,18 +216,22 @@ a.tree-junction {
margin-left: 0.4rem;
}
/* WEB SOCKET ERROR */
.websocket-error {
.connection-error, .page-error {
z-index: 1050;
padding: 0.75rem 1.25rem;
display: none;
position: fixed;
max-width: 75%;
font-weight: bold;
top: 0;
background: var(--danger);
background: var(--light-danger);
border-radius: 0 0 0.42rem 0.42rem;
color: white;
color: var(--danger);
}
.dark-mode .connection-error, .dark-mode .page-error {
background: var(--dark-mode-light-danger);
color: var(--danger);
}
textarea {
resize: none;

View File

@ -339,29 +339,29 @@ onedev.server = {
}
},
setupWebsocketCallback: function() {
var messagesToSent = [];
function sendMessages() {
if (onedev.server.ajaxRequests.count == 0) {
for (var i in messagesToSent)
Wicket.WebSocket.send(messagesToSent[i]);
messagesToSent = [];
} else {
setTimeout(sendMessages, 100);
}
setupWebSocketHandler: function() {
function showError($error) {
$error.css("left", ($(window).width()-$error.outerWidth()) / 2);
$error.slideDown("slow");
}
Wicket.Event.subscribe("/websocket/message", function(jqEvent, message) {
if (message.indexOf("ObservableChanged:") != -1) {
if (messagesToSent.indexOf(message) == -1) {
messagesToSent.push(message);
sendMessages();
}
} else if (message == "ErrorMessage") {
var $websocketError = $(".websocket-error");
$websocketError.css("left", ($(window).width()-$websocketError.outerWidth()) / 2);
$websocketError.slideDown("slow");
function testConnection() {
if (Wicket.WebSocket.INSTANCE.ws && Wicket.WebSocket.INSTANCE.ws.readyState == WebSocket.OPEN) {
Wicket.WebSocket.send("TestConnection");
setTimeout(testConnection, 60000);
}
}
Wicket.Event.subscribe("/websocket/open", function(jqEvent) {
testConnection();
});
Wicket.Event.subscribe("/websocket/closed", function(jqEvent) {
showError($(".connection-error"));
});
Wicket.Event.subscribe("/websocket/error", function(jqEvent) {
showError($(".connection-error"));
});
Wicket.Event.subscribe("/websocket/message", function(jqEvent, message) {
if (message == "ErrorMessage")
showError($(".page-error"));
});
},
@ -900,7 +900,7 @@ onedev.server = {
onedev.server.setupAjaxLoadingIndicator();
onedev.server.form.setupDirtyCheck();
onedev.server.setupWebsocketCallback();
onedev.server.setupWebSocketHandler();
onedev.server.mouseState.track();
onedev.server.ajaxRequests.track();
onedev.server.setupInputClear();

View File

@ -17,11 +17,11 @@ public class TestPage extends BasePage {
@Override
protected void onInitialize() {
super.onInitialize();
add(new Link<Void>("test") {
@Override
public void onClick() {
System.out.println("bb");
public void onClick() {
}
});

View File

@ -22,7 +22,7 @@ public class DefaultUploadService implements UploadService, SchedulableTask {
@Inject
private TaskScheduler taskScheduler;
private String taskId;
private volatile String taskId;
private final Map<String, FileUpload> uploads = new ConcurrentHashMap<>();

View File

@ -21,6 +21,7 @@ import org.apache.wicket.protocol.ws.api.registry.IKey;
import org.apache.wicket.protocol.ws.api.registry.IWebSocketConnectionRegistry;
import org.apache.wicket.protocol.ws.api.registry.PageIdKey;
import org.apache.wicket.protocol.ws.api.registry.SimpleWebSocketConnectionRegistry;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.joda.time.DateTime;
import org.jspecify.annotations.Nullable;
import org.quartz.ScheduleBuilder;
@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import io.onedev.commons.loader.ManagedSerializedForm;
import io.onedev.commons.utils.StringUtils;
import io.onedev.server.cluster.ClusterRunnable;
import io.onedev.server.cluster.ClusterService;
import io.onedev.server.event.Listen;
@ -52,7 +52,7 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene
private static final Logger logger = LoggerFactory.getLogger(DefaultWebSocketService.class);
private static final int KEEP_ALIVE_INTERVAL = 30;
private static final int KEEP_ALIVE_INTERVAL = 60;
@Inject
private Application application;
@ -72,9 +72,11 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene
private final Map<String, Pair<PageKey, Date>> notifiedObservables = new ConcurrentHashMap<>();
private String keepAliveTaskId;
private volatile String keepAliveTaskId;
private String notifiedObservableCleanupTaskId;
private volatile String notifiedObservableCleanupTaskId;
private volatile Thread checkMessageQueueThread;
public Object writeReplace() throws ObjectStreamException {
return new ManagedSerializedForm(WebSocketService.class);
@ -106,6 +108,14 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene
@Override
public void sessionDestroyed(String sessionId) {
for (IWebSocketConnection connection : connectionRegistry.getConnections(application, sessionId)) {
try {
connection.close(StatusCode.NORMAL, "Session destroyed");
IKey pageKey = ((WebSocketConnection) connection).getPageKey().getPageId();
connectionRegistry.removeConnection(application, sessionId, pageKey);
} catch (Exception e) {
}
}
registeredObservables.remove(sessionId);
}
@ -123,13 +133,11 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene
}
}
private void notifyObservables(IWebSocketConnection connection, Collection<String> observables) {
String message = WebSocketMessages.OBSERVABLE_CHANGED + ":" + StringUtils.join(observables, "\n");
try {
connection.sendMessage(message);
} catch (Exception e) {
logger.error("Error sending websocket message: " + message, e);
}
private void notifyObservablesChange(IWebSocketConnection connection, Set<String> observables) {
((WebSocketConnection) connection).queueMessage(new ObservablesChanged(observables));
var thread = checkMessageQueueThread;
if (thread != null)
thread.interrupt();
}
@Override
@ -157,7 +165,7 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene
var registeredChangedObservables =
filterObservables(registeredObservables, observables);
if (!registeredChangedObservables.isEmpty())
notifyObservables(connection, registeredChangedObservables);
notifyObservablesChange(connection, registeredChangedObservables);
}
}
}
@ -198,7 +206,7 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene
@Override
public ScheduleBuilder<?> getScheduleBuilder() {
return SimpleScheduleBuilder.repeatSecondlyForever(TOLERATE_SECONDS);
return SimpleScheduleBuilder.repeatSecondlyForever(1);
}
@Override
@ -211,10 +219,29 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene
}
});
checkMessageQueueThread = new Thread(() -> {
while (checkMessageQueueThread != null) {
for (var connection: connectionRegistry.getConnections(application)) {
if (connection.isOpen())
((WebSocketConnection) connection).checkMessageQueue();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
});
checkMessageQueueThread.start();
}
@Listen
public void on(SystemStopping event) {
var thread = checkMessageQueueThread;
checkMessageQueueThread = null;
if (thread != null)
thread.interrupt();
if (keepAliveTaskId != null)
taskScheduler.unschedule(keepAliveTaskId);
if (notifiedObservableCleanupTaskId != null)
@ -234,7 +261,7 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene
}
private void notifyPastObservables(IWebSocketConnection connection) {
PageKey pageKey = ((WebSocketConnection) connection).getPageKey();
PageKey pageKey = ((WebSocketConnection) connection).getPageKey();
Collection<String> registeredObservables = getRegisteredObservables(connection);
if (registeredObservables != null) {
Set<String> observables = new HashSet<>();
@ -247,7 +274,7 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene
}
}
if (!observables.isEmpty())
notifyObservables(connection, observables);
notifyObservablesChange(connection, observables);
}
}

View File

@ -0,0 +1,35 @@
package io.onedev.server.web.websocket;
import java.util.Collection;
import java.util.Set;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
public class ObservablesChanged implements IWebSocketPushMessage {
private final Set<String> observables;
public ObservablesChanged(Set<String> observables) {
this.observables = observables;
}
public Collection<String> getObservables() {
return observables;
}
@Override
public boolean equals(Object o) {
if (o instanceof ObservablesChanged observablesChanged)
return new EqualsBuilder().append(observables, observablesChanged.observables).isEquals();
else
return false;
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37).append(observables).toHashCode();
}
}

View File

@ -18,48 +18,63 @@ package io.onedev.server.web.websocket;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedHashSet;
import java.util.Set;
import org.apache.wicket.protocol.ws.api.AbstractWebSocketConnection;
import org.apache.shiro.subject.Subject;
import org.apache.shiro.util.ThreadContext;
import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor;
import org.apache.wicket.protocol.ws.api.IWebSocketConnection;
import org.apache.wicket.util.lang.Args;
import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
import org.eclipse.jetty.websocket.api.Session;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.onedev.server.OneDev;
import io.onedev.server.persistence.SessionService;
/**
* A wrapper around Jetty9's native WebSocketConnection.
*
* @since 6.2
*/
public class WebSocketConnection extends AbstractWebSocketConnection
{
public class WebSocketConnection implements IWebSocketConnection {
private final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private final AbstractWebSocketProcessor webSocketProcessor;
private final Session session;
private final PageKey pageKey;
private final Subject subject;
private final Set<IWebSocketPushMessage> messageQueue = new LinkedHashSet<>();
private boolean working;
/**
* Constructor.
*
* @param session
* the jetty websocket connection
*/
public WebSocketConnection(Session session, AbstractWebSocketProcessor webSocketProcessor, PageKey pageKey)
public WebSocketConnection(Session session, AbstractWebSocketProcessor webSocketProcessor, PageKey pageKey, Subject subject)
{
super(webSocketProcessor);
this.session = Args.notNull(session, "connection");
this.webSocketProcessor = webSocketProcessor;
this.session = session;
this.pageKey = pageKey;
this.subject = subject;
}
@Override
public boolean isOpen()
{
public boolean isOpen() {
return session.isOpen();
}
public Session getSession() {
return session;
}
@Override
public void close(int code, String reason)
{
if (isOpen())
{
public void close(int code, String reason) {
if (isOpen()) {
session.close(code, reason);
}
}
@ -69,8 +84,7 @@ public class WebSocketConnection extends AbstractWebSocketConnection
}
@Override
public IWebSocketConnection sendMessage(String message) throws IOException
{
public IWebSocketConnection sendMessage(String message) throws IOException {
checkClosed();
session.getRemote().sendStringByFuture(message);
@ -78,9 +92,7 @@ public class WebSocketConnection extends AbstractWebSocketConnection
}
@Override
public IWebSocketConnection sendMessage(byte[] message, int offset, int length)
throws IOException
{
public IWebSocketConnection sendMessage(byte[] message, int offset, int length) throws IOException {
checkClosed();
ByteBuffer buf = ByteBuffer.wrap(message, offset, length);
@ -88,11 +100,66 @@ public class WebSocketConnection extends AbstractWebSocketConnection
return this;
}
private void checkClosed()
{
if (!isOpen())
{
private void checkClosed() {
if (!isOpen()) {
throw new IllegalStateException("The connection is closed.");
}
}
@Override
public void sendMessage(IWebSocketPushMessage message) {
webSocketProcessor.broadcastMessage(message);
}
public synchronized boolean queueMessage(IWebSocketPushMessage message) {
return messageQueue.add(message);
}
public synchronized void checkMessageQueue() {
if (!messageQueue.isEmpty() && !working) {
ThreadContext.bind(subject);
working = true;
OneDev.getInstance(SessionService.class).runAsync(new Runnable() {
@Nullable
private IWebSocketPushMessage getNextMessage() {
synchronized (WebSocketConnection.this) {
var it = messageQueue.iterator();
if (it.hasNext()) {
var message = it.next();
it.remove();
return message;
} else {
return null;
}
}
}
@Override
public void run() {
try {
while (true) {
IWebSocketPushMessage message = getNextMessage();
if (message != null) {
webSocketProcessor.broadcastMessage(message);
} else {
break;
}
}
} catch (Throwable e) {
logger.error("Error processing websocket message", e);
try {
sendMessage(WebSocketMessages.ERROR_MESSAGE);
} catch (Throwable e2) {
}
} finally {
synchronized (WebSocketConnection.this) {
working = false;
}
}
}
});
}
}
}

View File

@ -3,9 +3,9 @@ package io.onedev.server.web.websocket;
public interface WebSocketMessages {
static final String ERROR_MESSAGE = "ErrorMessage";
static final String OBSERVABLE_CHANGED = "ObservableChanged";
static final String KEEP_ALIVE = "KeepAlive";
static final String TEST_CONNECTION = "TestConnection";
}

View File

@ -16,10 +16,7 @@
*/
package io.onedev.server.web.websocket;
import java.io.IOException;
import org.apache.shiro.subject.Subject;
import org.apache.shiro.util.ThreadContext;
import org.apache.wicket.protocol.http.WebApplication;
import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor;
import org.eclipse.jetty.websocket.api.Session;
@ -29,11 +26,6 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.onedev.commons.loader.AppLoader;
import io.onedev.commons.utils.ExceptionUtils;
import io.onedev.server.OneDev;
import io.onedev.server.persistence.SessionService;
/**
* An {@link org.apache.wicket.protocol.ws.api.IWebSocketProcessor processor} that integrates with
* Jetty 9.x {@link Session web socket} implementation.
@ -65,75 +57,36 @@ public class WebSocketProcessor extends AbstractWebSocketProcessor implements We
@Override
public void onWebSocketConnect(final Session session) {
run(new Runnable() {
@Override
public void run() {
PageKey pageKey = new PageKey(getSessionId(), getRegistryKey());
WebSocketConnection connection = new WebSocketConnection(session, WebSocketProcessor.this, pageKey);
onConnect(connection);
}
});
PageKey pageKey = new PageKey(getSessionId(), getRegistryKey());
Subject subject = (Subject) request.getHttpServletRequest().getAttribute(WebSocketFilter.SHIRO_SUBJECT);
WebSocketConnection connection = new WebSocketConnection(session, WebSocketProcessor.this, pageKey, subject);
onConnect(connection);
}
private void run(Runnable runnable) {
if (OneDev.getInstance().isReady()) {
SessionService sessionService = AppLoader.getInstance(SessionService.class);
Subject subject = (Subject) request.getHttpServletRequest()
.getAttribute(WebSocketFilter.SHIRO_SUBJECT);
ThreadContext.bind(subject);
sessionService.run(runnable);
}
}
@Override
public void onWebSocketText(final String message) {
run(new Runnable() {
@Override
public void run() {
onMessage(message);
}
});
if (!message.equals(WebSocketMessages.TEST_CONNECTION))
onMessage(message);
}
@Override
public void onWebSocketBinary(final byte[] payload, final int offset, final int len) {
run(new Runnable() {
@Override
public void run() {
onMessage(payload, offset, len);
}
});
onMessage(payload, offset, len);
}
@Override
public void onWebSocketClose(final int statusCode, final String reason) {
run(new Runnable() {
@Override
public void run() {
onClose(statusCode, reason);
}
});
onClose(statusCode, reason);
}
@Override
public void onWebSocketError(Throwable throwable) {
IOException ioException = ExceptionUtils.find(throwable, IOException.class);
if (ioException != null && "Broken pipe".equals(ioException.getMessage()))
logger.debug("WebSocket closed", throwable);
else
logger.error("An error occurred when using WebSocket.", throwable);
logger.error("An error occurred when using WebSocket", throwable);
}
@Override
public void onOpen(Object connection) {
onWebSocketConnect((Session)connection);
}
}

@ -1 +1 @@
Subproject commit c61b935709e58a4b17d83143afee2319c052d7fc
Subproject commit af138317275f62237fceea4958145fcbbd48610f