티스토리 뷰

argo는 kubernetes api를 이용하여 다양한 서비스를 제공하는 오픈소스 입니다.

 

컨테이너를 생성하고 관리할 수 있어서 파이프라인, 워크플로우에서 활용할 수 있습니다.

kubeflow도 파이프라인 관리에 내부적으로 argo를 사용하고 있습니다.

 

argo는 argo-cd, argo-event, argo-workflow 등 다양하게 활용되고 있습니다.

ci/cd에서도 argo 활용이 두드러지지만 저는 workflow로 사용하면서 느낀 점과 그 과정에서 argo 소스코드를 살펴본 부분을 공유드리겠습니다.

 

jenkins pipeline이나 python airflow도 파이프라인, 워크플로우를 처리하는 좋은 솔루션입니다.

하지만 모두 약간씩 아쉬움이 있습니다.

 

별도의 제약조건 없이 거의 모든 것을 파이프라인, 워크플로우로 처리하려면 argo-workflow가 최선입니다.

 

여러 번에 걸쳐서 argo 소스코드 일부를 같이 읽고 생각을 말씀드리겠습니다.

 

https://github.com/argoproj/argo

 

argoproj/argo

Argo Workflows: Get stuff done with Kubernetes. Contribute to argoproj/argo development by creating an account on GitHub.

github.com

 

먼저 argo를 실행하게 되면, argo-cli로 명령을 보낼 수 있습니다.

그리고 RestAPI로도 제어가 가능합니다.

 

Pod를 띄우고 실행하고 진행상황을 모니터링 할 수 있습니다.

 

argo 라이브러리를 이용해서 자체 서비스를 만들다보니 API 제공이 궁금했습니다.

 

https://github.com/argoproj/argo/blob/master/pkg/apiclient/workflow/workflow.proto

 

argoproj/argo

Argo Workflows: Get stuff done with Kubernetes. Contribute to argoproj/argo development by creating an account on GitHub.

github.com

grpc 인터페이스를 제공하고 있습니다.

 

service WorkflowService {
  rpc CreateWorkflow (WorkflowCreateRequest) returns (Workflow) 
  rpc GetWorkflow (WorkflowGetRequest) returns (Workflow) 
  rpc ListWorkflows (WorkflowListRequest) returns (WorkflowList) 
  rpc WatchWorkflows (WatchWorkflowsRequest) returns (stream WorkflowWatchEvent) 
  rpc DeleteWorkflow (WorkflowDeleteRequest) returns (WorkflowDeleteResponse) 
  rpc RetryWorkflow (WorkflowRetryRequest) returns (Workflow) 
  rpc ResubmitWorkflow (WorkflowResubmitRequest) returns (Workflow) 
  rpc ResumeWorkflow (WorkflowResumeRequest) returns (Workflow) 
  rpc SuspendWorkflow (WorkflowSuspendRequest) returns (Workflow) 
  rpc TerminateWorkflow (WorkflowTerminateRequest) returns (Workflow) 
  rpc StopWorkflow (WorkflowStopRequest) returns (Workflow) 
  rpc LintWorkflow (WorkflowLintRequest) returns (Workflow) 
  rpc PodLogs (WorkflowLogRequest) returns (stream LogEntry) 
  rpc SubmitWorkflow (WorkflowSubmitRequest) returns (Workflow) 
}

 

그리고 grpc-gateway를 이용하여 각 인터페이스의 RestAPI도 제공합니다.

 

 

이제 grpc 서버를 실행하는 부분을 살펴보겠습니다.

 

https://github.com/argoproj/argo/blob/master/server/apiserver/argoserver.go

 

argoproj/argo

Argo Workflows: Get stuff done with Kubernetes. Contribute to argoproj/argo development by creating an account on GitHub.

github.com

먼저 argoserver.go에는 두 서버 생성 함수가 있습니다.

 

newGRPCServer()

newHTTPServer()

 

