smarter-device-manager/server.go

243 lines
6.0 KiB
Go

// Copyright (c) 2019, Arm Ltd
package main
import (
"fmt"
"net"
"os"
"path"
"strings"
"time"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
const (
envDisableHealthChecks = "DP_DISABLE_HEALTHCHECKS"
allHealthChecks = "xids"
)
// SmarterDevicePlugin implements the Kubernetes device plugin API
type SmarterDevicePlugin struct {
devs []*pluginapi.Device
socket string
deviceFile string
resourceName string
stop chan interface{}
health chan *pluginapi.Device
server *grpc.Server
}
// NewSmarterDevicePlugin returns an initialized SmarterDevicePlugin
func NewSmarterDevicePlugin(nDevices uint, deviceFilename string, resourceIdentification string, serverSock string) *SmarterDevicePlugin {
return &SmarterDevicePlugin{
devs: getDevices(nDevices),
socket: serverSock,
deviceFile: deviceFilename,
resourceName: resourceIdentification,
stop: make(chan interface{}),
health: make(chan *pluginapi.Device),
}
}
// dial establishes the gRPC communication with the registered device plugin.
func dial(unixSocketPath string, timeout time.Duration) (*grpc.ClientConn, error) {
c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithTimeout(timeout),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
if err != nil {
return nil, err
}
return c, nil
}
// Start the gRPC server of the device plugin
func (m *SmarterDevicePlugin) Start() error {
err := m.cleanup()
if err != nil {
return err
}
sock, err := net.Listen("unix", m.socket)
if err != nil {
return err
}
m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterDevicePluginServer(m.server, m)
go m.server.Serve(sock)
// Wait for server to start by launching a blocking connexion
conn, err := dial(m.socket, 60*time.Second)
if err != nil {
return err
}
conn.Close()
go m.healthcheck()
return nil
}
// Stop the gRPC server
func (m *SmarterDevicePlugin) Stop() error {
glog.V(0).Infof("Stopping server with socket ",m.socket)
if m.server == nil {
return nil
}
m.server.Stop()
m.server = nil
close(m.stop)
glog.V(0).Info("Server stopped with socket ",m.socket)
return m.cleanup()
}
// Register the device plugin for the given resourceName with Kubelet.
func (m *SmarterDevicePlugin) Register(kubeletEndpoint, resourceName string) error {
conn, err := dial(kubeletEndpoint, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
client := pluginapi.NewRegistrationClient(conn)
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
}
_, err = client.Register(context.Background(), reqt)
if err != nil {
return err
}
return nil
}
// ListAndWatch lists devices and update that list according to the health status
func (m *SmarterDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
for {
select {
case <-m.stop:
return nil
case d := <-m.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = pluginapi.Unhealthy
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
}
}
}
func (m *SmarterDevicePlugin) unhealthy(dev *pluginapi.Device) {
m.health <- dev
}
// Allocate which return list of devices.
func (m *SmarterDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
devs := m.devs
responses := pluginapi.AllocateResponse{}
for _, req := range reqs.ContainerRequests {
response := pluginapi.ContainerAllocateResponse{
Devices: []*pluginapi.DeviceSpec{
&pluginapi.DeviceSpec{
ContainerPath: m.deviceFile,
HostPath: m.deviceFile,
Permissions: "rw",
},
},
}
for _, id := range req.DevicesIDs {
if !deviceExists(devs, id) {
return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
}
}
responses.ContainerResponses = append(responses.ContainerResponses, &response)
}
return &responses, nil
}
func (m *SmarterDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
return &pluginapi.PreStartContainerResponse{}, nil
}
func (m *SmarterDevicePlugin) GetPreferredAllocation(context.Context, *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
return &pluginapi.PreferredAllocationResponse{}, nil
}
func (m *SmarterDevicePlugin) cleanup() error {
glog.V(0).Info("Removing file ",m.socket)
if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
func (m *SmarterDevicePlugin) healthcheck() {
disableHealthChecks := strings.ToLower(os.Getenv(envDisableHealthChecks))
if disableHealthChecks == "all" {
disableHealthChecks = allHealthChecks
}
_, cancel := context.WithCancel(context.Background())
var xids chan *pluginapi.Device
if !strings.Contains(disableHealthChecks, "xids") {
xids = make(chan *pluginapi.Device)
}
for {
select {
case <-m.stop:
cancel()
return
case dev := <-xids:
m.unhealthy(dev)
}
}
}
// Serve starts the gRPC server and register the device plugin to Kubelet
func (m *SmarterDevicePlugin) Serve() error {
err := m.Start()
if err != nil {
glog.Errorf("Could not start device plugin: %s", err)
return err
}
glog.V(0).Info("Starting to serve on", m.socket)
err = m.Register(pluginapi.KubeletSocket, m.resourceName)
if err != nil {
glog.Errorf("Could not register device plugin: %s", err)
m.Stop()
return err
}
glog.V(0).Info("Registered device plugin with Kubelet")
return nil
}
func (m *SmarterDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
return &pluginapi.DevicePluginOptions{}, nil
}