Giải Thích Về Thuật Toán Điều Phối Hàng Đợi Tin Nhắn Skynet - nói dối e blog

Giải Thích Về Thuật Toán Điều Phối Hàng Đợi Tin Nhắn Skynet

Trong thời gian gần đây, một số bạn đồng nghiệp đã đặt câu hỏi về việc tại sao thiết kế hàng đợi tin nhắn của skynet lại sử dụng một mảng cờ flags độc lập. Do thời gian trôi qua khá lâu, bản thân tôi cũng suýt quên lý do ban đầu. Nhân dịp này, tôi đã bổ sung thêm chú thích trong mã nguồn để tránh quên mất chi tiết quan trọng.

Trước đây tôi từng viết một bài blog về chủ đề này, và phần bình luận dưới bài viết cũng có nhiều thảo luận sôi nổi. Hôm nay, tôi xin trình bày lại chi tiết quan trọng này một cách rõ ràng hơn:

Thiết kế ban đầu với mảng cờ flags

Skynet sử dụng một hàng đợi vòng (circular queue) để quản lý hàng đợi tin nhắn cấp hai. Mã nguồn minh họa như sau:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#define GP(p) ((p) % MAX_GLOBAL_MQ)
static void 
skynet_globalmq_push(struct message_queue * queue) {
  struct global_queue *q= Q;
  uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1));
  q->queue[tail] = queue;
  __sync_synchronize();
  q->flag[tail] = true;
}
struct message_queue * 
skynet_globalmq_pop() {
  struct global_queue *q = Q;
  uint32_t head = q->head;
  uint32_t head_ptr = GP(head);
  if (head_ptr == GP(q->tail)) {
    return NULL;
  }
  if(!q->flag[head_ptr]) {
    return NULL;
  }
  __sync_synchronize();
  struct message_queue * mq = q->queue[head_ptr];
  if (!__sync_bool_compare_and_swap(&q->head, head, head+1)) {
    return NULL;
  }
  q->flag[head_ptr] = false;
  return mq;
}

Câu hỏi từ cộng đồng: Tại sao không dùng con trỏ NULL?

Một số bạn thắc mắc: “Tại sao không đơn giản kiểm tra giá trị con trỏ trong mảng queue có phải NULL hay không, thay vì dùng mảng flags riêng biệt?” Câu hỏi này đã được thảo luận trong PR #68 của skynet. Mã nguồn có thể viết lại như sau:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#define GP(p) ((p) % MAX_GLOBAL_MQ)
static void 
skynet_globalmq_push(struct message_queue * queue) {
  struct global_queue *q= Q;
  uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1));
  // Nếu luồng bị treo tại đây, q->queue[tail] sẽ chứa giá trị cũ
  // dù tail đã được tăng lên
  q->queue[tail] = queue;
}
struct message_queue * 
skynet_globalmq_pop() {
  struct global_queue *q = Q;
  uint32_t head = q->head;
  uint32_t head_ptr = GP(head);
  if (head_ptr == GP(q->tail)) {
    return NULL;
  }
  if (!q->queue[head_ptr]) {
    return NULL;
  }
  struct message_queue * mq = q->queue[head_ptr];
  // Không đảm bảo đọc được giá trị mới nhất từ push()
  // Có thể là giá trị từ vòng lặp trước của hàng đợi
  if (!__sync_bool_compare_and_swap(&q->head, head, head+1)) {
    return NULL;
  }
  q->queue[head_ptr] = NULL;
  return mq;
}

Vấn đề tiềm ẩn trong thiết kế đơn giản

Thiết kế này ẩn chứa rủi ro nghiêm trọng: Khi hàng đợi 64K phần tử quay vòng hết một vòng, nếu một luồng push bị treo sau khi tăng tail nhưng chưa kịp ghi giá trị mới vào queue[tail], luồng pop có thể đọc phải giá trị con trỏ lỗi thời. Dù xác suất xảy ra cực thấp, nhưng trong giai đoạn thử nghiệm đầu tiên của skynet, lỗi này đã thực sự xuất hiện.

Điều này chứng minh rằng lập trình đa luồng đòi hỏi sự cẩn trọng tuyệt đối. Một thay đổi nhỏ cũng có thể dẫn đến lỗi nghiêm trọng nếu không hiểu rõ cơ chế hoạt động bên trong.

Giới hạn về quy mô dịch vụ

Thiết kế ban đầu có một hạn chế: Khi số lượng dịch vụ hoạt động (có tin nhắn) vượt quá 64K, hệ thống sẽ gặp lỗi. Mặc dù trong thực tế, một nút skynet khó có thể đạt tới giới hạn này (vì các dịch vụ không có tin nhắn sẽ không nằm trong hàng đợi toàn cục), nhưng về mặt lý thuyết, skynet có khả năng hỗ trợ số lượng dịch vụ lớn hơn nhiều.

Cải tiến với danh sách liên kết phụ trợ

Để khắc phục giới hạn này, tôi đã bổ sung một danh sách liên kết để lưu trữ các dịch vụ không thể đưa vào hàng đợi ngay lập tức khi hàng đợi đầy. Khi lấy phần tử ra khỏi hàng đợi, hệ thống sẽ cố gắng đưa các dịch vụ này trở lại hàng đợi chính. Mã nguồn cập nhật như sau:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static void 
skynet_globalmq_push(struct message_queue * queue) {
  struct global_queue *q= Q;
  if (q->flag[GP(q->tail)]) {
    // Khi hàng đợi đầy, lưu vào danh sách liên kết
    assert(queue->next == NULL);
    struct message_queue * last;
    do {
      last = q->list;
      queue->next = last;
    } while(!__sync_bool_compare_and_swap(&q->list, last, queue));
    return;
  }
  uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1));
  // Nếu luồng bị treo tại đây, q->queue[tail] sẽ chứa giá trị cũ
  // dù tail đã được tăng lên
  q->queue[tail] = queue;
  __sync_synchronize();
  q->flag[tail] = true;
}
struct message_queue * 
skynet_globalmq_pop() {
  struct global_queue *q = Q;
  uint32_t head = q->head;
  if (head == q->tail) {
    // Hàng đợi rỗng
    return NULL;
  }
  uint32_t head_ptr = GP(head);
  struct message_queue * list = q->list;
  if (list) {
    // Nếu danh sách liên kết không rỗng, cố gắng đưa vào hàng đợi
    struct message_queue *newhead = list->next;
    if (__sync
0%