diff --git a/server-core/src/main/java/io/onedev/server/CoreModule.java b/server-core/src/main/java/io/onedev/server/CoreModule.java index 90e25a57d5..866b76dc33 100644 --- a/server-core/src/main/java/io/onedev/server/CoreModule.java +++ b/server-core/src/main/java/io/onedev/server/CoreModule.java @@ -197,7 +197,6 @@ import io.onedev.server.service.BuildMetricService; import io.onedev.server.service.BuildParamService; import io.onedev.server.service.BuildQueryPersonalizationService; import io.onedev.server.service.BuildService; -import io.onedev.server.service.ChatService; import io.onedev.server.service.CodeCommentMentionService; import io.onedev.server.service.CodeCommentQueryPersonalizationService; import io.onedev.server.service.CodeCommentReplyService; @@ -289,7 +288,6 @@ import io.onedev.server.service.impl.DefaultBuildMetricService; import io.onedev.server.service.impl.DefaultBuildParamService; import io.onedev.server.service.impl.DefaultBuildQueryPersonalizationService; import io.onedev.server.service.impl.DefaultBuildService; -import io.onedev.server.service.impl.DefaultChatService; import io.onedev.server.service.impl.DefaultCodeCommentMentionService; import io.onedev.server.service.impl.DefaultCodeCommentQueryPersonalizationService; import io.onedev.server.service.impl.DefaultCodeCommentReplyService; @@ -609,7 +607,6 @@ public class CoreModule extends AbstractPluginModule { bind(PullRequestDescriptionRevisionService.class).to(DefaultPullRequestDescriptionRevisionService.class); bind(SsoProviderService.class).to(DefaultSsoProviderService.class); bind(SsoAccountService.class).to(DefaultSsoAccountService.class); - bind(ChatService.class).to(DefaultChatService.class); bind(BaseAuthorizationService.class).to(DefaultBaseAuthorizationService.class); bind(GroupEntitlementService.class).to(DefaultGroupEntitlementService.class); bind(UserEntitlementService.class).to(DefaultUserEntitlementService.class); @@ -733,7 +730,6 @@ public class CoreModule extends AbstractPluginModule { contributeFromPackage(ExceptionHandler.class, PageExpiredExceptionHandler.class); contributeFromPackage(ExceptionHandler.class, WebApplicationExceptionHandler.class); - contribute(SessionListener.class, DefaultChatService.class); contribute(SessionListener.class, DefaultWebSocketService.class); bind(UrlService.class).to(DefaultUrlService.class); diff --git a/server-core/src/main/java/io/onedev/server/attachment/DefaultAttachmentService.java b/server-core/src/main/java/io/onedev/server/attachment/DefaultAttachmentService.java index eedfe11027..282876334e 100644 --- a/server-core/src/main/java/io/onedev/server/attachment/DefaultAttachmentService.java +++ b/server-core/src/main/java/io/onedev/server/attachment/DefaultAttachmentService.java @@ -123,7 +123,7 @@ public class DefaultAttachmentService implements AttachmentService, SchedulableT @Inject private BatchWorkExecutionService batchWorkExecutionService; - private String taskId; + private volatile String taskId; public Object writeReplace() throws ObjectStreamException { return new ManagedSerializedForm(AttachmentService.class); diff --git a/server-core/src/main/java/io/onedev/server/data/DefaultDataService.java b/server-core/src/main/java/io/onedev/server/data/DefaultDataService.java index a5d01ffb7b..98df5d6fd7 100644 --- a/server-core/src/main/java/io/onedev/server/data/DefaultDataService.java +++ b/server-core/src/main/java/io/onedev/server/data/DefaultDataService.java @@ -196,7 +196,7 @@ public class DefaultDataService implements DataService, Serializable { @Inject private AlertService alertService; - private String backupTaskId; + private volatile String backupTaskId; private Metadata getMetadata() { return sessionFactoryService.getMetadata(); diff --git a/server-core/src/main/java/io/onedev/server/jetty/DefaultJettyService.java b/server-core/src/main/java/io/onedev/server/jetty/DefaultJettyService.java index 0172d58a7d..2a605a8c6a 100644 --- a/server-core/src/main/java/io/onedev/server/jetty/DefaultJettyService.java +++ b/server-core/src/main/java/io/onedev/server/jetty/DefaultJettyService.java @@ -19,6 +19,8 @@ import org.eclipse.jetty.http.HttpCookie.SameSite; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.eclipse.jetty.server.session.DefaultSessionIdManager; +import org.eclipse.jetty.server.session.HouseKeeper; import org.eclipse.jetty.server.session.SessionDataStoreFactory; import org.eclipse.jetty.servlet.ErrorPageErrorHandler; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -44,6 +46,8 @@ public class DefaultJettyService implements JettyService, Provider implements ChatService, SessionListener { - - private static final Logger logger = LoggerFactory.getLogger(DefaultChatService.class); - - private static final int MAX_HISTORY_MESSAGES = 25; - - private static final int MAX_HISTORY_MESSAGE_LEN = 1024; - - private static final int PARTIAL_RESPONSE_NOTIFICATION_INTERVAL = 1000; - - @Inject - private ExecutorService executorService; - - @Inject - private TransactionService transactionService; - - @Inject - private WebSocketService webSocketService; - - @Inject - private ObjectMapper objectMapper; - - @Inject - private SessionService sessionService; - - @Inject - private ClusterService clusterService; - - /* - * Use cluster wide id for anonymous chats and messages as we use id to route - * websocket observable changes to correct connections - */ - private volatile IAtomicLong nextAnonymousChatId; - - private volatile IAtomicLong nextAnonymousChatMessageId; - - private final Map> respondings = new ConcurrentHashMap<>(); - - @Override - public List query(User user, User ai, String term, int count) { - EntityCriteria criteria = EntityCriteria.of(Chat.class); - criteria.add(Restrictions.eq(Chat.PROP_USER, user)); - criteria.add(Restrictions.eq(Chat.PROP_AI, ai)); - criteria.add(Restrictions.ilike(Chat.PROP_TITLE, "%" + term + "%")); - criteria.addOrder(Order.desc(Chat.PROP_ID)); - return query(criteria); - } - - @Override - public void createOrUpdate(Chat chat) { - dao.persist(chat); - } - - @Sessional - @Override - public ChatResponding getResponding(WebSession session, Chat chat) { - return getResponding(session.getId(), chat.getId()); - } - - @Transactional - @Override - public void sendRequest(WebSession session, ChatMessage request, List tools, int timeoutSeconds) { - var sessionId = session.getId(); - var requestId = request.getId(); - var chatId = request.getChat().getId(); - var anonymous = request.getChat().getUser() == null; - - var modelSetting = request.getChat().getAi().getAiSetting().getModelSetting(); - var streamingChatModel = modelSetting.getStreamingChatModel(); - var chatModel = modelSetting.getChatModel(); - - var messages = request.getChat().getSortedMessages() - .stream() - .filter(it->!it.isError()) - .collect(Collectors.toList()); - - if (messages.size() > MAX_HISTORY_MESSAGES) - messages = messages.subList(messages.size()-MAX_HISTORY_MESSAGES, messages.size()); - - var langchain4jMessages = new ArrayList(); - langchain4jMessages.addAll(messages.stream() - .map(it -> { - var content = it.getContent(); - if (!it.equals(request)) - content = StringUtils.abbreviate(content, MAX_HISTORY_MESSAGE_LEN); - if (it.isRequest()) - return (dev.langchain4j.data.message.ChatMessage) new UserMessage(content); - else - return (dev.langchain4j.data.message.ChatMessage) new AiMessage(content); - }) - .collect(Collectors.toList())); - - var toolSpecifications = tools.stream() - .map(ChatTool::getSpecification) - .collect(Collectors.toList()); - var completableFuture = new CompletableFuture(); - var executableFuture = executorService.submit(new Runnable() { - - private StreamingChatResponseHandler newResponseHandler(Subject subject) { - return new StreamingChatResponseHandler() { - - private long lastPartialResponseNotificationTime = System.currentTimeMillis(); - - @Override - public void onPartialResponse(PartialResponse partialResponse, PartialResponseContext context) { - ThreadContext.bind(subject); - if (completableFuture.isDone()) { - context.streamingHandle().cancel(); - } else { - var responding = getResponding(sessionId, chatId, requestId); - if (responding != null) { - var content = responding.getContent(); - if (content == null) - content = ""; - content += partialResponse.text(); - responding.content = content; - if (System.currentTimeMillis() - lastPartialResponseNotificationTime > PARTIAL_RESPONSE_NOTIFICATION_INTERVAL) { - lastPartialResponseNotificationTime = System.currentTimeMillis(); - webSocketService.notifyObservableChange(Chat.getPartialResponseObservable(chatId), null); - } - } - } - } - - @Override - public void onPartialThinking(PartialThinking partialThinking, PartialThinkingContext context) { - ThreadContext.bind(subject); - if (completableFuture.isDone()) - context.streamingHandle().cancel(); - } - - @Override - public void onPartialToolCall(PartialToolCall partialToolCall, PartialToolCallContext context) { - ThreadContext.bind(subject); - if (completableFuture.isDone()) { - context.streamingHandle().cancel(); - } - } - - @Override - public void onCompleteResponse(ChatResponse completeResponse) { - ThreadContext.bind(subject); - sessionService.runAsync(() -> { - try { - var aiMessage = completeResponse.aiMessage(); - if (aiMessage.hasToolExecutionRequests()) { - langchain4jMessages.add(aiMessage); - var toolRequests = aiMessage.toolExecutionRequests(); - for (int i = 0; i < toolRequests.size(); i++) { - var toolRequest = toolRequests.get(i); - String toolName = toolRequest.name(); - String toolArgs = toolRequest.arguments(); - var toolResult = tools.stream() - .filter(it->it.getSpecification().name().equals(toolName)) - .findFirst() - .orElseThrow(() -> new ExplicitException("Unknown tool: " + toolName)) - .execute(objectMapper.readTree(toolArgs)); - ToolExecutionResultMessage toolResultMessage = ToolExecutionResultMessage.from( - toolRequest.id(), toolName, toolResult); - langchain4jMessages.add(toolResultMessage); - } - - var handler = newResponseHandler(SecurityUtils.getSubject()); - var langchain4jChatRequest = ChatRequest.builder() - .messages(new ArrayList<>(langchain4jMessages)) - .toolSpecifications(toolSpecifications) - .build(); - streamingChatModel.chat(langchain4jChatRequest, handler); - } else { - completableFuture.complete(aiMessage.text()); - } - } catch (Throwable t) { - completableFuture.completeExceptionally(t); - } - }); - } - - @Override - public void onError(Throwable error) { - ThreadContext.bind(subject); - completableFuture.completeExceptionally(error); - } - - }; - } - - private void createResponse(String content, @Nullable Throwable throwable) { - var responding = getResponding(session.getId(), chatId, requestId); - if (responding != null) { - transactionService.run(() -> { - if (throwable != null) - logger.error("Error getting chat response", throwable); - var response = new ChatMessage(); - response.setError(throwable != null); - response.setContent(content); - if (anonymous) { - var chat = SerializationUtils.clone(checkNotNull(session.getAnonymousChats().get(chatId))); - response.setChat(chat); - response.setId(nextAnonymousChatMessageId()); - chat.getMessages().add(response); - session.getAnonymousChats().put(chatId, chat); - } else { - response.setChat(load(chatId)); - dao.persist(response); - } - webSocketService.notifyObservableChange(Chat.getNewMessagesObservable(chatId), null); - }); - }; - } - - private void createIncompleteResponse(String reason) { - var responding = getResponding(sessionId, chatId, requestId); - if (responding != null) { - var content = responding.getContent(); - if (content == null) - content = ""; - else - content += "\n\n"; - content += "" + reason + ""; - createResponse(content, null); - } - } - - @Override - public void run() { - var handler = newResponseHandler(SecurityUtils.getSubject()); - try { - var langchain4jChatRequest = ChatRequest.builder() - .messages(langchain4jMessages.stream() - .map(msg -> (dev.langchain4j.data.message.ChatMessage) msg) - .collect(Collectors.toList())) - .toolSpecifications(toolSpecifications) - .build(); - - streamingChatModel.chat(langchain4jChatRequest, handler); - transactionService.run(() -> { - var chat = anonymous ? checkNotNull(session.getAnonymousChats().get(chatId)) : load(chatId); - var requests = chat.getMessages().stream().filter(it->it.isRequest()).collect(Collectors.toList()); - if (requests.size() == 1) { - var systemPrompt = String.format(""" - Summarize provided message to get a compact title with below requirements: - 1. Just summarize the message itself, no need to ask user for clarification or confirmation - 2. It should be within %d characters - 3. Only title is returned, no other text or comments - 4. Display in %s - """, Chat.MAX_TITLE_LEN, session.getLocale().getDisplayLanguage()); - var title = chatModel.chat(new SystemMessage(systemPrompt), new UserMessage(requests.get(0).getContent())).aiMessage().text(); - if (anonymous) { - chat = SerializationUtils.clone(chat); - chat.setTitle(title); - session.getAnonymousChats().put(chatId, chat); - } else { - chat.setTitle(title); - dao.persist(chat); - } - webSocketService.notifyObservableChange(Chat.getChangeObservable(chatId), null); - } - }); - var response = completableFuture.get(timeoutSeconds, TimeUnit.SECONDS); - if (StringUtils.isBlank(response)) - throw new ExplicitException("Received empty response"); - createResponse(response, null); - } catch (Throwable e) { - if (ExceptionUtils.find(e, InterruptedException.class) != null) { - createIncompleteResponse("Conversation cancelled"); - } else if (ExceptionUtils.find(e, TimeoutException.class) != null) { - createIncompleteResponse("Conversation timed out"); - } else { - logger.error("Error getting chat response", e); - String errorMessage = e.getMessage(); - if (errorMessage == null) - errorMessage = "Error getting chat response, check server log for details"; - createResponse(errorMessage, e); - } - } finally { - completableFuture.cancel(false); - var respondingsOfSession = respondings.get(sessionId); - if (respondingsOfSession != null) { - var responding = respondingsOfSession.get(chatId); - if (responding != null) { - respondingsOfSession.computeIfPresent(chatId, (k, v)-> { - if (v.requestId.equals(requestId)) - return null; - else - return v; - }); - webSocketService.notifyObservableChange(Chat.getPartialResponseObservable(chatId), null); - } - } - } - } - }); - - var previousResponding = respondings.computeIfAbsent(sessionId, it->new ConcurrentHashMap<>()).put(chatId, new ChatRespondingImpl(requestId, executableFuture)); - if (previousResponding != null) - previousResponding.cancel(); - } - - private ChatRespondingImpl getResponding(String sessionId, Long chatId) { - var respondingsOfSession = respondings.get(sessionId); - if (respondingsOfSession != null) { - return respondingsOfSession.get(chatId); - } - return null; - } - - private ChatRespondingImpl getResponding(String sessionId, Long chatId, Long requestId) { - var responding = getResponding(sessionId, chatId); - if (responding != null && responding.requestId.equals(requestId)) - return responding; - else - return null; - } - - @Listen - public void on(SystemStarting event) { - nextAnonymousChatId = clusterService.getHazelcastInstance().getCPSubsystem().getAtomicLong("nextAnonymousChatId"); - nextAnonymousChatMessageId = clusterService.getHazelcastInstance().getCPSubsystem().getAtomicLong("nextAnonymousChatMessageId"); - if (clusterService.isLeaderServer()) { - nextAnonymousChatId.set(1L); - nextAnonymousChatMessageId.set(1L); - } - } - - @Override - public long nextAnonymousChatId() { - return nextAnonymousChatId.getAndIncrement(); - } - - @Override - public long nextAnonymousChatMessageId() { - return nextAnonymousChatMessageId.getAndIncrement(); - } - - @Override - public void sessionCreated(String sessionId) { - } - - @Override - public void sessionDestroyed(String sessionId) { - var respondingsOfSession = respondings.remove(sessionId); - if (respondingsOfSession != null) { - for (var responding : respondingsOfSession.values()) - responding.cancel(); - } - } - - private static class ChatRespondingImpl implements ChatResponding { - - private final Long requestId; - - private final Future future; - - private volatile String content; - - ChatRespondingImpl(Long requestId, Future future) { - this.requestId = requestId; - this.future = future; - } - - @Override - public String getContent() { - return content; - } - - @Override - public void cancel() { - future.cancel(true); - } - - } -} \ No newline at end of file diff --git a/server-core/src/main/java/io/onedev/server/service/impl/DefaultIssueChangeService.java b/server-core/src/main/java/io/onedev/server/service/impl/DefaultIssueChangeService.java index 8f8da53a2a..28b6dae0a3 100644 --- a/server-core/src/main/java/io/onedev/server/service/impl/DefaultIssueChangeService.java +++ b/server-core/src/main/java/io/onedev/server/service/impl/DefaultIssueChangeService.java @@ -166,7 +166,7 @@ public class DefaultIssueChangeService extends BaseEntityService @Inject private TransactionService transactionService; - private String taskId; + private volatile String taskId; @Transactional @Override diff --git a/server-core/src/main/java/io/onedev/server/web/behavior/ChangeObserver.java b/server-core/src/main/java/io/onedev/server/web/behavior/ChangeObserver.java index 9a72f5efb4..95efb76dbc 100644 --- a/server-core/src/main/java/io/onedev/server/web/behavior/ChangeObserver.java +++ b/server-core/src/main/java/io/onedev/server/web/behavior/ChangeObserver.java @@ -2,6 +2,7 @@ package io.onedev.server.web.behavior; import java.util.Collection; import java.util.HashSet; +import java.util.Set; import org.apache.wicket.Component; import org.apache.wicket.behavior.Behavior; @@ -44,9 +45,9 @@ public abstract class ChangeObserver extends Behavior { } } - public static Collection filterObservables(Collection observingObservables, + public static Set filterObservables(Collection observingObservables, Collection changedObservables) { - Collection observingChangedObservables = new HashSet<>(); + Set observingChangedObservables = new HashSet<>(); for (var observingObservable: observingObservables) { for (var changedObservable: changedObservables) { if (containsObservable(observingObservable, changedObservable)) diff --git a/server-core/src/main/java/io/onedev/server/web/component/ai/chat/ChatPanel.java b/server-core/src/main/java/io/onedev/server/web/component/ai/chat/ChatPanel.java index 1644b93e12..b2d798d8aa 100644 --- a/server-core/src/main/java/io/onedev/server/web/component/ai/chat/ChatPanel.java +++ b/server-core/src/main/java/io/onedev/server/web/component/ai/chat/ChatPanel.java @@ -335,7 +335,7 @@ public class ChatPanel extends Panel { else return Collections.emptySet(); } - + }); respondingContainer.setOutputMarkupPlaceholderTag(true); diff --git a/server-core/src/main/java/io/onedev/server/web/component/ai/chat/chat.css b/server-core/src/main/java/io/onedev/server/web/component/ai/chat/chat.css index c026e1b488..95e153db85 100644 --- a/server-core/src/main/java/io/onedev/server/web/component/ai/chat/chat.css +++ b/server-core/src/main/java/io/onedev/server/web/component/ai/chat/chat.css @@ -3,7 +3,7 @@ top: 0; bottom: 0; right: 0; - z-index: 2000; + z-index: 1040; background: white; box-shadow: 0 0 12px rgba(0,0,0,0.2); } diff --git a/server-core/src/main/java/io/onedev/server/web/component/taskbutton/TaskButton.java b/server-core/src/main/java/io/onedev/server/web/component/taskbutton/TaskButton.java index 2c1f8543be..f39a228632 100644 --- a/server-core/src/main/java/io/onedev/server/web/component/taskbutton/TaskButton.java +++ b/server-core/src/main/java/io/onedev/server/web/component/taskbutton/TaskButton.java @@ -279,7 +279,7 @@ public abstract class TaskButton extends AjaxButton { @Inject private TaskScheduler taskScheduler; - private String taskId; + private volatile String taskId; @Listen public void on(SystemStarted event) { diff --git a/server-core/src/main/java/io/onedev/server/web/page/admin/usermanagement/NewUserPage.java b/server-core/src/main/java/io/onedev/server/web/page/admin/usermanagement/NewUserPage.java index bcbd6a0c22..b0bc63f811 100644 --- a/server-core/src/main/java/io/onedev/server/web/page/admin/usermanagement/NewUserPage.java +++ b/server-core/src/main/java/io/onedev/server/web/page/admin/usermanagement/NewUserPage.java @@ -35,6 +35,7 @@ import io.onedev.server.web.editable.BeanContext; import io.onedev.server.web.page.admin.AdministrationPage; import io.onedev.server.web.page.user.UserCssResourceReference; import io.onedev.server.web.page.user.basicsetting.UserBasicSettingPage; +import io.onedev.server.web.util.WicketUtils; import io.onedev.server.web.util.editbean.NewUserBean; public class NewUserPage extends AdministrationPage { @@ -51,7 +52,10 @@ public class NewUserPage extends AdministrationPage { protected void onInitialize() { super.onInitialize(); - var editor = BeanContext.edit("editor", bean, Sets.newHashSet(User.PROP_NOTIFY_OWN_EVENTS), true); + var excludeProperties = Sets.newHashSet(User.PROP_NOTIFY_OWN_EVENTS); + if (!WicketUtils.isSubscriptionActive()) + excludeProperties.add(User.PROP_TYPE); + var editor = BeanContext.edit("editor", bean, excludeProperties, true); Form form = new Form("form") { diff --git a/server-core/src/main/java/io/onedev/server/web/page/base/BasePage.html b/server-core/src/main/java/io/onedev/server/web/page/base/BasePage.html index cb6dad599f..b2e5ab97a5 100644 --- a/server-core/src/main/java/io/onedev/server/web/page/base/BasePage.html +++ b/server-core/src/main/java/io/onedev/server/web/page/base/BasePage.html @@ -17,7 +17,10 @@
Please wait...
-
+
+ Connection lost or session expired, reload to recover +
+
Page is in error, reload to recover
diff --git a/server-core/src/main/java/io/onedev/server/web/page/base/BasePage.java b/server-core/src/main/java/io/onedev/server/web/page/base/BasePage.java index 25c9970386..2d42ec7c0f 100644 --- a/server-core/src/main/java/io/onedev/server/web/page/base/BasePage.java +++ b/server-core/src/main/java/io/onedev/server/web/page/base/BasePage.java @@ -10,7 +10,6 @@ import java.time.ZoneId; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Locale; import javax.inject.Inject; @@ -28,6 +27,7 @@ import org.apache.wicket.ajax.AjaxRequestTarget; import org.apache.wicket.ajax.attributes.AjaxRequestAttributes; import org.apache.wicket.ajax.attributes.AjaxRequestAttributes.Method; import org.apache.wicket.core.request.handler.IPartialPageRequestHandler; +import org.apache.wicket.event.IEvent; import org.apache.wicket.markup.ComponentTag; import org.apache.wicket.markup.head.IHeaderResponse; import org.apache.wicket.markup.head.JavaScriptHeaderItem; @@ -40,8 +40,7 @@ import org.apache.wicket.markup.html.panel.FeedbackPanel; import org.apache.wicket.markup.repeater.RepeatingView; import org.apache.wicket.model.LoadableDetachableModel; import org.apache.wicket.protocol.ws.api.WebSocketBehavior; -import org.apache.wicket.protocol.ws.api.WebSocketRequestHandler; -import org.apache.wicket.protocol.ws.api.message.TextMessage; +import org.apache.wicket.protocol.ws.api.event.WebSocketPushPayload; import org.apache.wicket.request.IRequestParameters; import org.apache.wicket.request.cycle.RequestCycle; import org.apache.wicket.request.http.WebRequest; @@ -55,11 +54,9 @@ import org.unbescape.javascript.JavaScriptEscape; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; -import com.google.common.base.Splitter; import com.google.common.collect.Sets; import io.onedev.commons.bootstrap.Bootstrap; -import io.onedev.commons.loader.AppLoader; import io.onedev.server.OneDev; import io.onedev.server.commandhandler.Upgrade; import io.onedev.server.event.ListenerRegistry; @@ -80,7 +77,7 @@ import io.onedev.server.web.page.security.LoginPage; import io.onedev.server.web.page.serverinit.ServerInitPage; import io.onedev.server.web.page.simple.SimplePage; import io.onedev.server.web.util.WicketUtils; -import io.onedev.server.web.websocket.WebSocketMessages; +import io.onedev.server.web.websocket.ObservablesChanged; import io.onedev.server.web.websocket.WebSocketService; public abstract class BasePage extends WebPage { @@ -105,6 +102,12 @@ public abstract class BasePage extends WebPage { @Inject protected ListenerRegistry listenerRegistry; + + @Inject + private WebSocketService webSocketService; + + @Inject + private ObjectMapper objectMapper; public BasePage(PageParameters params) { super(params); @@ -148,6 +151,15 @@ public abstract class BasePage extends WebPage { setResponsePage(getPageClass(), getPageParameters()); } + @Override + public void onEvent(IEvent event) { + super.onEvent(event); + if (event.getPayload() instanceof WebSocketPushPayload payload + && payload.getMessage() instanceof ObservablesChanged observablesChanged) { + notifyObservablesChange(payload.getHandler(), observablesChanged.getObservables()); + } + } + @Override protected void onInitialize() { super.onInitialize(); @@ -205,8 +217,8 @@ public abstract class BasePage extends WebPage { String.valueOf(OneDev.getInstance().getBootDate().getTime()), SpriteImage.getVersionedHref(IconScope.class, null), popStateBehavior.getCallbackFunction(explicit("data")).toString(), - OneDev.getInstance(ObjectMapper.class).writeValueAsString(getRemoveAutosaveKeys()), - OneDev.getInstance(ObjectMapper.class).writeValueAsString(translations)))); + objectMapper.writeValueAsString(getRemoveAutosaveKeys()), + objectMapper.writeValueAsString(translations)))); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -295,20 +307,7 @@ public abstract class BasePage extends WebPage { add(rootComponents = new RepeatingView("rootComponents")); - add(new WebSocketBehavior() { - - @Override - protected void onMessage(WebSocketRequestHandler handler, TextMessage message) { - super.onMessage(handler, message); - - if (message.getText().startsWith(WebSocketMessages.OBSERVABLE_CHANGED)) { - List observables = Splitter.on('\n').splitToList( - message.getText().substring(WebSocketMessages.OBSERVABLE_CHANGED.length()+1)); - notifyObservablesChange(handler, observables); - } - } - - }); + add(new WebSocketBehavior() {}); add(zoneIdDetectBehavior = new AbstractDefaultAjaxBehavior() { @@ -389,7 +388,7 @@ public abstract class BasePage extends WebPage { @Override protected void onAfterRender() { - AppLoader.getInstance(WebSocketService.class).observe(this); + webSocketService.observe(this); super.onAfterRender(); } diff --git a/server-core/src/main/java/io/onedev/server/web/page/base/base.css b/server-core/src/main/java/io/onedev/server/web/page/base/base.css index 187c7d923b..2dd3f75ca4 100644 --- a/server-core/src/main/java/io/onedev/server/web/page/base/base.css +++ b/server-core/src/main/java/io/onedev/server/web/page/base/base.css @@ -216,18 +216,22 @@ a.tree-junction { margin-left: 0.4rem; } -/* WEB SOCKET ERROR */ -.websocket-error { +.connection-error, .page-error { z-index: 1050; padding: 0.75rem 1.25rem; display: none; position: fixed; max-width: 75%; + font-weight: bold; top: 0; - background: var(--danger); + background: var(--light-danger); border-radius: 0 0 0.42rem 0.42rem; - color: white; + color: var(--danger); } +.dark-mode .connection-error, .dark-mode .page-error { + background: var(--dark-mode-light-danger); + color: var(--danger); +} textarea { resize: none; diff --git a/server-core/src/main/java/io/onedev/server/web/page/base/base.js b/server-core/src/main/java/io/onedev/server/web/page/base/base.js index 2c6bcb5f50..0bad3ad526 100644 --- a/server-core/src/main/java/io/onedev/server/web/page/base/base.js +++ b/server-core/src/main/java/io/onedev/server/web/page/base/base.js @@ -339,29 +339,29 @@ onedev.server = { } }, - setupWebsocketCallback: function() { - var messagesToSent = []; - function sendMessages() { - if (onedev.server.ajaxRequests.count == 0) { - for (var i in messagesToSent) - Wicket.WebSocket.send(messagesToSent[i]); - messagesToSent = []; - } else { - setTimeout(sendMessages, 100); - } + setupWebSocketHandler: function() { + function showError($error) { + $error.css("left", ($(window).width()-$error.outerWidth()) / 2); + $error.slideDown("slow"); } - - Wicket.Event.subscribe("/websocket/message", function(jqEvent, message) { - if (message.indexOf("ObservableChanged:") != -1) { - if (messagesToSent.indexOf(message) == -1) { - messagesToSent.push(message); - sendMessages(); - } - } else if (message == "ErrorMessage") { - var $websocketError = $(".websocket-error"); - $websocketError.css("left", ($(window).width()-$websocketError.outerWidth()) / 2); - $websocketError.slideDown("slow"); + function testConnection() { + if (Wicket.WebSocket.INSTANCE.ws && Wicket.WebSocket.INSTANCE.ws.readyState == WebSocket.OPEN) { + Wicket.WebSocket.send("TestConnection"); + setTimeout(testConnection, 60000); } + } + Wicket.Event.subscribe("/websocket/open", function(jqEvent) { + testConnection(); + }); + Wicket.Event.subscribe("/websocket/closed", function(jqEvent) { + showError($(".connection-error")); + }); + Wicket.Event.subscribe("/websocket/error", function(jqEvent) { + showError($(".connection-error")); + }); + Wicket.Event.subscribe("/websocket/message", function(jqEvent, message) { + if (message == "ErrorMessage") + showError($(".page-error")); }); }, @@ -900,7 +900,7 @@ onedev.server = { onedev.server.setupAjaxLoadingIndicator(); onedev.server.form.setupDirtyCheck(); - onedev.server.setupWebsocketCallback(); + onedev.server.setupWebSocketHandler(); onedev.server.mouseState.track(); onedev.server.ajaxRequests.track(); onedev.server.setupInputClear(); diff --git a/server-core/src/main/java/io/onedev/server/web/page/test/TestPage.java b/server-core/src/main/java/io/onedev/server/web/page/test/TestPage.java index 64379a774b..f1727e618c 100644 --- a/server-core/src/main/java/io/onedev/server/web/page/test/TestPage.java +++ b/server-core/src/main/java/io/onedev/server/web/page/test/TestPage.java @@ -17,11 +17,11 @@ public class TestPage extends BasePage { @Override protected void onInitialize() { super.onInitialize(); + add(new Link("test") { @Override - public void onClick() { - System.out.println("bb"); + public void onClick() { } }); diff --git a/server-core/src/main/java/io/onedev/server/web/upload/DefaultUploadService.java b/server-core/src/main/java/io/onedev/server/web/upload/DefaultUploadService.java index 297b13ee58..889b0ebbc7 100644 --- a/server-core/src/main/java/io/onedev/server/web/upload/DefaultUploadService.java +++ b/server-core/src/main/java/io/onedev/server/web/upload/DefaultUploadService.java @@ -22,7 +22,7 @@ public class DefaultUploadService implements UploadService, SchedulableTask { @Inject private TaskScheduler taskScheduler; - private String taskId; + private volatile String taskId; private final Map uploads = new ConcurrentHashMap<>(); diff --git a/server-core/src/main/java/io/onedev/server/web/websocket/DefaultWebSocketService.java b/server-core/src/main/java/io/onedev/server/web/websocket/DefaultWebSocketService.java index fddfe2adc6..457cf9a487 100644 --- a/server-core/src/main/java/io/onedev/server/web/websocket/DefaultWebSocketService.java +++ b/server-core/src/main/java/io/onedev/server/web/websocket/DefaultWebSocketService.java @@ -21,6 +21,7 @@ import org.apache.wicket.protocol.ws.api.registry.IKey; import org.apache.wicket.protocol.ws.api.registry.IWebSocketConnectionRegistry; import org.apache.wicket.protocol.ws.api.registry.PageIdKey; import org.apache.wicket.protocol.ws.api.registry.SimpleWebSocketConnectionRegistry; +import org.eclipse.jetty.websocket.api.StatusCode; import org.joda.time.DateTime; import org.jspecify.annotations.Nullable; import org.quartz.ScheduleBuilder; @@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; import io.onedev.commons.loader.ManagedSerializedForm; -import io.onedev.commons.utils.StringUtils; import io.onedev.server.cluster.ClusterRunnable; import io.onedev.server.cluster.ClusterService; import io.onedev.server.event.Listen; @@ -52,7 +52,7 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene private static final Logger logger = LoggerFactory.getLogger(DefaultWebSocketService.class); - private static final int KEEP_ALIVE_INTERVAL = 30; + private static final int KEEP_ALIVE_INTERVAL = 60; @Inject private Application application; @@ -72,9 +72,11 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene private final Map> notifiedObservables = new ConcurrentHashMap<>(); - private String keepAliveTaskId; + private volatile String keepAliveTaskId; - private String notifiedObservableCleanupTaskId; + private volatile String notifiedObservableCleanupTaskId; + + private volatile Thread checkMessageQueueThread; public Object writeReplace() throws ObjectStreamException { return new ManagedSerializedForm(WebSocketService.class); @@ -106,6 +108,14 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene @Override public void sessionDestroyed(String sessionId) { + for (IWebSocketConnection connection : connectionRegistry.getConnections(application, sessionId)) { + try { + connection.close(StatusCode.NORMAL, "Session destroyed"); + IKey pageKey = ((WebSocketConnection) connection).getPageKey().getPageId(); + connectionRegistry.removeConnection(application, sessionId, pageKey); + } catch (Exception e) { + } + } registeredObservables.remove(sessionId); } @@ -123,13 +133,11 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene } } - private void notifyObservables(IWebSocketConnection connection, Collection observables) { - String message = WebSocketMessages.OBSERVABLE_CHANGED + ":" + StringUtils.join(observables, "\n"); - try { - connection.sendMessage(message); - } catch (Exception e) { - logger.error("Error sending websocket message: " + message, e); - } + private void notifyObservablesChange(IWebSocketConnection connection, Set observables) { + ((WebSocketConnection) connection).queueMessage(new ObservablesChanged(observables)); + var thread = checkMessageQueueThread; + if (thread != null) + thread.interrupt(); } @Override @@ -157,7 +165,7 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene var registeredChangedObservables = filterObservables(registeredObservables, observables); if (!registeredChangedObservables.isEmpty()) - notifyObservables(connection, registeredChangedObservables); + notifyObservablesChange(connection, registeredChangedObservables); } } } @@ -198,7 +206,7 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene @Override public ScheduleBuilder getScheduleBuilder() { - return SimpleScheduleBuilder.repeatSecondlyForever(TOLERATE_SECONDS); + return SimpleScheduleBuilder.repeatSecondlyForever(1); } @Override @@ -211,10 +219,29 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene } }); + + checkMessageQueueThread = new Thread(() -> { + while (checkMessageQueueThread != null) { + for (var connection: connectionRegistry.getConnections(application)) { + if (connection.isOpen()) + ((WebSocketConnection) connection).checkMessageQueue(); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + }); + checkMessageQueueThread.start(); } @Listen public void on(SystemStopping event) { + var thread = checkMessageQueueThread; + checkMessageQueueThread = null; + if (thread != null) + thread.interrupt(); + if (keepAliveTaskId != null) taskScheduler.unschedule(keepAliveTaskId); if (notifiedObservableCleanupTaskId != null) @@ -234,7 +261,7 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene } private void notifyPastObservables(IWebSocketConnection connection) { - PageKey pageKey = ((WebSocketConnection) connection).getPageKey(); + PageKey pageKey = ((WebSocketConnection) connection).getPageKey(); Collection registeredObservables = getRegisteredObservables(connection); if (registeredObservables != null) { Set observables = new HashSet<>(); @@ -247,7 +274,7 @@ public class DefaultWebSocketService implements WebSocketService, SessionListene } } if (!observables.isEmpty()) - notifyObservables(connection, observables); + notifyObservablesChange(connection, observables); } } diff --git a/server-core/src/main/java/io/onedev/server/web/websocket/ObservablesChanged.java b/server-core/src/main/java/io/onedev/server/web/websocket/ObservablesChanged.java new file mode 100644 index 0000000000..ecac98dd11 --- /dev/null +++ b/server-core/src/main/java/io/onedev/server/web/websocket/ObservablesChanged.java @@ -0,0 +1,35 @@ +package io.onedev.server.web.websocket; + +import java.util.Collection; +import java.util.Set; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; + +public class ObservablesChanged implements IWebSocketPushMessage { + + private final Set observables; + + public ObservablesChanged(Set observables) { + this.observables = observables; + } + + public Collection getObservables() { + return observables; + } + + @Override + public boolean equals(Object o) { + if (o instanceof ObservablesChanged observablesChanged) + return new EqualsBuilder().append(observables, observablesChanged.observables).isEquals(); + else + return false; + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37).append(observables).toHashCode(); + } + +} \ No newline at end of file diff --git a/server-core/src/main/java/io/onedev/server/web/websocket/WebSocketConnection.java b/server-core/src/main/java/io/onedev/server/web/websocket/WebSocketConnection.java index 5010035278..4250fe0df4 100644 --- a/server-core/src/main/java/io/onedev/server/web/websocket/WebSocketConnection.java +++ b/server-core/src/main/java/io/onedev/server/web/websocket/WebSocketConnection.java @@ -18,48 +18,63 @@ package io.onedev.server.web.websocket; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.LinkedHashSet; +import java.util.Set; -import org.apache.wicket.protocol.ws.api.AbstractWebSocketConnection; +import org.apache.shiro.subject.Subject; +import org.apache.shiro.util.ThreadContext; import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor; import org.apache.wicket.protocol.ws.api.IWebSocketConnection; -import org.apache.wicket.util.lang.Args; +import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; import org.eclipse.jetty.websocket.api.Session; +import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.onedev.server.OneDev; +import io.onedev.server.persistence.SessionService; /** * A wrapper around Jetty9's native WebSocketConnection. * * @since 6.2 */ -public class WebSocketConnection extends AbstractWebSocketConnection -{ +public class WebSocketConnection implements IWebSocketConnection { + + private final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class); + + private final AbstractWebSocketProcessor webSocketProcessor; + private final Session session; private final PageKey pageKey; + + private final Subject subject; + + private final Set messageQueue = new LinkedHashSet<>(); + + private boolean working; - /** - * Constructor. - * - * @param session - * the jetty websocket connection - */ - public WebSocketConnection(Session session, AbstractWebSocketProcessor webSocketProcessor, PageKey pageKey) + public WebSocketConnection(Session session, AbstractWebSocketProcessor webSocketProcessor, PageKey pageKey, Subject subject) { - super(webSocketProcessor); - this.session = Args.notNull(session, "connection"); + this.webSocketProcessor = webSocketProcessor; + this.session = session; this.pageKey = pageKey; + this.subject = subject; } @Override - public boolean isOpen() - { + public boolean isOpen() { return session.isOpen(); } + public Session getSession() { + return session; + } + @Override - public void close(int code, String reason) - { - if (isOpen()) - { + public void close(int code, String reason) { + if (isOpen()) { session.close(code, reason); } } @@ -69,8 +84,7 @@ public class WebSocketConnection extends AbstractWebSocketConnection } @Override - public IWebSocketConnection sendMessage(String message) throws IOException - { + public IWebSocketConnection sendMessage(String message) throws IOException { checkClosed(); session.getRemote().sendStringByFuture(message); @@ -78,9 +92,7 @@ public class WebSocketConnection extends AbstractWebSocketConnection } @Override - public IWebSocketConnection sendMessage(byte[] message, int offset, int length) - throws IOException - { + public IWebSocketConnection sendMessage(byte[] message, int offset, int length) throws IOException { checkClosed(); ByteBuffer buf = ByteBuffer.wrap(message, offset, length); @@ -88,11 +100,66 @@ public class WebSocketConnection extends AbstractWebSocketConnection return this; } - private void checkClosed() - { - if (!isOpen()) - { + private void checkClosed() { + if (!isOpen()) { throw new IllegalStateException("The connection is closed."); } } + + @Override + public void sendMessage(IWebSocketPushMessage message) { + webSocketProcessor.broadcastMessage(message); + } + + public synchronized boolean queueMessage(IWebSocketPushMessage message) { + return messageQueue.add(message); + } + + public synchronized void checkMessageQueue() { + if (!messageQueue.isEmpty() && !working) { + ThreadContext.bind(subject); + working = true; + OneDev.getInstance(SessionService.class).runAsync(new Runnable() { + + @Nullable + private IWebSocketPushMessage getNextMessage() { + synchronized (WebSocketConnection.this) { + var it = messageQueue.iterator(); + if (it.hasNext()) { + var message = it.next(); + it.remove(); + return message; + } else { + return null; + } + } + } + + @Override + public void run() { + try { + while (true) { + IWebSocketPushMessage message = getNextMessage(); + if (message != null) { + webSocketProcessor.broadcastMessage(message); + } else { + break; + } + } + } catch (Throwable e) { + logger.error("Error processing websocket message", e); + try { + sendMessage(WebSocketMessages.ERROR_MESSAGE); + } catch (Throwable e2) { + } + } finally { + synchronized (WebSocketConnection.this) { + working = false; + } + } + } + }); + } + } + } diff --git a/server-core/src/main/java/io/onedev/server/web/websocket/WebSocketMessages.java b/server-core/src/main/java/io/onedev/server/web/websocket/WebSocketMessages.java index 751ddcf27e..c142d421d3 100644 --- a/server-core/src/main/java/io/onedev/server/web/websocket/WebSocketMessages.java +++ b/server-core/src/main/java/io/onedev/server/web/websocket/WebSocketMessages.java @@ -3,9 +3,9 @@ package io.onedev.server.web.websocket; public interface WebSocketMessages { static final String ERROR_MESSAGE = "ErrorMessage"; - - static final String OBSERVABLE_CHANGED = "ObservableChanged"; - + static final String KEEP_ALIVE = "KeepAlive"; + static final String TEST_CONNECTION = "TestConnection"; + } diff --git a/server-core/src/main/java/io/onedev/server/web/websocket/WebSocketProcessor.java b/server-core/src/main/java/io/onedev/server/web/websocket/WebSocketProcessor.java index 7f92d41ac0..7019826f6b 100644 --- a/server-core/src/main/java/io/onedev/server/web/websocket/WebSocketProcessor.java +++ b/server-core/src/main/java/io/onedev/server/web/websocket/WebSocketProcessor.java @@ -16,10 +16,7 @@ */ package io.onedev.server.web.websocket; -import java.io.IOException; - import org.apache.shiro.subject.Subject; -import org.apache.shiro.util.ThreadContext; import org.apache.wicket.protocol.http.WebApplication; import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor; import org.eclipse.jetty.websocket.api.Session; @@ -29,11 +26,6 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.onedev.commons.loader.AppLoader; -import io.onedev.commons.utils.ExceptionUtils; -import io.onedev.server.OneDev; -import io.onedev.server.persistence.SessionService; - /** * An {@link org.apache.wicket.protocol.ws.api.IWebSocketProcessor processor} that integrates with * Jetty 9.x {@link Session web socket} implementation. @@ -65,75 +57,36 @@ public class WebSocketProcessor extends AbstractWebSocketProcessor implements We @Override public void onWebSocketConnect(final Session session) { - run(new Runnable() { - - @Override - public void run() { - PageKey pageKey = new PageKey(getSessionId(), getRegistryKey()); - WebSocketConnection connection = new WebSocketConnection(session, WebSocketProcessor.this, pageKey); - onConnect(connection); - } - - }); + PageKey pageKey = new PageKey(getSessionId(), getRegistryKey()); + Subject subject = (Subject) request.getHttpServletRequest().getAttribute(WebSocketFilter.SHIRO_SUBJECT); + WebSocketConnection connection = new WebSocketConnection(session, WebSocketProcessor.this, pageKey, subject); + onConnect(connection); } - - private void run(Runnable runnable) { - if (OneDev.getInstance().isReady()) { - SessionService sessionService = AppLoader.getInstance(SessionService.class); - Subject subject = (Subject) request.getHttpServletRequest() - .getAttribute(WebSocketFilter.SHIRO_SUBJECT); - ThreadContext.bind(subject); - sessionService.run(runnable); - } - } - + @Override public void onWebSocketText(final String message) { - run(new Runnable() { - - @Override - public void run() { - onMessage(message); - } - - }); + if (!message.equals(WebSocketMessages.TEST_CONNECTION)) + onMessage(message); } @Override public void onWebSocketBinary(final byte[] payload, final int offset, final int len) { - run(new Runnable() { - - @Override - public void run() { - onMessage(payload, offset, len); - } - - }); + onMessage(payload, offset, len); } @Override public void onWebSocketClose(final int statusCode, final String reason) { - run(new Runnable() { - - @Override - public void run() { - onClose(statusCode, reason); - } - - }); + onClose(statusCode, reason); } @Override public void onWebSocketError(Throwable throwable) { - IOException ioException = ExceptionUtils.find(throwable, IOException.class); - if (ioException != null && "Broken pipe".equals(ioException.getMessage())) - logger.debug("WebSocket closed", throwable); - else - logger.error("An error occurred when using WebSocket.", throwable); + logger.error("An error occurred when using WebSocket", throwable); } @Override public void onOpen(Object connection) { onWebSocketConnect((Session)connection); } + } diff --git a/server-ee b/server-ee index c61b935709..af13831727 160000 --- a/server-ee +++ b/server-ee @@ -1 +1 @@ -Subproject commit c61b935709e58a4b17d83143afee2319c052d7fc +Subproject commit af138317275f62237fceea4958145fcbbd48610f