diff --git a/cmd/server/server.go b/cmd/server/server.go index fc7adef..483ce4a 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -38,7 +38,8 @@ func (s *Server) Run() error { gin.SetMode(gin.ReleaseMode) } - if err := qemuserver.Setup(s.options.Qemu); err != nil { + qemu, err := qemuserver.NewServer(s.options.Qemu) + if err != nil { return err } @@ -46,6 +47,13 @@ func (s *Server) Run() error { if err != nil { return err } + webServer.RTCConnector.QEMU = qemu + + go func() { + if err := qemu.Run(); err != nil { + logrus.Fatal("cannot run qemuserver with error: ", err) + } + }() return webServer.Run() } diff --git a/lib/audiodriver/wavfifo.go b/lib/audiodriver/wavfifo.go index 245ff33..ffe9f07 100644 --- a/lib/audiodriver/wavfifo.go +++ b/lib/audiodriver/wavfifo.go @@ -3,10 +3,7 @@ package audiodriver import ( "context" "encoding/binary" - "errors" "io" - "os" - "time" "github.com/pion/mediadevices/pkg/io/audio" "github.com/pion/mediadevices/pkg/prop" @@ -24,46 +21,39 @@ const BufferSize = 512 const BitsPerByte = 8 type WavFIFODriver struct { - FIFOFile string - f *os.File - waveHeader *WavHeader + PCM <-chan [BufferSize]byte + WaveHeader <-chan *WavHeader closed <-chan struct{} cancel func() } -func New(fifoname string) *WavFIFODriver { - return &WavFIFODriver{ - FIFOFile: fifoname, - } +func New() *WavFIFODriver { + return &WavFIFODriver{} } func (w *WavFIFODriver) Open() error { + defer logrus.Debug("device opened") ctx, cancel := context.WithCancel(context.Background()) w.closed = ctx.Done() w.cancel = cancel - f, err := os.Open(w.FIFOFile) - if err != nil { - return err - } - - w.f = f - w.waveHeader = &WavHeader{} - return w.waveHeader.Parse(f) + return nil } func (w *WavFIFODriver) Close() error { defer w.cancel() - return w.f.Close() + return nil } func (w *WavFIFODriver) Properties() []prop.Media { + waveHeader := <-w.WaveHeader + logrus.Debugf("wave header: %v", waveHeader) return []prop.Media{ { Audio: prop.Audio{ - SampleRate: int(w.waveHeader.SampleRate), - ChannelCount: int(w.waveHeader.NumChannels), - Latency: time.Millisecond * time.Duration(BufferSize) / time.Duration(w.waveHeader.SampleRate) / time.Duration(w.waveHeader.NumChannels), + SampleRate: int(waveHeader.SampleRate), + ChannelCount: int(waveHeader.NumChannels), + Latency: waveHeader.GetLatnecy(), IsFloat: false, // just 8bit or 16bit with qemu IsBigEndian: false, // qemu should be little endian IsInterleaved: true, @@ -73,46 +63,18 @@ func (w *WavFIFODriver) Properties() []prop.Media { } func (w *WavFIFODriver) AudioRecord(p prop.Media) (audio.Reader, error) { - logrus.Debugf("wave header: %v", w.waveHeader) - offset := FmtHeaderOffset + FmtHeaderIDSize + FmtHeaderChunkSizeSize + int64(w.waveHeader.Size) + DataChunkIDSize + DataChunkSizeSize - if _, err := w.f.Seek( - offset, - io.SeekStart, - ); err != nil { - return nil, err - } - - var pcm <-chan [BufferSize]byte - go func() { - samples := make(chan [BufferSize]byte, BufferSize) - defer close(samples) - - pcm = samples - for { - var b [BufferSize]byte - if _, err := w.f.Read(b[:]); err != nil { - if errors.Is(err, io.EOF) { - return - } - logrus.Errorf("read audio stream error: %v", err) - continue - } - samples <- b - } - }() - reader := func() (wave.Audio, func(), error) { select { case <-w.closed: return nil, func() {}, io.EOF - case pcmData, ok := <-pcm: + case pcmData, ok := <-w.PCM: logrus.Debug("got %d bytes pcm data", len(pcmData)) if !ok { return nil, func() {}, io.ErrClosedPipe } a := wave.NewInt16Interleaved(wave.ChunkInfo{ - Len: BufferSize / int(w.waveHeader.BitsPerSample/BitsPerByte), + Len: BufferSize / int(p.SampleSize/BitsPerByte), Channels: p.ChannelCount, SamplingRate: p.SampleRate, }) diff --git a/lib/audiodriver/wavheader.go b/lib/audiodriver/wavheader.go index e3d6f9f..4e3613a 100644 --- a/lib/audiodriver/wavheader.go +++ b/lib/audiodriver/wavheader.go @@ -3,8 +3,10 @@ package audiodriver import ( "bytes" "encoding/binary" + "encoding/json" "errors" "io" + "time" ) // Skip riff header and `fmt ` just 16 bytes @@ -25,8 +27,18 @@ type WavHeader struct { BitsPerSample uint16 } -func (w *WavHeader) Parse(f io.ReadSeeker) error { - if _, err := f.Seek(FmtHeaderOffset, io.SeekStart); err != nil { +func NewHeader(f io.Reader) (*WavHeader, error) { + w := &WavHeader{} + if err := w.Parse(f); err != nil { + return nil, err + } + return w, nil +} + +func (w *WavHeader) Parse(f io.Reader) error { + // skip headers + var headers [FmtHeaderOffset]byte + if _, err := f.Read(headers[:]); err != nil { return err } @@ -76,10 +88,19 @@ func (w *WavHeader) Parse(f io.ReadSeeker) error { w.BlockAlign = binary.LittleEndian.Uint16(ba[:]) var bps [2]byte - if _, err := f.Read(ba[:]); err != nil { + if _, err := f.Read(bps[:]); err != nil { return err } w.BitsPerSample = binary.LittleEndian.Uint16(bps[:]) return nil } + +func (w *WavHeader) String() string { + b, _ := json.Marshal(w) + return string(b) +} + +func (w *WavHeader) GetLatnecy() time.Duration { + return time.Millisecond * time.Duration(BufferSize) / time.Duration(w.SampleRate) / time.Duration(w.NumChannels) +} diff --git a/lib/webrtcconnection/connection.go b/lib/webrtcconnection/connection.go index d9dc9ff..19a3adb 100644 --- a/lib/webrtcconnection/connection.go +++ b/lib/webrtcconnection/connection.go @@ -1,9 +1,11 @@ package webrtcconnection import ( + "git.sense-t.eu.org/ACE/ace/servers/qemuserver" "github.com/pion/mediadevices" "github.com/pion/mediadevices/pkg/codec/opus" "github.com/pion/mediadevices/pkg/codec/x264" + "github.com/pion/mediadevices/pkg/driver" "github.com/pion/webrtc/v3" "github.com/sirupsen/logrus" ) @@ -14,6 +16,7 @@ type Connection struct { option *Options api *webrtc.API stream mediadevices.MediaStream + QEMU *qemuserver.Server } func New(o *Options) (*Connection, error) { @@ -30,6 +33,17 @@ func New(o *Options) (*Connection, error) { connection.api = webrtc.NewAPI(webrtc.WithMediaEngine(me)) + logrus.Debug("list devices:") + logrus.Debug("------") + devices := driver.GetManager().Query(func(d driver.Driver) bool { return true }) + for _, device := range devices { + logrus.Debug(device.ID()) + logrus.Debug(device.Info()) + logrus.Debug(device.Properties()) + logrus.Debug(device.Status()) + logrus.Debug("------") + } + s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{ Video: func(mtc *mediadevices.MediaTrackConstraints) {}, Audio: func(mtc *mediadevices.MediaTrackConstraints) {}, @@ -44,7 +58,7 @@ func New(o *Options) (*Connection, error) { } func (c *Connection) Regist(offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { - logrus.Debug("received offer ", offer.Type.String()) + logrus.Debug("received offer ", offer) rtc, err := c.api.NewPeerConnection(webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ @@ -80,7 +94,7 @@ func (c *Connection) Regist(offer *webrtc.SessionDescription) (*webrtc.SessionDe } } - rtc.OnDataChannel(dataChannel) + rtc.OnDataChannel(c.dataChannel) if err := rtc.SetRemoteDescription(*offer); err != nil { return nil, err diff --git a/lib/webrtcconnection/datachannel.go b/lib/webrtcconnection/datachannel.go index cde1c31..de56c6c 100644 --- a/lib/webrtcconnection/datachannel.go +++ b/lib/webrtcconnection/datachannel.go @@ -4,15 +4,14 @@ import ( "encoding/json" "time" - "git.sense-t.eu.org/ACE/ace/servers/qemuserver" "github.com/pion/webrtc/v3" "github.com/sirupsen/logrus" ) -func dataChannel(d *webrtc.DataChannel) { +func (c *Connection) dataChannel(d *webrtc.DataChannel) { d.OnOpen(func() { for { - status := qemuserver.GetStatus().String() + status := c.QEMU.GetStatus().String() currentTime := time.Now().UnixMilli() b, err := json.Marshal(map[string]any{ @@ -40,7 +39,7 @@ func dataChannel(d *webrtc.DataChannel) { if !msg.IsString { return } - if err := qemuserver.SendEvent(msg.Data); err != nil { + if err := c.QEMU.SendEvent(msg.Data); err != nil { logrus.Errorf( "cannot parse message from '%s-%d' to qemu controll event: %v", d.Label(), *d.ID(), err, diff --git a/servers/qemuserver/server.go b/servers/qemuserver/server.go index 4bc0ac7..798b5e6 100644 --- a/servers/qemuserver/server.go +++ b/servers/qemuserver/server.go @@ -3,6 +3,8 @@ package qemuserver import ( "fmt" "net/url" + "os" + "time" "git.sense-t.eu.org/ACE/ace/lib/audiodriver" "git.sense-t.eu.org/ACE/ace/lib/qemuconnection" @@ -19,7 +21,9 @@ type Server struct { RX chan *qemuconnection.Event TX chan qemu.Status } - qemu *qemu.Domain + qemu *qemu.Domain + audioHeader chan *audiodriver.WavHeader + pcm chan [audiodriver.BufferSize]byte } var DefaultServer *Server @@ -30,7 +34,9 @@ func NewServer(o *Options) (*Server, error) { } server := &Server{ - options: o, + options: o, + audioHeader: make(chan *audiodriver.WavHeader, 1), + pcm: make(chan [audiodriver.BufferSize]byte), } server.QmpConnector.RX = make(chan *qemuconnection.Event) server.QmpConnector.TX = make(chan qemu.Status) @@ -62,8 +68,9 @@ func NewServer(o *Options) (*Server, error) { } server.qemu = qemu + audio := audiodriver.New() if err := driver.GetManager().Register( - audiodriver.New(o.AudioPipe), + audio, driver.Info{ Label: "audioFifo", DeviceType: driver.Microphone, @@ -72,6 +79,9 @@ func NewServer(o *Options) (*Server, error) { ); err != nil { return nil, err } + audio.PCM = server.pcm + audio.WaveHeader = server.audioHeader + if err := driver.GetManager().Register( vncdriver.NewVnc(o.VNCAddress), driver.Info{ @@ -91,7 +101,7 @@ func (s *Server) Run() error { defer logrus.Debug("qemu server exit") defer s.qemu.Close() - go s.startCapture() + s.startCapture() logrus.Debug("qemu capture start") for ev := range s.QmpConnector.RX { @@ -115,46 +125,71 @@ func (s *Server) Run() error { } func (s *Server) startCapture() { - if _, err := s.qemu.Run(qmp.Command{ - Execute: "human-monitor-command", - Args: map[string]string{ - "command-line": fmt.Sprintf( - "wavcapture %s %s", - s.options.AudioPipe, - s.options.AudioDevice, - ), - }, - }); err != nil { - logrus.Fatal("run audio command failed: ", err) - } - logrus.Debug("audio capture set") -} - -func Setup(o *Options) error { - DefaultServer, err := NewServer(o) - if err != nil { - return err - } go func() { - if err := DefaultServer.Run(); err != nil { - logrus.Fatal("cannot run qemuserver with error: ", err) + f, err := os.Open(s.options.AudioPipe) + if err != nil { + logrus.Fatal(err) + } + defer f.Close() + + logrus.Debug("start reading from fifo") + + waveHeader, err := audiodriver.NewHeader(f) + if err != nil { + logrus.Fatal(err) + } + logrus.Debug(waveHeader) + s.audioHeader <- waveHeader + close(s.audioHeader) // only once + + // skip riff data chunk ID and size, 8 bytes + var _dataChunkHeader [audiodriver.DataChunkIDSize + audiodriver.DataChunkSizeSize]byte + if _, err := f.Read(_dataChunkHeader[:]); err != nil { + logrus.Fatal(err) + } + + defer close(s.pcm) + for { + var b [audiodriver.BufferSize]byte + if _, err := f.Read(b[:]); err != nil { + logrus.Error(err) + } + select { + case s.pcm <- b: + case <-time.After(waveHeader.GetLatnecy()): + } } }() - return nil + + go func() { + if _, err := s.qemu.Run(qmp.Command{ + Execute: "human-monitor-command", + Args: map[string]string{ + "command-line": fmt.Sprintf( + "wavcapture %s %s", + s.options.AudioPipe, + s.options.AudioDevice, + ), + }, + }); err != nil { + logrus.Fatal("run audio command failed: ", err) + } + logrus.Debug("audio capture set") + }() } -func SendEvent(b []byte) error { +func (s *Server) SendEvent(b []byte) error { ev, err := qemuconnection.ParseEvent(b) if err != nil { return err } - DefaultServer.QmpConnector.RX <- ev + s.QmpConnector.RX <- ev return nil } -func GetStatus() qemu.Status { - DefaultServer.QmpConnector.RX <- &qemuconnection.Event{ +func (s *Server) GetStatus() qemu.Status { + s.QmpConnector.RX <- &qemuconnection.Event{ Type: qemuconnection.QueryStatusEvent, } - return <-DefaultServer.QmpConnector.TX + return <-s.QmpConnector.TX } diff --git a/servers/webserver/handlers.go b/servers/webserver/handlers.go index 298d883..9c78679 100644 --- a/servers/webserver/handlers.go +++ b/servers/webserver/handlers.go @@ -36,7 +36,7 @@ func (s *Server) exchangeSDP(c *gin.Context) { return } - answer, err := s.rtcConnector.Regist(offer) + answer, err := s.RTCConnector.Regist(offer) if err != nil { c.JSON(http.StatusInternalServerError, Response{ Succeed: false, diff --git a/servers/webserver/server.go b/servers/webserver/server.go index 4c6fd13..df085f5 100644 --- a/servers/webserver/server.go +++ b/servers/webserver/server.go @@ -9,7 +9,7 @@ import ( type Server struct { options *Options webServer *gin.Engine - rtcConnector *webrtcconnection.Connection + RTCConnector *webrtcconnection.Connection } func NewServer(o *Options) (*Server, error) { @@ -21,7 +21,7 @@ func NewServer(o *Options) (*Server, error) { s := &Server{ options: o, webServer: gin.New(), - rtcConnector: rtc, + RTCConnector: rtc, } return s, nil } diff --git a/web/src/components/AceScreen.vue b/web/src/components/AceScreen.vue index d51b91a..201a3e2 100644 --- a/web/src/components/AceScreen.vue +++ b/web/src/components/AceScreen.vue @@ -20,12 +20,10 @@ const controlEventTypes = { restart: 0, }; -const makeEvent = (evType, args) => { - return { - type: eventTypes[evType], - args: args, - }; -}; +const makeEvent = (evType, args) => ({ + type: eventTypes[evType], + args: args, +}); // eslint-disable-next-line const sendSpecialKey = (key) => { @@ -139,7 +137,7 @@ onMounted(() => { ajax .exchangeSDP(offer) .then((answer) => - pc.setRemoteDescription(new RTCSessionDescription(answer)) + pc.setRemoteDescription(new RTCSessionDescription(answer.data)) ); }); });