그리고 Run()에서 위 두 함수를 차례로 실행해서 서버 객체를 만들고

go routine으로 각 서버 객체에서 Serve() 함수를 동시에 (정확히는 논블로킹으로) 실행합니다.

 

func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(string)) {
  ...

  grpcServer := as.newGRPCServer(instanceIDService, offloadRepo, wfArchive, configMap.Links)
  httpServer := as.newHTTPServer(ctx, port, artifactServer)

  ...

  // Cmux is used to support servicing gRPC and HTTP1.1+JSON on the same port
  tcpm := cmux.New(conn)
  httpL := tcpm.Match(cmux.HTTP1Fast())
  grpcL := tcpm.Match(cmux.Any())

  go as.configController.Run(as.stopCh, as.restartOnConfigChange)
  go func() { as.checkServeErr("grpcServer", grpcServer.Serve(grpcL)) }()
  go func() { as.checkServeErr("httpServer", httpServer.Serve(httpL)) }()
  go func() { as.checkServeErr("tcpm", tcpm.Serve()) }()

  <-as.stopCh
}

 

cmux는 여러 프로토콜의 커넥션을 multiplexing해주는 라이브러리 입니다.

https://github.com/soheilhy/cmux

 

soheilhy/cmux

Connection multiplexer for GoLang: serve different services on the same port! - soheilhy/cmux

github.com

grpc와 http를 동시에 연결을 맺어줍니다.

 

 

newGRPCServer() 내부는 다음과 같습니다.

func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, links []*v1alpha1.Link) *grpc.Server {
  serverLog := log.NewEntry(log.StandardLogger())

  sOpts := []grpc.ServerOption{
    // Set both the send and receive the bytes limit to be 100MB
    // The proper way to achieve high performance is to have pagination
    // while we work toward that, we can have high limit first
    grpc.MaxRecvMsgSize(MaxGRPCMessageSize),
    grpc.MaxSendMsgSize(MaxGRPCMessageSize),
    grpc.ConnectionTimeout(300 * time.Second),
    grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
      grpc_logrus.UnaryServerInterceptor(serverLog),
      grpcutil.PanicLoggerUnaryServerInterceptor(serverLog),
      grpcutil.ErrorTranslationUnaryServerInterceptor,
      as.authenticator.UnaryServerInterceptor(),
    )),
    grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
      grpc_logrus.StreamServerInterceptor(serverLog),
      grpcutil.PanicLoggerStreamServerInterceptor(serverLog),
      grpcutil.ErrorTranslationStreamServerInterceptor,
      as.authenticator.StreamServerInterceptor(),
    )),
  }

  grpcServer := grpc.NewServer(sOpts...)

  infopkg.RegisterInfoServiceServer(grpcServer, info.NewInfoServer(as.managedNamespace, links))
  workflowpkg.RegisterWorkflowServiceServer(grpcServer, workflow.NewWorkflowServer(instanceIDService, offloadNodeStatusRepo))
  workflowtemplatepkg.RegisterWorkflowTemplateServiceServer(grpcServer, workflowtemplate.NewWorkflowTemplateServer(instanceIDService))
  cronworkflowpkg.RegisterCronWorkflowServiceServer(grpcServer, cronworkflow.NewCronWorkflowServer(instanceIDService))
  workflowarchivepkg.RegisterArchivedWorkflowServiceServer(grpcServer, workflowarchive.NewWorkflowArchiveServer(wfArchive))
  clusterwftemplatepkg.RegisterClusterWorkflowTemplateServiceServer(grpcServer, clusterworkflowtemplate.NewClusterWorkflowTemplateServer(instanceIDService))
  return grpcServer
}

inteceptor를 어떻게 활용하는지 잘 봐두면 좋겠습니다.

'코드읽기' 카테고리의 다른 글

golang grpc conn 객체 관리  (0) 2020.05.07
golang httpmq 코드 분석  (0) 2019.12.27
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함