Skip to content

Commit

Permalink
feat(bridge): add WebSocket bridge (#263)
Browse files Browse the repository at this point in the history
* feat(bridge): add WebSocket bridge

* feat(bridge): send DataFrame back to WebSocket connection

* feat(bridge): broadcast data to multi connections in WebSocket bridge

* refactor: remove session in context

* set websocket conn payload type

* feat(websocket): broadcast offline message

* Revert "feat(websocket): broadcast offline message"

This reverts commit da76906.

Co-authored-by: venjiang <venjiang@gmail.com>
  • Loading branch information
xiaojian-hong and venjiang committed Dec 6, 2021
1 parent 4779f7f commit 6f527f9
Show file tree
Hide file tree
Showing 17 changed files with 507 additions and 43 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ example/same-stream-fn/stream-fn/fn1
example/same-stream-fn/stream-fn/fn2
example/same-stream-fn/zipper/zipper
example/multi-zipper/bin
example/websocket-bridge/client-1/client
example/websocket-bridge/client-2/client
example/websocket-bridge/zipper/zipper
# cli
cli/example/source/source
cli/example/stream-fn-db/stream-fn-db
Expand Down
18 changes: 18 additions & 0 deletions core/bridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package core

import "github.com/yomorun/yomo/core/frame"

// Bridge is an interface of bridge which connects the clients of different transport protocols (f.e. WebSocket) with zipper.
type Bridge interface {
// Name returns the name of bridge.
Name() string

// Addr returns the address of bridge.
Addr() string

// ListenAndServe starts a server with a given handler.
ListenAndServe(handler func(ctx *Context)) error

// Send the data to clients.
Send(f frame.Frame) error
}
1 change: 1 addition & 0 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func NewClient(appName string, connType ClientType, opts ...ClientOption) *Clien
return c
}

// Init the options.
func (c *Client) Init(opts ...ClientOption) error {
for _, o := range opts {
o(&c.opts)
Expand Down
52 changes: 36 additions & 16 deletions core/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@ package core

import (
"fmt"
"io"
"sync"

"github.com/lucas-clemente/quic-go"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/pkg/logger"
)

type connStream struct {
id string // connection id (remote addr)
stream *quic.Stream // quic stream
}

type app struct {
id string // app id
name string // app name
Expand All @@ -33,20 +28,33 @@ func (a *app) Name() string {

var _ Connector = &connector{}

// Connector is a interface to manage the connections and applications.
type Connector interface {
Add(connID string, stream *quic.Stream)
// Add a connection.
Add(connID string, stream io.ReadWriteCloser)
// Remove a connection.
Remove(connID string)
Get(connID string) *quic.Stream
// Get a connection by connection id.
Get(connID string) io.ReadWriteCloser
// ConnID gets the connection id by appID and mae.
ConnID(appID string, name string) (string, bool)
// Write a DataFrame from a connection to another one.
Write(f *frame.DataFrame, fromID string, toID string) error
GetSnapshot() map[string]*quic.Stream
// GetSnapshot gets the snapshot of all connections.
GetSnapshot() map[string]io.ReadWriteCloser

// App gets the app by connID.
App(connID string) (*app, bool)
// AppID gets the ID of app by connID.
AppID(connID string) (string, bool)
// AppName gets the name of app by connID.
AppName(connID string) (string, bool)
// LinkApp links the app and connection.
LinkApp(connID string, appID string, name string)
// UnlinkApp removes the app by connID.
UnlinkApp(connID string, appID string, name string)

// Clean the connector.
Clean()
}

Expand All @@ -62,26 +70,30 @@ func newConnector() Connector {
}
}

func (c *connector) Add(connID string, stream *quic.Stream) {
// Add a connection.
func (c *connector) Add(connID string, stream io.ReadWriteCloser) {
logger.Debugf("%sconnector add: connID=%s", ServerLogPrefix, connID)
c.conns.Store(connID, stream)
}

// Remove a connection.
func (c *connector) Remove(connID string) {
logger.Debugf("%sconnector remove: connID=%s", ServerLogPrefix, connID)
c.conns.Delete(connID)
// c.funcs.Delete(connID)
c.apps.Delete(connID)
}

func (c *connector) Get(connID string) *quic.Stream {
// Get a connection by connection id.
func (c *connector) Get(connID string) io.ReadWriteCloser {
logger.Debugf("%sconnector get connection: connID=%s", ServerLogPrefix, connID)
if stream, ok := c.conns.Load(connID); ok {
return stream.(*quic.Stream)
return stream.(io.ReadWriteCloser)
}
return nil
}

// App gets the app by connID.
func (c *connector) App(connID string) (*app, bool) {
if result, found := c.apps.Load(connID); found {
app, ok := result.(*app)
Expand All @@ -96,20 +108,23 @@ func (c *connector) App(connID string) (*app, bool) {
return nil, false
}

// AppID gets the ID of app by connID.
func (c *connector) AppID(connID string) (string, bool) {
if app, ok := c.App(connID); ok {
return app.id, true
}
return "", false
}

// AppName gets the name of app by connID.
func (c *connector) AppName(connID string) (string, bool) {
if app, ok := c.App(connID); ok {
return app.name, true
}
return "", false
}

// ConnID gets the connection id by appID and mae.
func (c *connector) ConnID(appID string, name string) (string, bool) {
var connID string
var ok bool
Expand All @@ -131,30 +146,34 @@ func (c *connector) ConnID(appID string, name string) (string, bool) {
return connID, true
}

// Write a DataFrame from a connection to another one.
func (c *connector) Write(f *frame.DataFrame, fromID string, toID string) error {
targetStream := c.Get(toID)
if targetStream == nil {
logger.Warnf("%swill write to: [%s] -> [%s], target stream is nil", ServerLogPrefix, fromID, toID)
return fmt.Errorf("target[%s] stream is nil", toID)
}
_, err := (*targetStream).Write(f.Encode())
_, err := targetStream.Write(f.Encode())
return err
}

func (c *connector) GetSnapshot() map[string]*quic.Stream {
result := make(map[string]*quic.Stream)
// GetSnapshot gets the snapshot of all connections.
func (c *connector) GetSnapshot() map[string]io.ReadWriteCloser {
result := make(map[string]io.ReadWriteCloser)
c.conns.Range(func(key interface{}, val interface{}) bool {
result[key.(string)] = val.(*quic.Stream)
result[key.(string)] = val.(io.ReadWriteCloser)
return true
})
return result
}

// LinkApp links the app and connection.
func (c *connector) LinkApp(connID string, appID string, name string) {
logger.Debugf("%sconnector link application: connID[%s] --> app[%s::%s]", ServerLogPrefix, connID, appID, name)
c.apps.Store(connID, newApp(appID, name))
}

// UnlinkApp removes the app by connID.
func (c *connector) UnlinkApp(connID string, appID string, name string) {
logger.Debugf("%sconnector unlink application: connID[%s] x-> app[%s::%s]", ServerLogPrefix, connID, appID, name)
c.apps.Delete(connID)
Expand All @@ -178,6 +197,7 @@ func (c *connector) UnlinkApp(connID string, appID string, name string) {
// return conns
// }

// Clean the connector.
func (c *connector) Clean() {
c.conns = sync.Map{}
c.apps = sync.Map{}
Expand Down

0 comments on commit 6f527f9

Please sign in to comment.