OS/Linux

work queue

아르비스 2010. 5. 24. 15:03

작업 큐 소개

작업 큐는 2.5 버전의 Linux 커널에 추가된 최신 지연 메커니즘이다. 한번에 모든 작업을 처리하는 지연 스키마를 제공하는 태스크릿과는 달리 작업 큐는 작업 큐에 대한 핸들러 함수가 대기 상태에 있을 수 있는(태스크릿 모델에서는 가능하지 않음) 제네릭 지연 메커니즘이다. 작업 큐는 태스크릿보다 높은 지연 시간을 가질 수 있고 더 다양한 작업 지연 API를 포함하고 있다. 지연은 keventd를 통해 태스크 큐에 의해 관리되었지만 이제는 events/X라는 커널 작업자 스레드에 의해 관리된다.

작업 큐는 기능을 하반부로 지연시킬 수 있는 일반적인 방법을 제공한다. 중앙에 있는 작업 큐(struct workqueue_struct)는 작업이 배치되는 구조체이다. 작업은 work_struct 구조체로 표현되며 지연시킬 작업과 사용할 지연 함수를 식별한다(그림 3 참조). events/X 커널 스레드(CPU당 한 개)는 작업 큐에서 작업을 추출하고 하반부 핸들러 중 하나를 활성화한다(struct work_struct의 핸들러 함수에 지정된 대로).


그림 3. 작업 큐 관련 프로세스
왼쪽부터 차례대로 인터럽트 핸들러, struct work_struct, struct workqueue_struct, events/X 및 핸들러 함수를 보여 주는 프로세스 플로우차트

work_struct는 사용할 핸들러 함수를 나타내므로 작업 큐를 사용하여 다양한 핸들러에 대한 작업을 큐에 저장할 수 있다. 이제 작업 큐에 대한 API 함수를 살펴보자.

작업 큐 API

작업 큐 API는 수많은 옵션이 지원되기 때문에 태스크릿보다 약간 더 복잡하다. 먼저 작업 큐를 살펴본 후 작업 및 변형을 살펴보자.

그림 3에서 보았듯이 작업 큐의 핵심 구조체는 큐 자체이다. 이 구조체는 나중에 하반부에서 실행하기 위해 지연시킬 상반부의 작업을 큐에 저장하는 데 사용된다. 작업 큐는 create_workqueue라는 매크로를 통해 작성되며, 이 매크로는 workqueue_struct 참조를 리턴한다. 필요한 경우 나중에 다음과 같이 destroy_workqueue 함수를 호출하여 이 작업 큐를 제거할 수 있다.

struct workqueue_struct *create_workqueue( name );
void destroy_workqueue( struct workqueue_struct * );

작업 큐를 통해 통신할 작업은 work_struct 구조체로 정의된다. 일반적으로 이 구조체는 사용자가 작성한 작업 정의 구조체의 첫 번째 요소이다. (나중에 이에 대한 예제를 볼 수 있다.) 작업 큐 API는 할당된 버퍼에 있는 작업을 초기화할 수 있는 세 개의 함수를 제공한다(Listing 6 참조). INIT_WORK 매크로는 사용자가 전달한 핸들러 함수에 필요한 초기화 및 설정 기능을 제공한다. 작업을 작업 큐에 저장하기 전에 지연시킬 필요가 있을 경우 INIT_DELAYED_WORKINIT_DELAYED_WORK_DEFERRABLE 매크로를 사용할 수 있다.


Listing 6. 작업 초기화 매크로
	
INIT_WORK( work, func );
INIT_DELAYED_WORK( work, func );
INIT_DELAYED_WORK_DEFERRABLE( work, func );

작업 구조체를 초기화한 후에는 작업을 작업 큐에 저장하는 단계를 수행해야 한다. 이 작업은 몇 가지 방법으로 수행할 수 있다(Listing 7 참조). 먼저, queue_work(작업을 현재 CPU에 연결)를 사용하여 작업을 작업 큐에 저장할 수 있다. 또는 queue_work_on을 사용하여 핸들러가 실행되어야 하는 CPU를 지정할 수 있다. 두 개의 추가 함수는 지연된 작업에 대해 동일한 기능을 수행한다. (해당 구조체가 work_struct 구조체와 작업 지연 시간을 캡슐화한다.)


Listing 7. 작업 큐 함수
	
int queue_work( struct workqueue_struct *wq, struct work_struct *work );
int queue_work_on( int cpu, struct workqueue_struct *wq, struct work_struct *work );

int queue_delayed_work( struct workqueue_struct *wq,
			struct delayed_work *dwork, unsigned long delay );

int queue_delayed_work_on( int cpu, struct workqueue_struct *wq,
			struct delayed_work *dwork, unsigned long delay );

