티스토리 뷰

코드읽기

golang httpmq 코드 분석

Нуеоп 2019. 12. 27. 13:10

golang으로 구현된 httpmq 코드 분석 해보겠습니다.

 

깃허브 주소는 아래와 같습니다.

 

https://github.com/hnlq715/httpmq

 

hnlq715/httpmq

A simple HTTP message queue written in Go with goleveldb, just like httpsqs written in C with Tokyo Cabinet. - hnlq715/httpmq

github.com

 

간단한 http 메시지 큐 golang 구현체입니다.

 

 

`main.go`에 모두 구현되어 있어 코드 읽기가 쉽습니다.

 

대부분의 메시지 큐 구현이 database에 메시지를 저장하고 꺼내가는 구조로 되어 있습니다.

따라서 어떤 database를 사용했는지 보는 것이 중요합니다.

httpmq는 내부 database로 leveldb를 사용하였습니다.

 

그리고 http로 서비스를 제공하기 위해 net/http 대신 fasthttp를 사용하였습니다.

 

httpmq는 크게 5개의 함수로 되어 있습니다.

 

init(), main(), httpmqReadMetadata()

그리고

httpmqNowGetpos(), httpmqNowPutpos()

 

init()는 httpmq 실행시 command argument 파싱을 하여 실행 옵션을 지정합니다.

ip, port와 인증, 캐시 사이즈 정도 지정할 수 있습니다.

func init() {
	defaultMaxqueue = flag.Int("maxqueue", 1000000, "the max queue length")
	ip = flag.String("ip", "0.0.0.0", "ip address to listen on")
	port = flag.String("port", "1218", "port to listen on")
	defaultAuth = flag.String("auth", "", "auth password to access httpmq")
	dbPath = flag.String("db", "level.db", "database path")
	cacheSize = flag.Int("cache", 64, "cache size(MB)")
	writeBuffer = flag.Int("buffer", 32, "write buffer(MB)")
	cpu = flag.Int("cpu", runtime.NumCPU(), "cpu number for httpmq")
	keepalive = flag.Int("k", 60, "keepalive timeout for httpmq")
	flag.Parse()

	var err error
	db, err = leveldb.OpenFile(*dbPath, &opt.Options{BlockCacheCapacity: *cacheSize,
		WriteBuffer: *writeBuffer * 1024 * 1024})
	if err != nil {
		log.Fatalln("db.Get(), err:", err)
	}
}

 

다음 main() 함수를 살펴보겠습니다.

func main() {

	//...

	putnamechan := make(chan string, 100)
	putposchan := make(chan string, 100)
	getnamechan := make(chan string, 100)
	getposchan := make(chan string, 100)

	go func(chan string, chan string) {
		for {
			name := <-putnamechan
			putpos := httpmqNowPutpos(name)
			putposchan <- putpos
		}
	}(putnamechan, putposchan)

	go func(chan string, chan string) {
		for {
			name := <-getnamechan
			getpos := httpmqNowGetpos(name)
			getposchan <- getpos
		}
	}(getnamechan, getposchan)
    
	// ...
    
}

 

채널이 4개 있습니다.

각각 put과 get에 대한 채널입니다.

 

큐는 환형큐 형태로, getpos와 putpos가 있습니다.

 

putnamechan에 큐 이름을 보내면, httpmqNowPutpos(name)을 호출하고

반환값을 putposchan에 넣어줍니다.

 

getnamechan도 마찬가지로 큐 이름을 보내면, httpmqNowGetpos(name)을 호출하고

반환값을 getposchan에 넣어줍니다.

 

 

httpmqNowPutpos()와 httpmqNowGettpos()를 보기 전에 httpmqReadMetadata()를 살펴보겠습니다.

// name.maxqueue - maxqueue
// name.putpos - putpos
// name.getpos - getpos
func httpmqReadMetadata(name string) []string {
	maxqueue := name + ".maxqueue"
	data1, _ := db.Get([]byte(maxqueue), nil)
	if len(data1) == 0 {
		data1 = []byte(strconv.Itoa(*defaultMaxqueue))
	}
	putpos := name + ".putpos"
	data2, _ := db.Get([]byte(putpos), nil)
	getpos := name + ".getpos"
	data3, _ := db.Get([]byte(getpos), nil)
	return []string{string(data1), string(data2), string(data3)}
}

