package qemuserver import ( "fmt" "net/url" "os" "time" "git.sense-t.eu.org/ACE/ace/lib/audiodriver" "git.sense-t.eu.org/ACE/ace/lib/qemuconnection" "github.com/digitalocean/go-qemu/qemu" "github.com/digitalocean/go-qemu/qmp" "github.com/pion/mediadevices/pkg/driver" "github.com/pion/mediadevices/pkg/driver/vncdriver" "github.com/sirupsen/logrus" ) type Server struct { options *Options QmpConnector struct { RX chan *qemuconnection.Event TX chan qemu.Status } qemu *qemu.Domain audioHeader chan *audiodriver.WavHeader pcm chan [audiodriver.BufferSize]byte } var DefaultServer *Server func NewServer(o *Options) (*Server, error) { if err := o.MakeFIFO(); err != nil { return nil, err } server := &Server{ options: o, audioHeader: make(chan *audiodriver.WavHeader, 1), pcm: make(chan [audiodriver.BufferSize]byte), } server.QmpConnector.RX = make(chan *qemuconnection.Event) server.QmpConnector.TX = make(chan qemu.Status) u, err := url.Parse(o.QmpAddress) if err != nil { return nil, err } var address string if u.Scheme == "unix" { address = u.Path } else { address = u.Host } logrus.Debugf("trying to connect qmp with %s://%s", u.Scheme, address) qmpConnection, err := qmp.NewSocketMonitor(u.Scheme, address, o.Timeout) if err != nil { return nil, err } if err := qmpConnection.Connect(); err != nil { return nil, err } logrus.Debug("qmp connected") qemu, err := qemu.NewDomain(qmpConnection, o.Name) if err != nil { return nil, err } server.qemu = qemu audio := audiodriver.New() if err := driver.GetManager().Register( audio, driver.Info{ Label: "audioFifo", DeviceType: driver.Microphone, Priority: driver.PriorityNormal, }, ); err != nil { return nil, err } audio.PCM = server.pcm audio.WaveHeader = server.audioHeader if err := driver.GetManager().Register( vncdriver.NewVnc(o.VNCAddress), driver.Info{ Label: "vnc", DeviceType: driver.Camera, Priority: driver.PriorityNormal, }, ); err != nil { return nil, err } return server, nil } func (s *Server) Run() error { logrus.Debug("qemu server running") defer logrus.Debug("qemu server exit") defer s.qemu.Close() s.startCapture() logrus.Debug("qemu capture start") for ev := range s.QmpConnector.RX { if ev.Type == qemuconnection.QueryStatusEvent { status, err := s.qemu.Status() if err != nil { logrus.Error("get qemu status error: ", err) continue } s.QmpConnector.TX <- status continue } for _, cmd := range ev.ToQemuCommand() { _, err := s.qemu.Run(cmd) if err != nil { logrus.Error("run command error: ", err) } } } return nil } func (s *Server) startCapture() { go func() { f, err := os.Open(s.options.AudioPipe) if err != nil { logrus.Fatal(err) } defer f.Close() logrus.Debug("start reading from fifo") waveHeader, err := audiodriver.NewHeader(f) if err != nil { logrus.Fatal(err) } logrus.Debug(waveHeader) s.audioHeader <- waveHeader close(s.audioHeader) // only once // skip riff data chunk ID and size, 8 bytes var _dataChunkHeader [audiodriver.DataChunkIDSize + audiodriver.DataChunkSizeSize]byte if _, err := f.Read(_dataChunkHeader[:]); err != nil { logrus.Fatal(err) } defer close(s.pcm) for { var b [audiodriver.BufferSize]byte if _, err := f.Read(b[:]); err != nil { logrus.Error(err) } select { case s.pcm <- b: case <-time.After(waveHeader.GetLatnecy()): } } }() go func() { if _, err := s.qemu.Run(qmp.Command{ Execute: "human-monitor-command", Args: map[string]string{ "command-line": fmt.Sprintf( "wavcapture %s %s", s.options.AudioPipe, s.options.AudioDevice, ), }, }); err != nil { logrus.Fatal("run audio command failed: ", err) } logrus.Debug("audio capture set") }() } func (s *Server) SendEvent(b []byte) error { ev, err := qemuconnection.ParseEvent(b) if err != nil { return err } s.QmpConnector.RX <- ev return nil } func (s *Server) GetStatus() qemu.Status { s.QmpConnector.RX <- &qemuconnection.Event{ Type: qemuconnection.QueryStatusEvent, } return <-s.QmpConnector.TX }