package qemuserver import ( "fmt" "io" "net/url" "os" "time" "git.sense-t.eu.org/ACE/ace/drivers/audio" "git.sense-t.eu.org/ACE/ace/drivers/video" "git.sense-t.eu.org/ACE/ace/lib/qemuconnection" "github.com/digitalocean/go-qemu/qemu" "github.com/digitalocean/go-qemu/qmp" "github.com/pion/mediadevices/pkg/driver" "github.com/sirupsen/logrus" ) const waveHeaderSize = audio.FmtHeaderSizeDefault var waveHeader *audio.WavHeader func init() { waveHeader = audio.DefaultHeader() } type Server struct { options *Options qemu *qemu.Domain audioHeader chan *audio.WavHeader pcm chan []byte ppm chan io.ReadCloser } var DefaultServer *Server func NewServer(o *Options) (*Server, error) { server := &Server{ options: o, audioHeader: make(chan *audio.WavHeader, 1), pcm: make(chan []byte), ppm: make(chan io.ReadCloser, 60), // to be configured } u, err := url.Parse(o.QmpAddress) if err != nil { return nil, err } var address string if u.Scheme == "unix" { address = u.Path } else { address = u.Host } logrus.Debugf("trying to connect qmp with %s://%s", u.Scheme, address) qmpConnection, err := qmp.NewSocketMonitor(u.Scheme, address, o.Timeout) if err != nil { return nil, err } if err := qmpConnection.Connect(); err != nil { return nil, err } logrus.Debug("qmp connected") qemu, err := qemu.NewDomain(qmpConnection, o.Name) if err != nil { return nil, err } server.qemu = qemu audio := &audio.PCMStreamDriver{ PCM: server.pcm, WaveHeader: waveHeader, BufferSizeByBytes: o.Audio.BufferSize, // to be configured } if err := driver.GetManager().Register( audio, driver.Info{ Label: "audioFifo", DeviceType: driver.Microphone, Priority: driver.PriorityNormal, }, ); err != nil { return nil, err } video := &video.PPMStreamDriver{ Height: o.Video.Height, Width: o.Video.Width, FPS: o.Video.FPS, PPMImage: server.ppm, } if err := driver.GetManager().Register( video, driver.Info{ Label: "vnc", DeviceType: driver.Camera, Priority: driver.PriorityNormal, }, ); err != nil { return nil, err } return server, nil } func (s *Server) Run() error { logrus.Debug("qemu server running") path, err := s.options.MakeFIFO() if err != nil { return err } go func() { logrus.Debug("screen capture start") defer close(s.ppm) for range time.Tick(time.Second / time.Duration(s.options.Video.FPS)) { // to be configured ppm, err := s.qemu.ScreenDump() if err != nil { logrus.Error(err) continue } s.ppm <- ppm } }() go func() { f, err := os.Open(path) if err != nil { logrus.Fatal(err) } defer f.Close() logrus.Debug("start reading fifo") logrus.Debug("skip wave headers, to the PCM!") // skip to pcm data, for 44 bytes. var _dataChunkHeader [audio.FmtHeaderOffset + audio.FmtHeaderIDSize + audio.FmtHeaderChunkSizeSize + waveHeaderSize + audio.DataChunkIDSize + audio.DataChunkSizeSize]byte if _, err := f.Read(_dataChunkHeader[:]); err != nil { logrus.Fatal(err) } logrus.Debug("start reading PCM") defer close(s.pcm) for { b := make([]byte, 2048) // to be configured if _, err := f.Read(b[:]); err != nil { logrus.Error(err) } select { case s.pcm <- b: case <-time.After(waveHeader.GetLatnecy(s.options.Audio.BufferSize)): } } }() go func() { logrus.Debug("setting audio capture") if _, err := s.qemu.Run(qmp.Command{ Execute: "human-monitor-command", Args: map[string]string{ "command-line": fmt.Sprintf( "wavcapture %s %s %d %d %d", path, s.options.Audio.Device, waveHeader.SampleRate, waveHeader.BitsPerSample, waveHeader.NumChannels, ), }, }); err != nil { logrus.Fatal("run audio command failed: ", err) } logrus.Debug("audio capture set") }() select {} } func (s *Server) SendEvent(b []byte) error { ev, err := qemuconnection.ParseEvent(b) if err != nil { return err } for _, cmd := range ev.ToQemuCommand() { _, err := s.qemu.Run(cmd) if err != nil { return err } } return nil } func (s *Server) GetStatus() qemu.Status { status, err := s.qemu.Status() if err != nil { return qemu.StatusIOError } return status }