...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/apiserver/workflow/workflow.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/apiserver/workflow

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package workflow
    15  
    16  import (
    17  	"encoding/json"
    18  	"fmt"
    19  	"net/http"
    20  	"sort"
    21  
    22  	"github.com/gin-gonic/gin"
    23  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    24  	ctrl "sigs.k8s.io/controller-runtime"
    25  
    26  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    27  	"github.com/chaos-mesh/chaos-mesh/pkg/apiserver/utils"
    28  	"github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
    29  	config "github.com/chaos-mesh/chaos-mesh/pkg/config/dashboard"
    30  	"github.com/chaos-mesh/chaos-mesh/pkg/core"
    31  )
    32  
    33  var log = ctrl.Log.WithName("workflow api")
    34  
    35  // StatusResponse defines a common status struct.
    36  type StatusResponse struct {
    37  	Status string `json:"status"`
    38  }
    39  
    40  func Register(r *gin.RouterGroup, s *Service) {
    41  	endpoint := r.Group("/workflows")
    42  	endpoint.GET("", s.listWorkflows)
    43  	endpoint.POST("", s.createWorkflow)
    44  	endpoint.GET("/:uid", s.getWorkflowDetailByUID)
    45  	endpoint.PUT("/:uid", s.updateWorkflow)
    46  	endpoint.DELETE("/:uid", s.deleteWorkflow)
    47  }
    48  
    49  // Service defines a handler service for workflows.
    50  type Service struct {
    51  	conf  *config.ChaosDashboardConfig
    52  	store core.WorkflowStore
    53  }
    54  
    55  func NewService(conf *config.ChaosDashboardConfig, store core.WorkflowStore) *Service {
    56  	return &Service{conf: conf, store: store}
    57  }
    58  
    59  // @Summary List workflows from Kubernetes cluster.
    60  // @Description List workflows from Kubernetes cluster.
    61  // @Tags workflows
    62  // @Produce json
    63  // @Param namespace query string false "namespace, given empty string means list from all namespace"
    64  // @Param status query string false "status" Enums(Initializing, Running, Errored, Finished)
    65  // @Success 200 {array} core.WorkflowMeta
    66  // @Router /workflows [get]
    67  // @Failure 500 {object} utils.APIError
    68  func (it *Service) listWorkflows(c *gin.Context) {
    69  	namespace := c.Query("namespace")
    70  	if len(namespace) == 0 && !it.conf.ClusterScoped &&
    71  		len(it.conf.TargetNamespace) != 0 {
    72  		namespace = it.conf.TargetNamespace
    73  	}
    74  
    75  	result := make([]core.WorkflowMeta, 0)
    76  
    77  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
    78  	if err != nil {
    79  		_ = c.Error(utils.ErrInvalidRequest.WrapWithNoMessage(err))
    80  		return
    81  	}
    82  	repo := core.NewKubeWorkflowRepository(kubeClient)
    83  
    84  	if namespace != "" {
    85  		workflowFromNs, err := repo.ListByNamespace(c.Request.Context(), namespace)
    86  		if err != nil {
    87  			utils.SetErrorForGinCtx(c, err)
    88  			return
    89  		}
    90  		result = append(result, workflowFromNs...)
    91  	} else {
    92  		allWorkflow, err := repo.List(c.Request.Context())
    93  		if err != nil {
    94  			utils.SetErrorForGinCtx(c, err)
    95  			return
    96  		}
    97  		result = append(result, allWorkflow...)
    98  	}
    99  
   100  	// enriching with ID
   101  	for index, item := range result {
   102  		entity, err := it.store.FindByUID(c.Request.Context(), string(item.UID))
   103  		if err != nil {
   104  			log.Info("warning: workflow does not have a record in database",
   105  				"namespaced name", fmt.Sprintf("%s/%s", item.Namespace, item.Name),
   106  				"uid", item.UID,
   107  			)
   108  		}
   109  
   110  		if entity != nil {
   111  			result[index].ID = entity.ID
   112  		}
   113  	}
   114  
   115  	sort.Slice(result, func(i, j int) bool {
   116  		return result[i].CreatedAt.After(result[i].CreatedAt)
   117  	})
   118  
   119  	c.JSON(http.StatusOK, result)
   120  }
   121  
   122  // @Summary Get detailed information about the specified workflow.
   123  // @Description Get detailed information about the specified workflow. If that object is not existed in kubernetes, it will only return ths persisted data in the database.
   124  // @Tags workflows
   125  // @Produce json
   126  // @Param uid path string true "uid"
   127  // @Router /workflows/{uid} [GET]
   128  // @Success 200 {object} core.WorkflowDetail
   129  // @Failure 400 {object} utils.APIError
   130  // @Failure 500 {object} utils.APIError
   131  func (it *Service) getWorkflowDetailByUID(c *gin.Context) {
   132  	uid := c.Param("uid")
   133  
   134  	entity, err := it.store.FindByUID(c.Request.Context(), uid)
   135  	if err != nil {
   136  		utils.SetErrorForGinCtx(c, err)
   137  		return
   138  	}
   139  
   140  	namespace := entity.Namespace
   141  	name := entity.Name
   142  
   143  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   144  	if err != nil {
   145  		if apierrors.IsNotFound(err) {
   146  			// if not exists in kubernetes anymore, return the persisted entity directly.
   147  			workflowDetail, err := core.WorkflowEntity2WorkflowDetail(entity)
   148  			if err != nil {
   149  				utils.SetErrorForGinCtx(c, err)
   150  				return
   151  			}
   152  			c.JSON(http.StatusOK, workflowDetail)
   153  			return
   154  		}
   155  		_ = c.Error(utils.ErrInvalidRequest.WrapWithNoMessage(err))
   156  		return
   157  	}
   158  
   159  	// enriching the topology and spec/status with CR in kubernetes
   160  	repo := core.NewKubeWorkflowRepository(kubeClient)
   161  
   162  	workflowCRInKubernetes, err := repo.Get(c.Request.Context(), namespace, name)
   163  	if err != nil {
   164  		utils.SetErrorForGinCtx(c, err)
   165  		return
   166  	}
   167  	result, err := core.WorkflowEntity2WorkflowDetail(entity)
   168  	if err != nil {
   169  		utils.SetErrorForGinCtx(c, err)
   170  		return
   171  	}
   172  	result.Topology = workflowCRInKubernetes.Topology
   173  	result.KubeObject = workflowCRInKubernetes.KubeObject
   174  
   175  	c.JSON(http.StatusOK, result)
   176  }
   177  
   178  // @Summary Create a new workflow.
   179  // @Description Create a new workflow.
   180  // @Tags workflows
   181  // @Produce json
   182  // @Param request body v1alpha1.Workflow true "Request body"
   183  // @Success 200 {object} core.WorkflowDetail
   184  // @Failure 400 {object} utils.APIError
   185  // @Failure 500 {object} utils.APIError
   186  // @Router /workflows/new [post]
   187  func (it *Service) createWorkflow(c *gin.Context) {
   188  	payload := v1alpha1.Workflow{}
   189  
   190  	err := json.NewDecoder(c.Request.Body).Decode(&payload)
   191  	if err != nil {
   192  		_ = c.Error(utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
   193  		return
   194  	}
   195  
   196  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   197  	if err != nil {
   198  		_ = c.Error(utils.ErrInvalidRequest.WrapWithNoMessage(err))
   199  		return
   200  	}
   201  
   202  	repo := core.NewKubeWorkflowRepository(kubeClient)
   203  
   204  	result, err := repo.Create(c.Request.Context(), payload)
   205  	if err != nil {
   206  		_ = c.Error(utils.ErrInternalServer.WrapWithNoMessage(err))
   207  		return
   208  	}
   209  	c.JSON(http.StatusOK, result)
   210  }
   211  
   212  // @Summary Delete the specified workflow.
   213  // @Description Delete the specified workflow.
   214  // @Tags workflows
   215  // @Produce json
   216  // @Param uid path string true "uid"
   217  // @Success 200 {object} StatusResponse
   218  // @Failure 400 {object} utils.APIError
   219  // @Failure 404 {object} utils.APIError
   220  // @Failure 500 {object} utils.APIError
   221  // @Router /workflows/{uid} [delete]
   222  func (it *Service) deleteWorkflow(c *gin.Context) {
   223  	uid := c.Param("uid")
   224  
   225  	entity, err := it.store.FindByUID(c.Request.Context(), uid)
   226  	if err != nil {
   227  		utils.SetErrorForGinCtx(c, err)
   228  		return
   229  	}
   230  
   231  	namespace := entity.Namespace
   232  	name := entity.Name
   233  
   234  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   235  	if err != nil {
   236  		_ = c.Error(utils.ErrInvalidRequest.WrapWithNoMessage(err))
   237  		return
   238  	}
   239  
   240  	repo := core.NewKubeWorkflowRepository(kubeClient)
   241  
   242  	err = repo.Delete(c.Request.Context(), namespace, name)
   243  	if err != nil {
   244  		utils.SetErrorForGinCtx(c, err)
   245  		return
   246  	}
   247  	c.JSON(http.StatusOK, StatusResponse{Status: "success"})
   248  }
   249  
   250  // @Summary Update a workflow.
   251  // @Description Update a workflow.
   252  // @Tags workflows
   253  // @Produce json
   254  // @Param uid path string true "uid"
   255  // @Param request body v1alpha1.Workflow true "Request body"
   256  // @Success 200 {object} core.WorkflowDetail
   257  // @Failure 400 {object} utils.APIError
   258  // @Failure 500 {object} utils.APIError
   259  // @Router /workflows/{uid} [put]
   260  func (it *Service) updateWorkflow(c *gin.Context) {
   261  	payload := v1alpha1.Workflow{}
   262  
   263  	err := json.NewDecoder(c.Request.Body).Decode(&payload)
   264  	if err != nil {
   265  		_ = c.Error(utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
   266  		return
   267  	}
   268  	uid := c.Param("uid")
   269  	entity, err := it.store.FindByUID(c.Request.Context(), uid)
   270  	if err != nil {
   271  		utils.SetErrorForGinCtx(c, err)
   272  		return
   273  	}
   274  
   275  	namespace := entity.Namespace
   276  	name := entity.Name
   277  
   278  	if namespace != payload.Namespace {
   279  		_ = c.Error(utils.ErrInvalidRequest.Wrap(err,
   280  			"namespace is not consistent, pathParameter: %s, metaInRaw: %s",
   281  			namespace,
   282  			payload.Namespace),
   283  		)
   284  		return
   285  	}
   286  	if name != payload.Name {
   287  		_ = c.Error(utils.ErrInvalidRequest.Wrap(err,
   288  			"name is not consistent, pathParameter: %s, metaInRaw: %s",
   289  			name,
   290  			payload.Name),
   291  		)
   292  		return
   293  	}
   294  
   295  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   296  	if err != nil {
   297  		_ = c.Error(utils.ErrInvalidRequest.WrapWithNoMessage(err))
   298  		return
   299  	}
   300  
   301  	repo := core.NewKubeWorkflowRepository(kubeClient)
   302  
   303  	result, err := repo.Update(c.Request.Context(), namespace, name, payload)
   304  	if err != nil {
   305  		_ = c.Error(utils.ErrInternalServer.WrapWithNoMessage(err))
   306  		return
   307  	}
   308  
   309  	c.JSON(http.StatusOK, result)
   310  }
   311