This repository has been archived on 2022-09-10. You can view files and clone it, but cannot push or open issues or pull requests.
tunnel/manager.go

135 lines
2.8 KiB
Go
Raw Normal View History

2022-09-09 06:05:20 +00:00
package tunnel
import (
"io"
)
type Manager struct {
2022-09-09 06:31:47 +00:00
Tunnel io.ReadWriter
Connections map[ID]*Connection
incoming chan *DataFrame
outgoing chan *DataFrame
accept chan *DataFrame
closed chan bool
newConnection chan *Connection
delConnection chan *Connection
2022-09-09 13:16:36 +00:00
logger Logger
2022-09-09 06:05:20 +00:00
}
func NewManager(tun io.ReadWriter) *Manager {
return &Manager{
2022-09-09 06:31:47 +00:00
Tunnel: tun,
Connections: make(map[ID]*Connection),
incoming: make(chan *DataFrame, 1024),
outgoing: make(chan *DataFrame, 1024),
accept: make(chan *DataFrame, 1024),
newConnection: make(chan *Connection),
delConnection: make(chan *Connection),
2022-09-09 13:16:36 +00:00
logger: DefaultLogger,
2022-09-09 06:05:20 +00:00
}
}
2022-09-09 13:16:36 +00:00
func (m *Manager) SetLogger(logger Logger) {
m.logger = logger
}
2022-09-09 06:05:20 +00:00
func (m *Manager) Run() {
2022-09-09 13:31:49 +00:00
m.logger.Debug("manager run")
2022-09-09 11:38:41 +00:00
onReceiveQueue := make(chan bool)
2022-09-09 06:05:20 +00:00
for {
select {
case <-m.closed:
return
case df := <-m.incoming:
2022-09-09 13:31:49 +00:00
m.logger.Trace("dataframe->tunnel: ", df)
2022-09-09 06:05:20 +00:00
go df.Encode(m.Tunnel)
2022-09-09 06:31:47 +00:00
case connection := <-m.newConnection:
m.Connections[connection.ID] = connection
2022-09-09 13:31:49 +00:00
m.logger.Tracef("connection '%s' registered", connection.ID)
2022-09-09 06:31:47 +00:00
case connection := <-m.delConnection:
2022-09-10 02:19:17 +00:00
if !connection.closed {
connection.Close()
delete(m.Connections, connection.ID)
m.logger.Tracef("connection '%s' unregistered", connection.ID)
}
2022-09-09 11:38:41 +00:00
case onReceiveQueue <- true:
go m.onReceive(onReceiveQueue)
2022-09-09 06:05:20 +00:00
}
}
}
2022-09-09 11:38:41 +00:00
func (m *Manager) onReceive(ch chan bool) {
<-ch
2022-09-09 06:31:47 +00:00
df := new(DataFrame)
2022-09-09 11:38:41 +00:00
if err := df.Decode(m.Tunnel); err != nil {
return
}
2022-09-09 13:31:49 +00:00
m.logger.Trace("dataframe<-tunnel: ", df)
2022-09-09 06:31:47 +00:00
switch df.Type {
case TypeRequest:
m.accept <- df
case TypeClosed:
connection, ok := m.Connections[df.ID]
if ok {
m.delConnection <- connection
}
case TypeNormal:
connection, ok := m.Connections[df.ID]
if ok {
connection.RX = m.outgoing
m.outgoing <- df
}
}
}
func (m *Manager) Connect() (*Connection, error) {
df := &DataFrame{
ID: NewID(),
Type: TypeRequest,
}
connection := &Connection{
2022-09-09 13:16:36 +00:00
ID: df.ID,
TX: m.incoming,
logger: DefaultLogger,
2022-09-09 06:31:47 +00:00
}
m.newConnection <- connection
m.incoming <- df
df = <-connection.RX
if df.Type != TypeConnected {
2022-09-09 11:53:46 +00:00
return nil, WrongDataFrameTypeError{ShouldBe: TypeConnected}
2022-09-09 06:31:47 +00:00
}
return connection, nil
}
func (m *Manager) Accept() (*Connection, error) {
df := <-m.accept
if df.Type != TypeRequest {
2022-09-09 11:53:46 +00:00
return nil, WrongDataFrameTypeError{ShouldBe: TypeRequest}
2022-09-09 06:31:47 +00:00
}
connection := &Connection{
2022-09-09 13:16:36 +00:00
ID: df.ID,
TX: m.incoming,
logger: DefaultLogger,
2022-09-09 06:31:47 +00:00
}
m.newConnection <- connection
df.Type = TypeConnected
m.incoming <- df
return connection, nil
}
2022-09-09 06:05:20 +00:00
func (m *Manager) Close() error {
defer close(m.closed)
defer close(m.accept)
defer close(m.incoming)
defer close(m.outgoing)
for _, connection := range m.Connections {
connection.Close()
}
m.closed <- true
return nil
}