큐 이름을 입력받으면 key-value 저장소인 leveldb에서 `name.maxqueue`와 `name.putpos`, `name.getpos`의 value값을 배열로 반환합니다.

세 값 모두 문자열이지만, 숫자형태의 문자열입니다.

특정 이름의 큐의 큐 사이즈, putpos(입력 위치), getpos(출력 위치)를 의미합니다.

 

 

이제 httpmqNowPutpos()와 httpmqNowGettpos()를 살펴보겠습니다.

// httpmq now putpos api
// get the current putpos of httpmq for request
func httpmqNowPutpos(name string) string {
	metadata := httpmqReadMetadata(name)
	maxqueue, _ := strconv.Atoi(metadata[0])
	putpos, _ := strconv.Atoi(metadata[1])
	getpos, _ := strconv.Atoi(metadata[2])

	putpos++              // increase put queue pos
	if putpos == getpos { // queue is full
		return "0" // return 0 to reject put operation
	} else if getpos <= 1 && putpos > maxqueue { // get operation less than 1
		return "0" // and queue is full, just reject it
	} else if putpos > maxqueue { //  2nd lap
		metadata[1] = "1" // reset putpos as 1 and write to leveldb
	} else { // 1nd lap, convert int to string and write to leveldb
		metadata[1] = strconv.Itoa(putpos)
	}

	db.Put([]byte(name+".putpos"), []byte(metadata[1]), nil)

	return metadata[1]
}

httpmqNowPutpos()는 특정 이름의 큐에 값을 넣을 준비를 하는 함수입니다.

putpos의 위치를 오른쪽으로 한칸 옮기는 작업이 전부입니다.

leveldb에서 `name.putpos`의 값으로 1 증가된 값을 저장합니다.

환형큐이므로 putpos의 위치를 끝으로 옮기는 작업정도가 추가되어있습니다.

반환값은 최종 putpos의 값을 문자열로 반환합니다.

 

 

// httpmq now getpos api
// get the current getpos of httpmq for request
func httpmqNowGetpos(name string) string {
	metadata := httpmqReadMetadata(name)
	maxqueue, _ := strconv.Atoi(metadata[0])
	putpos, _ := strconv.Atoi(metadata[1])
	getpos, _ := strconv.Atoi(metadata[2])

	if getpos == 0 && putpos > 0 {
		getpos = 1 // first get operation, set getpos 1
	} else if getpos < putpos {
		getpos++ // 1nd lap, increase getpos
	} else if getpos > putpos && getpos < maxqueue {
		getpos++ // 2nd lap
	} else if getpos > putpos && getpos == maxqueue {
		getpos = 1 // 2nd first operation, set getpos 1
	} else {
		return "0" // all data in queue has been get
	}

	data := strconv.Itoa(getpos)
	db.Put([]byte(name+".getpos"), []byte(data), nil)
	return data
}

httpmqNowGetpos()는 특정 이름의 큐에 값을 꺼낼 준비를 하는 함수입니다.

getpos의 위치를 오른쪽으로 한칸 옮기는 작업이 전부입니다.

leveldb에서 `name.getpos`의 값으로 1 증가된 값을 저장합니다.

환형큐이므로 getpos의 위치를 끝으로 옮기는 작업정도가 추가되어있습니다.

반환값은 최종 getpos의 값을 문자열로 반환합니다.

 

아직까진 큐로써의 작업이 충분하지 않습니다.

getpos, putpos 위치값을 수정하는 작업은 있지만, 실제로 값을 넣고 꺼내오는 작업은 없었기 때문입니다.

 

 

이제 다시 main() 함수 나머지 부분을 살펴보겠습니다.

 

