본문 바로가기

프로그래밍

[펌] Lock-Free Queue

출처: 온라인서버제작자모임

Lock-Free Queue

  • 일반적인 방식의 Thread Safe Queue

  • #include "ace/Thread_Mutex.h"
     template 
     class NormalQueueT
     {
     private:
         int    _size;
         ACE_Thread_Mutex _lock;
    
         // NODE데이타를 저장할 구조체
        struct Node
         {
             T Data;
             Node *NextNode;
         };
    
         Node* _mNode;        // 노드 할당을 위해서 사용한다.
         Node* _nextNode;    // 다음노드를 포인트한다. 
    
         Node* _startNode;    // 시작노드를 포인트한다.
         Node* _endNode;        // 마지막노드를 포인트한다.
    
    public:
         NormalQueueT()
         {
             _size = 0;
         }
    
         void Enqueue(T t)
         {
             ACE_Guard guard(_lock);
             _mNode = new Node;
             _mNode->Data = t;
             _mNode->NextNode = NULL;
             if (_size == 0)
             {
                 _startNode = _mNode;
                 _endNode = _mNode;
             }
             else
             {
                 _endNode->NextNode = _mNode;
                 _endNode = _mNode;
             }
             _size++;
         }
    
         int Dequeue(T& t)
         {
             ACE_Guard guard(_lock);
             if (_size == 0)
                 return -1;
             t = _startNode->Data;
             _nextNode = _startNode->NextNode;
             delete _startNode;
             _startNode = _nextNode;
             _size--;
             return 0;
         }
     };
    
  • Lock-Free 방식으로 구현한 Thread Safe Queue

  • #include 
     template 
     class QueueT
     {
     private:
         long _size;
         // NODE데이타를 저장할 구조체
        struct Node
         {
             T data;
             Node* next;
    
             Node& operator=(const Node& ot)
             {
                 data = ot.data;
                 next = ot.next;
                 return *this;
             }
    
             bool operator==(const Node& ot)
             {
                 return (data == ot.data && next == ot.next);
             }
         };
    
         Node* _head;        // 시작노드를 포인트한다.
         Node* _tail;        // 마지막노드를 포인트한다.
    
    public:
         QueueT()
         {
             _size = 0;
             _head = new Node;
             _head->next = NULL;
             _tail = _head;
         }
    
         void Enqueue(T t)
         {
             Node* node = new Node;
             node->data = t;
             node->next = NULL;
    
             while(true)
             {
                 Node* tail = _tail;
                 Node* next = tail->next;
                
                 if (tail == _tail)
                 {
                     if (next == NULL)
                     {
                         if ( InterlockedCompareExchangePointer((PVOID*)&tail->next, node, next) == next )
                         {
                             InterlockedCompareExchangePointer((PVOID*)&_tail, node, tail);
                             break;
                         }
                     }
                     else
                     {
                         InterlockedCompareExchangePointer((PVOID*)&_tail, next, tail);
                     }
    
                 }
             }
    
             InterlockedExchangeAdd(&_size, 1);
         }
    
         int Dequeue(T& t)
         {
             if (_size == 0)
                 return -1;
    
             while(true)
             {
                 Node* head = _head;
                 Node* tail = _tail;
                 Node* next = head->next;
                 if (head == _head)
                 {
                     if (head == tail)
                     {
                         if (next == NULL)
                             return -1;
                     }
                     else
                     {
                         t = next->data;
                         if ( InterlockedCompareExchangePointer((PVOID*)&_head, next, head) == head )
                         {
                             delete head;
                             break;
                         }
                     }
                 }
             }
             InterlockedExchangeAdd(&_size, -1);
             return 0;
         }
     };
    
  • 스레드 10개씩을 생성하여 Enqueue 10만개, Dequeue 10만개 하여 작업이 완료되는 시점으로 성능을 비교하였다.

    • Enqueue는 Lock-Free 방식이 10배 정도 빠름.
    • Dequeue는 Lock-Free 방식이 2배 정도 빠름.
    • 스레드 개수를 4개씩으로 변경하여도 비슷한 수치가 나옴.
  • 주의사항

    • Dequeue함수의 delete head; 부분을 Hazard Pointer 등의 기법으로 대체하여야 합니다. 그렇지 않을 경우 exception이 발생할 수 있습니다.