Introduction

C#을 기준으로 Producer-Consumer를 살펴 보겠습니다.  Producer-Consumer 패턴은 상당히 널리 알려져 있는 패턴이기에, 대부분의 개발자들이 기본적으로 이해하고 있는 내용입니다. 주된 목적은, 정보 제공자와 소비자 사이를 매끄럽게 연결하는 것 입니다. 간단하게 예를 들면, 시장에 비유할 수 있습니다. 우리 집도 배추 농사를 짓고, 옆집도 배추를 재배한다고 해서 각자 알아서 김장 담그는 집을 찾아 나서야 하는 건 아닙니다. 수확한 배추를 시장 같은 곳에 내 놓으면, 김장을 담그던 혹은 배추쌈을 해 먹던 와서 적절히 사가는 것이죠.

Screen Shot 2014-12-02 at 5.42.13 pm

이러한 데이타의 흐름을 만드는 것이, Producer-Consumer Pattern의 골자가 되겠습니다.

What's the matter?

단순하게 접근한다면, Producer에서 Consumer까지 사용할 수 있는 리스트가 하나 있어서, 그곳을 다함께 사용하는 시나리오를 작성할 수 있습니다. 그러니까, 마을에 시장이 하나만 있어서, 모두들 그곳을 거치도록 만드는 것이죠. 예를 들면, 다음의 코드로 표현 할 수 있습니다.

using System;
using System.Collections.Generic;

namespace PnC 
{
	public class Producer 
	{
		Queue<int> _q;

		public Producer(Queue<int> q)
		{
			_q = q;
		}
			
		public void DoWork()
		{
			var r = new Random ();
			_q.Enqueue (r.Next (100));
		}
	}

	public class Consumer
	{
		Queue<int> _q;

		public Consumer(Queue<int> q)
		{
			_q = q;
		}

		public int DoWork()
		{
			return _q.Dequeue ();
		}
	}

	public class Program
	{
		static void Main()
		{
			Queue<int> queue = new Queue<int>();

			Producer producer1 = new Producer (queue);
			Producer producer2 = new Producer (queue);
			Consumer consumer = new Consumer (queue);

			producer1.DoWork ();
			producer2.DoWork ();
			consumer.DoWork ();
		}
	}
}

코드는 간단합니다. Producer와 Consumer를 정의하면서 DoWork이라는 함수에 간단한 작업을 같이 정의하였습니다. Main함수에서 보시는 것처럼, 2개의 Producer와 Consumer를 생성하여, 일을 시키는 내용입니다. 여기서, 가장 중요한 점은, 디자인이 모두 같은 자료 구조를 사용하도록 되어 있다는 점 입니다. 즉, Queue가 이 디자인을 관통하는 가장 큰 요소인자, 한계를 나타내는 부분입니다.

Thread

"쓰레드가 나타났다~~"

"쓰레드가 나타났다!"

위의 코드는 단일 process, 단일 thread에서는 아무 문제가 없습니다. 그렇지만, 효율적이지는 않습니다. producer와 consumer를 나눈 배경에는 두 개의 주체가 각각 일하면서 각자의 역활에만 충실할 수 있도록 고안한 것 입니다. 만약, 2개 이상의 thread가 있다면, 무슨 문제가 생길까요?

Lock

아무 문제가 없다면, 정말로 좋겠지만... 그럴리 없겠죠? 가장 큰 문제는, queue입니다. 하나 이상의 thread가 있다면, Enqueue 함수와 Dequeue 함수가 Atomic하지 않기 때문에,  문제의 소지가 있습니다. 즉, Enqueue가 일어나는 중에 다른 thread가 Dequeue를 시도하는 경우 입니다.  이러한 부분은 Lock이라는 키워드를 사용해서 고립시켜야 합니다.

using System;
using System.Threading;
using System.Collections.Generic;
using System.Collections;
using System.Linq;

namespace PnCwithThread
{
	public class SyncEvents
	{
		public SyncEvents ()
		{
			NewItemEvent = new AutoResetEvent (false);
			ExitThreadEvent = new ManualResetEvent (false);
			EventArray = new WaitHandle[2];
			EventArray [0] = NewItemEvent;
			EventArray [1] = ExitThreadEvent;
		}

