Skip to main content

Queue Pattern

Case Study

Kita ingin membuat sistem subscribe ke newsletter. Ketika user memasukkan emailnya, kita simpan email tersebut ke database dan mengirimkan email notifikasi. Email notification ini hanya bersifat notifikasi, bukan sesuatu yang krusial.

package main

import (
"fmt"
"net/http"
"time"
)

func saveToDatabase(email string) error {
return nil
}

func sendEmail(email, message string) error {
// Simulate slow request
time.Sleep(5 * time.Second)
return nil
}

func register(w http.ResponseWriter, r *http.Request) {
fmt.Println("Incoming request")
email := r.FormValue("email")

if err := saveToDatabase(email); err != nil {
http.Error(w, "Failed to save to database", http.StatusInternalServerError)
return
}

// This is slow
sendEmail(email, "Please verify your email")
}

func main() {
http.HandleFunc("/register", register)
http.ListenAndServe(":8080", nil)
}

Problem: Slow HTTP Response

Email API (seperti halnya external API lainnya) biasanya relatif lambat, dan kita tidak ingin user menunggu hingga email terkirim. Toh, emailnya hanya bersifat notikasi. Di code tadi, jika sendEmail memakan waktu 5 detik (well, ini esktrim sih), maka user akan perlu menunggu 5 detik! Bad UX!

Kita ingin agar user hanya perlu menunggu datanya disimpan ke database saja. Setelah itu browser langsung memberikan response ke user.

Mari kita load test:

#!/bin/bash

echo "POST http://localhost:8080/register" | \
vegeta attack -body=body.json \
-header="Content-Type: application/x-www-form-urlencoded" \
-rate=100 -duration=30s | \
vegeta report

Code di atas akan membuat request 100 RPS selama 30 detik (total 3000 request).

Requests      [total, rate, throughput]         3000, 100.03, 85.73
Duration [total, attack, wait] 34.992s, 29.99s, 5.002s
Latencies [min, mean, 50, 90, 95, 99, max] 5s, 5.002s, 5.001s, 5.003s, 5.004s, 5.008s, 5.017s
Bytes In [total, mean] 0, 0.00
Bytes Out [total, mean] 66000, 22.00
Success [ratio] 100.00%
Status Codes [code:count] 200:3000

Good news! Semua request berhasil. Tapi, semua request memakan waktu >= 5 detik. Bad UX!

Mari kita coba di Python

from time import sleep
from flask import Flask, request, abort

app = Flask(__name__)


def save_to_database(email):
pass


def send_email(email, message):
# Simulate slow request
sleep(5)


@app.route('/register', methods=['POST'])
def register():
print("Incoming request")
email = request.form.get('email')

try:
save_to_database(email)
except Exception as e:
abort(500, description="Failed to save to database")

# This is slow
send_email(email, "Please verify your email")

return 'OK', 200


if __name__ == '__main__':
app.run(port=8080)

Jalankan dengan gunicorn:

gunicorn -b :8080 main:app

Let's attack!

Requests      [total, rate, throughput]         3000, 100.03, 0.11
Duration [total, attack, wait] 55.896s, 29.99s, 25.906s
Latencies [min, mean, 50, 90, 95, 99, max] 36µs, 24.614s, 25.907s, 25.912s, 26.2s, 30.001s, 30.016s
Bytes In [total, mean] 12, 0.00
Bytes Out [total, mean] 132, 0.04
Success [ratio] 0.20%
Status Codes [code:count] 0:2994 200:6

Oops, jelek sekali hasilnya, hanya 6 request yang berhasil, 2,994 request gagal!

Kenapa tidak sebagus yang Golang?

Goroutine vs Thread

Di versi Golang, tiap request baru akan di-handle oleh goroutine baru. Goroutine ini sangat ringan, sehingga Golang bisa membuat ribuan goroutine tanpa masalah.

Untuk mengecek berapa jumlah goroutine yang sedang berjalan, kita bisa menggunakan pprof. Tambahkan import _ "net/http/pprof" dan buka http://localhost:8080/debug/pprof.

Ketika load test dijalankan, terlihat bahwa jumlah goroutine yang berjalan terus bertambah.

Count	Profile
13 allocs
0 block
0 cmdline
1007 goroutine
13 heap
0 mutex
0 profile
9 threadcreate
0 trace

