From ed15f041cea8811b1a50415cd7ac118849b9ca9b Mon Sep 17 00:00:00 2001 From: Sense T Date: Fri, 9 Sep 2022 11:38:41 +0000 Subject: [PATCH] done --- connection.go | 17 ++++++++++++++++- manager.go | 12 ++++++++---- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/connection.go b/connection.go index 15728cc..091f4d5 100644 --- a/connection.go +++ b/connection.go @@ -1,5 +1,10 @@ package tunnel +import ( + "io" + "sync" +) + type Connection struct { RX <-chan *DataFrame TX chan<- *DataFrame @@ -16,7 +21,14 @@ func (c *Connection) Read(p []byte) (int, error) { } func (c *Connection) Write(p []byte) (int, error) { - defer recover() + var lock sync.Mutex + lock.Lock() + defer lock.Unlock() + + if c.closed { + return 0, io.ErrClosedPipe + } + df := &DataFrame{ ID: c.ID, Type: TypeNormal, @@ -30,6 +42,7 @@ func (c *Connection) Write(p []byte) (int, error) { } func (c *Connection) Close() error { + var lock sync.Mutex defer close(c.TX) if !c.closed { c.TX <- &DataFrame{ @@ -37,6 +50,8 @@ func (c *Connection) Close() error { Type: TypeClosed, } } + lock.Lock() c.closed = true + lock.Unlock() return nil } diff --git a/manager.go b/manager.go index 993cac4..ba2fb0e 100644 --- a/manager.go +++ b/manager.go @@ -29,6 +29,7 @@ func NewManager(tun io.ReadWriter) *Manager { } func (m *Manager) Run() { + onReceiveQueue := make(chan bool) for { select { case <-m.closed: @@ -40,15 +41,18 @@ func (m *Manager) Run() { case connection := <-m.delConnection: connection.Close() delete(m.Connections, connection.ID) - default: - go m.onReceive() + case onReceiveQueue <- true: + go m.onReceive(onReceiveQueue) } } } -func (m *Manager) onReceive() { +func (m *Manager) onReceive(ch chan bool) { + <-ch df := new(DataFrame) - df.Decode(m.Tunnel) + if err := df.Decode(m.Tunnel); err != nil { + return + } switch df.Type { case TypeRequest: m.accept <- df