		public ManualResetEvent ExitThreadEvent { private set; get; }

		public AutoResetEvent NewItemEvent { private set; get; }

		public WaitHandle[] EventArray  { private set; get; }
	}

	public class Producer
	{
		public Producer (Queue<int> q, SyncEvents e)
		{
			_queue = q;
			_syncEvents = e;
			_count = 0;
		}

		public void ThreadRun ()
		{
			Random r = new Random ();
			while (!_syncEvents.ExitThreadEvent.WaitOne (0, false)) {
				lock (((ICollection)_queue).SyncRoot) {
					while (_queue.Count < 20) {
						_count++;
						_queue.Enqueue (r.Next (0, 100));
						_syncEvents.NewItemEvent.Set ();
					}
				}
			}
			Console.WriteLine ("Producer thread: produced {0} items", _count);
		}

		Queue<int> _queue;
		SyncEvents _syncEvents;
		int _count;
	}

	public class Consumer
	{
		public Consumer (Queue<int> q, SyncEvents e)
		{
			_queue = q;
			_syncEvents = e;
			_count = 0;
		}

		public void ThreadRun ()
		{
			while (WaitHandle.WaitAny (_syncEvents.EventArray) != 1) {
				lock (((ICollection)_queue).SyncRoot) {
					_count++;
					_queue.Dequeue ();
				}
			}
			Console.WriteLine ("Consumer : consumed {0} items", _count);
		}

		Queue<int> _queue;
		SyncEvents _syncEvents;
		int _count;
	}

	public class ThreadSyncSample
	{
		private static void ShowQueueContents (Queue<int> q)
		{
			lock (((ICollection)q).SyncRoot) {
				foreach (int item in q) {
					Console.Write ("{0} ", item);
				}
			}
			Console.WriteLine ();
		}

		static void Main ()
		{
			Queue<int> queue = new Queue<int> ();
			SyncEvents syncEvents = new SyncEvents ();

			Console.WriteLine ("Configuring worker thread...");

			Producer producer1 = new Producer (queue, syncEvents);
			Producer producer2 = new Producer (queue, syncEvents);
			Consumer consumer = new Consumer (queue, syncEvents);

			Thread producerThread1 = new Thread (producer1.ThreadRun);
			Thread producerThread2 = new Thread (producer2.ThreadRun);
			Thread consumerThread = new Thread (consumer.ThreadRun);

			Console.WriteLine ("Launching producer and consumer threads...");
			producerThread1.Start ();
			producerThread2.Start ();
			consumerThread.Start ();

			// for execute twice 
			Enumerable.Range (0, 2).ToList ().ForEach (t => {
				Thread.Sleep (2000);
				ShowQueueContents (queue);
			});

			Console.WriteLine ("Signaling threads to terminate...");
			syncEvents.ExitThreadEvent.Set ();
 			
			producerThread1.Join ();
			producerThread2.Join ();
			consumerThread.Join ();
		}
	}
}

Lock을 사용하는 버전으로 다시 작성하였습니다. 2개의 producer와 하나의 consumer가 사이좋게 지낼 수 있도록, lock을 통한 atomic을 구현했습니다. thread-safe producer-consumer가 만들어 진 것 입니다.

그런데 말입니다. 늘, 이러한 패턴을 친구들에게 보여주면 한 가지 이야기를 듣게 됩니다. 저 'lock'이 최선입니까? 결국, 기다림이 발생되고 성능은 지연되는게 아닌가요? 화장실을 다녀온 후의 찝찝함이랄까요, 여전히 무언가 마음에 들지 않습니다.

Result 결과

Configuring worker thread...
Launching producer and consumer threads...
78 26 89 14 49 93 81 1 48 80 80 7 91 50 42 82 94 6 37 67
64 52 53 67 75 97 53 37 76 9 98 49 71 12 26 88 1 36 25
Signaling threads to terminate...
Consumer : consumed 680792 items
Producer thread: produced 317023 items
Producer thread: produced 363789 items

 

 

Without lock 락없는 생산자-소비자

