// Copyright (c) 2019, Arm Ltd package main import ( "fmt" "net" "os" "path" "strings" "time" "github.com/golang/glog" "golang.org/x/net/context" "google.golang.org/grpc" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" ) const ( envDisableHealthChecks = "DP_DISABLE_HEALTHCHECKS" allHealthChecks = "xids" ) // SmarterDevicePlugin implements the Kubernetes device plugin API type SmarterDevicePlugin struct { devs []*pluginapi.Device socket string deviceFile string resourceName string stop chan interface{} health chan *pluginapi.Device server *grpc.Server } // NewSmarterDevicePlugin returns an initialized SmarterDevicePlugin func NewSmarterDevicePlugin(nDevices uint, deviceFilename string, resourceIdentification string, serverSock string) *SmarterDevicePlugin { return &SmarterDevicePlugin{ devs: getDevices(nDevices), socket: serverSock, deviceFile: deviceFilename, resourceName: resourceIdentification, stop: make(chan interface{}), health: make(chan *pluginapi.Device), } } // dial establishes the gRPC communication with the registered device plugin. func dial(unixSocketPath string, timeout time.Duration) (*grpc.ClientConn, error) { c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(timeout), grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { return net.DialTimeout("unix", addr, timeout) }), ) if err != nil { return nil, err } return c, nil } // Start the gRPC server of the device plugin func (m *SmarterDevicePlugin) Start() error { err := m.cleanup() if err != nil { return err } sock, err := net.Listen("unix", m.socket) if err != nil { return err } m.server = grpc.NewServer([]grpc.ServerOption{}...) pluginapi.RegisterDevicePluginServer(m.server, m) go m.server.Serve(sock) // Wait for server to start by launching a blocking connexion conn, err := dial(m.socket, 60*time.Second) if err != nil { return err } conn.Close() go m.healthcheck() return nil } // Stop the gRPC server func (m *SmarterDevicePlugin) Stop() error { glog.V(0).Infof("Stopping server with socket ",m.socket) if m.server == nil { return nil } m.server.Stop() m.server = nil close(m.stop) glog.V(0).Info("Server stopped with socket ",m.socket) return m.cleanup() } // Register the device plugin for the given resourceName with Kubelet. func (m *SmarterDevicePlugin) Register(kubeletEndpoint, resourceName string) error { conn, err := dial(kubeletEndpoint, 5*time.Second) if err != nil { return err } defer conn.Close() client := pluginapi.NewRegistrationClient(conn) reqt := &pluginapi.RegisterRequest{ Version: pluginapi.Version, Endpoint: path.Base(m.socket), ResourceName: resourceName, } _, err = client.Register(context.Background(), reqt) if err != nil { return err } return nil } // ListAndWatch lists devices and update that list according to the health status func (m *SmarterDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}) for { select { case <-m.stop: return nil case d := <-m.health: // FIXME: there is no way to recover from the Unhealthy state. d.Health = pluginapi.Unhealthy s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}) } } } func (m *SmarterDevicePlugin) unhealthy(dev *pluginapi.Device) { m.health <- dev } // Allocate which return list of devices. func (m *SmarterDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { devs := m.devs responses := pluginapi.AllocateResponse{} for _, req := range reqs.ContainerRequests { response := pluginapi.ContainerAllocateResponse{ Devices: []*pluginapi.DeviceSpec{ &pluginapi.DeviceSpec{ ContainerPath: m.deviceFile, HostPath: m.deviceFile, Permissions: "rw", }, }, } for _, id := range req.DevicesIDs { if !deviceExists(devs, id) { return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id) } } responses.ContainerResponses = append(responses.ContainerResponses, &response) } return &responses, nil } func (m *SmarterDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { return &pluginapi.PreStartContainerResponse{}, nil } func (m *SmarterDevicePlugin) cleanup() error { glog.V(0).Info("Removing file ",m.socket) if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) { return err } return nil } func (m *SmarterDevicePlugin) healthcheck() { disableHealthChecks := strings.ToLower(os.Getenv(envDisableHealthChecks)) if disableHealthChecks == "all" { disableHealthChecks = allHealthChecks } _, cancel := context.WithCancel(context.Background()) var xids chan *pluginapi.Device if !strings.Contains(disableHealthChecks, "xids") { xids = make(chan *pluginapi.Device) } for { select { case <-m.stop: cancel() return case dev := <-xids: m.unhealthy(dev) } } } // Serve starts the gRPC server and register the device plugin to Kubelet func (m *SmarterDevicePlugin) Serve() error { err := m.Start() if err != nil { glog.Errorf("Could not start device plugin: %s", err) return err } glog.V(0).Info("Starting to serve on", m.socket) err = m.Register(pluginapi.KubeletSocket, m.resourceName) if err != nil { glog.Errorf("Could not register device plugin: %s", err) m.Stop() return err } glog.V(0).Info("Registered device plugin with Kubelet") return nil } func (m *SmarterDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { return &pluginapi.DevicePluginOptions{}, nil }