package tunnel import ( "io" ) type Manager struct { Tunnel io.ReadWriter Connections map[ID]*Connection incoming chan *DataFrame outgoing chan *DataFrame accept chan *DataFrame closed chan bool newConnection chan *Connection delConnection chan *Connection logger Logger } func NewManager(tun io.ReadWriter) *Manager { return &Manager{ Tunnel: tun, Connections: make(map[ID]*Connection), incoming: make(chan *DataFrame, 1024), outgoing: make(chan *DataFrame, 1024), accept: make(chan *DataFrame, 1024), newConnection: make(chan *Connection), delConnection: make(chan *Connection), closed: make(chan bool), logger: DefaultLogger, } } func (m *Manager) SetLogger(logger Logger) { m.logger = logger } func (m *Manager) Run() { m.logger.Debug("manager run") onReceiveQueue := make(chan bool) for { select { case <-m.closed: return case df := <-m.incoming: m.logger.Trace("dataframe->tunnel: ", df) go df.Encode(m.Tunnel) case connection := <-m.newConnection: m.Connections[connection.ID] = connection m.logger.Tracef("connection '%s' registered", connection.ID) case connection := <-m.delConnection: if !connection.closed { connection.Close() delete(m.Connections, connection.ID) m.logger.Tracef("connection '%s' unregistered", connection.ID) } case onReceiveQueue <- true: go m.onReceive(onReceiveQueue) } } } func (m *Manager) onReceive(ch chan bool) { <-ch df := new(DataFrame) if err := df.Decode(m.Tunnel); err != nil { return } m.logger.Trace("dataframe<-tunnel: ", df) switch df.Type { case TypeRequest: m.accept <- df case TypeClosed: connection, ok := m.Connections[df.ID] if ok { m.delConnection <- connection } case TypeNormal: connection, ok := m.Connections[df.ID] if ok { connection.RX = m.outgoing m.outgoing <- df } } } func (m *Manager) Connect() (*Connection, error) { df := &DataFrame{ ID: NewID(), Type: TypeRequest, } connection := &Connection{ ID: df.ID, TX: m.incoming, logger: DefaultLogger, } m.newConnection <- connection m.incoming <- df df = <-connection.RX if df.Type != TypeConnected { return nil, WrongDataFrameTypeError{ShouldBe: TypeConnected} } return connection, nil } func (m *Manager) Accept() (*Connection, error) { df := <-m.accept if df.Type != TypeRequest { return nil, WrongDataFrameTypeError{ShouldBe: TypeRequest} } connection := &Connection{ ID: df.ID, TX: m.incoming, logger: DefaultLogger, } m.newConnection <- connection df.Type = TypeConnected m.incoming <- df return connection, nil } func (m *Manager) Close() error { defer close(m.closed) defer close(m.accept) defer close(m.incoming) defer close(m.outgoing) for _, connection := range m.Connections { connection.Close() } m.closed <- true return nil }