This commit is contained in:
Sense T 2022-09-09 11:38:41 +00:00
parent 3bf499a5dc
commit ed15f041ce
2 changed files with 24 additions and 5 deletions

View File

@ -1,5 +1,10 @@
package tunnel
import (
"io"
"sync"
)
type Connection struct {
RX <-chan *DataFrame
TX chan<- *DataFrame
@ -16,7 +21,14 @@ func (c *Connection) Read(p []byte) (int, error) {
}
func (c *Connection) Write(p []byte) (int, error) {
defer recover()
var lock sync.Mutex
lock.Lock()
defer lock.Unlock()
if c.closed {
return 0, io.ErrClosedPipe
}
df := &DataFrame{
ID: c.ID,
Type: TypeNormal,
@ -30,6 +42,7 @@ func (c *Connection) Write(p []byte) (int, error) {
}
func (c *Connection) Close() error {
var lock sync.Mutex
defer close(c.TX)
if !c.closed {
c.TX <- &DataFrame{
@ -37,6 +50,8 @@ func (c *Connection) Close() error {
Type: TypeClosed,
}
}
lock.Lock()
c.closed = true
lock.Unlock()
return nil
}

View File

@ -29,6 +29,7 @@ func NewManager(tun io.ReadWriter) *Manager {
}
func (m *Manager) Run() {
onReceiveQueue := make(chan bool)
for {
select {
case <-m.closed:
@ -40,15 +41,18 @@ func (m *Manager) Run() {
case connection := <-m.delConnection:
connection.Close()
delete(m.Connections, connection.ID)
default:
go m.onReceive()
case onReceiveQueue <- true:
go m.onReceive(onReceiveQueue)
}
}
}
func (m *Manager) onReceive() {
func (m *Manager) onReceive(ch chan bool) {
<-ch
df := new(DataFrame)
df.Decode(m.Tunnel)
if err := df.Decode(m.Tunnel); err != nil {
return
}
switch df.Type {
case TypeRequest:
m.accept <- df