func main() {
	runtime.GOMAXPROCS(*cpu)

	sync := &opt.WriteOptions{Sync: true}

	putnamechan := make(chan string, 100)
	putposchan := make(chan string, 100)
	getnamechan := make(chan string, 100)
	getposchan := make(chan string, 100)

	go func(chan string, chan string) {
		for {
			name := <-putnamechan
			putpos := httpmqNowPutpos(name)
			putposchan <- putpos
		}
	}(putnamechan, putposchan)

	go func(chan string, chan string) {
		for {
			name := <-getnamechan
			getpos := httpmqNowGetpos(name)
			getposchan <- getpos
		}
	}(getnamechan, getposchan)

	m := func(ctx *fasthttp.RequestCtx) {
		var data string
		var buf []byte
		auth := string(ctx.FormValue("auth"))
		name := string(ctx.FormValue("name"))
		opt := string(ctx.FormValue("opt"))
		pos := string(ctx.FormValue("pos"))
		num := string(ctx.FormValue("num"))
		charset := string(ctx.FormValue("charset"))

		if *defaultAuth != "" && *defaultAuth != auth {
			ctx.Write([]byte("HTTPMQ_AUTH_FAILED"))
			return
		}

		method := string(ctx.Method())
		if method == "GET" {
			data = string(ctx.FormValue("data"))
		} else if method == "POST" {
			if string(ctx.Request.Header.ContentType()) == "application/x-www-form-urlencoded" {
				data = string(ctx.FormValue("data"))
			} else {
				buf = ctx.PostBody()
			}
		}

		if len(name) == 0 || len(opt) == 0 {
			ctx.Write([]byte("HTTPMQ_ERROR"))
			return
		}

		ctx.Response.Header.Set("Connection", "keep-alive")
		ctx.Response.Header.Set("Cache-Control", "no-cache")
		ctx.Response.Header.Set("Content-type", "text/plain")
		if len(charset) > 0 {
			ctx.Response.Header.Set("Content-type", "text/plain; charset="+charset)
		}

		if opt == "put" {
			if len(data) == 0 && len(buf) == 0 {
				ctx.Write([]byte("HTTPMQ_PUT_ERROR"))
				return
			}

			putnamechan <- name
			putpos := <-putposchan

			if putpos != "0" {
				queueName := name + putpos
				if data != "" {
					db.Put([]byte(queueName), []byte(data), nil)
				} else if len(buf) > 0 {
					db.Put([]byte(queueName), buf, nil)
				}
				ctx.Response.Header.Set("Pos", putpos)
				ctx.Write([]byte("HTTPMQ_PUT_OK"))
			} else {
				ctx.Write([]byte("HTTPMQ_PUT_END"))
			}
		} else if opt == "get" {
			getnamechan <- name
			getpos := <-getposchan

			if getpos == "0" {
				ctx.Write([]byte("HTTPMQ_GET_END"))
			} else {
				queueName := name + getpos
				v, err := db.Get([]byte(queueName), nil)
				if err == nil {
					ctx.Response.Header.Set("Pos", getpos)
					ctx.Write(v)
				} else {
					ctx.Write([]byte("HTTPMQ_GET_ERROR"))
				}
			}
		} else if opt == "status" {
			metadata := httpmqReadMetadata(name)
			maxqueue, _ := strconv.Atoi(metadata[0])
			putpos, _ := strconv.Atoi(metadata[1])
			getpos, _ := strconv.Atoi(metadata[2])

			var ungetnum float64
			var putTimes, getTimes string
			if putpos >= getpos {
				ungetnum = math.Abs(float64(putpos - getpos))
				putTimes = "1st lap"
				getTimes = "1st lap"
			} else if putpos < getpos {
				ungetnum = math.Abs(float64(maxqueue - getpos + putpos))
				putTimes = "2nd lap"
				getTimes = "1st lap"
			}

			buf := fmt.Sprintf("HTTP Simple Queue Service v%s\n", VERSION)
			buf += fmt.Sprintf("------------------------------\n")
			buf += fmt.Sprintf("Queue Name: %s\n", name)
			buf += fmt.Sprintf("Maximum number of queues: %d\n", maxqueue)
			buf += fmt.Sprintf("Put position of queue (%s): %d\n", putTimes, putpos)
			buf += fmt.Sprintf("Get position of queue (%s): %d\n", getTimes, getpos)
			buf += fmt.Sprintf("Number of unread queue: %g\n\n", ungetnum)

			ctx.Write([]byte(buf))
		} else if opt == "view" {
			v, err := db.Get([]byte(name+pos), nil)
			if err == nil {
				ctx.Write([]byte(v))
			} else {
				ctx.Write([]byte("HTTPMQ_VIEW_ERROR"))
			}
		} else if opt == "reset" {
			maxqueue := strconv.Itoa(*defaultMaxqueue)
			db.Put([]byte(name+".maxqueue"), []byte(maxqueue), sync)
			db.Put([]byte(name+".putpos"), []byte("0"), sync)
			db.Put([]byte(name+".getpos"), []byte("0"), sync)
			ctx.Write([]byte("HTTPMQ_RESET_OK"))
		} else if opt == "maxqueue" {
			maxqueue, _ := strconv.Atoi(num)
			if maxqueue > 0 && maxqueue <= 10000000 {
				db.Put([]byte(name+".maxqueue"), []byte(num), sync)
				ctx.Write([]byte("HTTPMQ_MAXQUEUE_OK"))
			} else {
				ctx.Write([]byte("HTTPMQ_MAXQUEUE_CANCLE"))
			}
		}
	}

	log.Fatal(fasthttp.ListenAndServe(*ip+":"+*port, m))
}

 

