This repository has been archived on 2022-09-10. You can view files and clone it, but cannot push or open issues or pull requests.
tunnel/manager.go

136 lines
2.9 KiB
Go

package tunnel
import (
"io"
)
type Manager struct {
Tunnel io.ReadWriter
Connections map[ID]*Connection
incoming chan *DataFrame
outgoing chan *DataFrame
accept chan *DataFrame
closed chan bool
newConnection chan *Connection
delConnection chan *Connection
logger Logger
}
func NewManager(tun io.ReadWriter) *Manager {
return &Manager{
Tunnel: tun,
Connections: make(map[ID]*Connection),
incoming: make(chan *DataFrame, 1024),
outgoing: make(chan *DataFrame, 1024),
accept: make(chan *DataFrame, 1024),
newConnection: make(chan *Connection),
delConnection: make(chan *Connection),
closed: make(chan bool),
logger: DefaultLogger,
}
}
func (m *Manager) SetLogger(logger Logger) {
m.logger = logger
}
func (m *Manager) Run() {
m.logger.Debug("manager run")
onReceiveQueue := make(chan bool)
for {
select {
case <-m.closed:
return
case df := <-m.incoming:
m.logger.Trace("dataframe->tunnel: ", df)
go df.Encode(m.Tunnel)
case connection := <-m.newConnection:
m.Connections[connection.ID] = connection
m.logger.Tracef("connection '%s' registered", connection.ID)
case connection := <-m.delConnection:
if !connection.closed {
connection.Close()
delete(m.Connections, connection.ID)
m.logger.Tracef("connection '%s' unregistered", connection.ID)
}
case onReceiveQueue <- true:
go m.onReceive(onReceiveQueue)
}
}
}
func (m *Manager) onReceive(ch chan bool) {
<-ch
df := new(DataFrame)
if err := df.Decode(m.Tunnel); err != nil {
return
}
m.logger.Trace("dataframe<-tunnel: ", df)
switch df.Type {
case TypeRequest:
m.accept <- df
case TypeClosed:
connection, ok := m.Connections[df.ID]
if ok {
m.delConnection <- connection
}
case TypeNormal:
connection, ok := m.Connections[df.ID]
if ok {
connection.RX = m.outgoing
m.outgoing <- df
}
}
}
func (m *Manager) Connect() (*Connection, error) {
df := &DataFrame{
ID: NewID(),
Type: TypeRequest,
}
connection := &Connection{
ID: df.ID,
TX: m.incoming,
logger: DefaultLogger,
}
m.newConnection <- connection
m.incoming <- df
df = <-connection.RX
if df.Type != TypeConnected {
return nil, WrongDataFrameTypeError{ShouldBe: TypeConnected}
}
return connection, nil
}
func (m *Manager) Accept() (*Connection, error) {
df := <-m.accept
if df.Type != TypeRequest {
return nil, WrongDataFrameTypeError{ShouldBe: TypeRequest}
}
connection := &Connection{
ID: df.ID,
TX: m.incoming,
logger: DefaultLogger,
}
m.newConnection <- connection
df.Type = TypeConnected
m.incoming <- df
return connection, nil
}
func (m *Manager) Close() error {
defer close(m.closed)
defer close(m.accept)
defer close(m.incoming)
defer close(m.outgoing)
for _, connection := range m.Connections {
connection.Close()
}
m.closed <- true
return nil
}