이 작업 큐를 지원하는 네 개의 함수가 있는 글로벌 커널-글로벌 작업 큐를 사용할 수 있다. 이러한 함수(Listing 8 참조)는 작업 큐 구조체를 정의하지 않아도 된다는 점을 제외하면 Listing 7의 함수와 유사하다.


Listing 8. 커널-글로벌 작업 큐 함수
	
int schedule_work( struct work_struct *work );
int schedule_work_on( int cpu, struct work_struct *work );

int scheduled_delayed_work( struct delayed_work *dwork, unsigned long delay );
int scheduled_delayed_work_on( 
		int cpu, struct delayed_work *dwork, unsigned long delay );

또한 작업 큐의 작업을 비우거나 취소하는 데 사용할 수 있는 여러 헬퍼 함수도 있다. 작업이 완료될 때까지 특정 작업 항목 및 블록을 비우려는 경우 flush_work를 호출할 수 있다. flush_workqueue를 호출하여 지정된 작업 큐의 모든 작업을 완료할 수도 있다. 두 경우 모두 작업이 완료될 때까지 호출자가 차단된다. 커널-글로벌 작업 큐를 비우려면 flush_scheduled_work를 호출한다.

int flush_work( struct work_struct *work );
int flush_workqueue( struct workqueue_struct *wq );
void flush_scheduled_work( void );

핸들러에서 아직 실행되지 않고 있을 경우 작업을 취소할 수 있다. cancel_work_sync를 호출하면 콜백이 완료될 때까지 큐 또는 블록의 작업이 종료된다(작업이 핸들러에서 처리 중인 경우). 작업이 지연된 경우에는 cancel_delayed_work_sync를 호출할 수 있다.

int cancel_work_sync( struct work_struct *work );
int cancel_delayed_work_sync( struct delayed_work *dwork );

마지막으로, work_pending 또는 delayed_work_pending 호출을 사용하여 작업 항목이 보류 중(핸들러에서 아직 실행되지 않음)인지 여부를 확인할 수 있다.

work_pending( work );
delayed_work_pending( work );

이 점이 바로 작업 큐 API의 핵심이다. 작업 큐 API의 구현은 ./kernel/workqueue.c에서 볼 수 있으며 API 정의는 ./include/linux/workqueue.h에 있다. 이제 작업 큐 API의 간단한 예제를 살펴보자.

간단한 작업 큐 예제

다음 예제에서는 몇 가지 핵심 작업 큐 API 함수를 보여 준다. 태스크릿 예제와 마찬가지로 이 예제도 단순하게 설명하기 위해 커널 모듈의 컨텍스트에서 구현한다.

먼저 작업 구조체와 하반부를 구현하는 데 사용할 핸들러 함수를 살펴보자(Listing 9 참조). 여기서 먼저 살펴볼 부분은 작업 큐 구조체 참조(my_wq)와 my_work_t 정의이다. my_work_t typedef에는 work_struct 구조체가 앞에 있고, 그 뒤에 작업 항목을 나타내는 정수가 있다. 핸들러(콜백 함수)는 work_struct 포인터를 my_work_t 유형으로 역참조한다. 작업 항목(구조체의 정수)을 내보낸 후 작업 포인터의 참조가 해제된다.


Listing 9. 작업 구조체 및 하반부 핸들러
	
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/workqueue.h>

MODULE_LICENSE( "GPL" );

static struct workqueue_struct *my_wq;

typedef struct {
  struct work_struct my_work;
  int    x;
} my_work_t;

my_work_t *work, *work2;


static void my_wq_function( struct work_struct *work)
{
  my_work_t *my_work = (my_work_t *)work;

  printk( "my_work.x %d\n", my_work->x );

  kfree( (void *)work );

  return;
}

Listing 10은 사용자의 init_module 함수이며 create_workqueue API 함수를 사용하여 작업 큐를 작성하는 작업부터 시작한다. 작업 큐를 성공적으로 작성한 후에는 두 개의 작업 항목(kmalloc을 통해 할당됨)을 작성한다. 그런 다음 각 작업 항목이 INIT_WORK를 통해 초기화되고 작업이 정의된 후 queue_work 호출을 통해 작업 큐에 저장된다. 상반부 프로세스(여기에서 시뮬레이트한 프로세스)가 이제 완료되었다. 그런 다음 작업이, 때로는 나중에, 핸들러에 의해 처리된다(Listing 10 참조).


Listing 10. 작업 큐 및 작업 작성
	