put의 http 요청이 올 경우

`putnamechan <- name`으로 putnamechan에 큐 이름을 보냅니다.

main() 함수 윗 부분의 고루틴에서 putnamechan에 메시지가 들어올 경우, httpmqNowPutpos()를 호출하고, 응답값인 putpos를 putposchan에 보냅니다.

 

메시지를 넣어야할 putpos를 알아냈으므로, leveldb에서 `name.putpos` 키에 해당하는 값을 넣기만 하면 됩니다.

if opt == "put" {
	if len(data) == 0 && len(buf) == 0 {
		ctx.Write([]byte("HTTPMQ_PUT_ERROR"))
		return
	}

	putnamechan <- name
	putpos := <-putposchan

	if putpos != "0" {
		queueName := name + putpos
		if data != "" {
			db.Put([]byte(queueName), []byte(data), nil)
		} else if len(buf) > 0 {
			db.Put([]byte(queueName), buf, nil)
		}
		ctx.Response.Header.Set("Pos", putpos)
		ctx.Write([]byte("HTTPMQ_PUT_OK"))
	} else {
		ctx.Write([]byte("HTTPMQ_PUT_END"))
	}
} 

 

 

get 요청도 마찬가지로

`getnamechan <- name`으로 getnamechan에 큐 이름을 보냅니다.

main() 함수 윗부분의 고루틴에서 getnamechan에 메시지가 들어올 경우, httpmqNowGetpos()를 호출하고, 응답값인 getpos를 getposchan에 보냅니다.

 

메시지를 읽어야할 getpos를 알아냈으므로, leveldb에서 `name.getpos` 키에 해당하는 값을 조회하기만 하면 됩니다.

else if opt == "get" {
	getnamechan <- name
	getpos := <-getposchan

	if getpos == "0" {
		ctx.Write([]byte("HTTPMQ_GET_END"))
	} else {
		queueName := name + getpos
		v, err := db.Get([]byte(queueName), nil)
		if err == nil {
			ctx.Response.Header.Set("Pos", getpos)
			ctx.Write(v)
		} else {
			ctx.Write([]byte("HTTPMQ_GET_ERROR"))
		}
	}
} 

 

이름이 "foo", "bar"인 큐가 2개 있다고 가정하겠습니다.

leveldb에는 아래와 같이 값이 저장될 것입니다. (maxqueue값은 생략하였습니다.)

 

key value
foo.getpos 4
foo.putpos 7
bar.getpos 2
bar.putpos 6
foo.4 javascript
bar.2 seoul
bar.3 daejeon
foo.5 python
bar.4 daegu
foo.6 rust
bar.5 busan
foo.7 golang

 

 

마지막으로 reset은 어떻게 동작하는지 살펴보겠습니다.

 

else if opt == "reset" {
	maxqueue := strconv.Itoa(*defaultMaxqueue)
	db.Put([]byte(name+".maxqueue"), []byte(maxqueue), sync)
	db.Put([]byte(name+".putpos"), []byte("0"), sync)
	db.Put([]byte(name+".getpos"), []byte("0"), sync)
	ctx.Write([]byte("HTTPMQ_RESET_OK"))
} 

 

meta정보에 해당하는 maxqueue, putpos, getpos만 지웁니다.

 

이름마다 사이즈가 고정된 저장소를 이용하는 큐이므로, 동적으로 다양하게 생성하여 사용하는 큐로는 적합하지 않습니다.

 

'코드읽기' 카테고리의 다른 글

github.com/argoproj/argo 코드 살펴보기  (0) 2020.05.07
golang grpc conn 객체 관리  (0) 2020.05.07
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함