diff --git a/Dockerfile b/Dockerfile index a423920f..670299a7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,13 +10,13 @@ COPY go.sum go.sum # RUN go mod download # Copy the go source -COPY main.go main.go +COPY cmd/ cmd/ COPY api/ api/ COPY pkg/ pkg/ COPY vendor/ vendor/ # Build -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -mod vendor -o curve-operator main.go +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -mod vendor -o curve-operator cmd/main.go ## # Use debian-9 as base image to package the curve-operator binary @@ -41,7 +41,7 @@ RUN echo "deb http://mirrors.aliyun.com/ubuntu/ focal main restricted" > /etc/ap # Install utility tools RUN apt-get update -y && \ - apt-get install -y coreutils dnsutils iputils-ping iproute2 telnet curl vim less wget graphviz unzip tcpdump gdb && \ + apt-get install -y coreutils dnsutils iputils-ping iproute2 telnet curl vim less wget graphviz unzip tcpdump gdb udev gdisk && \ apt-get clean # Install Go diff --git a/Makefile b/Makefile index d8b222dd..2dc607d2 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,7 @@ test: generate fmt vet manifests # Build curve-operator binary curve-operator: generate fmt vet - go build -o bin/curve-operator main.go + go build -o bin/curve-operator cmd/main.go # Run against the configured Kubernetes cluster in ~/.kube/config run: generate fmt vet diff --git a/api/v1/curvecluster_types.go b/api/v1/curvecluster_types.go index 360f2d5e..eea7a105 100644 --- a/api/v1/curvecluster_types.go +++ b/api/v1/curvecluster_types.go @@ -139,6 +139,9 @@ type CurveClusterSpec struct { // +optional Monitor MonitorSpec `json:"monitor,omitempty"` + + // +optional + EnableReport bool `json:"enableReport,omitempty"` } // CurveClusterStatus defines the observed state of CurveCluster diff --git a/cmd/curve/discover.go b/cmd/curve/discover.go new file mode 100644 index 00000000..51703d4d --- /dev/null +++ b/cmd/curve/discover.go @@ -0,0 +1,37 @@ +package curve + +import ( + "context" + "github.com/opencurve/curve-operator/pkg/clusterd" + "github.com/opencurve/curve-operator/pkg/discover" + "github.com/spf13/cobra" + "k8s.io/klog" + "os" + "time" +) + +var ( + DiscoverCmd = &cobra.Command{ + Use: "discover", + Short: "Discover devices", + Hidden: true, // do not advertise to end users + } + + // interval between discovering devices + discoverDevicesInterval time.Duration +) + +func init() { + DiscoverCmd.Flags().DurationVar(&discoverDevicesInterval, "discover-interval", 60*time.Minute, + "interval between discovering devices (default 60m)") + DiscoverCmd.Run = startDiscover +} + +func startDiscover(cmd *cobra.Command, args []string) { + clusterdContext := clusterd.NewContext() + err := discover.Run(context.TODO(), clusterdContext, discoverDevicesInterval) + if err != nil { + klog.Error(err) + os.Exit(1) + } +} diff --git a/main.go b/cmd/curve/operator.go similarity index 76% rename from main.go rename to cmd/curve/operator.go index 9207f836..ac5b2809 100644 --- a/main.go +++ b/cmd/curve/operator.go @@ -1,48 +1,48 @@ -/* - - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main +package curve import ( "fmt" - "os" - + operatorv1 "github.com/opencurve/curve-operator/api/v1" + "github.com/opencurve/curve-operator/pkg/clusterd" + "github.com/opencurve/curve-operator/pkg/controllers" + "github.com/opencurve/curve-operator/pkg/discover" "github.com/spf13/cobra" "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "os" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" - - operatorv1 "github.com/opencurve/curve-operator/api/v1" - "github.com/opencurve/curve-operator/pkg/clusterd" - "github.com/opencurve/curve-operator/pkg/controllers" ) var ( scheme = runtime.NewScheme() setupLog = ctrl.Log.WithName("setup") + + OperatorCmd = &cobra.Command{ + Use: "operator", + // TODO: Rewrite this long message. + Long: `The Curve-Operator is a daemon to deploy Curve and auto it on kubernetes. + It support for Curve storage to natively integrate with cloud-native environments.`, + } ) func init() { _ = clientgoscheme.AddToScheme(scheme) _ = operatorv1.AddToScheme(scheme) // +kubebuilder:scaffold:scheme + + options, err := NewCurveOptions() + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + options.AddFlags(OperatorCmd.Flags()) + OperatorCmd.Run = func(cmd *cobra.Command, args []string) { + setupLog.Error(options.Run(), "failed to run curve-operator") + os.Exit(1) + } } type CurveOptions struct { @@ -58,32 +58,6 @@ func NewCurveOptions() (*CurveOptions, error) { }, nil } -func main() { - opts, err := NewCurveOptions() - if err != nil { - fmt.Fprintf(os.Stderr, "error: %v\n", err) - os.Exit(1) - } - - cmd := &cobra.Command{ - Use: "curve-operator", - // TODO: Rewrite this long message. - Long: `The Curve-Operator is a daemon to deploy Curve and auto it on kubernetes. - It support for Curve storage to natively integrate with cloud-native environments.`, - Run: func(cmd *cobra.Command, args []string) { - setupLog.Error(opts.Run(), "failed to run curve-operator") - os.Exit(1) - }, - } - - opts.AddFlags(cmd.Flags()) - - if err := cmd.Execute(); err != nil { - fmt.Fprintf(os.Stderr, "error: %v\n", err) - os.Exit(1) - } -} - func (opts *CurveOptions) Run() error { ctrl.SetLogger(zap.New(zap.UseDevMode(true))) @@ -132,6 +106,13 @@ func (opts *CurveOptions) Run() error { } // +kubebuilder:scaffold:builder + // reconcile discover daemonSet + err = discover.ReconcileDiscoveryDaemon() + if err != nil { + setupLog.Error(err, "problem discover") + os.Exit(1) + } + setupLog.Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") diff --git a/cmd/curve/root.go b/cmd/curve/root.go new file mode 100644 index 00000000..6f2ac7b0 --- /dev/null +++ b/cmd/curve/root.go @@ -0,0 +1,11 @@ +package curve + +import ( + "github.com/spf13/cobra" +) + +var RootCmd = &cobra.Command{ + Use: "curve", + Short: "Curve (curve.io) Kubernetes operator and user tools", + Hidden: false, +} diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 00000000..91fa9d47 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,20 @@ +package main + +import ( + "fmt" + "github.com/opencurve/curve-operator/cmd/curve" +) + +func main() { + addCommands() + if err := curve.RootCmd.Execute(); err != nil { + fmt.Printf("curve error: %+v\n", err) + } +} + +func addCommands() { + curve.RootCmd.AddCommand( + curve.OperatorCmd, + curve.DiscoverCmd, + ) +} diff --git a/config/crd/bases/operator.curve.io_curveclusters.yaml b/config/crd/bases/operator.curve.io_curveclusters.yaml index ad1b94d8..e448ded3 100644 --- a/config/crd/bases/operator.curve.io_curveclusters.yaml +++ b/config/crd/bases/operator.curve.io_curveclusters.yaml @@ -1,23 +1,11 @@ - --- -apiVersion: apiextensions.k8s.io/v1beta1 +apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.2.5 - creationTimestamp: null + controller-gen.kubebuilder.io/version: (unknown) name: curveclusters.operator.curve.io spec: - additionalPrinterColumns: - - JSONPath: .spec.hostDataDir - name: HostDataDir - type: string - - JSONPath: .spec.curveVersion.image - name: Version - type: string - - JSONPath: .status.phase - name: Phase - type: string group: operator.curve.io names: kind: CurveCluster @@ -25,247 +13,252 @@ spec: plural: curveclusters singular: curvecluster scope: Namespaced - subresources: - status: {} - validation: - openAPIV3Schema: - description: CurveCluster is the Schema for the curveclusters API - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - spec: - description: CurveClusterSpec defines the desired state of CurveCluster - properties: - cleanupConfirm: - description: Indicates user intent when deleting a cluster; blocks orchestration - and should not be set if cluster deletion is not imminent. - nullable: true - type: string - curveVersion: - description: CurveVersionSpec represents the settings for the Curve - version - properties: - image: - type: string - imagePullPolicy: - description: PullPolicy describes a policy for if/when to pull a - container image - enum: - - IfNotPresent - - Always - - Never - - "" - type: string - type: object - etcd: - description: EtcdSpec is the spec of etcd - properties: - clientPort: - type: integer - config: - additionalProperties: + versions: + - additionalPrinterColumns: + - jsonPath: .spec.hostDataDir + name: HostDataDir + type: string + - jsonPath: .spec.curveVersion.image + name: Version + type: string + - jsonPath: .status.phase + name: Phase + type: string + name: v1 + schema: + openAPIV3Schema: + description: CurveCluster is the Schema for the curveclusters API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: CurveClusterSpec defines the desired state of CurveCluster + properties: + cleanupConfirm: + description: Indicates user intent when deleting a cluster; blocks + orchestration and should not be set if cluster deletion is not imminent. + nullable: true + type: string + curveVersion: + description: CurveVersionSpec represents the settings for the Curve + version + properties: + image: type: string - type: object - peerPort: - type: integer - type: object - hostDataDir: - type: string - mds: - description: MdsSpec is the spec of mds - properties: - config: - additionalProperties: + imagePullPolicy: + description: PullPolicy describes a policy for if/when to pull + a container image + enum: + - IfNotPresent + - Always + - Never + - "" type: string - type: object - dummyPort: - type: integer - port: - type: integer - type: object - monitor: - properties: - enable: - type: boolean - grafana: - properties: - containerImage: - type: string - dataDir: - type: string - listenPort: - type: integer - passWord: - type: string - userName: - type: string - type: object - monitorHost: - type: string - nodeExporter: - properties: - containerImage: - type: string - listenPort: - type: integer - type: object - prometheus: - properties: - containerImage: - type: string - dataDir: - type: string - listenPort: - type: integer - retentionSize: - type: string - retentionTime: + type: object + enableReport: + type: boolean + etcd: + description: EtcdSpec is the spec of etcd + properties: + clientPort: + type: integer + config: + additionalProperties: type: string - type: object - type: object - nodes: - items: + type: object + peerPort: + type: integer + type: object + hostDataDir: type: string - type: array - snapShotClone: - description: SnapShotCloneSpec is the spec of snapshot clone - properties: - dummyPort: - type: integer - enable: - type: boolean - port: - type: integer - proxyPort: - type: integer - s3Config: - description: S3ConfigSpec is the spec of s3 config - properties: - ak: - type: string - bucketName: - type: string - nosAddress: - type: string - sk: + mds: + description: MdsSpec is the spec of mds + properties: + config: + additionalProperties: type: string - type: object - type: object - storage: - description: StorageScopeSpec is the spec of storage scope - properties: - copySets: - type: integer - devices: - items: - description: DevicesSpec represents a disk to use in the cluster + type: object + dummyPort: + type: integer + port: + type: integer + type: object + monitor: + properties: + enable: + type: boolean + grafana: properties: - mountPath: + containerImage: type: string - name: + dataDir: type: string - percentage: + listenPort: type: integer + passWord: + type: string + userName: + type: string type: object - type: array - nodes: - items: + monitorHost: type: string - type: array - port: - type: integer - selectedNodes: - items: + nodeExporter: properties: - devices: - items: - description: DevicesSpec represents a disk to use in the - cluster - properties: - mountPath: - type: string - name: - type: string - percentage: - type: integer - type: object - type: array - node: + containerImage: type: string + listenPort: + type: integer type: object - type: array - useSelectedNodes: - type: boolean - type: object - type: object - status: - description: CurveClusterStatus defines the observed state of CurveCluster - properties: - conditions: - description: Condition contains current service state of cluster such - as progressing/Ready/Failure... - items: + prometheus: + properties: + containerImage: + type: string + dataDir: + type: string + listenPort: + type: integer + retentionSize: + type: string + retentionTime: + type: string + type: object + type: object + nodes: + items: + type: string + type: array + snapShotClone: + description: SnapShotCloneSpec is the spec of snapshot clone properties: - lastTransitionTime: - description: LastTransitionTime specifies last time the condition - transitioned from one status to another. - format: date-time - type: string - message: - description: Message is a human readable message indicating details - about last transition. - type: string - observedGeneration: - description: ObservedGeneration - format: int64 + dummyPort: type: integer - reason: - description: Reason is a unique, one-word, CamelCase reason for - the condition's last transition. - type: string - status: - description: Status is the status of condition Can be True, False - or Unknown. - type: string - type: - description: Type is the type of condition. + enable: + type: boolean + port: + type: integer + proxyPort: + type: integer + s3Config: + description: S3ConfigSpec is the spec of s3 config + properties: + ak: + type: string + bucketName: + type: string + nosAddress: + type: string + sk: + type: string + type: object + type: object + storage: + description: StorageScopeSpec is the spec of storage scope + properties: + copySets: + type: integer + devices: + items: + description: DevicesSpec represents a disk to use in the cluster + properties: + mountPath: + type: string + name: + type: string + percentage: + type: integer + type: object + type: array + nodes: + items: + type: string + type: array + port: + type: integer + selectedNodes: + items: + properties: + devices: + items: + description: DevicesSpec represents a disk to use in the + cluster + properties: + mountPath: + type: string + name: + type: string + percentage: + type: integer + type: object + type: array + node: + type: string + type: object + type: array + useSelectedNodes: + type: boolean + type: object + type: object + status: + description: CurveClusterStatus defines the observed state of CurveCluster + properties: + conditions: + description: Condition contains current service state of cluster such + as progressing/Ready/Failure... + items: + properties: + lastTransitionTime: + description: LastTransitionTime specifies last time the condition + transitioned from one status to another. + format: date-time + type: string + message: + description: Message is a human readable message indicating + details about last transition. + type: string + observedGeneration: + description: ObservedGeneration + format: int64 + type: integer + reason: + description: Reason is a unique, one-word, CamelCase reason + for the condition's last transition. + type: string + status: + description: Status is the status of condition Can be True, + False or Unknown. + type: string + type: + description: Type is the type of condition. + type: string + type: object + type: array + curveVersion: + description: CurveVersion shows curve version info on status field + properties: + image: type: string type: object - type: array - curveVersion: - description: CurveVersion shows curve version info on status field - properties: - image: - type: string - type: object - message: - description: Message shows summary message of cluster from ClusterState - such as 'Curve Cluster Created successfully' - type: string - phase: - description: Phase is a summary of cluster state. It can be translated - from the last conditiontype - type: string - type: object - type: object - version: v1 - versions: - - name: v1 + message: + description: Message shows summary message of cluster from ClusterState + such as 'Curve Cluster Created successfully' + type: string + phase: + description: Phase is a summary of cluster state. It can be translated + from the last conditiontype + type: string + type: object + type: object served: true storage: true -status: - acceptedNames: - kind: "" - plural: "" - conditions: [] - storedVersions: [] + subresources: + status: {} diff --git a/config/crd/bases/operator.curve.io_curvefs.yaml b/config/crd/bases/operator.curve.io_curvefs.yaml index f6c3a5a8..6647e359 100644 --- a/config/crd/bases/operator.curve.io_curvefs.yaml +++ b/config/crd/bases/operator.curve.io_curvefs.yaml @@ -1,23 +1,11 @@ - --- -apiVersion: apiextensions.k8s.io/v1beta1 +apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.2.5 - creationTimestamp: null + controller-gen.kubebuilder.io/version: (unknown) name: curvefs.operator.curve.io spec: - additionalPrinterColumns: - - JSONPath: .spec.hostDataDir - name: HostDataDir - type: string - - JSONPath: .spec.curveVersion.image - name: Version - type: string - - JSONPath: .status.phase - name: Phase - type: string group: operator.curve.io names: kind: Curvefs @@ -25,215 +13,218 @@ spec: plural: curvefs singular: curvefs scope: Namespaced - subresources: - status: {} - validation: - openAPIV3Schema: - description: Curvefs is the Schema for the curvefsclusters API - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - spec: - description: CurvefsSpec defines the desired state of Curvefs - properties: - cleanupConfirm: - description: Indicates user intent when deleting a cluster; blocks orchestration - and should not be set if cluster deletion is not imminent. - nullable: true - type: string - curveVersion: - description: CurveVersionSpec represents the settings for the Curve - version - properties: - image: - type: string - imagePullPolicy: - description: PullPolicy describes a policy for if/when to pull a - container image - enum: - - IfNotPresent - - Always - - Never - - "" - type: string - type: object - etcd: - description: EtcdSpec is the spec of etcd - properties: - clientPort: - type: integer - config: - additionalProperties: - type: string - type: object - peerPort: - type: integer - type: object - hostDataDir: - type: string - mds: - description: MdsSpec is the spec of mds - properties: - config: - additionalProperties: + versions: + - additionalPrinterColumns: + - jsonPath: .spec.hostDataDir + name: HostDataDir + type: string + - jsonPath: .spec.curveVersion.image + name: Version + type: string + - jsonPath: .status.phase + name: Phase + type: string + name: v1 + schema: + openAPIV3Schema: + description: Curvefs is the Schema for the curvefsclusters API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: CurvefsSpec defines the desired state of Curvefs + properties: + cleanupConfirm: + description: Indicates user intent when deleting a cluster; blocks + orchestration and should not be set if cluster deletion is not imminent. + nullable: true + type: string + curveVersion: + description: CurveVersionSpec represents the settings for the Curve + version + properties: + image: type: string - type: object - dummyPort: - type: integer - port: - type: integer - type: object - metaserver: - description: MdsSpec is the spec of mds - properties: - config: - additionalProperties: + imagePullPolicy: + description: PullPolicy describes a policy for if/when to pull + a container image + enum: + - IfNotPresent + - Always + - Never + - "" type: string - type: object - copySets: - type: integer - externalPort: - type: integer - port: - type: integer - type: object - monitor: - properties: - enable: - type: boolean - grafana: - properties: - containerImage: - type: string - dataDir: + type: object + etcd: + description: EtcdSpec is the spec of etcd + properties: + clientPort: + type: integer + config: + additionalProperties: type: string - listenPort: - type: integer - passWord: + type: object + peerPort: + type: integer + type: object + hostDataDir: + type: string + mds: + description: MdsSpec is the spec of mds + properties: + config: + additionalProperties: type: string - userName: + type: object + dummyPort: + type: integer + port: + type: integer + type: object + metaserver: + description: MdsSpec is the spec of mds + properties: + config: + additionalProperties: type: string - type: object - monitorHost: + type: object + copySets: + type: integer + externalPort: + type: integer + port: + type: integer + type: object + monitor: + properties: + enable: + type: boolean + grafana: + properties: + containerImage: + type: string + dataDir: + type: string + listenPort: + type: integer + passWord: + type: string + userName: + type: string + type: object + monitorHost: + type: string + nodeExporter: + properties: + containerImage: + type: string + listenPort: + type: integer + type: object + prometheus: + properties: + containerImage: + type: string + dataDir: + type: string + listenPort: + type: integer + retentionSize: + type: string + retentionTime: + type: string + type: object + type: object + nodes: + items: type: string - nodeExporter: - properties: - containerImage: - type: string - listenPort: - type: integer - type: object - prometheus: + type: array + snapShotClone: + description: SnapShotCloneSpec is the spec of snapshot clone + properties: + dummyPort: + type: integer + enable: + type: boolean + port: + type: integer + proxyPort: + type: integer + s3Config: + description: S3ConfigSpec is the spec of s3 config + properties: + ak: + type: string + bucketName: + type: string + nosAddress: + type: string + sk: + type: string + type: object + type: object + type: object + status: + description: CurvefsStatus defines the observed state of Curvefs + properties: + conditions: + description: Condition contains current service state of cluster such + as progressing/Ready/Failure... + items: properties: - containerImage: + lastTransitionTime: + description: LastTransitionTime specifies last time the condition + transitioned from one status to another. + format: date-time type: string - dataDir: + message: + description: Message is a human readable message indicating + details about last transition. type: string - listenPort: + observedGeneration: + description: ObservedGeneration + format: int64 type: integer - retentionSize: + reason: + description: Reason is a unique, one-word, CamelCase reason + for the condition's last transition. type: string - retentionTime: + status: + description: Status is the status of condition Can be True, + False or Unknown. type: string - type: object - type: object - nodes: - items: - type: string - type: array - snapShotClone: - description: SnapShotCloneSpec is the spec of snapshot clone - properties: - dummyPort: - type: integer - enable: - type: boolean - port: - type: integer - proxyPort: - type: integer - s3Config: - description: S3ConfigSpec is the spec of s3 config - properties: - ak: - type: string - bucketName: - type: string - nosAddress: - type: string - sk: + type: + description: Type is the type of condition. type: string type: object - type: object - type: object - status: - description: CurvefsStatus defines the observed state of Curvefs - properties: - conditions: - description: Condition contains current service state of cluster such - as progressing/Ready/Failure... - items: + type: array + curveVersion: + description: CurveVersion shows curve version info on status field properties: - lastTransitionTime: - description: LastTransitionTime specifies last time the condition - transitioned from one status to another. - format: date-time - type: string - message: - description: Message is a human readable message indicating details - about last transition. - type: string - observedGeneration: - description: ObservedGeneration - format: int64 - type: integer - reason: - description: Reason is a unique, one-word, CamelCase reason for - the condition's last transition. - type: string - status: - description: Status is the status of condition Can be True, False - or Unknown. - type: string - type: - description: Type is the type of condition. + image: type: string type: object - type: array - curveVersion: - description: CurveVersion shows curve version info on status field - properties: - image: - type: string - type: object - message: - description: Message shows summary message of cluster from ClusterState - such as 'Curve Cluster Created successfully' - type: string - phase: - description: Phase is a summary of cluster state. It can be translated - from the last conditiontype - type: string - type: object - type: object - version: v1 - versions: - - name: v1 + message: + description: Message shows summary message of cluster from ClusterState + such as 'Curve Cluster Created successfully' + type: string + phase: + description: Phase is a summary of cluster state. It can be translated + from the last conditiontype + type: string + type: object + type: object served: true storage: true -status: - acceptedNames: - kind: "" - plural: "" - conditions: [] - storedVersions: [] + subresources: + status: {} diff --git a/config/deploy/manifests.yaml b/config/deploy/manifests.yaml index 58c0793d..ba988f9d 100644 --- a/config/deploy/manifests.yaml +++ b/config/deploy/manifests.yaml @@ -5,24 +5,13 @@ metadata: control-plane: curve-operator name: curve --- -apiVersion: apiextensions.k8s.io/v1beta1 +apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.2.5 - creationTimestamp: null + controller-gen.kubebuilder.io/version: (unknown) name: curveclusters.operator.curve.io spec: - additionalPrinterColumns: - - JSONPath: .spec.hostDataDir - name: HostDataDir - type: string - - JSONPath: .spec.curveVersion.image - name: Version - type: string - - JSONPath: .status.phase - name: Phase - type: string group: operator.curve.io names: kind: CurveCluster @@ -30,269 +19,263 @@ spec: plural: curveclusters singular: curvecluster scope: Namespaced - subresources: - status: {} - validation: - openAPIV3Schema: - description: CurveCluster is the Schema for the curveclusters API - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - spec: - description: CurveClusterSpec defines the desired state of CurveCluster - properties: - cleanupConfirm: - description: Indicates user intent when deleting a cluster; blocks orchestration - and should not be set if cluster deletion is not imminent. - nullable: true - type: string - curveVersion: - description: CurveVersionSpec represents the settings for the Curve - version - properties: - image: - type: string - imagePullPolicy: - description: PullPolicy describes a policy for if/when to pull a - container image - enum: - - IfNotPresent - - Always - - Never - - "" - type: string - type: object - etcd: - description: EtcdSpec is the spec of etcd - properties: - clientPort: - type: integer - config: - additionalProperties: + versions: + - additionalPrinterColumns: + - jsonPath: .spec.hostDataDir + name: HostDataDir + type: string + - jsonPath: .spec.curveVersion.image + name: Version + type: string + - jsonPath: .status.phase + name: Phase + type: string + name: v1 + schema: + openAPIV3Schema: + description: CurveCluster is the Schema for the curveclusters API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: CurveClusterSpec defines the desired state of CurveCluster + properties: + cleanupConfirm: + description: Indicates user intent when deleting a cluster; blocks + orchestration and should not be set if cluster deletion is not imminent. + nullable: true + type: string + curveVersion: + description: CurveVersionSpec represents the settings for the Curve + version + properties: + image: type: string - type: object - peerPort: - type: integer - type: object - hostDataDir: - type: string - mds: - description: MdsSpec is the spec of mds - properties: - config: - additionalProperties: + imagePullPolicy: + description: PullPolicy describes a policy for if/when to pull + a container image + enum: + - IfNotPresent + - Always + - Never + - "" type: string - type: object - dummyPort: - type: integer - port: - type: integer - type: object - monitor: - properties: - enable: - type: boolean - grafana: - properties: - containerImage: - type: string - dataDir: - type: string - listenPort: - type: integer - passWord: - type: string - userName: - type: string - type: object - monitorHost: - type: string - nodeExporter: - properties: - containerImage: - type: string - listenPort: - type: integer - type: object - prometheus: - properties: - containerImage: - type: string - dataDir: - type: string - listenPort: - type: integer - retentionSize: - type: string - retentionTime: + type: object + enableReport: + type: boolean + etcd: + description: EtcdSpec is the spec of etcd + properties: + clientPort: + type: integer + config: + additionalProperties: type: string - type: object - type: object - nodes: - items: + type: object + peerPort: + type: integer + type: object + hostDataDir: type: string - type: array - snapShotClone: - description: SnapShotCloneSpec is the spec of snapshot clone - properties: - dummyPort: - type: integer - enable: - type: boolean - port: - type: integer - proxyPort: - type: integer - s3Config: - description: S3ConfigSpec is the spec of s3 config - properties: - ak: - type: string - bucketName: - type: string - nosAddress: - type: string - sk: + mds: + description: MdsSpec is the spec of mds + properties: + config: + additionalProperties: type: string - type: object - type: object - storage: - description: StorageScopeSpec is the spec of storage scope - properties: - copySets: - type: integer - devices: - items: - description: DevicesSpec represents a disk to use in the cluster + type: object + dummyPort: + type: integer + port: + type: integer + type: object + monitor: + properties: + enable: + type: boolean + grafana: properties: - mountPath: + containerImage: type: string - name: + dataDir: type: string - percentage: + listenPort: type: integer + passWord: + type: string + userName: + type: string type: object - type: array - nodes: - items: + monitorHost: type: string - type: array - port: - type: integer - selectedNodes: - items: + nodeExporter: properties: - devices: - items: - description: DevicesSpec represents a disk to use in the - cluster - properties: - mountPath: - type: string - name: - type: string - percentage: - type: integer - type: object - type: array - node: + containerImage: type: string + listenPort: + type: integer type: object - type: array - useSelectedNodes: - type: boolean - type: object - type: object - status: - description: CurveClusterStatus defines the observed state of CurveCluster - properties: - conditions: - description: Condition contains current service state of cluster such - as progressing/Ready/Failure... - items: + prometheus: + properties: + containerImage: + type: string + dataDir: + type: string + listenPort: + type: integer + retentionSize: + type: string + retentionTime: + type: string + type: object + type: object + nodes: + items: + type: string + type: array + snapShotClone: + description: SnapShotCloneSpec is the spec of snapshot clone properties: - lastTransitionTime: - description: LastTransitionTime specifies last time the condition - transitioned from one status to another. - format: date-time - type: string - message: - description: Message is a human readable message indicating details - about last transition. - type: string - observedGeneration: - description: ObservedGeneration - format: int64 + dummyPort: type: integer - reason: - description: Reason is a unique, one-word, CamelCase reason for - the condition's last transition. - type: string - status: - description: Status is the status of condition Can be True, False - or Unknown. - type: string - type: - description: Type is the type of condition. + enable: + type: boolean + port: + type: integer + proxyPort: + type: integer + s3Config: + description: S3ConfigSpec is the spec of s3 config + properties: + ak: + type: string + bucketName: + type: string + nosAddress: + type: string + sk: + type: string + type: object + type: object + storage: + description: StorageScopeSpec is the spec of storage scope + properties: + copySets: + type: integer + devices: + items: + description: DevicesSpec represents a disk to use in the cluster + properties: + mountPath: + type: string + name: + type: string + percentage: + type: integer + type: object + type: array + nodes: + items: + type: string + type: array + port: + type: integer + selectedNodes: + items: + properties: + devices: + items: + description: DevicesSpec represents a disk to use in the + cluster + properties: + mountPath: + type: string + name: + type: string + percentage: + type: integer + type: object + type: array + node: + type: string + type: object + type: array + useSelectedNodes: + type: boolean + type: object + type: object + status: + description: CurveClusterStatus defines the observed state of CurveCluster + properties: + conditions: + description: Condition contains current service state of cluster such + as progressing/Ready/Failure... + items: + properties: + lastTransitionTime: + description: LastTransitionTime specifies last time the condition + transitioned from one status to another. + format: date-time + type: string + message: + description: Message is a human readable message indicating + details about last transition. + type: string + observedGeneration: + description: ObservedGeneration + format: int64 + type: integer + reason: + description: Reason is a unique, one-word, CamelCase reason + for the condition's last transition. + type: string + status: + description: Status is the status of condition Can be True, + False or Unknown. + type: string + type: + description: Type is the type of condition. + type: string + type: object + type: array + curveVersion: + description: CurveVersion shows curve version info on status field + properties: + image: type: string type: object - type: array - curveVersion: - description: CurveVersion shows curve version info on status field - properties: - image: - type: string - type: object - message: - description: Message shows summary message of cluster from ClusterState - such as 'Curve Cluster Created successfully' - type: string - phase: - description: Phase is a summary of cluster state. It can be translated - from the last conditiontype - type: string - type: object - type: object - version: v1 - versions: - - name: v1 + message: + description: Message shows summary message of cluster from ClusterState + such as 'Curve Cluster Created successfully' + type: string + phase: + description: Phase is a summary of cluster state. It can be translated + from the last conditiontype + type: string + type: object + type: object served: true storage: true -status: - acceptedNames: - kind: "" - plural: "" - conditions: [] - storedVersions: [] + subresources: + status: {} --- -apiVersion: apiextensions.k8s.io/v1beta1 +apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.2.5 - creationTimestamp: null + controller-gen.kubebuilder.io/version: (unknown) name: curvefs.operator.curve.io spec: - additionalPrinterColumns: - - JSONPath: .spec.hostDataDir - name: HostDataDir - type: string - - JSONPath: .spec.curveVersion.image - name: Version - type: string - - JSONPath: .status.phase - name: Phase - type: string group: operator.curve.io names: kind: Curvefs @@ -300,218 +283,221 @@ spec: plural: curvefs singular: curvefs scope: Namespaced - subresources: - status: {} - validation: - openAPIV3Schema: - description: Curvefs is the Schema for the curvefsclusters API - properties: - apiVersion: - description: 'APIVersion defines the versioned schema of this representation - of an object. Servers should convert recognized schemas to the latest - internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' - type: string - kind: - description: 'Kind is a string value representing the REST resource this - object represents. Servers may infer this from the endpoint the client - submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' - type: string - metadata: - type: object - spec: - description: CurvefsSpec defines the desired state of Curvefs - properties: - cleanupConfirm: - description: Indicates user intent when deleting a cluster; blocks orchestration - and should not be set if cluster deletion is not imminent. - nullable: true - type: string - curveVersion: - description: CurveVersionSpec represents the settings for the Curve - version - properties: - image: - type: string - imagePullPolicy: - description: PullPolicy describes a policy for if/when to pull a - container image - enum: - - IfNotPresent - - Always - - Never - - "" - type: string - type: object - etcd: - description: EtcdSpec is the spec of etcd - properties: - clientPort: - type: integer - config: - additionalProperties: - type: string - type: object - peerPort: - type: integer - type: object - hostDataDir: - type: string - mds: - description: MdsSpec is the spec of mds - properties: - config: - additionalProperties: + versions: + - additionalPrinterColumns: + - jsonPath: .spec.hostDataDir + name: HostDataDir + type: string + - jsonPath: .spec.curveVersion.image + name: Version + type: string + - jsonPath: .status.phase + name: Phase + type: string + name: v1 + schema: + openAPIV3Schema: + description: Curvefs is the Schema for the curvefsclusters API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: CurvefsSpec defines the desired state of Curvefs + properties: + cleanupConfirm: + description: Indicates user intent when deleting a cluster; blocks + orchestration and should not be set if cluster deletion is not imminent. + nullable: true + type: string + curveVersion: + description: CurveVersionSpec represents the settings for the Curve + version + properties: + image: type: string - type: object - dummyPort: - type: integer - port: - type: integer - type: object - metaserver: - description: MdsSpec is the spec of mds - properties: - config: - additionalProperties: + imagePullPolicy: + description: PullPolicy describes a policy for if/when to pull + a container image + enum: + - IfNotPresent + - Always + - Never + - "" type: string - type: object - copySets: - type: integer - externalPort: - type: integer - port: - type: integer - type: object - monitor: - properties: - enable: - type: boolean - grafana: - properties: - containerImage: - type: string - dataDir: + type: object + etcd: + description: EtcdSpec is the spec of etcd + properties: + clientPort: + type: integer + config: + additionalProperties: type: string - listenPort: - type: integer - passWord: + type: object + peerPort: + type: integer + type: object + hostDataDir: + type: string + mds: + description: MdsSpec is the spec of mds + properties: + config: + additionalProperties: type: string - userName: + type: object + dummyPort: + type: integer + port: + type: integer + type: object + metaserver: + description: MdsSpec is the spec of mds + properties: + config: + additionalProperties: type: string - type: object - monitorHost: + type: object + copySets: + type: integer + externalPort: + type: integer + port: + type: integer + type: object + monitor: + properties: + enable: + type: boolean + grafana: + properties: + containerImage: + type: string + dataDir: + type: string + listenPort: + type: integer + passWord: + type: string + userName: + type: string + type: object + monitorHost: + type: string + nodeExporter: + properties: + containerImage: + type: string + listenPort: + type: integer + type: object + prometheus: + properties: + containerImage: + type: string + dataDir: + type: string + listenPort: + type: integer + retentionSize: + type: string + retentionTime: + type: string + type: object + type: object + nodes: + items: type: string - nodeExporter: - properties: - containerImage: - type: string - listenPort: - type: integer - type: object - prometheus: + type: array + snapShotClone: + description: SnapShotCloneSpec is the spec of snapshot clone + properties: + dummyPort: + type: integer + enable: + type: boolean + port: + type: integer + proxyPort: + type: integer + s3Config: + description: S3ConfigSpec is the spec of s3 config + properties: + ak: + type: string + bucketName: + type: string + nosAddress: + type: string + sk: + type: string + type: object + type: object + type: object + status: + description: CurvefsStatus defines the observed state of Curvefs + properties: + conditions: + description: Condition contains current service state of cluster such + as progressing/Ready/Failure... + items: properties: - containerImage: + lastTransitionTime: + description: LastTransitionTime specifies last time the condition + transitioned from one status to another. + format: date-time type: string - dataDir: + message: + description: Message is a human readable message indicating + details about last transition. type: string - listenPort: + observedGeneration: + description: ObservedGeneration + format: int64 type: integer - retentionSize: - type: string - retentionTime: - type: string - type: object - type: object - nodes: - items: - type: string - type: array - snapShotClone: - description: SnapShotCloneSpec is the spec of snapshot clone - properties: - dummyPort: - type: integer - enable: - type: boolean - port: - type: integer - proxyPort: - type: integer - s3Config: - description: S3ConfigSpec is the spec of s3 config - properties: - ak: - type: string - bucketName: + reason: + description: Reason is a unique, one-word, CamelCase reason + for the condition's last transition. type: string - nosAddress: + status: + description: Status is the status of condition Can be True, + False or Unknown. type: string - sk: + type: + description: Type is the type of condition. type: string type: object - type: object - type: object - status: - description: CurvefsStatus defines the observed state of Curvefs - properties: - conditions: - description: Condition contains current service state of cluster such - as progressing/Ready/Failure... - items: + type: array + curveVersion: + description: CurveVersion shows curve version info on status field properties: - lastTransitionTime: - description: LastTransitionTime specifies last time the condition - transitioned from one status to another. - format: date-time - type: string - message: - description: Message is a human readable message indicating details - about last transition. - type: string - observedGeneration: - description: ObservedGeneration - format: int64 - type: integer - reason: - description: Reason is a unique, one-word, CamelCase reason for - the condition's last transition. - type: string - status: - description: Status is the status of condition Can be True, False - or Unknown. - type: string - type: - description: Type is the type of condition. + image: type: string type: object - type: array - curveVersion: - description: CurveVersion shows curve version info on status field - properties: - image: - type: string - type: object - message: - description: Message shows summary message of cluster from ClusterState - such as 'Curve Cluster Created successfully' - type: string - phase: - description: Phase is a summary of cluster state. It can be translated - from the last conditiontype - type: string - type: object - type: object - version: v1 - versions: - - name: v1 + message: + description: Message shows summary message of cluster from ClusterState + such as 'Curve Cluster Created successfully' + type: string + phase: + description: Phase is a summary of cluster state. It can be translated + from the last conditiontype + type: string + type: object + type: object served: true storage: true -status: - acceptedNames: - kind: "" - plural: "" - conditions: [] - storedVersions: [] + subresources: + status: {} --- apiVersion: v1 kind: ServiceAccount @@ -555,9 +541,20 @@ rules: apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - creationTimestamp: null name: curve-operator-role rules: +- apiGroups: + - apps + resources: + - daemonsets + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - apps resources: @@ -737,9 +734,23 @@ spec: spec: containers: - args: + - operator - --enable-leader-election=true command: - ./curve-operator + env: + - name: DISCOVER_DISK + value: "true" + - name: DISCOVER_UDEV_BLACKLIST + value: (?i)loop[0-9]+,(?i)fd[0-9]+,(?i)sr[0-9]+,(?i)ram[0-9]+,(?i)dm-[0-9]+,(?i)md[0-9]+,(?i)zd[0-9]+,(?i)rbd[0-9]+,(?i)nbd[0-9]+ + - name: DISCOVER_INTERVAL + value: 60m + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: OPERATOR_IMAGE + value: curve2operator/curve-operator:v1.0.6 image: curve2operator/curve-operator:v1.0.6 name: curve-operator resources: diff --git a/config/manager/curve-operator.yaml b/config/manager/curve-operator.yaml index 80e8ae90..8e3e96f0 100644 --- a/config/manager/curve-operator.yaml +++ b/config/manager/curve-operator.yaml @@ -35,8 +35,22 @@ spec: - command: - ./curve-operator args: + - operator - --enable-leader-election=true - image: curve2operator/curve-operator:v1.0.0 + image: curve2operator/curve-operator:v1.0.6 + env: + - name: DISCOVER_DISK + value: "true" + - name: DISCOVER_UDEV_BLACKLIST + value: "(?i)loop[0-9]+,(?i)fd[0-9]+,(?i)sr[0-9]+,(?i)ram[0-9]+,(?i)dm-[0-9]+,(?i)md[0-9]+,(?i)zd[0-9]+,(?i)rbd[0-9]+,(?i)nbd[0-9]+" + - name: DISCOVER_INTERVAL + value: "60m" + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: OPERATOR_IMAGE + value: curve2operator/curve-operator:v1.0.6 name: curve-operator resources: limits: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 159a19dd..f29d7295 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -1,11 +1,21 @@ - --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - creationTimestamp: null name: curve-operator-role rules: +- apiGroups: + - apps + resources: + - daemonsets + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - apps resources: diff --git a/pkg/clusterd/context.go b/pkg/clusterd/context.go index 179180d5..d3551100 100644 --- a/pkg/clusterd/context.go +++ b/pkg/clusterd/context.go @@ -1,8 +1,11 @@ package clusterd import ( + "fmt" + "github.com/opencurve/curve-operator/pkg/util/exec" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "os" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -15,4 +18,24 @@ type Context struct { // Represents the Client provided by the controller-runtime package to interact with Kubernetes objects Client client.Client + + Executor *exec.CommandExecutor +} + +func NewContext() *Context { + ctx := &Context{ + Executor: &exec.CommandExecutor{}, + } + config, err := rest.InClusterConfig() + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + ctx.KubeConfig = config + ctx.Clientset, err = kubernetes.NewForConfig(config) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + return ctx } diff --git a/pkg/clusterd/disk.go b/pkg/clusterd/disk.go new file mode 100644 index 00000000..70d1b04c --- /dev/null +++ b/pkg/clusterd/disk.go @@ -0,0 +1,197 @@ +package clusterd + +import ( + "errors" + "fmt" + "github.com/opencurve/curve-operator/pkg/util/exec" + "github.com/opencurve/curve-operator/pkg/util/sys" + "k8s.io/klog" + "path" + "regexp" + "strconv" + "strings" +) + +// GetDeviceEmpty check whether a device is completely empty +func GetDeviceEmpty(device *sys.LocalDisk) bool { + return len(device.Partitions) == 0 && device.Filesystem == "" +} + +func DiscoverDevicesWithFilter(executor exec.Executor, blackList string) ([]*sys.LocalDisk, error) { + var disks []*sys.LocalDisk + devices, err := sys.ListDevices(executor) + if err != nil { + return nil, err + } + + for _, d := range devices { + + if filterByBlackList(d, blackList) { + continue + } + + // Populate device information coming from lsblk + disk, err := PopulateDeviceInfo(d, executor) + if err != nil { + klog.Errorf("skipping device %q. %v", d, err) + continue + } + + // Populate udev information coming from udev + disk, err = PopulateDeviceUdevInfo(d, executor, disk) + if err != nil { + // go on without udev info + // not ideal for our filesystem check later but we can't really fail either... + klog.Errorf("failed to get udev info for device %q. %v", d, err) + } + + // Test if device has child, if so we skip it and only consider the partitions + // which will come in later iterations of the loop + // We only test if the type is 'disk', this is a property reported by lsblk + // and means it's a parent block device + if disk.Type == sys.DiskType { + deviceChild, err := sys.ListDevicesChild(executor, fmt.Sprintf("/dev/%s", d)) + if err != nil { + klog.Errorf("failed to detect child devices for device %q, assuming they are none. %v", d, err) + } + // lsblk will output at least 2 lines if they are partitions, one for the parent + // and N for the child + if len(deviceChild) > 1 { + klog.Errorf("skipping device %q because it has child, considering the child instead.", d) + continue + } + } + + disks = append(disks, disk) + } + klog.Info("discovered disks are:") + for _, disk := range disks { + klog.Infof("%+v", disk) + } + + return disks, nil +} + +// DiscoverDevices returns all the details of devices available on the local node +func DiscoverDevices(executor exec.Executor, blackList string) ([]*sys.LocalDisk, error) { + disks, err := DiscoverDevicesWithFilter(executor, blackList) + if err != nil { + return nil, err + } + return disks, nil +} + +// PopulateDeviceInfo returns the information of the specified block device +func PopulateDeviceInfo(d string, executor exec.Executor) (*sys.LocalDisk, error) { + diskProps, err := sys.GetDeviceProperties(d, executor) + if err != nil { + return nil, err + } + + diskType, ok := diskProps["TYPE"] + if !ok { + return nil, errors.New("diskType is empty") + } + + // get the UUID for disks + var diskUUID string + if diskType == sys.DiskType { + uuid, err := sys.GetDiskUUID(d, executor) + if err != nil { + klog.Error(err) + } else { + diskUUID = uuid + } + } + + disk := &sys.LocalDisk{Name: d, UUID: diskUUID} + + if val, ok := diskProps["TYPE"]; ok { + disk.Type = val + } + if val, ok := diskProps["SIZE"]; ok { + if size, err := strconv.ParseUint(val, 10, 64); err == nil { + disk.Size = size + } + } + if val, ok := diskProps["ROTA"]; ok { + if rotates, err := strconv.ParseBool(val); err == nil { + disk.Rotational = rotates + } + } + if val, ok := diskProps["RO"]; ok { + if ro, err := strconv.ParseBool(val); err == nil { + disk.Readonly = ro + } + } + if val, ok := diskProps["PKNAME"]; ok { + if val != "" { + disk.Parent = path.Base(val) + } + } + if val, ok := diskProps["NAME"]; ok { + disk.RealPath = val + } + if val, ok := diskProps["KNAME"]; ok { + disk.KernelName = path.Base(val) + } + if val, ok := diskProps["FSTYPE"]; ok && val != "" { + disk.Filesystem = path.Base(val) + } + if val, ok := diskProps["MOUNTPOINT"]; ok && val != "" { + disk.Mountpoint = path.Base(val) + } + + return disk, nil +} + +// PopulateDeviceUdevInfo fills the udev info into the block device information +func PopulateDeviceUdevInfo(d string, executor exec.Executor, disk *sys.LocalDisk) (*sys.LocalDisk, error) { + udevInfo, err := sys.GetUdevInfo(d, executor) + if err != nil { + return disk, err + } + // parse udev info output + if val, ok := udevInfo["DEVLINKS"]; ok { + disk.DevLinks = val + } + if val, ok := udevInfo["ID_FS_TYPE"]; ok { + disk.Filesystem = val + } + if val, ok := udevInfo["ID_SERIAL"]; ok { + disk.Serial = val + } + + if val, ok := udevInfo["ID_VENDOR"]; ok { + disk.Vendor = val + } + + if val, ok := udevInfo["ID_MODEL"]; ok { + disk.Model = val + } + + if val, ok := udevInfo["ID_WWN_WITH_EXTENSION"]; ok { + disk.WWNVendorExtension = val + } + + if val, ok := udevInfo["ID_WWN"]; ok { + disk.WWN = val + } + + return disk, nil +} + +func filterByBlackList(device string, blackList string) bool { + blackListArray := strings.Split(blackList, ",") + for _, item := range blackListArray { + matched, err := regexp.MatchString(item, device) + if err != nil { + klog.Errorf("regexp.Match err: %v", err) + continue + } + if matched { + return true + } + } + return false +} diff --git a/pkg/controllers/cluster.go b/pkg/controllers/cluster.go index af086766..0e702d29 100644 --- a/pkg/controllers/cluster.go +++ b/pkg/controllers/cluster.go @@ -52,9 +52,11 @@ func reconcileSharedServer(c *daemon.Cluster) ([]daemon.NodeInfo, []*topology.De logger.Info("create config template configmap successfully") - err = createReportConfigMap(c) - if err != nil { - return nil, nil, err + if c.EnableReport { + err = createReportConfigMap(c) + if err != nil { + return nil, nil, err + } } // Start etcd cluster @@ -109,10 +111,12 @@ func reconcileCurveDaemons(c *daemon.Cluster) error { } } - // report cluster - err = runReportCronJob(c, c.SnapShotClone.Enable) - if err != nil { - return err + if c.EnableReport { + // report cluster + err = runReportCronJob(c, c.SnapShotClone.Enable) + if err != nil { + return err + } } return nil @@ -141,10 +145,12 @@ func reconcileCurveFSDaemons(c *daemon.Cluster) error { } } - // report cluster - err = runReportCronJob(c, c.SnapShotClone.Enable) - if err != nil { - return err + if c.EnableReport { + // report cluster + err = runReportCronJob(c, c.SnapShotClone.Enable) + if err != nil { + return err + } } return nil diff --git a/pkg/controllers/curvecluster_controller.go b/pkg/controllers/curvecluster_controller.go index 1e46a443..f0e9b083 100644 --- a/pkg/controllers/curvecluster_controller.go +++ b/pkg/controllers/curvecluster_controller.go @@ -79,6 +79,7 @@ func NewCurveClusterReconciler( // +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=apps,resources=daemonsets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=batch,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete @@ -188,6 +189,8 @@ func (c *ClusterController) reconcileCurveCluster(clusterObj *curvev1.CurveClust cluster.DataDirHostPath = path.Join(clusterObj.Spec.HostDataDir, "data") cluster.LogDirHostPath = path.Join(clusterObj.Spec.HostDataDir, "logs") cluster.ConfDirHostPath = path.Join(clusterObj.Spec.HostDataDir, "conf") + cluster.EnableReport = clusterObj.Spec.EnableReport + c.clusterMap[cluster.Namespace] = cluster log.Log.Info("reconcileing CurveCluster in namespace", "namespace", cluster.Namespace) diff --git a/pkg/daemon/cluster.go b/pkg/daemon/cluster.go index 8635247d..d070c982 100644 --- a/pkg/daemon/cluster.go +++ b/pkg/daemon/cluster.go @@ -30,6 +30,8 @@ type Cluster struct { DataDirHostPath string LogDirHostPath string ConfDirHostPath string + + EnableReport bool } func (c *Cluster) GetUUID() string { return c.UUID } diff --git a/pkg/discover/discover.go b/pkg/discover/discover.go new file mode 100644 index 00000000..b523bff1 --- /dev/null +++ b/pkg/discover/discover.go @@ -0,0 +1,558 @@ +package discover + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "github.com/opencurve/curve-operator/pkg/clusterd" + "github.com/opencurve/curve-operator/pkg/k8sutil" + "github.com/opencurve/curve-operator/pkg/util/sys" + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog" + "os" + "os/exec" + "os/signal" + "regexp" + "strings" + "syscall" + "time" +) + +var ( + // AppLabel is the app label + AppLabel = "app" + // AppName is the name of the pod + AppName = "curve-discover" + // NodeAttr is the attribute of that node + NodeAttr = "curve.io/node" + // LocalDiskCMData is the data name of the config map storing devices + LocalDiskCMData = "devices" + // LocalDiskCMName is name of the config map storing devices + LocalDiskCMName = "local-device-%s" + nodeName string + namespace string + lastDevice string + cmName string + cm *v1.ConfigMap + udevEventPeriod = time.Duration(5) * time.Second +) + +func Run(ctx context.Context, context *clusterd.Context, probeInterval time.Duration) error { + if context == nil { + return fmt.Errorf("nil context") + } + klog.Infof("device discovery interval is %q", probeInterval.String()) + nodeName = os.Getenv(k8sutil.NodeNameEnvVar) + namespace = os.Getenv(k8sutil.PodNamespaceEnvVar) + cmName = fmt.Sprintf(LocalDiskCMName, nodeName) + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGTERM) + + err := updateDeviceCM(ctx, context) + if err != nil { + klog.Errorf("failed to update device configmap: %v", err) + return err + } + + udevEvents := make(chan struct{}) + go udevBlockMonitor(udevEvents, udevEventPeriod) + for { + select { + case <-sigc: + klog.Infof("shutdown signal received, exiting...") + return nil + case <-time.After(probeInterval): + if err := updateDeviceCM(ctx, context); err != nil { + klog.Errorf("failed to update device configmap during probe interval. %v", err) + } + case _, ok := <-udevEvents: + if ok { + klog.Info("trigger probe from udev event") + if err := updateDeviceCM(ctx, context); err != nil { + klog.Errorf("failed to update device configmap triggered from udev event. %v", err) + } + } else { + klog.Warningf("disabling udev monitoring") + udevEvents = nil + } + } + } +} + +func matchUdevEvent(text string, matches, exclusions []string) (bool, error) { + for _, match := range matches { + matched, err := regexp.MatchString(match, text) + if err != nil { + return false, fmt.Errorf("failed to search string: %v", err) + } + if matched { + hasExclusion := false + for _, exclusion := range exclusions { + matched, err = regexp.MatchString(exclusion, text) + if err != nil { + return false, fmt.Errorf("failed to search string: %v", err) + } + if matched { + hasExclusion = true + break + } + } + if !hasExclusion { + klog.Infof("udevadm monitor: matched event: %s", text) + return true, nil + } + } + } + return false, nil +} + +// Scans `udevadm monitor` output for block sub-system events. Each line of +// output matching a set of substrings is sent to the provided channel. An event +// is returned if it passes any matches tests, and passes all exclusion tests. +func rawUdevBlockMonitor(c chan struct{}, matches, exclusions []string) { + defer close(c) + + // stdbuf -oL performs line buffered output + cmd := exec.Command("stdbuf", "-oL", "udevadm", "monitor", "-u", "-s", "block") + stdout, err := cmd.StdoutPipe() + if err != nil { + klog.Warningf("Cannot open udevadm stdout: %v", err) + return + } + defer stdout.Close() + + err = cmd.Start() + if err != nil { + klog.Warningf("Cannot start udevadm monitoring: %v", err) + return + } + + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + text := scanner.Text() + klog.Infof("udevadm monitor: %s", text) + match, err := matchUdevEvent(text, matches, exclusions) + if err != nil { + klog.Warningf("udevadm filtering failed: %v", err) + return + } + if match { + c <- struct{}{} + } + } + + if err := scanner.Err(); err != nil { + klog.Errorf("udevadm monitor scanner error: %v", err) + } + + klog.Info("udevadm monitor finished") +} + +// Monitors udev for block device changes, and collapses these events such that +// only one event is emitted per period in order to deal with flapping. +func udevBlockMonitor(c chan struct{}, period time.Duration) { + defer close(c) + var udevFilter []string + + // return any add or remove events, but none that match device mapper + // events. string matching is case-insensitive + events := make(chan struct{}) + + blackList := getDiscoverUdevBlackList() + udevFilter = strings.Split(blackList, ",") + klog.Infof("using the regular expressions %q", udevFilter) + + go rawUdevBlockMonitor(events, + []string{"(?i)add", "(?i)remove"}, + udevFilter) + + timeout := time.NewTimer(period) + defer timeout.Stop() + for { + _, ok := <-events + if !ok { + return + } + if !timeout.Stop() { + <-timeout.C + } + timeout.Reset(period) + for { + select { + case <-timeout.C: + case _, ok := <-events: + if !ok { + return + } + continue + } + break + } + c <- struct{}{} + } +} + +func ignoreDevice(dev sys.LocalDisk) bool { + return strings.Contains(strings.ToUpper(dev.DevLinks), "USB") +} + +func checkMatchingDevice(checkDev sys.LocalDisk, devices []sys.LocalDisk) *sys.LocalDisk { + for i, dev := range devices { + if ignoreDevice(dev) { + continue + } + // check if devices should be considered the same. the uuid can be + // unstable, so we also use the reported serial and device name, which + // appear to be more stable. + if checkDev.UUID != "" && dev.UUID != "" && checkDev.UUID == dev.UUID { + return &devices[i] + } + + // on virt-io devices in libvirt, the serial is reported as an empty + // string, so also account for that. + if checkDev.Serial == dev.Serial && checkDev.Serial != "" { + return &devices[i] + } + + if checkDev.Name == dev.Name { + return &devices[i] + } + } + return nil +} + +// note that the idea of equality here may not be intuitive. equality of device +// sets refers to a state in which no change has been observed between the sets +// of devices that would warrant changes to their consumption by storage +// daemons. for example, if a device appears to have been wiped vs a device +// appears to now be in use. +func checkDeviceListsEqual(oldDevs, newDevs []sys.LocalDisk) bool { + for _, oldDev := range oldDevs { + if ignoreDevice(oldDev) { + continue + } + match := checkMatchingDevice(oldDev, newDevs) + if match == nil { + // device has been removed + return false + } + if !oldDev.Empty && match.Empty { + // device has changed from non-empty to empty + return false + } + if oldDev.Partitions != nil && match.Partitions == nil { + return false + } + } + + for _, newDev := range newDevs { + if ignoreDevice(newDev) { + continue + } + match := checkMatchingDevice(newDev, oldDevs) + if match == nil { + // device has been added + return false + } + // the matching case is handled in the previous join + } + + return true +} + +// DeviceListsEqual checks whether 2 lists are equal or not +func DeviceListsEqual(old, new string) (bool, error) { + var oldDevs []sys.LocalDisk + var newDevs []sys.LocalDisk + + err := json.Unmarshal([]byte(old), &oldDevs) + if err != nil { + return false, fmt.Errorf("cannot unmarshal devices: %+v", err) + } + + err = json.Unmarshal([]byte(new), &newDevs) + if err != nil { + return false, fmt.Errorf("cannot unmarshal devices: %+v", err) + } + + return checkDeviceListsEqual(oldDevs, newDevs), nil +} + +func updateDeviceCM(ctx context.Context, clusterdContext *clusterd.Context) error { + klog.Infof("updating device configmap") + devices, err := probeDevices(clusterdContext) + if err != nil { + klog.Errorf("failed to probe devices: %v", err) + return err + } + deviceJSON, err := json.Marshal(devices) + if err != nil { + klog.Errorf("failed to marshal: %v", err) + return err + } + + deviceStr := string(deviceJSON) + if cm == nil { + cm, err = clusterdContext.Clientset.CoreV1().ConfigMaps(namespace).Get(cmName, metav1.GetOptions{}) + } + if err == nil { + lastDevice = cm.Data[LocalDiskCMData] + klog.Infof("last devices %s", lastDevice) + } else { + if !kerrors.IsNotFound(err) { + klog.Errorf("failed to get configmap: %v", err) + return err + } + + data := make(map[string]string, 1) + data[LocalDiskCMData] = deviceStr + + // the map doesn't exist yet, create it now + cm = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: namespace, + Labels: map[string]string{ + AppLabel: AppName, + NodeAttr: nodeName, + }, + }, + Data: data, + } + + cm, err = clusterdContext.Clientset.CoreV1().ConfigMaps(namespace).Create(cm) + if err != nil { + klog.Errorf("failed to create configmap: %v", err) + return fmt.Errorf("failed to create local device map %s: %+v", cmName, err) + } + lastDevice = deviceStr + } + devicesEqual, err := DeviceListsEqual(lastDevice, deviceStr) + if err != nil { + return fmt.Errorf("failed to compare device lists: %v", err) + } + if !devicesEqual { + data := make(map[string]string, 1) + data[LocalDiskCMData] = deviceStr + cm.Data = data + cm, err = clusterdContext.Clientset.CoreV1().ConfigMaps(namespace).Update(cm) + if err != nil { + klog.Errorf("failed to update configmap %s: %v", cmName, err) + return err + } + } + return nil +} + +func logDevices(devices []*sys.LocalDisk) { + var devicesList []string + for _, device := range devices { + klog.Infof("localdevice %q: %+v", device.Name, device) + devicesList = append(devicesList, device.Name) + } + klog.Infof("localdevices: %q", strings.Join(devicesList, ", ")) +} + +func probeDevices(context *clusterd.Context) ([]sys.LocalDisk, error) { + devices := make([]sys.LocalDisk, 0) + blackList := getDiscoverUdevBlackList() + localDevices, err := clusterd.DiscoverDevices(context.Executor, blackList) + if err != nil { + return devices, fmt.Errorf("failed initial hardware discovery. %+v", err) + } + + logDevices(localDevices) + + for _, device := range localDevices { + if device == nil { + continue + } + + partitions, _, err := sys.GetDevicePartitions(device.Name, context.Executor) + if err != nil { + klog.Errorf("failed to check device partitions %s: %v", device.Name, err) + continue + } + + // check if there is a file system on the device + fs, err := sys.GetDeviceFilesystems(device.Name, context.Executor) + if err != nil { + klog.Errorf("failed to check device filesystem %s: %v", device.Name, err) + continue + } + device.Partitions = partitions + device.Filesystem = fs + device.Empty = clusterd.GetDeviceEmpty(device) + + devices = append(devices, *device) + } + + klog.Infof("available devices: %+v", devices) + return devices, nil +} + +func getDiscoverUdevBlackList() string { + // get discoverDaemonUdevBlacklist from the environment variable + // if user doesn't provide any regex; generate the default regex + // else use the regex provided by user + discoverUdev := os.Getenv(k8sutil.DiscoverUdevBlacklist) + if discoverUdev == "" { + // loop,fd0,sr0,/dev/ram*,/dev/dm-,/dev/md,/dev/rbd*,/dev/zd* + discoverUdev = "(?i)loop[0-9]+,(?i)fd[0-9]+,(?i)sr[0-9]+,(?i)ram[0-9]+,(?i)dm-[0-9]+,(?i)md[0-9]+,(?i)zd[0-9]+,(?i)rbd[0-9]+,(?i)nbd[0-9]+" + } + return discoverUdev +} + +func ReconcileDiscoveryDaemon() (err error) { + clusterCtx := clusterd.NewContext() + namespace := os.Getenv(k8sutil.PodNamespaceEnvVar) + discoverImage := os.Getenv(k8sutil.OperatorImage) + if EnableDiscoverDisk() { + return Start(clusterCtx, namespace, discoverImage) + } else { + return Stop(clusterCtx, namespace) + } +} + +func EnableDiscoverDisk() bool { + return os.Getenv(k8sutil.DiscoverDisk) == "true" +} + +func Start(ctx *clusterd.Context, namespace, discoverImage string) error { + err := createDiscoverDaemonSet(ctx, namespace, discoverImage) + if err != nil { + return fmt.Errorf("failed to start discover daemonset. %v", err) + } + return nil +} + +func createDiscoverDaemonSet(ctx *clusterd.Context, namespace, discoverImage string) error { + discoveryParameters := []string{"discover"} + discoverInterval := os.Getenv(k8sutil.DiscoverInterval) + if len(discoverInterval) > 0 { + discoveryParameters = append(discoveryParameters, "--discover-interval", + discoverInterval) + } + + privileged := true + ds := &apps.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: AppName, + Namespace: namespace, + }, + Spec: apps.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + AppLabel: AppName, + }, + }, + UpdateStrategy: apps.DaemonSetUpdateStrategy{ + Type: apps.RollingUpdateDaemonSetStrategyType, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + AppLabel: AppName, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "discover", + Image: discoverImage, + Args: discoveryParameters, + SecurityContext: &v1.SecurityContext{ + Privileged: &privileged, + }, + VolumeMounts: []v1.VolumeMount{ + { + Name: "dev", + MountPath: "/dev", + // discovery pod could fail to start if /dev is mounted ro + ReadOnly: false, + }, + { + Name: "sys", + MountPath: "/sys", + ReadOnly: true, + }, + { + Name: "udev", + MountPath: "/run/udev", + ReadOnly: true, + }, + }, + Env: []v1.EnvVar{ + k8sutil.NamespaceEnvVar(), + k8sutil.NodeEnvVar(), + k8sutil.NameEnvVar(), + }, + }, + }, + Volumes: []v1.Volume{ + { + Name: "dev", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: "/dev", + }, + }, + }, + { + Name: "sys", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: "/sys", + }, + }, + }, + { + Name: "udev", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: "/run/udev", + }, + }, + }, + }, + HostNetwork: false, + }, + }, + }, + } + // Get the operator pod details to attach the owner reference to the discover daemon set + operatorPod, err := k8sutil.GetRunningPod(ctx.Clientset) + if err != nil { + klog.Errorf("failed to get operator pod. %+v", err) + } else { + k8sutil.SetOwnerRefsWithoutBlockOwner(&ds.ObjectMeta, operatorPod.OwnerReferences) + } + + _, err = ctx.Clientset.AppsV1().DaemonSets(namespace).Create(ds) + if err != nil { + if !kerrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create rook-discover daemon set. %+v", err) + } + klog.Infof("rook-discover daemonset already exists, updating ...") + _, err = ctx.Clientset.AppsV1().DaemonSets(namespace).Update(ds) + if err != nil { + return fmt.Errorf("failed to update rook-discover daemon set. %+v", err) + } + } else { + klog.Infof("curve-discover daemonset started") + } + return nil + +} + +func Stop(ctx *clusterd.Context, namespace string) error { + err := ctx.Clientset.AppsV1().DaemonSets(namespace).Delete(AppName, &metav1.DeleteOptions{}) + if err != nil && !kerrors.IsNotFound(err) { + return err + } + return nil +} diff --git a/pkg/k8sutil/k8sutil.go b/pkg/k8sutil/k8sutil.go new file mode 100644 index 00000000..a3b396fb --- /dev/null +++ b/pkg/k8sutil/k8sutil.go @@ -0,0 +1,31 @@ +package k8sutil + +import v1 "k8s.io/api/core/v1" + +const ( + NodeNameEnvVar = "NODE_NAME" + PodNameEnvVar = "POD_NAME" + PodNamespaceEnvVar = "POD_NAMESPACE" +) + +const ( + DiscoverUdevBlacklist = "DISCOVER_UDEV_BLACKLIST" + DiscoverDisk = "DISCOVER_DISK" + DiscoverInterval = "DISCOVER_INTERVAL" + OperatorImage = "OPERATOR_IMAGE" +) + +// NamespaceEnvVar namespace env var +func NamespaceEnvVar() v1.EnvVar { + return v1.EnvVar{Name: PodNamespaceEnvVar, ValueFrom: &v1.EnvVarSource{FieldRef: &v1.ObjectFieldSelector{FieldPath: "metadata.namespace"}}} +} + +// NameEnvVar pod name env var +func NameEnvVar() v1.EnvVar { + return v1.EnvVar{Name: PodNameEnvVar, ValueFrom: &v1.EnvVarSource{FieldRef: &v1.ObjectFieldSelector{FieldPath: "metadata.name"}}} +} + +// NodeEnvVar node env var +func NodeEnvVar() v1.EnvVar { + return v1.EnvVar{Name: NodeNameEnvVar, ValueFrom: &v1.EnvVarSource{FieldRef: &v1.ObjectFieldSelector{FieldPath: "spec.nodeName"}}} +} diff --git a/pkg/k8sutil/pod.go b/pkg/k8sutil/pod.go index e33bf506..9b84ff46 100644 --- a/pkg/k8sutil/pod.go +++ b/pkg/k8sutil/pod.go @@ -1,10 +1,12 @@ package k8sutil import ( + "fmt" "github.com/pkg/errors" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "os" ) func GetPodsByLabelSelector(clientset kubernetes.Interface, namespace string, selector string) (*v1.PodList, error) { @@ -16,3 +18,22 @@ func GetPodsByLabelSelector(clientset kubernetes.Interface, namespace string, se } return pods, nil } + +// GetRunningPod reads the name and namespace of a pod from the +// environment, and returns the pod (if it exists). +func GetRunningPod(clientset kubernetes.Interface) (*v1.Pod, error) { + podName := os.Getenv(PodNameEnvVar) + if podName == "" { + return nil, fmt.Errorf("cannot detect the pod name. Please provide it using the downward API in the manifest file") + } + podNamespace := os.Getenv(PodNamespaceEnvVar) + if podName == "" { + return nil, fmt.Errorf("cannot detect the pod namespace. Please provide it using the downward API in the manifest file") + } + + pod, err := clientset.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return pod, nil +} diff --git a/pkg/util/exec/exec.go b/pkg/util/exec/exec.go new file mode 100644 index 00000000..324c7f54 --- /dev/null +++ b/pkg/util/exec/exec.go @@ -0,0 +1,61 @@ +package exec + +import ( + "fmt" + "k8s.io/klog" + "os/exec" + "strings" +) + +// Executor is the main interface for all the exec commands +type Executor interface { + ExecuteCommandWithOutput(command string, arg ...string) (string, error) +} + +// CommandExecutor is the type of the Executor +type CommandExecutor struct{} + +// ExecuteCommandWithOutput executes a command with output +func (*CommandExecutor) ExecuteCommandWithOutput(command string, arg ...string) (string, error) { + logCommand(command, arg...) + cmd := exec.Command(command, arg...) + return runCommandWithOutput(cmd, false) +} + +func runCommandWithOutput(cmd *exec.Cmd, combinedOutput bool) (string, error) { + var output []byte + var err error + var out string + + if combinedOutput { + output, err = cmd.CombinedOutput() + } else { + output, err = cmd.Output() + if err != nil { + output = []byte(fmt.Sprintf("%s. %s", string(output), assertErrorType(err))) + } + } + + out = strings.TrimSpace(string(output)) + + if err != nil { + return out, err + } + + return out, nil +} + +func logCommand(command string, arg ...string) { + klog.Infof("Running command: %s %s", command, strings.Join(arg, " ")) +} + +func assertErrorType(err error) string { + switch errType := err.(type) { + case *exec.ExitError: + return string(errType.Stderr) + case *exec.Error: + return errType.Error() + } + + return "" +} diff --git a/pkg/util/sys/device.go b/pkg/util/sys/device.go new file mode 100644 index 00000000..e9a64fb1 --- /dev/null +++ b/pkg/util/sys/device.go @@ -0,0 +1,303 @@ +package sys + +import ( + "fmt" + "github.com/opencurve/curve-operator/pkg/util/exec" + "k8s.io/klog" + osexec "os/exec" + "strconv" + "strings" + + "github.com/pkg/errors" + + "github.com/google/uuid" +) + +const ( + // DiskType is a disk type + DiskType = "disk" + // PartType is a partition type + PartType = "part" + // LVMType is an LVM type + LVMType = "lvm" + sgdiskCmd = "sgdisk" +) + +// Partition represents a partition metadata +type Partition struct { + Name string + Size uint64 + Label string + Filesystem string +} + +// LocalDisk contains information about an unformatted block device +type LocalDisk struct { + // Name is the device name + Name string `json:"name"` + // Parent is the device parent's name + Parent string `json:"parent"` + // HasChildren is whether the device has a children device + HasChildren bool `json:"hasChildren"` + // DevLinks is the persistent device path on the host + DevLinks string `json:"devLinks"` + // Size is the device capacity in byte + Size uint64 `json:"size"` + // UUID is used by /dev/disk/by-uuid + UUID string `json:"uuid"` + // Serial is the disk serial used by /dev/disk/by-id + Serial string `json:"serial"` + // Type is disk type + Type string `json:"type"` + // Rotational is the boolean whether the device is rotational: true for hdd, false for ssd and nvme + Rotational bool `json:"rotational"` + // ReadOnly is the boolean whether the device is readonly + Readonly bool `json:"readOnly"` + // Partitions is a partition slice + Partitions []Partition + // Filesystem is the filesystem currently on the device + Filesystem string `json:"filesystem"` + // Mountpoint is the mountpoint of the filesystem's on the device + Mountpoint string `json:"mountpoint"` + // Vendor is the device vendor + Vendor string `json:"vendor"` + // Model is the device model + Model string `json:"model"` + // WWN is the world wide name of the device + WWN string `json:"wwn"` + // WWNVendorExtension is the WWN_VENDOR_EXTENSION from udev info + WWNVendorExtension string `json:"wwnVendorExtension"` + // Empty checks whether the device is completely empty + Empty bool `json:"empty"` + // RealPath is the device pathname behind the PVC, behind /mnt//name + RealPath string `json:"real-path,omitempty"` + // KernelName is the kernel name of the device + KernelName string `json:"kernel-name,omitempty"` + // Whether this device should be encrypted + Encrypted bool `json:"encrypted,omitempty"` +} + +// ListDevices list all devices available on a machine +func ListDevices(executor exec.Executor) ([]string, error) { + devices, err := executor.ExecuteCommandWithOutput("lsblk", "--all", "--noheadings", "--list", "--output", "KNAME") + if err != nil { + return nil, fmt.Errorf("failed to list all devices: %+v", err) + } + + return strings.Split(devices, "\n"), nil +} + +// GetDevicePartitions gets partitions on a given device +func GetDevicePartitions(device string, executor exec.Executor) (partitions []Partition, unusedSpace uint64, err error) { + + var devicePath string + splitDevicePath := strings.Split(device, "/") + if len(splitDevicePath) == 1 { + devicePath = fmt.Sprintf("/dev/%s", device) //device path for OSD on devices. + } else { + devicePath = device //use the exact device path (like /mnt/) in case of PVC block device + } + + output, err := executor.ExecuteCommandWithOutput("lsblk", devicePath, + "--bytes", "--pairs", "--output", "NAME,SIZE,TYPE,PKNAME") + klog.Infof("Output: %+v", output) + if err != nil { + return nil, 0, fmt.Errorf("failed to get device %s partitions. %+v", device, err) + } + partInfo := strings.Split(output, "\n") + var deviceSize uint64 + var totalPartitionSize uint64 + for _, info := range partInfo { + props := parseKeyValuePairString(info) + name := props["NAME"] + if name == device { + // found the main device + klog.Info("Device found - ", name) + deviceSize, err = strconv.ParseUint(props["SIZE"], 10, 64) + if err != nil { + return nil, 0, fmt.Errorf("failed to get device %s size. %+v", device, err) + } + } else if props["PKNAME"] == device && props["TYPE"] == PartType { + // found a partition + p := Partition{Name: name} + p.Size, err = strconv.ParseUint(props["SIZE"], 10, 64) + if err != nil { + return nil, 0, fmt.Errorf("failed to get partition %s size. %+v", name, err) + } + totalPartitionSize += p.Size + + info, err := GetUdevInfo(name, executor) + if err != nil { + return nil, 0, err + } + if v, ok := info["PARTNAME"]; ok { + p.Label = v + } + if v, ok := info["ID_PART_ENTRY_NAME"]; ok { + p.Label = v + } + if v, ok := info["ID_FS_TYPE"]; ok { + p.Filesystem = v + } + + partitions = append(partitions, p) + } + } + + if deviceSize > 0 { + unusedSpace = deviceSize - totalPartitionSize + } + return partitions, unusedSpace, nil +} + +// GetDeviceProperties gets device properties +func GetDeviceProperties(device string, executor exec.Executor) (map[string]string, error) { + // As we are mounting the block mode PVs on /mnt we use the entire path, + // e.g., if the device path is /mnt/example-pvc then its taken completely + // else if its just vdb then the following is used + devicePath := strings.Split(device, "/") + if len(devicePath) == 1 { + device = fmt.Sprintf("/dev/%s", device) + } + return GetDevicePropertiesFromPath(device, executor) +} + +// GetDevicePropertiesFromPath gets a device property from a path +func GetDevicePropertiesFromPath(devicePath string, executor exec.Executor) (map[string]string, error) { + output, err := executor.ExecuteCommandWithOutput("lsblk", devicePath, + "--bytes", "--nodeps", "--pairs", "--paths", "--output", "SIZE,ROTA,RO,TYPE,PKNAME,NAME,KNAME,MOUNTPOINT,FSTYPE") + if err != nil { + klog.Errorf("failed to execute lsblk. output: %s", output) + return nil, err + } + klog.Infof("lsblk output: %q", output) + + return parseKeyValuePairString(output), nil +} + +// GetUdevInfo gets udev information +func GetUdevInfo(device string, executor exec.Executor) (map[string]string, error) { + output, err := executor.ExecuteCommandWithOutput("udevadm", "info", "--query=property", fmt.Sprintf("/dev/%s", device)) + if err != nil { + return nil, err + } + klog.Infof("udevadm info output: %q", output) + + return parseUdevInfo(output), nil +} + +// GetDeviceFilesystems get the file systems available +func GetDeviceFilesystems(device string, executor exec.Executor) (string, error) { + devicePath := strings.Split(device, "/") + if len(devicePath) == 1 { + device = fmt.Sprintf("/dev/%s", device) + } + output, err := executor.ExecuteCommandWithOutput("udevadm", "info", "--query=property", device) + if err != nil { + return "", err + } + + return parseFS(output), nil +} + +// GetDiskUUID look up the UUID for a disk. +func GetDiskUUID(device string, executor exec.Executor) (string, error) { + if _, err := osexec.LookPath(sgdiskCmd); err != nil { + return "", errors.Wrap(err, "sgdisk not found") + } + + devicePath := strings.Split(device, "/") + if len(devicePath) == 1 { + device = fmt.Sprintf("/dev/%s", device) + } + + output, err := executor.ExecuteCommandWithOutput(sgdiskCmd, "--print", device) + if err != nil { + return "", errors.Wrapf(err, "sgdisk failed. output=%s", output) + } + + return parseUUID(device, output) +} + +// finds the disk uuid in the output of sgdisk +func parseUUID(device, output string) (string, error) { + + // find the line with the uuid + lines := strings.Split(output, "\n") + for _, line := range lines { + // If GPT is not found in a disk, sgdisk creates a new GPT in memory and reports its UUID. + // This ID changes each call and is not appropriate to identify the device. + if strings.Contains(line, "Creating new GPT entries in memory.") { + break + } + if strings.Contains(line, "Disk identifier (GUID)") { + words := strings.Split(line, " ") + for _, word := range words { + // we expect most words in the line not to be a uuid, but will return the first one that is + result, err := uuid.Parse(word) + if err == nil { + return result.String(), nil + } + } + } + } + + return "", fmt.Errorf("uuid not found for device %s. output=%s", device, output) +} + +// converts a raw key value pair string into a map of key value pairs +// example raw string of `foo="0" bar="1" baz="biz"` is returned as: +// map[string]string{"foo":"0", "bar":"1", "baz":"biz"} +func parseKeyValuePairString(propsRaw string) map[string]string { + // first split the single raw string on spaces and initialize a map of + // a length equal to the number of pairs + props := strings.Split(propsRaw, " ") + propMap := make(map[string]string, len(props)) + + for _, kvpRaw := range props { + // split each individual key value pair on the equals sign + kvp := strings.Split(kvpRaw, "=") + if len(kvp) == 2 { + // first element is the final key, second element is the final value + // (don't forget to remove surrounding quotes from the value) + propMap[kvp[0]] = strings.Replace(kvp[1], `"`, "", -1) + } + } + + return propMap +} + +// find fs from udevadm info +func parseFS(output string) string { + m := parseUdevInfo(output) + if v, ok := m["ID_FS_TYPE"]; ok { + return v + } + return "" +} + +func parseUdevInfo(output string) map[string]string { + lines := strings.Split(output, "\n") + result := make(map[string]string, len(lines)) + for _, v := range lines { + pairs := strings.Split(v, "=") + if len(pairs) > 1 { + result[pairs[0]] = pairs[1] + } + } + return result +} + +// ListDevicesChild list all child available on a device +// For an encrypted device, it will return the encrypted device like so: +// lsblk --noheadings --output NAME --path --list /dev/sdd +// /dev/sdd +// /dev/mapper/ocs-deviceset-thin-1-data-0hmfgp-block-dmcrypt +func ListDevicesChild(executor exec.Executor, device string) ([]string, error) { + childListRaw, err := executor.ExecuteCommandWithOutput("lsblk", "--noheadings", "--path", "--list", "--output", "NAME", device) + if err != nil { + return []string{}, fmt.Errorf("failed to list child devices of %q. %v", device, err) + } + + return strings.Split(childListRaw, "\n"), nil +}