int init_module( void )
{
  int ret;

  my_wq = create_workqueue("my_queue");
  if (my_wq) {

    /* Queue some work (item 1) */
    work = (my_work_t *)kmalloc(sizeof(my_work_t), GFP_KERNEL);
    if (work) {

      INIT_WORK( (struct work_struct *)work, my_wq_function );

      work->x = 1;

      ret = queue_work( my_wq, (struct work_struct *)work );

    }

    /* Queue some additional work (item 2) */
    work2 = (my_work_t *)kmalloc(sizeof(my_work_t), GFP_KERNEL);
    if (work2) {

      INIT_WORK( (struct work_struct *)work2, my_wq_function );

      work2->x = 2;

      ret = queue_work( my_wq, (struct work_struct *)work2 );

    }

  }

  return 0;
}

Listing 11에서는 마지막 요소를 보여 준다. 이 정리 모듈에서는 특정 작업 큐를 비우고(핸들러가 작업 처리를 완료할 때까지 차단됨) 작업 큐를 삭제한다.


Listing 11. 작업 큐 비우기 및 삭제
	
void cleanup_module( void )
{
  flush_workqueue( my_wq );

  destroy_workqueue( my_wq );

  return;
}


태스크릿과 작업 큐의 차이점

태스크릿과 작업 큐를 간단하게 소개하는 이 기사에서는 작업을 상반부에서 하반부로 지연시키는 두 가지 스키마를 살펴보았다. 태스크릿은 간단하고 직접적이기 때문에 지연 시간이 짧은 메커니즘을 제공하는 반면 작업 큐는 여러 작업 항목을 큐에 저장할 수 있는 유연한 API를 제공한다. 두 스키마 모두 인터럽트 컨텍스트의 작업을 지연시키지만 태스크릿만 실행부터 완료까지 자동으로 실행되며 작업 큐는 필요한 경우 핸들러를 대기 상태로 설정할 수 있다. 두 방법 모두 작업 지연에 유용하므로 사용자의 특정 요구에 따라 선택해서 사용하면 된다


example.
#include <linux/kernel.h>
#include <linux/kthread.h>
#include <linux/module.h>
#include <linux/workqueue.h>
#include <linux/slab.h>
#include <linux/timer.h>

struct kthread_t
{
  struct task_struct *thread;
  int running;
};

struct kthread_t *kthread1 = NULL;
struct kthread_t *kthread2 = NULL;

//struct work_struct *work1, *work2;

static struct workqueue_struct *my_wq;

static int lte_sdio_tx_thread(void *unused);
static int lte_sdio_rx_thread(void *unused);

static DECLARE_WORK(work1,(void *)lte_sdio_tx_thread);
static DECLARE_WORK(work2,(void *)lte_sdio_rx_thread);


void cleanup_queue( void )
{
  flush_workqueue( my_wq );

  destroy_workqueue( my_wq );

  return;
}


int lte_sdio_start_thread(void)
{
  int ret;

 my_wq = create_workqueue("my_queue");
 if (my_wq)
 {
  /* Queue some work (item 1) */
  kthread1 = kmalloc(sizeof(struct kthread_t), GFP_KERNEL);
  if(kthread1)
  {
   memset(kthread1, 0, sizeof(struct kthread_t));

   kthread1->running = 1;
   kthread1->thread = kthread_run(lte_sdio_tx_thread, NULL, "lte_sdio_tx_thread");

   printk(" Tx thread create \n");
   ret = queue_work( my_wq, &work1 );
  }

  /* Queue some work (item 2) */
  kthread2 = kmalloc(sizeof(struct kthread_t), GFP_KERNEL);
  if(kthread2)
  {
   memset(kthread2, 0, sizeof(struct kthread_t));

   kthread2->running = 1;
   kthread2->thread = kthread_run(lte_sdio_rx_thread, NULL, "lte_sdio_rx_thread");
   printk(" Rx thread create \n");
   ret = queue_work( my_wq, &work2 );
  }
 }
  
  return 0;
}

int lte_sdio_end_thread(void)
{
  kthread_stop(kthread1->thread);
   printk(" Tx Thread stop \n");
  kthread_stop(kthread2->thread);
   printk(" Rx Thread stop \n");

 cleanup_queue();

  return 0;
}


static int lte_sdio_tx_thread(void *unused)
{
  int tx_count =0;

  while(1)
  {
    msleep_interruptible(1000);
    if(kthread_should_stop())
       break;
 
    printk(KERN_ALERT "LTE TX [%d]Thread\n", ++tx_count);
  }
  return 0;
}

static int lte_sdio_rx_thread(void *unused)
{
  int rx_count = 0; 
  while(1)
  {
    msleep_interruptible(1000);
    if(kthread_should_stop())
      break;

    printk(KERN_ALERT "LTE RX [%d]Thread\n", ++rx_count);
  }
  return 0;
}

module_init(lte_sdio_start_thread);
module_exit(lte_sdio_end_thread);

MODULE_LICENSE("GPL");