Step by Step – Producer consumer program in java

producer consumer program

 247 total views

Today we will learn about the most famous java programming interview question – The Producer consumer program.

Even though there exist a lot of implementation of this program on the internet here I will try to provide a step-by-step guide towards the solution.

Lets understand the problem first

Understanding the Problem statement

In the Producer-consumer problem scenario, there are two main entities – A Producer and A Consumer.

There is also a third entity – A Queue.

The Problem states that The developer must create two threads. A Producer and a consumer thread.

  • The producer thread must create and add data to the Queue.
  • At the same time, the consumer must be able to consume or fetch the data from the Queue.
  • Whereas the Queue must be of the fixed size.

Restriction

There are two very important restriction to this simple task

  1. The Producer will not try to create and add new data if the Queue is full.
  2. In the same way, the Consumer will not try to fetch the data from the Queue if it is empty.

Okay, I hope now that you have understood the problem lets move ahead.

In this blog, I want to emphasize more on the approach towards the problem instead of just giving the solution and program.

Create an always alive thread

This is one of the basic steps for implementing the producer-consumer program in any language.

How to create an always alive thread?

If you want to create an always alive thread then you have to write some kind of code that is always doing something.

One such entity can be an infinite-loop.

while (true)
{
  //do something
}

Once you add this infinite while loop into your Thread’s run method your thread is alive forever.

And once you write action in that loop then it will be executed for infinite time.

More on this later for now lets jump into our queue’s implementation.

Creating a Buffer or Queue

The term buffer is interchangeably used with Queue and it is the entity that will store our generated data.

Remember, our Producer will generate this data.

Moving on, As we have to design a class(Queue) let us see what kind of members will it have. We will have

  • A Linked List as the member variable – will be used as the storage unit.
  • An Integer variable – to specify the Linked list size.
  • One method insert() – to add data into the Linked list.
  • And Finally, one method fetch() – to remove the data from the Linked list.

But don’t forget about the restrictions defined above, we have to provide a secure way of access to the threads so that the restriction is followed.

Implement Synchronization

We will achieve this using sychronization.

Synchronization provides a way so that only one thread can access a resource at a moment this will stop the threads to produce and read erroneous data.

We can add it as the next point

  • Add synchronized block in our methods.

Okay enough of the waiting, lets see How our Queue looks till now

DummyQueue.java

import java.util.LinkedList;

public class DummyQueue {

	private LinkedList<Integer> list;
	private int listSize;

	public DummyQueue(int size) {
		this.list = new LinkedList<>();
		this.listSize = size;
	}

	public void insert(int data) throws InterruptedException {
		synchronized (this) {
			list.add(data);
		}
	}

	public int fetch() throws InterruptedException {
		synchronized (this) {
			return list.poll();
		}
	}
}

But wait !!!

We are not done yet, we still have to add the most important functionality into this Queue class i.e Code to restrict Producer and Consumer.

By executing the below code in insert() and fetch() method both the thread will check the size of the queue.

  • The Producer will check the upper limit of the size.
  • The Consumer will check the lower-limit i.e Zero.

The restriction will be added using wait-and-notify.

Lets see now the final code of our queue.

import java.util.LinkedList;

public class DummyQueue {

	private LinkedList<Integer> list;
	private int listSize;

	public DummyQueue(int size) {
		this.list = new LinkedList<>();
		this.listSize = size;
	}

	public void insert(int data) throws InterruptedException {
		synchronized (this) {
			while (list.size() >= listSize) {
                wait();
            }
            list.add(data);
            notify();
		}
	}

	public int fetch() throws InterruptedException {
		synchronized (this) {
			while (list.size() == 0) {
                wait();
            }
            int value = list.poll();
            notify();
            return value;
		}
	}
}

We have not created Our Producer and Consumer Threads yet but we have designed our queue keeping in mind the problem statement.

Now Lets dry-run this code for Producer and consumer threads.

For Producer,

  • The Producer thread will call the insert method
  • It will ask for the lock on the DummyQueue object because of the synchronized block.
    • If the producer gets the lock on the object it will check the size of the linked-list else it will wait for the lock.
      • If the list is full the Producer will release the lock and will go in the waiting state until the Consumer notifies it.
      • else it will add the data into the linked-list and will notify the Consumer.

For Consumer,

  • The consumer thread will call the fetch method
  • It will also ask for the lock on the DummyQueue object again because of the synchronized block.
    • If Consumer gets the lock it will check the size of the Linked-list else it will wait for the lock to be released.
      • If the list is empty the consumer will release the lock and will go to the waiting state until Producer notifies it.
      • else it will remove the element from the linked list and it will notify the Producer.

Finally, let’s create our Producer-Consumer thread. I hope you remember the First Step of making an always alive thread because it will be used now.

Creating Producer-Consumer threads

Firstly, we will create the Producer thread.

Producer Thread

Thread lProducerThread = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					int lData = 0;
					while (true) {
						dummyQueue.insert(lData);
						System.out.println("Produced data " + lData);
						lData++;
						Thread.sleep(1000);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});

Consumer Thread

Thread lConsumerThread = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					while (true) {
						int value = dummyQueue.fetch();
						System.out.println("Consumed Data " + value);
						Thread.sleep(1000);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}

			}
		});

As we are iterating over an infinite-while-loop the producer will produce and the consumer will consume the data for an indefinite time or until you stop the execution explicitly.

The producer has a local integer variable. It inserts this variable’s value into the queue and will increment it for the next iteration whereas the consumer will only fetch the data from the queue in each iteration.

Complete Producer consumer program

public class ProducerConsumer {

	public static void main(String[] args) {
		DummyQueue dummyQueue = new DummyQueue(2);

		Thread lProducerThread = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					int lData = 0;
					while (true) {
						dummyQueue.insert(lData);
						System.out.println("Produced data " + lData);
						lData++;
						Thread.sleep(1000);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});

		Thread lConsumerThread = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					while (true) {
						int value = dummyQueue.fetch();
						System.out.println("Consumed Data " + value);
						Thread.sleep(1000);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}

			}
		});

        //Start Producer thread		
		lProducerThread.start();
		//Start Consumer thread
		lConsumerThread.start();
		
		try {
			lProducerThread.join();
			lConsumerThread.join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
	}
}

Output

Produced data 0
Consumed Data 0
Produced data 1
Consumed Data 1
Consumed Data 2
Produced data 2
Produced data 3
Consumed Data 3

In this complete program, we are using Thread join to ensure that the producer thread always stops before Consumer Thread.

Also, Thread.sleep() is not mandatory it is added so that you can better understand the output.

Leave a Reply

Your email address will not be published. Required fields are marked *