Sementara untuk versi Python, kita hanya menggunakan 1 thread saja. Jika ada request baru, maka request tersebut akan menunggu sampai request sebelumnya selesai.

Untuk membuat versi Python lebih scalable, kita bisa menambahkan jumlah threadnya.

gunicorn -b :8080 --threads 100 main:app
Requests      [total, rate, throughput]         3000, 100.03, 11.67
Duration [total, attack, wait] 59.971s, 29.99s, 29.981s
Latencies [min, mean, 50, 90, 95, 99, max] 102.884ms, 22.897s, 25.911s, 30.001s, 30.001s, 30.002s, 30.012s
Bytes In [total, mean] 1400, 0.47
Bytes Out [total, mean] 15400, 5.13
Success [ratio] 23.33%
Status Codes [code:count] 0:2300 200:700

Getting better, tapi masih jelek, hanya 23.33% request yang berhasil.

Coba lebih banyak lagi threadsnya

gunicorn -b :8080 --threads 1000 main:app
Requests      [total, rate, throughput]         3000, 100.04, 85.72
Duration [total, attack, wait] 34.996s, 29.989s, 5.007s
Latencies [min, mean, 50, 90, 95, 99, max] 5.002s, 5.009s, 5.009s, 5.012s, 5.014s, 5.017s, 5.042s
Bytes In [total, mean] 6000, 2.00
Bytes Out [total, mean] 66000, 22.00
Success [ratio] 100.00%
Status Codes [code:count] 200:3000

Nah, sekarang sudah 100% request yang berhasil. Tapi, kita perlu 1000 threads untuk mendapatkan hasil ini. Memory usagenya akan lebih besar. Dan sebagian besar waktu threads tersebut hanya menunggu (tidak melakukan komputasi apapun) -> wasted.

Solution: Fire and Forget

Jadi bagaimana yang lebih efisien? Ada banyak (dan akan dikupas di bab-bab lain), kita mulai dari yang paling sederhana dulu: In Memory Queue.

Karena toh email hanya bersifat notifikasi, let's just fire and forget! Jadi, cukup simpan email ke dalam in-memory queue (fire) dan biarkan worker yang mengirimkan emailnya jalan di background (forget

package main

import (
"fmt"
"net/http"
"time"
)

var queueSize = 0
var emailQueue = make(chan string, queueSize)
var numWorkers = 1


func init() {
// Start `numWorkers` workers
for i := 0; i < numWorkers; i++ {
// These run on separate goroutines
go func() {
// Keep consuming from the emailQueue
for email := range emailQueue {
// This is the slow part
sendEmail(email, "Please verify your email")
}
}()
}
}


func saveToDatabase(email string) error {
return nil
}

func sendEmail(email, message string) error {
// Simulate slow request
time.Sleep(5 * time.Second)
fmt.Println("Sent email to", email, "with message", message)
return nil
}

func register(w http.ResponseWriter, r *http.Request) {
fmt.Println("Incoming request")
email := r.FormValue("email")

if err := saveToDatabase(email); err != nil {
http.Error(w, "Failed to save to database", http.StatusInternalServerError)
return
}

// this is fast*
emailQueue <- email
}

func main() {
http.HandleFunc("/register", register)
http.ListenAndServe(":8080", nil)
}

Di sini kita membuat emailQueue sebagai penampung email yang akan dikirimkan. Ketika register dipanggil, kita hanya memasukkan email ke dalam queue tersebut dan langsung return response ke user.

Isi emailQueue itu diproses secara terpisah di goroutine yang berjalan di background.

Requests      [total, rate, throughput]         3000, 100.03, 0.13
Duration [total, attack, wait] 55.546s, 29.99s, 25.556s
Latencies [min, mean, 50, 90, 95, 99, max] 64.459µs, 25.516s, 30.001s, 30.002s, 30.002s, 30.004s, 30.022s
Bytes In [total, mean] 0, 0.00
Bytes Out [total, mean] 154, 0.05
Success [ratio] 0.23%
Status Codes [code:count] 0:2993 200:7

Super jelek! Hanya 7 request yang berhasil. Dan latency-nya juga jelek. Padahal di versi Golang yang awal, 100% request berhasil.

Apa masalahnya?

Queue Size

var queueSize = 0
var emailQueue = make(chan string, queueSize)

