gitpod/components/gitpod-protocol/go/reconnecting-ws.go
2021-03-12 15:40:12 +01:00

157 lines
4.0 KiB
Go

// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.
package protocol
import (
"errors"
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/gitpod-io/gitpod/common-go/log"
)
// The ReconnectingWebsocket represents a Reconnecting WebSocket connection.
type ReconnectingWebsocket struct {
url string
reqHeader http.Header
handshakeTimeout time.Duration
minReconnectionDelay time.Duration
maxReconnectionDelay time.Duration
reconnectionDelayGrowFactor float64
closedCh chan struct{}
connCh chan chan *websocket.Conn
errCh chan error
}
// NewReconnectingWebsocket creates a new instance of ReconnectingWebsocket
func NewReconnectingWebsocket(url string, reqHeader http.Header) *ReconnectingWebsocket {
return &ReconnectingWebsocket{
url: url,
reqHeader: reqHeader,
minReconnectionDelay: 2 * time.Second,
maxReconnectionDelay: 30 * time.Second,
reconnectionDelayGrowFactor: 1.5,
handshakeTimeout: 2 * time.Second,
connCh: make(chan chan *websocket.Conn),
closedCh: make(chan struct{}),
errCh: make(chan error),
}
}
// Close closes the underlying webscoket connection.
func (rc *ReconnectingWebsocket) Close() error {
close(rc.closedCh)
return nil
}
// WriteObject writes the JSON encoding of v as a message.
// See the documentation for encoding/json Marshal for details about the conversion of Go values to JSON.
func (rc *ReconnectingWebsocket) WriteObject(v interface{}) error {
for {
connCh := make(chan *websocket.Conn, 1)
select {
case <-rc.closedCh:
return errors.New("closed")
case rc.connCh <- connCh:
}
conn := <-connCh
err := conn.WriteJSON(v)
if err == nil {
return nil
}
if !websocket.IsUnexpectedCloseError(err) {
return err
}
select {
case <-rc.closedCh:
return errors.New("closed")
case rc.errCh <- err:
}
}
}
// ReadObject reads the next JSON-encoded message from the connection and stores it in the value pointed to by v.
// See the documentation for the encoding/json Unmarshal function for details about the conversion of JSON to a Go value.
func (rc *ReconnectingWebsocket) ReadObject(v interface{}) error {
for {
connCh := make(chan *websocket.Conn, 1)
select {
case <-rc.closedCh:
return errors.New("closed")
case rc.connCh <- connCh:
}
conn := <-connCh
err := conn.ReadJSON(v)
if err == nil {
return nil
}
if !websocket.IsUnexpectedCloseError(err) {
return err
}
select {
case <-rc.closedCh:
return errors.New("closed")
case rc.errCh <- err:
}
}
}
// Dial creates a new client connection.
func (rc *ReconnectingWebsocket) Dial() {
var conn *websocket.Conn
defer func() {
if conn == nil {
return
}
log.WithField("url", rc.url).Warn("connection is permanently closed")
conn.Close()
}()
conn = rc.connect()
for {
select {
case <-rc.closedCh:
return
case connCh := <-rc.connCh:
connCh <- conn
case err := <-rc.errCh:
log.WithError(err).WithField("url", rc.url).Warn("connection has been closed, reconnecting...")
conn.Close()
time.Sleep(1 * time.Second)
conn = rc.connect()
}
}
}
func (rc *ReconnectingWebsocket) connect() *websocket.Conn {
delay := rc.minReconnectionDelay
for {
dialer := websocket.Dialer{HandshakeTimeout: rc.handshakeTimeout}
conn, _, err := dialer.Dial(rc.url, rc.reqHeader)
if err == nil {
log.WithField("url", rc.url).Info("connection was successfully established")
return conn
}
log.WithError(err).WithField("url", rc.url).Errorf("failed to connect, trying again in %d seconds...", uint32(delay.Seconds()))
select {
case <-rc.closedCh:
return nil
case <-time.After(delay):
delay = time.Duration(float64(delay) * rc.reconnectionDelayGrowFactor)
if delay > rc.maxReconnectionDelay {
delay = rc.maxReconnectionDelay
}
}
}
}