lock을 없애고 싶다. 없애고 싶은데, 도대체 어떻게 하죠? lock이 생겨난 이유에 대해서 곰곰히 생각해 볼 필요가 있습니다. 결국에는, queue라는 사실이 떠 오르실 거예요. 즉, queue의 한계점인 셈입니다. 그렇기에, lock을 없애기 위해서는 queue를 버려야 합니다. 우리가 이제부터 사용할 자료구조는 BlockingCollection<T>이 되겠습니다. BlockingCollection은 내부적으로 ConcurrentQueue를 사용합니다.

using System;
using System.Threading;
using System.Collections.Generic;
using System.Collections;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Linq;

namespace PnCwithoutLock
{
	public class Producer
	{
		public Producer (BlockingCollection<int> q, CancellationTokenSource e)
		{
			_queue = q;
			_endTokenSource = e;
			_count = 0;
		}

		public void ThreadRun ()
		{
			Random r = new Random ();
			while (!_endTokenSource.IsCancellationRequested) {
				while (_queue.Count < 20) {
					_count++;
					_queue.Add (r.Next (0, 100));
				}
			}
			Console.WriteLine ("Producer thread: produced {0} items", _count);
		}

		int _count;
		BlockingCollection<int> _queue;
		CancellationTokenSource _endTokenSource;
	}

	public class Consumer
	{
		public Consumer (BlockingCollection<int> q, CancellationTokenSource e)
		{
			_queue = q;
			_endTokenSource = e;
			_count = 0;
		}

		public void ThreadRun ()
		{
			while (!_endTokenSource.IsCancellationRequested) {
				_count++;
				_queue.Take ();
			}

			Console.WriteLine ("Consumer : consumed {0} items", _count);
		}

		int _count;
		BlockingCollection<int> _queue;
		CancellationTokenSource _endTokenSource;
	}

	public class ThreadSyncSample
	{
		private static void ShowQueueContents (BlockingCollection<int> q)
		{
			foreach (int item in q) {
				Console.Write ("{0} ", item);
			}
		}

		static void Main ()
		{
			BlockingCollection<int> queue = new BlockingCollection<int> ();
			CancellationTokenSource endTokenSource = new CancellationTokenSource ();

			Console.WriteLine ("Configuring worker thread...");

			Producer producer1 = new Producer (queue, endTokenSource);
			Producer producer2 = new Producer (queue, endTokenSource);
			Consumer consumer = new Consumer (queue, endTokenSource);

			var tasks = new  [] {
				new Task (producer1.ThreadRun),
				new Task (producer2.ThreadRun),
				new Task (consumer.ThreadRun)
			};

			Console.WriteLine ("Launching producer and consumer threads...");
			Array.ForEach (tasks, t => t.Start ());

			// for execute twice 
			Enumerable.Range (0, 2).ToList ().ForEach (t => {
				Thread.Sleep (2000);
				ShowQueueContents (queue);
			});

			Console.WriteLine ("Signaling threads to terminate...");
			endTokenSource.Cancel ();
			Task.WaitAll (tasks);
		}
	}
}

앞의 소스와 큰 차이는 없습니다. BlockingCollection을 사용하면서, 교통 정리를 위해서   CancellationTokenSource를 부가적으로 사용하게 되었습니다. 모든 thread가 해당 token을 참조하면서, 끝나는 시점에 대한 공유를 하고 있습니다.

Result 결과

 

Configuring worker thread...
Launching producer and consumer threads...
41 33 99 57 76 41 98 61 39 41 22 19 99 63 92 70 59 52 84 12
22 92
Signaling threads to terminate...
Producer thread: produced 4825693 items
Consumer : consumed 9607867 items
Producer thread: produced 4782195 items

 

Conclusion

Producer와 Consumer는 생각보다 자주 쓰이는 패턴입니다. 그리고 효율을 위해서 Thread를 분리하는 일도 중요하답니다. 이러한 가운데, 더욱 효율적으로 작성할 수 있는 방법이, BlockingCollection을 이용하는 것 입니다. 같은 내용임에도 결과에서 보이는 것처럼 성능 차이가 꽤 발생되고 있습니다. 즉, lock을 사용하실 때는 더욱 주의를 필요로 하고, 가급적 사용하지 않는 것이 프로그램의 전체적인 성능이 좋아진다고 볼 수 있겠습니다.

References