From 708732a37b608691604af7e035f235c018aeff61 Mon Sep 17 00:00:00 2001 From: Sense T Date: Sun, 2 Oct 2022 11:29:57 +0000 Subject: [PATCH] some fix --- cmd/config/config.go | 2 +- drivers/audio/wavheader.go | 8 +- lib/audiodriver/wavfifo.go | 98 ------------------------ lib/audiodriver/wavheader.go | 118 ----------------------------- lib/webrtcconnection/connection.go | 8 +- lib/webrtcconnection/options.go | 10 +-- servers/qemuserver/options.go | 54 +++++++++---- servers/qemuserver/server.go | 59 +++++++-------- 8 files changed, 76 insertions(+), 281 deletions(-) delete mode 100644 lib/audiodriver/wavfifo.go delete mode 100644 lib/audiodriver/wavheader.go diff --git a/cmd/config/config.go b/cmd/config/config.go index 531be6c..4fd3c31 100644 --- a/cmd/config/config.go +++ b/cmd/config/config.go @@ -47,7 +47,7 @@ func NewConfig() *Config { } func genconf(c *cli.Context) error { - f, err := os.OpenFile(c.String("config"), os.O_CREATE|os.O_WRONLY, 0644) + f, err := os.OpenFile(c.String("config"), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) if err != nil { return err } diff --git a/drivers/audio/wavheader.go b/drivers/audio/wavheader.go index 6e6f62d..a499abe 100644 --- a/drivers/audio/wavheader.go +++ b/drivers/audio/wavheader.go @@ -11,7 +11,7 @@ import ( // Skip riff header and `fmt ` just 16 bytes const ( - FmtHeaderOffset = 0x0c + FmtHeaderOffset = 0x0c // 12 FmtHeaderIDSize = 4 FmtHeaderChunkSizeSize = 4 FmtHeaderSizeDefault = 16 @@ -49,8 +49,8 @@ func DefaultHeader() *WavHeader { func (w *WavHeader) Parse(f io.Reader) error { // skip headers - var headers [FmtHeaderOffset]byte - if _, err := f.Read(headers[:]); err != nil { + var _headers [FmtHeaderOffset]byte + if _, err := f.Read(_headers[:]); err != nil { return err } @@ -58,7 +58,7 @@ func (w *WavHeader) Parse(f io.Reader) error { if _, err := f.Read(id[:]); err != nil { return err } - if bytes.Equal(id[:], []byte{'f', 'm', 't', '0'}[:]) { + if bytes.Equal(id[:], []byte("fmt ")) { return errors.New("bad header") } w.ID = id diff --git a/lib/audiodriver/wavfifo.go b/lib/audiodriver/wavfifo.go deleted file mode 100644 index 0d16517..0000000 --- a/lib/audiodriver/wavfifo.go +++ /dev/null @@ -1,98 +0,0 @@ -package audiodriver - -import ( - "context" - "encoding/binary" - "io" - "time" - - "github.com/pion/mediadevices/pkg/io/audio" - "github.com/pion/mediadevices/pkg/prop" - "github.com/pion/mediadevices/pkg/wave" - "github.com/sirupsen/logrus" -) - -const ( - DataChunkIDSize = 4 - DataChunkSizeSize = 4 -) - -// BufferSize for pcm bytes -const BufferSize = 512 -const BitsPerByte = 8 - -type WavFIFODriver struct { - PCM <-chan [BufferSize]byte - WaveHeader *WavHeader - closed <-chan struct{} - cancel func() -} - -func New() *WavFIFODriver { - return &WavFIFODriver{} -} - -func (w *WavFIFODriver) Open() error { - defer logrus.Debug("device opened") - ctx, cancel := context.WithCancel(context.Background()) - w.closed = ctx.Done() - w.cancel = cancel - - return nil -} - -func (w *WavFIFODriver) Close() error { - w.cancel() - return nil -} - -func (w *WavFIFODriver) Properties() []prop.Media { - logrus.Debugf("wave header: %v", w.WaveHeader) - return []prop.Media{ - { - Audio: prop.Audio{ - SampleRate: int(w.WaveHeader.SampleRate), - ChannelCount: int(w.WaveHeader.NumChannels), - SampleSize: int(w.WaveHeader.BitsPerSample), - Latency: w.WaveHeader.GetLatnecy(), - IsFloat: false, // just 8bit or 16bit with qemu - IsBigEndian: false, // qemu should be little endian - IsInterleaved: true, - }, - }, - } -} - -func (w *WavFIFODriver) AudioRecord(p prop.Media) (audio.Reader, error) { - logrus.Debug(p) - - reader := func() (wave.Audio, func(), error) { - a := wave.NewInt16Interleaved(wave.ChunkInfo{ - Len: BufferSize / int(p.SampleSize/BitsPerByte), - Channels: p.ChannelCount, - SamplingRate: p.SampleRate, - }) - select { - case <-w.closed: - return nil, func() {}, io.EOF - case pcmData, ok := <-w.PCM: - logrus.Debug("got %d bytes pcm data", len(pcmData)) - if !ok { - return nil, func() {}, io.ErrClosedPipe - } - copy(a.Data, bytesTo16BitSamples(pcmData[:])) - case <-time.After(p.Latency): - } - return a, func() {}, nil - } - return audio.ReaderFunc(reader), nil -} - -func bytesTo16BitSamples(b []byte) []int16 { - samples := make([]int16, 0) - for i := 0; i < len(b); i += 2 { - sample := binary.LittleEndian.Uint16(b[i : i+1]) - samples = append(samples, int16(sample)) - } - return samples -} diff --git a/lib/audiodriver/wavheader.go b/lib/audiodriver/wavheader.go deleted file mode 100644 index 2f0c2dd..0000000 --- a/lib/audiodriver/wavheader.go +++ /dev/null @@ -1,118 +0,0 @@ -package audiodriver - -import ( - "bytes" - "encoding/binary" - "encoding/json" - "errors" - "io" - "time" -) - -// Skip riff header and `fmt ` just 16 bytes -const ( - FmtHeaderOffset = 0x0c - FmtHeaderIDSize = 4 - FmtHeaderChunkSizeSize = 4 - FmtHeaderSizeDefault = 16 -) - -type WavHeader struct { - ID [4]byte - Size uint32 - AudioFormat uint16 - NumChannels uint16 - SampleRate uint32 - ByteRate uint32 - BlockAlign uint16 - BitsPerSample uint16 -} - -func NewHeader(f io.Reader) (*WavHeader, error) { - w := &WavHeader{} - if err := w.Parse(f); err != nil { - return nil, err - } - return w, nil -} - -func DefaultHeader() *WavHeader { - return &WavHeader{ - Size: uint32(FmtHeaderSizeDefault), - AudioFormat: 1, - NumChannels: 2, - SampleRate: 48000, // opus only support 48kHz - BlockAlign: 4, - BitsPerSample: 16, - } -} - -func (w *WavHeader) Parse(f io.Reader) error { - // skip headers - var headers [FmtHeaderOffset]byte - if _, err := f.Read(headers[:]); err != nil { - return err - } - - var id [4]byte - if _, err := f.Read(id[:]); err != nil { - return err - } - if bytes.Equal(id[:], []byte{'f', 'm', 't', '0'}[:]) { - return errors.New("bad header") - } - w.ID = id - - var size [4]byte - if _, err := f.Read(size[:]); err != nil { - return err - } - w.Size = binary.LittleEndian.Uint32(size[:]) - - var af [2]byte - if _, err := f.Read(af[:]); err != nil { - return err - } - w.AudioFormat = binary.LittleEndian.Uint16(af[:]) - - var nc [2]byte - if _, err := f.Read(nc[:]); err != nil { - return err - } - w.NumChannels = binary.LittleEndian.Uint16(nc[:]) - - var sr [4]byte - if _, err := f.Read(sr[:]); err != nil { - return err - } - w.SampleRate = binary.LittleEndian.Uint32(sr[:]) - - var br [4]byte - if _, err := f.Read(br[:]); err != nil { - return err - } - w.ByteRate = binary.LittleEndian.Uint32(br[:]) - - var ba [2]byte - if _, err := f.Read(ba[:]); err != nil { - return err - } - w.BlockAlign = binary.LittleEndian.Uint16(ba[:]) - - var bps [2]byte - if _, err := f.Read(bps[:]); err != nil { - return err - } - w.BitsPerSample = binary.LittleEndian.Uint16(bps[:]) - - return nil -} - -func (w *WavHeader) String() string { - b, _ := json.Marshal(w) - return string(b) -} - -func (w *WavHeader) GetLatnecy() time.Duration { - return time.Millisecond * time.Duration(BufferSize) / time.Duration(w.SampleRate) / time.Duration(w.NumChannels) -} diff --git a/lib/webrtcconnection/connection.go b/lib/webrtcconnection/connection.go index d2494ec..48db210 100644 --- a/lib/webrtcconnection/connection.go +++ b/lib/webrtcconnection/connection.go @@ -6,7 +6,6 @@ import ( "github.com/pion/mediadevices/pkg/codec/opus" "github.com/pion/mediadevices/pkg/codec/x264" "github.com/pion/mediadevices/pkg/driver" - "github.com/pion/mediadevices/pkg/prop" "github.com/pion/webrtc/v3" "github.com/sirupsen/logrus" ) @@ -24,7 +23,7 @@ func New(o *Options) (*Connection, error) { connection := &Connection{ option: o, } - codecSelector, err := setupCodec(o.Video.BPS) + codecSelector, err := setupCodec(o.VideoBPS) if err != nil { return nil, err } @@ -45,10 +44,7 @@ func New(o *Options) (*Connection, error) { } s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{ - Video: func(mtc *mediadevices.MediaTrackConstraints) { - mtc.Height = prop.Int(o.Video.Height) - mtc.Width = prop.Int(o.Video.Width) - }, + Video: func(mtc *mediadevices.MediaTrackConstraints) {}, Audio: func(mtc *mediadevices.MediaTrackConstraints) {}, Codec: codecSelector, }) diff --git a/lib/webrtcconnection/options.go b/lib/webrtcconnection/options.go index 38d41fd..d584926 100644 --- a/lib/webrtcconnection/options.go +++ b/lib/webrtcconnection/options.go @@ -2,11 +2,7 @@ package webrtcconnection type Options struct { STUNServers []string `yaml:"stun_servers"` - Video struct { - Height int `yaml:"height"` - Width int `yaml:"width"` - BPS int `yaml:"bps"` - } `yaml:"video"` + VideoBPS int `yaml:"video_bps"` } func ExampleOptions() *Options { @@ -15,9 +11,7 @@ func ExampleOptions() *Options { "stun:stun.l.google.com:19302", "stun:wetofu.me:3478", }, + VideoBPS: 500_000, } - options.Video.BPS = 500_000 - options.Video.Height = 768 - options.Video.Width = 1024 return options } diff --git a/servers/qemuserver/options.go b/servers/qemuserver/options.go index 7596494..007708e 100644 --- a/servers/qemuserver/options.go +++ b/servers/qemuserver/options.go @@ -1,36 +1,58 @@ package qemuserver import ( + "fmt" "net/url" "os" + "path" "syscall" "time" ) type Options struct { - QmpAddress string `yaml:"address"` - Timeout time.Duration `yaml:"timeout"` - Name string `yaml:"name"` - VNCAddress string `yaml:"vnc"` - AudioPipe string `yaml:"audio_pipe"` - AudioDevice string `yaml:"audio_device"` + QmpAddress string `yaml:"address"` + Timeout time.Duration `yaml:"timeout"` + Name string `yaml:"name"` + Audio AudioOptions `yaml:"audio_options"` + Video VideoOptions `yaml:"video_options"` +} + +type AudioOptions struct { + Device string `yaml:"device"` + BufferSize uint16 `yaml:"buffer_size"` // in bytes +} + +type VideoOptions struct { + Height int `yaml:"height"` + Width int `yaml:"width"` + FPS float32 `yaml:"fps"` } func ExampleOptions() *Options { return &Options{ QmpAddress: (&url.URL{ - Scheme: "unix", - Path: "/tmp/qemu.sock", + Scheme: "tcp", + Host: "localhost:4444", }).String(), - Timeout: time.Duration(60 * time.Second), - Name: "ace-qemu", - VNCAddress: "localhost:5900", - AudioPipe: "/tmp/audio", - AudioDevice: "snd0", + Timeout: time.Duration(30 * time.Second), + Name: "ace-qemu", + Audio: AudioOptions{ + Device: "snd0", + BufferSize: 2048, + }, + Video: VideoOptions{ + Height: 1024, + Width: 768, + FPS: 60, + }, } } -func (o *Options) MakeFIFO() error { - os.Remove(o.AudioPipe) - return syscall.Mkfifo(o.AudioPipe, 0600) +func (o *Options) MakeFIFO() (string, error) { + path := path.Join( + os.TempDir(), + fmt.Sprintf("%s-%s-audio", o.Name, o.Audio.Device), + ) + os.Remove(path) + return path, syscall.Mkfifo(path, 0600) } diff --git a/servers/qemuserver/server.go b/servers/qemuserver/server.go index f3c9219..d7608d2 100644 --- a/servers/qemuserver/server.go +++ b/servers/qemuserver/server.go @@ -9,7 +9,6 @@ import ( "git.sense-t.eu.org/ACE/ace/drivers/audio" "git.sense-t.eu.org/ACE/ace/drivers/video" - "git.sense-t.eu.org/ACE/ace/lib/audiodriver" "git.sense-t.eu.org/ACE/ace/lib/qemuconnection" "github.com/digitalocean/go-qemu/qemu" "github.com/digitalocean/go-qemu/qmp" @@ -36,15 +35,11 @@ type Server struct { var DefaultServer *Server func NewServer(o *Options) (*Server, error) { - if err := o.MakeFIFO(); err != nil { - return nil, err - } - server := &Server{ options: o, audioHeader: make(chan *audio.WavHeader, 1), pcm: make(chan []byte), - ppm: make(chan io.ReadCloser), + ppm: make(chan io.ReadCloser, 60), // to be configured } u, err := url.Parse(o.QmpAddress) @@ -77,7 +72,7 @@ func NewServer(o *Options) (*Server, error) { audio := &audio.PCMStreamDriver{ PCM: server.pcm, WaveHeader: waveHeader, - BufferSizeByBytes: 2048, // to be configured + BufferSizeByBytes: o.Audio.BufferSize, // to be configured } if err := driver.GetManager().Register( audio, @@ -91,9 +86,9 @@ func NewServer(o *Options) (*Server, error) { } video := &video.PPMStreamDriver{ - Height: 768, - Width: 1024, - FPS: 60, + Height: o.Video.Height, + Width: o.Video.Width, + FPS: o.Video.FPS, PPMImage: server.ppm, } if err := driver.GetManager().Register( @@ -112,9 +107,26 @@ func NewServer(o *Options) (*Server, error) { func (s *Server) Run() error { logrus.Debug("qemu server running") + path, err := s.options.MakeFIFO() + if err != nil { + return err + } go func() { - f, err := os.Open(s.options.AudioPipe) + logrus.Debug("screen capture start") + defer close(s.ppm) + for range time.Tick(time.Second / time.Duration(s.options.Video.FPS)) { // to be configured + ppm, err := s.qemu.ScreenDump() + if err != nil { + logrus.Error(err) + continue + } + s.ppm <- ppm + } + }() + + go func() { + f, err := os.Open(path) if err != nil { logrus.Fatal(err) } @@ -123,9 +135,9 @@ func (s *Server) Run() error { logrus.Debug("skip wave headers, to the PCM!") // skip to pcm data, for 44 bytes. - var _dataChunkHeader [audiodriver.FmtHeaderOffset + - audiodriver.FmtHeaderIDSize + audiodriver.FmtHeaderChunkSizeSize + waveHeaderSize + - audiodriver.DataChunkIDSize + audiodriver.DataChunkSizeSize]byte + var _dataChunkHeader [audio.FmtHeaderOffset + + audio.FmtHeaderIDSize + audio.FmtHeaderChunkSizeSize + waveHeaderSize + + audio.DataChunkIDSize + audio.DataChunkSizeSize]byte if _, err := f.Read(_dataChunkHeader[:]); err != nil { logrus.Fatal(err) } @@ -139,7 +151,7 @@ func (s *Server) Run() error { } select { case s.pcm <- b: - case <-time.After(waveHeader.GetLatnecy(2048)): + case <-time.After(waveHeader.GetLatnecy(s.options.Audio.BufferSize)): } } }() @@ -151,8 +163,8 @@ func (s *Server) Run() error { Args: map[string]string{ "command-line": fmt.Sprintf( "wavcapture %s %s %d %d %d", - s.options.AudioPipe, - s.options.AudioDevice, + path, + s.options.Audio.Device, waveHeader.SampleRate, waveHeader.BitsPerSample, waveHeader.NumChannels, @@ -164,19 +176,6 @@ func (s *Server) Run() error { logrus.Debug("audio capture set") }() - go func() { - logrus.Debug("screen capture start") - defer close(s.ppm) - for range time.Tick(time.Second / 60) { // to be configured - ppm, err := s.qemu.ScreenDump() - if err != nil { - logrus.Error(err) - continue - } - s.ppm <- ppm - } - }() - select {} }