Masalahnya ada di queueSize yang kita set ke 0. Itu berarti, emailQueue <- email akan menjadi blocking call. Itu akan ter-unblock ketika ada goroutine lain yang mengambil <- emailQueue. Jadi, register justru akan terblock oleh goroutine lain yang menjalankan sendEmail.

Ok, mari kita set jadi 1000.

var queueSize = 1000
var emailQueue = make(chan string, queueSize)

Attack!!

Requests      [total, rate, throughput]         3000, 100.04, 16.82
Duration [total, attack, wait] 59.99s, 29.989s, 30.001s
Latencies [min, mean, 50, 90, 95, 99, max] 182µs, 19.946s, 30.001s, 30.002s, 30.002s, 30.005s, 30.01s
Bytes In [total, mean] 0, 0.00
Bytes Out [total, mean] 22198, 7.40
Success [ratio] 33.63%
Status Codes [code:count] 0:1991 200:1009

Sudah lebih bagus, tapi masih banyak yang gagal.

Ok, set lagi lebih besar jadi 3,000

Requests      [total, rate, throughput]         3000, 100.03, 100.03
Duration [total, attack, wait] 29.99s, 29.99s, 545.5µs
Latencies [min, mean, 50, 90, 95, 99, max] 130.875µs, 967.627µs, 765.663µs, 1.554ms, 2.202ms, 5.006ms, 16.508ms
Bytes In [total, mean] 0, 0.00
Bytes Out [total, mean] 66000, 22.00
Success [ratio] 100.00%
Status Codes [code:count] 200:3000

Nice, 100% success. Bonus: perhatikan latencynya. p99nya hanya 5 ms. Milisecond, bukan second! Jauh lebih cepat!

Tapi - jika jeli - akan notice bahwa pengiriman email jalan belakangan dan sangat lambat. Ketika load test sudah selesai, masih banyak sekali email yang belum terkirim. Mengapa?

Num Of Workers

var numWorkers = 1

func init() {
for i := 0; i < numWorkers; i++ {
go func() {
...
}
}
}

Proses pengiram lambat karena kita hanya punya 1 goroutine yang mengirimkan email. Ibarat restoran yang hanya punya 1 koki, maka jika banyak orderan (dan assuming kita punya banyak sekali pelayan), maka orderan akan terakumulasi di dalam queue.

Let's calculate

  • Rate pengiriman email adalah 5 detik/email
  • Ada 30 detik * 100 request / detik = 3000 email

Maka kita butuh 3000 email * 5 detik / email = 15,000 detik = 4 jam 10 menit untuk mengirimkan semua email.

Jika ada 100 workers, maka kita hanya butuh 150 detik = 2 menit 30 detik. Jika ada 1000 workers, maka kita hanya butuh 15 detik.

Ok, mari kita set jadi 1000 workers

var queueSize = 3000
var emailQueue = make(chan string, queueSize)
var numWorkers = 1000

Perhatikan timestamp lognya, hanya sebentar delay antara request dan pengiriman emailnya!

Python Final Version

Versi Python-nya kurang lebih mirip, hanya saja tidak memakai goroutine, melainkan menggunakan threading dan Queue:

from flask import Flask, request, abort
from queue import Queue
from threading import Thread
import time

app = Flask(__name__)

queueSize = 3000
emailQueue = Queue(maxsize=queueSize)
numWorkers = 1000


def send_email(email, message):
time.sleep(5)
print(time.strftime("%Y-%m-%d %H:%M:%S"),
"Sent email to", email, "with message", message)


def worker():
while True:
email = emailQueue.get()
send_email(email, "Please verify your email")
emailQueue.task_done()


def save_to_database(email):
pass


@app.route('/register', methods=['POST'])
def register():
print(time.strftime("%Y-%m-%d %H:%M:%S"), "Incoming request")
email = request.form.get('email')
try:
save_to_database(email)
# fire-and-forget
emailQueue.put(email)
return 'OK', 200
except Exception as e:
abort(500, description="Failed to save to database")


def start_workers():
for i in range(numWorkers):
t = Thread(target=worker)
t.daemon = True
t.start()


start_workers()

Next Problems

Eits jangan senang dulu, masih ada banyak problem lain yang perlu diatasi :P

  • Jika server mati, maka goroutine yang mengirimkan email juga akan mati.
  • Bagaimana jika server kita mati? Email yang belum terkirim akan hilang.
  • Bagaimana jika email server memiliki rate limit? Kita tidak bisa terus menambah worker.