135 lines
2.8 KiB
Go
135 lines
2.8 KiB
Go
package tunnel
|
|
|
|
import (
|
|
"io"
|
|
)
|
|
|
|
type Manager struct {
|
|
Tunnel io.ReadWriter
|
|
Connections map[ID]*Connection
|
|
incoming chan *DataFrame
|
|
outgoing chan *DataFrame
|
|
accept chan *DataFrame
|
|
closed chan bool
|
|
newConnection chan *Connection
|
|
delConnection chan *Connection
|
|
logger Logger
|
|
}
|
|
|
|
func NewManager(tun io.ReadWriter) *Manager {
|
|
return &Manager{
|
|
Tunnel: tun,
|
|
Connections: make(map[ID]*Connection),
|
|
incoming: make(chan *DataFrame, 1024),
|
|
outgoing: make(chan *DataFrame, 1024),
|
|
accept: make(chan *DataFrame, 1024),
|
|
newConnection: make(chan *Connection),
|
|
delConnection: make(chan *Connection),
|
|
logger: DefaultLogger,
|
|
}
|
|
}
|
|
|
|
func (m *Manager) SetLogger(logger Logger) {
|
|
m.logger = logger
|
|
}
|
|
|
|
func (m *Manager) Run() {
|
|
m.logger.Debug("manager run")
|
|
onReceiveQueue := make(chan bool)
|
|
for {
|
|
select {
|
|
case <-m.closed:
|
|
return
|
|
case df := <-m.incoming:
|
|
m.logger.Trace("dataframe->tunnel: ", df)
|
|
go df.Encode(m.Tunnel)
|
|
case connection := <-m.newConnection:
|
|
m.Connections[connection.ID] = connection
|
|
m.logger.Tracef("connection '%s' registered", connection.ID)
|
|
case connection := <-m.delConnection:
|
|
if !connection.closed {
|
|
connection.Close()
|
|
delete(m.Connections, connection.ID)
|
|
m.logger.Tracef("connection '%s' unregistered", connection.ID)
|
|
}
|
|
case onReceiveQueue <- true:
|
|
go m.onReceive(onReceiveQueue)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) onReceive(ch chan bool) {
|
|
<-ch
|
|
df := new(DataFrame)
|
|
if err := df.Decode(m.Tunnel); err != nil {
|
|
return
|
|
}
|
|
m.logger.Trace("dataframe<-tunnel: ", df)
|
|
|
|
switch df.Type {
|
|
case TypeRequest:
|
|
m.accept <- df
|
|
case TypeClosed:
|
|
connection, ok := m.Connections[df.ID]
|
|
if ok {
|
|
m.delConnection <- connection
|
|
}
|
|
case TypeNormal:
|
|
connection, ok := m.Connections[df.ID]
|
|
if ok {
|
|
connection.RX = m.outgoing
|
|
m.outgoing <- df
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) Connect() (*Connection, error) {
|
|
df := &DataFrame{
|
|
ID: NewID(),
|
|
Type: TypeRequest,
|
|
}
|
|
connection := &Connection{
|
|
ID: df.ID,
|
|
TX: m.incoming,
|
|
logger: DefaultLogger,
|
|
}
|
|
m.newConnection <- connection
|
|
m.incoming <- df
|
|
|
|
df = <-connection.RX
|
|
if df.Type != TypeConnected {
|
|
return nil, WrongDataFrameTypeError{ShouldBe: TypeConnected}
|
|
}
|
|
return connection, nil
|
|
}
|
|
|
|
func (m *Manager) Accept() (*Connection, error) {
|
|
df := <-m.accept
|
|
if df.Type != TypeRequest {
|
|
return nil, WrongDataFrameTypeError{ShouldBe: TypeRequest}
|
|
}
|
|
connection := &Connection{
|
|
ID: df.ID,
|
|
TX: m.incoming,
|
|
logger: DefaultLogger,
|
|
}
|
|
m.newConnection <- connection
|
|
|
|
df.Type = TypeConnected
|
|
m.incoming <- df
|
|
|
|
return connection, nil
|
|
}
|
|
|
|
func (m *Manager) Close() error {
|
|
defer close(m.closed)
|
|
defer close(m.accept)
|
|
defer close(m.incoming)
|
|
defer close(m.outgoing)
|
|
for _, connection := range m.Connections {
|
|
connection.Close()
|
|
}
|
|
m.closed <- true
|
|
return nil
|
|
}
|