...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/workflow/workflow.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/workflow

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package workflow
    17  
    18  import (
    19  	"encoding/json"
    20  	"fmt"
    21  	"net/http"
    22  	"sort"
    23  
    24  	"github.com/gin-gonic/gin"
    25  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    26  	ctrl "sigs.k8s.io/controller-runtime"
    27  
    28  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    29  	"github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
    30  	config "github.com/chaos-mesh/chaos-mesh/pkg/config/dashboard"
    31  	"github.com/chaos-mesh/chaos-mesh/pkg/curl"
    32  	"github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/utils"
    33  	"github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
    34  )
    35  
    36  var log = ctrl.Log.WithName("workflow api")
    37  
    38  // StatusResponse defines a common status struct.
    39  type StatusResponse struct {
    40  	Status string `json:"status"`
    41  }
    42  
    43  func Register(r *gin.RouterGroup, s *Service) {
    44  	endpoint := r.Group("/workflows")
    45  	endpoint.GET("", s.listWorkflows)
    46  	endpoint.POST("", s.createWorkflow)
    47  	endpoint.GET("/:uid", s.getWorkflowDetailByUID)
    48  	endpoint.PUT("/:uid", s.updateWorkflow)
    49  	endpoint.DELETE("/:uid", s.deleteWorkflow)
    50  	endpoint.POST("/render-task/http", s.renderHTTPTask)
    51  	endpoint.POST("/parse-task/http", s.parseHTTPTask)
    52  	endpoint.POST("/validate-task/http", s.isValidRenderedHTTPTask)
    53  }
    54  
    55  // Service defines a handler service for workflows.
    56  type Service struct {
    57  	conf  *config.ChaosDashboardConfig
    58  	store core.WorkflowStore
    59  }
    60  
    61  func NewService(conf *config.ChaosDashboardConfig, store core.WorkflowStore) *Service {
    62  	return &Service{conf: conf, store: store}
    63  }
    64  
    65  // @Summary Render a task which sends HTTP request
    66  // @Description Render a task which sends HTTP request
    67  // @Tags workflows
    68  // @Produce json
    69  // @Param request body curl.RequestForm true "Origin HTTP Request"
    70  // @Router /workflows/render-task/http [post]
    71  // @Success 200 {object} v1alpha1.Template
    72  // @Failure 400 {object} utils.APIError
    73  // @Failure 500 {object} utils.APIError
    74  func (it *Service) renderHTTPTask(c *gin.Context) {
    75  	requestBody := curl.RequestForm{}
    76  	if err := c.ShouldBindJSON(&requestBody); err != nil {
    77  		utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err, "failed to parse request body"))
    78  		return
    79  	}
    80  	result, err := curl.RenderWorkflowTaskTemplate(requestBody)
    81  	if err != nil {
    82  		utils.SetAPIError(c, utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
    83  		return
    84  	}
    85  	c.JSON(http.StatusOK, result)
    86  }
    87  
    88  // @Summary Validate the given template is a valid rendered HTTP Task
    89  // @Description Validate the given template is a valid rendered HTTP Task
    90  // @Tags workflows
    91  // @Produce json
    92  // @Param request body v1alpha1.Template true "Rendered Task"
    93  // @Router /workflows/validate-task/http [post]
    94  // @Success 200 {object} bool
    95  // @Failure 400 {object} utils.APIError
    96  // @Failure 500 {object} utils.APIError
    97  func (it *Service) isValidRenderedHTTPTask(c *gin.Context) {
    98  	requestBody := v1alpha1.Template{}
    99  	if err := c.ShouldBindJSON(&requestBody); err != nil {
   100  		utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err, "failed to parse request body"))
   101  		return
   102  	}
   103  	result := curl.IsValidRenderedTask(&requestBody)
   104  	c.JSON(http.StatusOK, result)
   105  }
   106  
   107  // @Summary Parse the rendered task back to the original request
   108  // @Description Parse the rendered task back to the original request
   109  // @Tags workflows
   110  // @Produce json
   111  // @Param request body v1alpha1.Template true "Rendered Task"
   112  // @Router /workflows/parse-task/http [post]
   113  // @Success 200 {object} curl.RequestForm
   114  // @Failure 400 {object} utils.APIError
   115  // @Failure 500 {object} utils.APIError
   116  func (it *Service) parseHTTPTask(c *gin.Context) {
   117  	requestBody := v1alpha1.Template{}
   118  	if err := c.ShouldBindJSON(&requestBody); err != nil {
   119  		utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err, "failed to parse request body"))
   120  		return
   121  	}
   122  	result, err := curl.ParseWorkflowTaskTemplate(&requestBody)
   123  	if err != nil {
   124  		utils.SetAPIError(c, utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
   125  		return
   126  	}
   127  	c.JSON(http.StatusOK, result)
   128  }
   129  
   130  // @Summary List workflows from Kubernetes cluster.
   131  // @Description List workflows from Kubernetes cluster.
   132  // @Tags workflows
   133  // @Produce json
   134  // @Param namespace query string false "namespace, given empty string means list from all namespace"
   135  // @Param status query string false "status" Enums(Initializing, Running, Errored, Finished)
   136  // @Success 200 {array} core.WorkflowMeta
   137  // @Router /workflows [get]
   138  // @Failure 500 {object} utils.APIError
   139  func (it *Service) listWorkflows(c *gin.Context) {
   140  	namespace := c.Query("namespace")
   141  	if len(namespace) == 0 && !it.conf.ClusterScoped &&
   142  		len(it.conf.TargetNamespace) != 0 {
   143  		namespace = it.conf.TargetNamespace
   144  	}
   145  
   146  	result := make([]core.WorkflowMeta, 0)
   147  
   148  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   149  	if err != nil {
   150  		_ = c.Error(utils.ErrBadRequest.WrapWithNoMessage(err))
   151  		return
   152  	}
   153  	repo := core.NewKubeWorkflowRepository(kubeClient)
   154  
   155  	if namespace != "" {
   156  		workflowFromNs, err := repo.ListByNamespace(c.Request.Context(), namespace)
   157  		if err != nil {
   158  			utils.SetAPImachineryError(c, err)
   159  			return
   160  		}
   161  		result = append(result, workflowFromNs...)
   162  	} else {
   163  		allWorkflow, err := repo.List(c.Request.Context())
   164  		if err != nil {
   165  			utils.SetAPImachineryError(c, err)
   166  			return
   167  		}
   168  		result = append(result, allWorkflow...)
   169  	}
   170  
   171  	// enriching with ID
   172  	for index, item := range result {
   173  		entity, err := it.store.FindByUID(c.Request.Context(), string(item.UID))
   174  		if err != nil {
   175  			log.Info("warning: workflow does not have a record in database",
   176  				"namespaced name", fmt.Sprintf("%s/%s", item.Namespace, item.Name),
   177  				"uid", item.UID,
   178  			)
   179  		}
   180  
   181  		if entity != nil {
   182  			result[index].ID = entity.ID
   183  		}
   184  	}
   185  
   186  	sort.Slice(result, func(i, j int) bool {
   187  		return result[i].CreatedAt.After(result[i].CreatedAt)
   188  	})
   189  
   190  	c.JSON(http.StatusOK, result)
   191  }
   192  
   193  // @Summary Get detailed information about the specified workflow.
   194  // @Description Get detailed information about the specified workflow. If that object is not existed in kubernetes, it will only return ths persisted data in the database.
   195  // @Tags workflows
   196  // @Produce json
   197  // @Param uid path string true "uid"
   198  // @Router /workflows/{uid} [GET]
   199  // @Success 200 {object} core.WorkflowDetail
   200  // @Failure 400 {object} utils.APIError
   201  // @Failure 500 {object} utils.APIError
   202  func (it *Service) getWorkflowDetailByUID(c *gin.Context) {
   203  	uid := c.Param("uid")
   204  
   205  	entity, err := it.store.FindByUID(c.Request.Context(), uid)
   206  	if err != nil {
   207  		utils.SetAPImachineryError(c, err)
   208  		return
   209  	}
   210  
   211  	namespace := entity.Namespace
   212  	name := entity.Name
   213  
   214  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   215  	if err != nil {
   216  		if apierrors.IsNotFound(err) {
   217  			// if not exists in kubernetes anymore, return the persisted entity directly.
   218  			workflowDetail, err := core.WorkflowEntity2WorkflowDetail(entity)
   219  			if err != nil {
   220  				utils.SetAPImachineryError(c, err)
   221  				return
   222  			}
   223  			c.JSON(http.StatusOK, workflowDetail)
   224  			return
   225  		}
   226  		_ = c.Error(utils.ErrBadRequest.WrapWithNoMessage(err))
   227  		return
   228  	}
   229  
   230  	// enriching the topology and spec/status with CR in kubernetes
   231  	repo := core.NewKubeWorkflowRepository(kubeClient)
   232  
   233  	workflowCRInKubernetes, err := repo.Get(c.Request.Context(), namespace, name)
   234  	if err != nil {
   235  		utils.SetAPImachineryError(c, err)
   236  		return
   237  	}
   238  	result, err := core.WorkflowEntity2WorkflowDetail(entity)
   239  	if err != nil {
   240  		utils.SetAPImachineryError(c, err)
   241  		return
   242  	}
   243  	result.Topology = workflowCRInKubernetes.Topology
   244  	result.KubeObject = workflowCRInKubernetes.KubeObject
   245  
   246  	c.JSON(http.StatusOK, result)
   247  }
   248  
   249  // @Summary Create a new workflow.
   250  // @Description Create a new workflow.
   251  // @Tags workflows
   252  // @Produce json
   253  // @Param request body v1alpha1.Workflow true "Request body"
   254  // @Success 200 {object} core.WorkflowDetail
   255  // @Failure 400 {object} utils.APIError
   256  // @Failure 500 {object} utils.APIError
   257  // @Router /workflows [post]
   258  func (it *Service) createWorkflow(c *gin.Context) {
   259  	payload := v1alpha1.Workflow{}
   260  
   261  	err := json.NewDecoder(c.Request.Body).Decode(&payload)
   262  	if err != nil {
   263  		_ = c.Error(utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
   264  		return
   265  	}
   266  
   267  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   268  	if err != nil {
   269  		utils.SetAPImachineryError(c, err)
   270  		return
   271  	}
   272  
   273  	repo := core.NewKubeWorkflowRepository(kubeClient)
   274  
   275  	result, err := repo.Create(c.Request.Context(), payload)
   276  	if err != nil {
   277  		utils.SetAPImachineryError(c, err)
   278  		return
   279  	}
   280  	c.JSON(http.StatusOK, result)
   281  }
   282  
   283  // @Summary Delete the specified workflow.
   284  // @Description Delete the specified workflow.
   285  // @Tags workflows
   286  // @Produce json
   287  // @Param uid path string true "uid"
   288  // @Success 200 {object} StatusResponse
   289  // @Failure 400 {object} utils.APIError
   290  // @Failure 404 {object} utils.APIError
   291  // @Failure 500 {object} utils.APIError
   292  // @Router /workflows/{uid} [delete]
   293  func (it *Service) deleteWorkflow(c *gin.Context) {
   294  	uid := c.Param("uid")
   295  
   296  	entity, err := it.store.FindByUID(c.Request.Context(), uid)
   297  	if err != nil {
   298  		utils.SetAPImachineryError(c, err)
   299  		return
   300  	}
   301  
   302  	namespace := entity.Namespace
   303  	name := entity.Name
   304  
   305  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   306  	if err != nil {
   307  		_ = c.Error(utils.ErrBadRequest.WrapWithNoMessage(err))
   308  		return
   309  	}
   310  
   311  	repo := core.NewKubeWorkflowRepository(kubeClient)
   312  
   313  	err = repo.Delete(c.Request.Context(), namespace, name)
   314  	if err != nil {
   315  		utils.SetAPImachineryError(c, err)
   316  		return
   317  	}
   318  	c.JSON(http.StatusOK, StatusResponse{Status: "success"})
   319  }
   320  
   321  // @Summary Update a workflow.
   322  // @Description Update a workflow.
   323  // @Tags workflows
   324  // @Produce json
   325  // @Param uid path string true "uid"
   326  // @Param request body v1alpha1.Workflow true "Request body"
   327  // @Success 200 {object} core.WorkflowDetail
   328  // @Failure 400 {object} utils.APIError
   329  // @Failure 500 {object} utils.APIError
   330  // @Router /workflows/{uid} [put]
   331  func (it *Service) updateWorkflow(c *gin.Context) {
   332  	payload := v1alpha1.Workflow{}
   333  
   334  	err := json.NewDecoder(c.Request.Body).Decode(&payload)
   335  	if err != nil {
   336  		utils.SetAPIError(c, utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
   337  		return
   338  	}
   339  	uid := c.Param("uid")
   340  	entity, err := it.store.FindByUID(c.Request.Context(), uid)
   341  	if err != nil {
   342  		utils.SetAPImachineryError(c, err)
   343  		return
   344  	}
   345  
   346  	namespace := entity.Namespace
   347  	name := entity.Name
   348  
   349  	if namespace != payload.Namespace {
   350  		utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err,
   351  			"namespace is not consistent, pathParameter: %s, metaInRaw: %s",
   352  			namespace,
   353  			payload.Namespace))
   354  		return
   355  	}
   356  	if name != payload.Name {
   357  		utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err,
   358  			"name is not consistent, pathParameter: %s, metaInRaw: %s",
   359  			name,
   360  			payload.Name))
   361  		return
   362  	}
   363  
   364  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   365  	if err != nil {
   366  		utils.SetAPImachineryError(c, err)
   367  		return
   368  	}
   369  
   370  	repo := core.NewKubeWorkflowRepository(kubeClient)
   371  
   372  	result, err := repo.Update(c.Request.Context(), namespace, name, payload)
   373  	if err != nil {
   374  		utils.SetAPImachineryError(c, err)
   375  		return
   376  	}
   377  
   378  	c.JSON(http.StatusOK, result)
   379  }
   380