...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/workflow/workflow.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/workflow

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package workflow
    17  
    18  import (
    19  	"encoding/json"
    20  	"fmt"
    21  	"net/http"
    22  	"sort"
    23  
    24  	"github.com/gin-gonic/gin"
    25  	"github.com/go-logr/logr"
    26  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    27  
    28  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    29  	"github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
    30  	config "github.com/chaos-mesh/chaos-mesh/pkg/config"
    31  	"github.com/chaos-mesh/chaos-mesh/pkg/curl"
    32  	"github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/utils"
    33  	"github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
    34  )
    35  
    36  func Register(r *gin.RouterGroup, s *Service) {
    37  	endpoint := r.Group("/workflows")
    38  	endpoint.GET("", s.listWorkflows)
    39  	endpoint.POST("", s.createWorkflow)
    40  	endpoint.GET("/:uid", s.getWorkflowDetailByUID)
    41  	endpoint.PUT("/:uid", s.updateWorkflow)
    42  	endpoint.DELETE("/:uid", s.deleteWorkflow)
    43  	endpoint.POST("/render-task/http", s.renderHTTPTask)
    44  	endpoint.POST("/parse-task/http", s.parseHTTPTask)
    45  	endpoint.POST("/validate-task/http", s.isValidRenderedHTTPTask)
    46  }
    47  
    48  // Service defines a handler service for workflows.
    49  type Service struct {
    50  	conf   *config.ChaosDashboardConfig
    51  	store  core.WorkflowStore
    52  	logger logr.Logger
    53  }
    54  
    55  func NewService(conf *config.ChaosDashboardConfig, store core.WorkflowStore, logger logr.Logger) *Service {
    56  	return &Service{conf: conf, store: store, logger: logger}
    57  }
    58  
    59  // @Summary Render a task which sends HTTP request
    60  // @Description Render a task which sends HTTP request
    61  // @Tags workflows
    62  // @Produce json
    63  // @Param request body curl.RequestForm true "Origin HTTP Request"
    64  // @Success 200 {object} v1alpha1.Template
    65  // @Failure 400 {object} utils.APIError
    66  // @Failure 500 {object} utils.APIError
    67  // @Router /workflows/render-task/http [post]
    68  func (it *Service) renderHTTPTask(c *gin.Context) {
    69  	requestBody := curl.RequestForm{}
    70  	if err := c.ShouldBindJSON(&requestBody); err != nil {
    71  		utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err, "failed to parse request body"))
    72  		return
    73  	}
    74  	result, err := curl.RenderWorkflowTaskTemplate(requestBody)
    75  	if err != nil {
    76  		utils.SetAPIError(c, utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
    77  		return
    78  	}
    79  	c.JSON(http.StatusOK, result)
    80  }
    81  
    82  // @Summary Validate the given template is a valid rendered HTTP Task
    83  // @Description Validate the given template is a valid rendered HTTP Task
    84  // @Tags workflows
    85  // @Produce json
    86  // @Param request body v1alpha1.Template true "Rendered Task"
    87  // @Router /workflows/validate-task/http [post]
    88  // @Success 200 {object} bool
    89  // @Failure 400 {object} utils.APIError
    90  // @Failure 500 {object} utils.APIError
    91  func (it *Service) isValidRenderedHTTPTask(c *gin.Context) {
    92  	requestBody := v1alpha1.Template{}
    93  	if err := c.ShouldBindJSON(&requestBody); err != nil {
    94  		utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err, "failed to parse request body"))
    95  		return
    96  	}
    97  	result := curl.IsValidRenderedTask(&requestBody)
    98  	c.JSON(http.StatusOK, result)
    99  }
   100  
   101  // @Summary Parse the rendered task back to the original request
   102  // @Description Parse the rendered task back to the original request
   103  // @Tags workflows
   104  // @Produce json
   105  // @Param request body v1alpha1.Template true "Rendered Task"
   106  // @Router /workflows/parse-task/http [post]
   107  // @Success 200 {object} curl.RequestForm
   108  // @Failure 400 {object} utils.APIError
   109  // @Failure 500 {object} utils.APIError
   110  func (it *Service) parseHTTPTask(c *gin.Context) {
   111  	requestBody := v1alpha1.Template{}
   112  	if err := c.ShouldBindJSON(&requestBody); err != nil {
   113  		utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err, "failed to parse request body"))
   114  		return
   115  	}
   116  	result, err := curl.ParseWorkflowTaskTemplate(&requestBody)
   117  	if err != nil {
   118  		utils.SetAPIError(c, utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
   119  		return
   120  	}
   121  	c.JSON(http.StatusOK, result)
   122  }
   123  
   124  // @Summary List workflows from Kubernetes cluster.
   125  // @Description List workflows from Kubernetes cluster.
   126  // @Tags workflows
   127  // @Produce json
   128  // @Param namespace query string false "namespace, given empty string means list from all namespace"
   129  // @Param status query string false "status" Enums(Initializing, Running, Errored, Finished)
   130  // @Success 200 {array} core.WorkflowMeta
   131  // @Router /workflows [get]
   132  // @Failure 500 {object} utils.APIError
   133  func (it *Service) listWorkflows(c *gin.Context) {
   134  	namespace := c.Query("namespace")
   135  	if len(namespace) == 0 && !it.conf.ClusterScoped &&
   136  		len(it.conf.TargetNamespace) != 0 {
   137  		namespace = it.conf.TargetNamespace
   138  	}
   139  
   140  	result := make([]core.WorkflowMeta, 0)
   141  
   142  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   143  	if err != nil {
   144  		_ = c.Error(utils.ErrBadRequest.WrapWithNoMessage(err))
   145  		return
   146  	}
   147  	repo := core.NewKubeWorkflowRepository(kubeClient)
   148  
   149  	if namespace != "" {
   150  		workflowFromNs, err := repo.ListByNamespace(c.Request.Context(), namespace)
   151  		if err != nil {
   152  			utils.SetAPImachineryError(c, err)
   153  			return
   154  		}
   155  		result = append(result, workflowFromNs...)
   156  	} else {
   157  		allWorkflow, err := repo.List(c.Request.Context())
   158  		if err != nil {
   159  			utils.SetAPImachineryError(c, err)
   160  			return
   161  		}
   162  		result = append(result, allWorkflow...)
   163  	}
   164  
   165  	// enriching with ID
   166  	for index, item := range result {
   167  		entity, err := it.store.FindByUID(c.Request.Context(), string(item.UID))
   168  		if err != nil {
   169  			it.logger.Info("warning: workflow does not have a record in database",
   170  				"namespaced name", fmt.Sprintf("%s/%s", item.Namespace, item.Name),
   171  				"uid", item.UID,
   172  			)
   173  		}
   174  
   175  		if entity != nil {
   176  			result[index].ID = entity.ID
   177  		}
   178  	}
   179  
   180  	sort.Slice(result, func(i, j int) bool {
   181  		return result[i].CreatedAt.After(result[i].CreatedAt)
   182  	})
   183  
   184  	c.JSON(http.StatusOK, result)
   185  }
   186  
   187  // @Summary Get detailed information about the specified workflow.
   188  // @Description Get detailed information about the specified workflow. If that object is not existed in kubernetes, it will only return ths persisted data in the database.
   189  // @Tags workflows
   190  // @Produce json
   191  // @Param uid path string true "uid"
   192  // @Router /workflows/{uid} [GET]
   193  // @Success 200 {object} core.WorkflowDetail
   194  // @Failure 400 {object} utils.APIError
   195  // @Failure 500 {object} utils.APIError
   196  func (it *Service) getWorkflowDetailByUID(c *gin.Context) {
   197  	uid := c.Param("uid")
   198  
   199  	entity, err := it.store.FindByUID(c.Request.Context(), uid)
   200  	if err != nil {
   201  		utils.SetAPImachineryError(c, err)
   202  		return
   203  	}
   204  
   205  	namespace := entity.Namespace
   206  	name := entity.Name
   207  
   208  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   209  	if err != nil {
   210  		if apierrors.IsNotFound(err) {
   211  			// if not exists in kubernetes anymore, return the persisted entity directly.
   212  			workflowDetail, err := core.WorkflowEntity2WorkflowDetail(entity)
   213  			if err != nil {
   214  				utils.SetAPImachineryError(c, err)
   215  				return
   216  			}
   217  			c.JSON(http.StatusOK, workflowDetail)
   218  			return
   219  		}
   220  		_ = c.Error(utils.ErrBadRequest.WrapWithNoMessage(err))
   221  		return
   222  	}
   223  
   224  	// enriching the topology and spec/status with CR in kubernetes
   225  	repo := core.NewKubeWorkflowRepository(kubeClient)
   226  
   227  	workflowCRInKubernetes, err := repo.Get(c.Request.Context(), namespace, name)
   228  	if err != nil {
   229  		utils.SetAPImachineryError(c, err)
   230  		return
   231  	}
   232  	result, err := core.WorkflowEntity2WorkflowDetail(entity)
   233  	if err != nil {
   234  		utils.SetAPImachineryError(c, err)
   235  		return
   236  	}
   237  	result.Topology = workflowCRInKubernetes.Topology
   238  	result.KubeObject = workflowCRInKubernetes.KubeObject
   239  
   240  	c.JSON(http.StatusOK, result)
   241  }
   242  
   243  // @Summary Create a new workflow.
   244  // @Description Create a new workflow.
   245  // @Tags workflows
   246  // @Produce json
   247  // @Param request body v1alpha1.Workflow true "Request body"
   248  // @Success 200 {object} core.WorkflowDetail
   249  // @Failure 400 {object} utils.APIError
   250  // @Failure 500 {object} utils.APIError
   251  // @Router /workflows [post]
   252  func (it *Service) createWorkflow(c *gin.Context) {
   253  	payload := v1alpha1.Workflow{}
   254  
   255  	err := json.NewDecoder(c.Request.Body).Decode(&payload)
   256  	if err != nil {
   257  		_ = c.Error(utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
   258  		return
   259  	}
   260  
   261  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   262  	if err != nil {
   263  		utils.SetAPImachineryError(c, err)
   264  		return
   265  	}
   266  
   267  	repo := core.NewKubeWorkflowRepository(kubeClient)
   268  
   269  	result, err := repo.Create(c.Request.Context(), payload)
   270  	if err != nil {
   271  		utils.SetAPImachineryError(c, err)
   272  		return
   273  	}
   274  	c.JSON(http.StatusOK, result)
   275  }
   276  
   277  // @Summary Delete the specified workflow.
   278  // @Description Delete the specified workflow.
   279  // @Tags workflows
   280  // @Produce json
   281  // @Param uid path string true "uid"
   282  // @Success 200 {object} utils.Response
   283  // @Failure 400 {object} utils.APIError
   284  // @Failure 404 {object} utils.APIError
   285  // @Failure 500 {object} utils.APIError
   286  // @Router /workflows/{uid} [delete]
   287  func (it *Service) deleteWorkflow(c *gin.Context) {
   288  	uid := c.Param("uid")
   289  
   290  	entity, err := it.store.FindByUID(c.Request.Context(), uid)
   291  	if err != nil {
   292  		utils.SetAPImachineryError(c, err)
   293  		return
   294  	}
   295  
   296  	namespace := entity.Namespace
   297  	name := entity.Name
   298  
   299  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   300  	if err != nil {
   301  		_ = c.Error(utils.ErrBadRequest.WrapWithNoMessage(err))
   302  		return
   303  	}
   304  
   305  	repo := core.NewKubeWorkflowRepository(kubeClient)
   306  
   307  	err = repo.Delete(c.Request.Context(), namespace, name)
   308  	if err != nil {
   309  		utils.SetAPImachineryError(c, err)
   310  		return
   311  	}
   312  	c.JSON(http.StatusOK, utils.ResponseSuccess)
   313  }
   314  
   315  // @Summary Update a workflow.
   316  // @Description Update a workflow.
   317  // @Tags workflows
   318  // @Produce json
   319  // @Param uid path string true "uid"
   320  // @Param request body v1alpha1.Workflow true "Request body"
   321  // @Success 200 {object} core.WorkflowDetail
   322  // @Failure 400 {object} utils.APIError
   323  // @Failure 500 {object} utils.APIError
   324  // @Router /workflows/{uid} [put]
   325  func (it *Service) updateWorkflow(c *gin.Context) {
   326  	payload := v1alpha1.Workflow{}
   327  
   328  	err := json.NewDecoder(c.Request.Body).Decode(&payload)
   329  	if err != nil {
   330  		utils.SetAPIError(c, utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
   331  		return
   332  	}
   333  	uid := c.Param("uid")
   334  	entity, err := it.store.FindByUID(c.Request.Context(), uid)
   335  	if err != nil {
   336  		utils.SetAPImachineryError(c, err)
   337  		return
   338  	}
   339  
   340  	namespace := entity.Namespace
   341  	name := entity.Name
   342  
   343  	if namespace != payload.Namespace {
   344  		utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err,
   345  			"namespace is not consistent, pathParameter: %s, metaInRaw: %s",
   346  			namespace,
   347  			payload.Namespace))
   348  		return
   349  	}
   350  	if name != payload.Name {
   351  		utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err,
   352  			"name is not consistent, pathParameter: %s, metaInRaw: %s",
   353  			name,
   354  			payload.Name))
   355  		return
   356  	}
   357  
   358  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   359  	if err != nil {
   360  		utils.SetAPImachineryError(c, err)
   361  		return
   362  	}
   363  
   364  	repo := core.NewKubeWorkflowRepository(kubeClient)
   365  
   366  	result, err := repo.Update(c.Request.Context(), namespace, name, payload)
   367  	if err != nil {
   368  		utils.SetAPImachineryError(c, err)
   369  		return
   370  	}
   371  
   372  	c.JSON(http.StatusOK, result)
   373  }
   374