ace/servers/qemuserver/server.go
2022-09-27 16:59:47 +08:00

174 lines
3.7 KiB
Go

package qemuserver
import (
"fmt"
"net/url"
"os"
"time"
"git.sense-t.eu.org/ACE/ace/lib/audiodriver"
"git.sense-t.eu.org/ACE/ace/lib/qemuconnection"
"github.com/digitalocean/go-qemu/qemu"
"github.com/digitalocean/go-qemu/qmp"
"github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/driver/vncdriver"
"github.com/sirupsen/logrus"
)
const waveHeaderSize = audiodriver.FmtHeaderSizeDefault
var waveHeader *audiodriver.WavHeader
func init() {
waveHeader = audiodriver.DefaultHeader()
}
type Server struct {
options *Options
qemu *qemu.Domain
audioHeader chan *audiodriver.WavHeader
pcm chan [audiodriver.BufferSize]byte
}
var DefaultServer *Server
func NewServer(o *Options) (*Server, error) {
if err := o.MakeFIFO(); err != nil {
return nil, err
}
server := &Server{
options: o,
audioHeader: make(chan *audiodriver.WavHeader, 1),
pcm: make(chan [audiodriver.BufferSize]byte),
}
u, err := url.Parse(o.QmpAddress)
if err != nil {
return nil, err
}
var address string
if u.Scheme == "unix" {
address = u.Path
} else {
address = u.Host
}
logrus.Debugf("trying to connect qmp with %s://%s", u.Scheme, address)
qmpConnection, err := qmp.NewSocketMonitor(u.Scheme, address, o.Timeout)
if err != nil {
return nil, err
}
if err := qmpConnection.Connect(); err != nil {
return nil, err
}
logrus.Debug("qmp connected")
qemu, err := qemu.NewDomain(qmpConnection, o.Name)
if err != nil {
return nil, err
}
server.qemu = qemu
audio := audiodriver.New()
if err := driver.GetManager().Register(
audio,
driver.Info{
Label: "audioFifo",
DeviceType: driver.Microphone,
Priority: driver.PriorityNormal,
},
); err != nil {
return nil, err
}
audio.PCM = server.pcm
audio.WaveHeader = waveHeader
if err := driver.GetManager().Register(
vncdriver.NewVnc(o.VNCAddress),
driver.Info{
Label: "vnc",
DeviceType: driver.Camera,
Priority: driver.PriorityNormal,
},
); err != nil {
return nil, err
}
return server, nil
}
func (s *Server) Run() error {
logrus.Debug("qemu server running")
f, err := os.Open(s.options.AudioPipe)
if err != nil {
logrus.Fatal(err)
}
defer f.Close()
logrus.Debug("start reading fifo")
go func() {
logrus.Debug("setting audio capture")
if _, err := s.qemu.Run(qmp.Command{
Execute: "human-monitor-command",
Args: map[string]string{
"command-line": fmt.Sprintf(
"wavcapture %s %s %d %d %d",
s.options.AudioPipe,
s.options.AudioDevice,
waveHeader.SampleRate,
waveHeader.BitsPerSample,
waveHeader.NumChannels,
),
},
}); err != nil {
logrus.Fatal("run audio command failed: ", err)
}
logrus.Debug("audio capture set")
}()
logrus.Debug("skip wave headers, to the PCM!")
// skip to pcm data, for 44 bytes.
var _dataChunkHeader [audiodriver.FmtHeaderOffset +
audiodriver.FmtHeaderIDSize + audiodriver.FmtHeaderChunkSizeSize + waveHeaderSize +
audiodriver.DataChunkIDSize + audiodriver.DataChunkSizeSize]byte
if _, err := f.Read(_dataChunkHeader[:]); err != nil {
logrus.Fatal(err)
}
defer close(s.pcm)
for {
var b [audiodriver.BufferSize]byte
if _, err := f.Read(b[:]); err != nil {
logrus.Error(err)
}
select {
case s.pcm <- b:
case <-time.After(waveHeader.GetLatnecy()):
}
}
}
func (s *Server) SendEvent(b []byte) error {
ev, err := qemuconnection.ParseEvent(b)
if err != nil {
return err
}
for _, cmd := range ev.ToQemuCommand() {
_, err := s.qemu.Run(cmd)
if err != nil {
return err
}
}
return nil
}
func (s *Server) GetStatus() qemu.Status {
status, err := s.qemu.Status()
if err != nil {
return qemu.StatusIOError
}
return status
}