广

Java编程

  • IOS开发
  • android开发
  • PHP编程
  • JavaScript
  • ASP.NET
  • ASP编程
  • JSP编程
  • Java编程
  • 易语言
  • Ruby编程
  • Perl编程
  • AJAX
  • 正则表达式
  • C语言
  • 编程开发

    基于Java回顾之多线程同步的使用详解

    2018-11-14 08:49:40 次阅读 稿源:互联网
    零七广告

    首先阐述什么是同步,不同步有什么问题,然后讨论可以采取哪些措施控制同步,接下来我们会仿照回顾网络通信时那样,构建一个服务器端的“线程池”,JDK为我们提供了一个很大的concurrent工具包,最后我们会对里面的内容进行探索。

    为什么要线程同步?

    说到线程同步,大部分情况下, 我们是在针对“单对象多线程”的情况进行讨论,一般会将其分成两部分,一部分是关于“共享变量”,一部分关于“执行步骤”。

    共享变量

    当我们在线程对象(Runnable)中定义了全局变量,run方法会修改该变量时,如果有多个线程同时使用该线程对象,那么就会造成全局变量的值被同时修改,造成错误。我们来看下面的代码:
    代码如下:

    共享变量造成同步问题
     class MyRunner implements Runnable
     {
         public int sum = 0;

         public void run()
         {
             System.out.println(Thread.currentThread().getName() + " Start.");
             for (int i = 1; i <= 100; i++)
             {
                 sum += i;
             }
             try {
                 Thread.sleep(500);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum);
             System.out.println(Thread.currentThread().getName() + " End.");
         }
     }

     
     private static void sharedVaribleTest() throws InterruptedException
     {
         MyRunner runner = new MyRunner();
         Thread thread1 = new Thread(runner);
         Thread thread2 = new Thread(runner);
         thread1.setDaemon(true);
         thread2.setDaemon(true);
         thread1.start();
         thread2.start();
         thread1.join();
         thread2.join();
     }

    这个示例中,线程用来计算1到100的和是多少,我们知道正确结果是5050(好像是高斯小时候玩过这个?),但是上述程序返回的结果是10100,原因是两个线程同时对sum进行操作。

    执行步骤

    我们在多个线程运行时,可能需要某些操作合在一起作为“原子操作”,即在这些操作可以看做是“单线程”的,例如我们可能希望输出结果的样子是这样的:
    代码如下:

    线程1:步骤1
     线程1:步骤2
     线程1:步骤3
     线程2:步骤1
     线程2:步骤2
     线程2:步骤3

    如果同步控制不好,出来的样子可能是这样的:
    代码如下:

    线程1:步骤1
    线程2:步骤1
    线程1:步骤2
    线程2:步骤2
    线程1:步骤3
    线程2:步骤3

    这里我们也给出一个示例代码:
    代码如下:

    执行步骤混乱带来的同步问题
     class MyNonSyncRunner implements Runnable
     {
         public void run() {
             System.out.println(Thread.currentThread().getName() + " Start.");
             for(int i = 1; i <= 5; i++)
             {
                 System.out.println(Thread.currentThread().getName() + " Running step " + i);
                 try
                 {
                     Thread.sleep(50);
                 }
                 catch(InterruptedException ex)
                 {
                     ex.printStackTrace();
                 }
             }
             System.out.println(Thread.currentThread().getName() + " End.");
         }
     }

     
     private static void syncTest() throws InterruptedException
     {
         MyNonSyncRunner runner = new MyNonSyncRunner();
         Thread thread1 = new Thread(runner);
         Thread thread2 = new Thread(runner);
         thread1.setDaemon(true);
         thread2.setDaemon(true);
         thread1.start();
         thread2.start();
         thread1.join();
         thread2.join();
     }

    如何控制线程同步

    既然线程同步有上述问题,那么我们应该如何去解决呢?针对不同原因造成的同步问题,我们可以采取不同的策略。

    控制共享变量

    我们可以采取3种方式来控制共享变量。

    将“单对象多线程”修改成“多对象多线程”

    上文提及,同步问题一般发生在“单对象多线程”的场景中,那么最简单的处理方式就是将运行模型修改成“多对象多线程”的样子,针对上面示例中的同步问题,修改后的代码如下:
    代码如下:

    解决共享变量问题方案一
     private static void sharedVaribleTest2() throws InterruptedException
     {
         Thread thread1 = new Thread(new MyRunner());
         Thread thread2 = new Thread(new MyRunner());
         thread1.setDaemon(true);
         thread2.setDaemon(true);
         thread1.start();
         thread2.start();
         thread1.join();
         thread2.join();
     }

    我们可以看到,上述代码中两个线程使用了两个不同的Runnable实例,它们在运行过程中,就不会去访问同一个全局变量。
    将“全局变量”降级为“局部变量”

    既然是共享变量造成的问题,那么我们可以将共享变量改为“不共享”,即将其修改为局部变量。这样也可以解决问题,同样针对上面的示例,这种解决方式的代码如下:
    代码如下:

    解决共享变量问题方案二
     class MyRunner2 implements Runnable
     {
         public void run()
         {
             System.out.println(Thread.currentThread().getName() + " Start.");
             int sum = 0;
             for (int i = 1; i <= 100; i++)
             {
                 sum += i;
             }
             try {
                 Thread.sleep(500);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum);
             System.out.println(Thread.currentThread().getName() + " End.");
         }
     }

     
     private static void sharedVaribleTest3() throws InterruptedException
     {
         MyRunner2 runner = new MyRunner2();
         Thread thread1 = new Thread(runner);
         Thread thread2 = new Thread(runner);
         thread1.setDaemon(true);
         thread2.setDaemon(true);
         thread1.start();
         thread2.start();
         thread1.join();
         thread2.join();
     }

    我们可以看出,sum变量已经由全局变量变为run方法内部的局部变量了。
    使用ThreadLocal机制

    ThreadLocal是JDK引入的一种机制,它用于解决线程间共享变量,使用ThreadLocal声明的变量,即使在线程中属于全局变量,针对每个线程来讲,这个变量也是独立的。

    我们可以用这种方式来改造上面的代码,如下所示:
    代码如下:

    解决共享变量问题方案三
     class MyRunner3 implements Runnable
     {
         public ThreadLocal<Integer> tl = new ThreadLocal<Integer>();

         public void run()
         {
             System.out.println(Thread.currentThread().getName() + " Start.");
             for (int i = 0; i <= 100; i++)
             {
                 if (tl.get() == null)
                 {
                     tl.set(new Integer(0));
                 }
                 int sum = ((Integer)tl.get()).intValue();
                 sum+= i;
                 tl.set(new Integer(sum));
                 try {
                     Thread.sleep(10);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }

             System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + ((Integer)tl.get()).intValue());
             System.out.println(Thread.currentThread().getName() + " End.");
         }
     }

     
     private static void sharedVaribleTest4() throws InterruptedException
     {
         MyRunner3 runner = new MyRunner3();
         Thread thread1 = new Thread(runner);
         Thread thread2 = new Thread(runner);
         thread1.setDaemon(true);
         thread2.setDaemon(true);
         thread1.start();
         thread2.start();
         thread1.join();
         thread2.join();
     }

    综上三种方案,第一种方案会降低多线程执行的效率,因此,我们推荐使用第二种或者第三种方案。

    控制执行步骤

    说到执行步骤,我们可以使用synchronized关键字来解决它。
    代码如下:

    执行步骤问题解决方案
     class MySyncRunner implements Runnable
     {
         public void run() {
             synchronized(this)
             {
                 System.out.println(Thread.currentThread().getName() + " Start.");
                 for(int i = 1; i <= 5; i++)
                 {
                     System.out.println(Thread.currentThread().getName() + " Running step " + i);
                     try
                     {
                         Thread.sleep(50);
                     }
                     catch(InterruptedException ex)
                     {
                         ex.printStackTrace();
                     }
                 }
                 System.out.println(Thread.currentThread().getName() + " End.");
             }
         }
     }

     
     private static void syncTest2() throws InterruptedException
     {
         MySyncRunner runner = new MySyncRunner();
         Thread thread1 = new Thread(runner);
         Thread thread2 = new Thread(runner);
         thread1.setDaemon(true);
         thread2.setDaemon(true);
         thread1.start();
         thread2.start();
         thread1.join();
         thread2.join();
     }

    在线程同步的话题上,synchronized是一个非常重要的关键字。它的原理和数据库中事务锁的原理类似。我们在使用过程中,应该尽量缩减synchronized覆盖的范围,原因有二:1)被它覆盖的范围是串行的,效率低;2)容易产生死锁。我们来看下面的示例:
    代码如下:

    synchronized示例
     private static void syncTest3() throws InterruptedException
     {
         final List<Integer> list = new ArrayList<Integer>();

         Thread thread1 = new Thread()
         {
             public void run()
             {
                 System.out.println(Thread.currentThread().getName() + " Start.");
                 Random r = new Random(100);
                 synchronized(list)
                 {
                     for (int i = 0; i < 5; i++)
                     {
                         list.add(new Integer(r.nextInt()));
                     }
                     System.out.println("The size of list is " + list.size());
                 }
                 try
                 {
                     Thread.sleep(500);
                 }
                 catch(InterruptedException ex)
                 {
                     ex.printStackTrace();
                 }
                 System.out.println(Thread.currentThread().getName() + " End.");
             }
         };

         Thread thread2 = new Thread()
         {
             public void run()
             {
                 System.out.println(Thread.currentThread().getName() + " Start.");
                 Random r = new Random(100);
                 synchronized(list)
                 {
                     for (int i = 0; i < 5; i++)
                     {
                         list.add(new Integer(r.nextInt()));
                     }
                     System.out.println("The size of list is " + list.size());
                 }
                 try
                 {
                     Thread.sleep(500);
                 }
                 catch(InterruptedException ex)
                 {
                     ex.printStackTrace();
                 }
                 System.out.println(Thread.currentThread().getName() + " End.");
             }
         };

         thread1.start();
         thread2.start();
         thread1.join();
         thread2.join();
     }

    我们应该把需要同步的内容集中在一起,尽量不包含其他不相关的、消耗大量资源的操作,示例中线程休眠的操作显然不应该包括在里面。

    构造线程池

    我们在<基于Java回顾之网络通信的应用分析>中,已经构建了一个Socket连接池,这里我们在此基础上,构建一个线程池,完成基本的启动、休眠、唤醒、停止操作。

    基本思路还是以数组的形式保持一系列线程,通过Socket通信,客户端向服务器端发送命令,当服务器端接收到命令后,根据收到的命令对线程数组中的线程进行操作。

    Socket客户端的代码保持不变,依然采用构建Socket连接池时的代码,我们主要针对服务器端进行改造。

    首先,我们需要定义一个线程对象,它用来执行我们的业务操作,这里简化起见,只让线程进行休眠。
    代码如下:

    定义线程对象
     enum ThreadStatus
     {
         Initial,
         Running,
         Sleeping,
         Stopped
     }

     enum ThreadTask
     {
         Start,
         Stop,
         Sleep,
         Wakeup
     }

     
     class MyThread extends Thread
     {
         public ThreadStatus status = ThreadStatus.Initial;
         public ThreadTask task;
         public void run()
         {
             status = ThreadStatus.Running;
             while(true)
             {
                 try {
                     Thread.sleep(3000);
                     if (status == ThreadStatus.Sleeping)
                     {
                         System.out.println(Thread.currentThread().getName() + " 进入休眠状态。");
                         this.wait();
                     }
                 } catch (InterruptedException e) {
                     System.out.println(Thread.currentThread().getName() + " 运行过程中出现错误。");
                     status = ThreadStatus.Stopped;
                 }
             }
         }
     }

    然后,我们需要定义一个线程管理器,它用来对线程池中的线程进行管理,代码如下:
    代码如下:

    定义线程池管理对象
     class MyThreadManager
     {
         public static void manageThread(MyThread[] threads, ThreadTask task)
         {
             for (int i = 0; i < threads.length; i++)
             {
                 synchronized(threads[i])
                 {
                     manageThread(threads[i], task);
                 }
             }
             System.out.println(getThreadStatus(threads));
         }

         public static void manageThread(MyThread thread, ThreadTask task)
         {
             if (task == ThreadTask.Start)
             {
                 if (thread.status == ThreadStatus.Running)
                 {
                     return;
                 }
                 if (thread.status == ThreadStatus.Stopped)
                 {
                     thread = new MyThread();
                 }
                 thread.status = ThreadStatus.Running;
                 thread.start();

             }
             else if (task == ThreadTask.Stop)
             {
                 if (thread.status != ThreadStatus.Stopped)
                 {
                     thread.interrupt();
                     thread.status = ThreadStatus.Stopped;
                 }
             }
             else if (task == ThreadTask.Sleep)
             {
                 thread.status = ThreadStatus.Sleeping;
             }
             else if (task == ThreadTask.Wakeup)
             {
                 thread.notify();
                 thread.status = ThreadStatus.Running;
             }
         }

         public static String getThreadStatus(MyThread[] threads)
         {
             StringBuffer sb = new StringBuffer();
             for (int i = 0; i < threads.length; i++)
             {
                 sb.append(threads[i].getName() + "的状态:" + threads[i].status).append("/r/n");
             }
             return sb.toString();
         }
     }

    最后,是我们的服务器端,它不断接受客户端的请求,每收到一个连接请求,服务器端会新开一个线程,来处理后续客户端发来的各种操作指令。
    代码如下:

    定义服务器端线程池对象
     public class MyThreadPool {

         public static void main(String[] args) throws IOException
         {
             MyThreadPool pool = new MyThreadPool(5);
         }

         private int threadCount;
         private MyThread[] threads = null;

        
         public MyThreadPool(int count) throws IOException
         {
             this.threadCount = count;
             threads = new MyThread[count];
             for (int i = 0; i < threads.length; i++)
             {
                 threads[i] = new MyThread();
                 threads[i].start();
             }
             Init();
         }

         private void Init() throws IOException
         {
             ServerSocket serverSocket = new ServerSocket(5678);
             while(true)
             {
                 final Socket socket = serverSocket.accept();
                 Thread thread = new Thread()
                 {
                     public void run()
                     {
                         try
                         {
                             System.out.println("检测到一个新的Socket连接。");
                             BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                             PrintStream ps = new PrintStream(socket.getOutputStream());
                             String line = null;
                             while((line = br.readLine()) != null)
                             {
                                 System.out.println(line);
                                 if (line.equals("Count"))
                                 {
                                     System.out.println("线程池中有5个线程");
                                 }
                                 else if (line.equals("Status"))
                                 {
                                     String status = MyThreadManager.getThreadStatus(threads);
                                     System.out.println(status);
                                 }
                                 else if (line.equals("StartAll"))
                                 {
                                     MyThreadManager.manageThread(threads, ThreadTask.Start);
                                 }
                                 else if (line.equals("StopAll"))
                                 {
                                     MyThreadManager.manageThread(threads, ThreadTask.Stop);
                                 }
                                 else if (line.equals("SleepAll"))
                                 {
                                     MyThreadManager.manageThread(threads, ThreadTask.Sleep);
                                 }
                                 else if (line.equals("WakeupAll"))
                                 {
                                     MyThreadManager.manageThread(threads, ThreadTask.Wakeup);
                                 }
                                 else if (line.equals("End"))
                                 {
                                     break;
                                 }
                                 else
                                 {
                                     System.out.println("Command:" + line);
                                 }
                                 ps.println("OK");
                                 ps.flush();
                             }
                         }
                         catch(Exception ex)
                         {
                             ex.printStackTrace();
                         }
                     }
                 };
                 thread.start();
             }
         }
     }

    探索JDK中的concurrent工具包

    为了简化开发人员在进行多线程开发时的工作量,并减少程序中的bug,JDK提供了一套concurrent工具包,我们可以用它来方便的开发多线程程序。
    线程池

    我们在上面实现了一个非常“简陋”的线程池,concurrent工具包中也提供了线程池,而且使用非常方便。

    concurrent工具包中的线程池分为3类:ScheduledThreadPool、FixedThreadPool和CachedThreadPool。

    首先我们来定义一个Runnable的对象
    代码如下:

    定义Runnable对象
     class MyRunner implements Runnable
     {
         public void run() {
             System.out.println(Thread.currentThread().getName() + "运行开始");
             for(int i = 0; i < 1; i++)
             {
                 try
                 {
                     System.out.println(Thread.currentThread().getName() + "正在运行");
                     Thread.sleep(200);
                 }
                 catch(Exception ex)
                 {
                     ex.printStackTrace();
                 }
             }
             System.out.println(Thread.currentThread().getName() + "运行结束");
         }
     }

    可以看出,它的功能非常简单,只是输出了线程的执行过程。

    ScheduledThreadPool

    这和我们平时使用的ScheduledTask比较类似,或者说很像Timer,它可以使得一个线程在指定的一段时间内开始运行,并且在间隔另外一段时间后再次运行,直到线程池关闭。

    示例代码如下:
    代码如下:

    ScheduledThreadPool示例
     private static void scheduledThreadPoolTest()
     {
         final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);

         MyRunner runner = new MyRunner();

         final ScheduledFuture<?> handler1 = scheduler.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS);
         final ScheduledFuture<?> handler2 = scheduler.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS);

         scheduler.schedule(new Runnable()
         {
             public void run()
             {
                 handler1.cancel(true);
                 handler2.cancel(true);
                 scheduler.shutdown();
             }
         }, 30, TimeUnit.SECONDS
         );
     }

    FixedThreadPool

    这是一个指定容量的线程池,即我们可以指定在同一时间,线程池中最多有多个线程在运行,超出的线程,需要等线程池中有空闲线程时,才能有机会运行。

    来看下面的代码:
    代码如下:

    FixedThreadPool示例
     private static void fixedThreadPoolTest()
     {
         ExecutorService exec = Executors.newFixedThreadPool(3);
         for(int i = 0; i < 5; i++)
         {
             MyRunner runner = new MyRunner();
             exec.execute(runner);
         }
         exec.shutdown();
     }

    注意它的输出结果:
    代码如下:

    pool-1-thread-1运行开始
    pool-1-thread-1正在运行
    pool-1-thread-2运行开始
    pool-1-thread-2正在运行
    pool-1-thread-3运行开始
    pool-1-thread-3正在运行
    pool-1-thread-1运行结束
    pool-1-thread-1运行开始
    pool-1-thread-1正在运行
    pool-1-thread-2运行结束
    pool-1-thread-2运行开始
    pool-1-thread-2正在运行
    pool-1-thread-3运行结束
    pool-1-thread-1运行结束
    pool-1-thread-2运行结束

    可以看到从始至终,最多有3个线程在同时运行。
    CachedThreadPool

    这是另外一种线程池,它不需要指定容量,只要有需要,它就会创建新的线程。

    它的使用方式和FixedThreadPool非常像,来看下面的代码:
    代码如下:

    CachedThreadPool示例
     private static void cachedThreadPoolTest()
     {
         ExecutorService exec = Executors.newCachedThreadPool();
         for(int i = 0; i < 5; i++)
         {
             MyRunner runner = new MyRunner();
             exec.execute(runner);
         }
         exec.shutdown();
     }

    它的执行结果如下:
    代码如下:

    pool-1-thread-1运行开始
    pool-1-thread-1正在运行
    pool-1-thread-2运行开始
    pool-1-thread-2正在运行
    pool-1-thread-3运行开始
    pool-1-thread-3正在运行
    pool-1-thread-4运行开始
    pool-1-thread-4正在运行
    pool-1-thread-5运行开始
    pool-1-thread-5正在运行
    pool-1-thread-1运行结束
    pool-1-thread-2运行结束
    pool-1-thread-3运行结束
    pool-1-thread-4运行结束
    pool-1-thread-5运行结束

    可以看到,它创建了5个线程。
    处理线程返回值

    在有些情况下,我们需要使用线程的返回值,在上述的所有代码中,线程这是执行了某些操作,没有任何返回值。

    如何做到这一点呢?我们可以使用JDK中的Callable<T>和CompletionService<T>,前者返回单个线程的结果,后者返回一组线程的结果。
    返回单个线程的结果

    还是直接看代码吧:
    代码如下:

    Callable示例
     private static void callableTest() throws InterruptedException, ExecutionException
     {
         ExecutorService exec = Executors.newFixedThreadPool(1);
         Callable<String> call = new Callable<String>()
         {
             public String call()
             {
                 return "Hello World.";
             }
         };
         Future<String> result = exec.submit(call);
         System.out.println("线程的返回值是" + result.get());
         exec.shutdown();
     }

    执行结果如下:
    代码如下:

    线程的返回值是Hello World.

    返回线程池中每个线程的结果

    这里需要使用CompletionService<T>,代码如下:
    代码如下:

    CompletionService示例
     private static void completionServiceTest() throws InterruptedException, ExecutionException
     {
         ExecutorService exec = Executors.newFixedThreadPool(10);
         CompletionService<String> service = new ExecutorCompletionService<String>(exec);
         for (int i = 0; i < 10; i++)
         {
             Callable<String> call = new Callable<String>()
             {
                 public String call() throws InterruptedException
                 {
                     return Thread.currentThread().getName();
                 }
             };
             service.submit(call);
         }

         Thread.sleep(1000);
         for(int i = 0; i < 10; i++)
         {
             Future<String> result = service.take();
             System.out.println("线程的返回值是" + result.get());
         }
         exec.shutdown();
     }

    执行结果如下:
    代码如下:

    线程的返回值是pool-2-thread-1
    线程的返回值是pool-2-thread-2
    线程的返回值是pool-2-thread-3
    线程的返回值是pool-2-thread-5
    线程的返回值是pool-2-thread-4
    线程的返回值是pool-2-thread-6
    线程的返回值是pool-2-thread-8
    线程的返回值是pool-2-thread-7
    线程的返回值是pool-2-thread-9
    线程的返回值是pool-2-thread-10

    实现生产者-消费者模型

    对于生产者-消费者模型来说,我们应该都不会陌生,通常我们都会使用某种数据结构来实现它。在concurrent工具包中,我们可以使用BlockingQueue来实现生产者-消费者模型,如下:
    代码如下:

    BlockingQueue示例
     public class BlockingQueueSample {

         public static void main(String[] args)
         {
             blockingQueueTest();
         }

         private static void blockingQueueTest()
         {
             final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
             final int maxSleepTimeForSetter = 10;
             final int maxSleepTimerForGetter = 10;

             Runnable setter = new Runnable()
             {
                 public void run()
                 {
                     Random r = new Random();
                     while(true)
                     {
                         int value = r.nextInt(100);
                         try
                         {
                             queue.put(new Integer(value));
                             System.out.println(Thread.currentThread().getName() + "---向队列中插入值" + value);
                             Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);
                         }
                         catch(Exception ex)
                         {
                             ex.printStackTrace();
                         }
                     }
                 }
             };

             Runnable getter = new Runnable()
             {
                 public void run()
                 {
                     Random r = new Random();
                     while(true)
                     {
                         try
                         {
                             if (queue.size() == 0)
                             {
                                 System.out.println(Thread.currentThread().getName() + "---队列为空");
                             }
                             else
                             {
                                 int value = queue.take().intValue();
                                 System.out.println(Thread.currentThread().getName() + "---从队列中获取值" + value);
                             }
                             Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);
                         }
                         catch(Exception ex)
                         {
                             ex.printStackTrace();
                         }
                     }
                 }
             };

             ExecutorService exec = Executors.newFixedThreadPool(2);
             exec.execute(setter);
             exec.execute(getter);
         }
     }

    我们定义了两个线程,一个线程向Queue中添加数据,一个线程从Queue中取数据。我们可以通过控制maxSleepTimeForSetter和maxSleepTimerForGetter的值,来使得程序得出不同的结果。

    可能的执行结果如下:
    代码如下:

    pool-1-thread-1---向队列中插入值88
    pool-1-thread-2---从队列中获取值88
    pool-1-thread-1---向队列中插入值75
    pool-1-thread-2---从队列中获取值75
    pool-1-thread-2---队列为空
    pool-1-thread-2---队列为空
    pool-1-thread-2---队列为空
    pool-1-thread-1---向队列中插入值50
    pool-1-thread-2---从队列中获取值50
    pool-1-thread-2---队列为空
    pool-1-thread-2---队列为空
    pool-1-thread-2---队列为空
    pool-1-thread-2---队列为空
    pool-1-thread-2---队列为空
    pool-1-thread-1---向队列中插入值51
    pool-1-thread-1---向队列中插入值92
    pool-1-thread-2---从队列中获取值51
    pool-1-thread-2---从队列中获取值92

    因为Queue中的值和Thread的休眠时间都是随机的,所以执行结果也不是固定的。

    使用信号量来控制线程

    JDK提供了Semaphore来实现“信号量”的功能,它提供了两个方法分别用于获取和释放信号量:acquire和release,示例代码如下:
    代码如下:

    SemaPhore示例
     private static void semaphoreTest()
     {
         ExecutorService exec = Executors.newFixedThreadPool(10);
         final Semaphore semp = new Semaphore(2);

         for (int i = 0; i < 10; i++)
         {
             Runnable runner = new Runnable()
             {
                 public void run()
                 {
                     try
                     {
                         semp.acquire();
                         System.out.println(new Date() + " " + Thread.currentThread().getName() + "正在执行。");
                         Thread.sleep(5000);
                         semp.release();
                     }
                     catch(Exception ex)
                     {
                         ex.printStackTrace();
                     }
                 }
             };
             exec.execute(runner);
         }

         exec.shutdown();
     }

    执行结果如下:
    代码如下:

    Tue May 07 11:22:11 CST 2013 pool-1-thread-1正在执行。
    Tue May 07 11:22:11 CST 2013 pool-1-thread-2正在执行。
    Tue May 07 11:22:17 CST 2013 pool-1-thread-3正在执行。
    Tue May 07 11:22:17 CST 2013 pool-1-thread-4正在执行。
    Tue May 07 11:22:22 CST 2013 pool-1-thread-5正在执行。
    Tue May 07 11:22:22 CST 2013 pool-1-thread-6正在执行。
    Tue May 07 11:22:27 CST 2013 pool-1-thread-7正在执行。
    Tue May 07 11:22:27 CST 2013 pool-1-thread-8正在执行。
    Tue May 07 11:22:32 CST 2013 pool-1-thread-10正在执行。
    Tue May 07 11:22:32 CST 2013 pool-1-thread-9正在执行。

    可以看出,尽管线程池中创建了10个线程,但是同时运行的,只有2个线程。
    控制线程池中所有线程的执行步骤

    在前面,我们已经提到,可以用synchronized关键字来控制单个线程中的执行步骤,那么如果我们想要对线程池中的所有线程的执行步骤进行控制的话,应该如何实现呢?

    我们有两种方式,一种是使用CyclicBarrier,一种是使用CountDownLatch。

    CyclicBarrier使用了类似于Object.wait的机制,它的构造函数中需要接收一个整型数字,用来说明它需要控制的线程数目,当在线程的run方法中调用它的await方法时,它会保证所有的线程都执行到这一步,才会继续执行后面的步骤。

    示例代码如下:
    代码如下:

    CyclicBarrier示例
     class MyRunner2 implements Runnable
     {
         private CyclicBarrier barrier = null;
         public MyRunner2(CyclicBarrier barrier)
         {
             this.barrier = barrier;
         }

         public void run() {
             Random r = new Random();
             try
             {
                 for (int i = 0; i < 3; i++)
                 {
                     Thread.sleep(r.nextInt(10) * 1000);
                     System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--第" + (i + 1) + "次等待。");
                     barrier.await();
                 }
             }
             catch(Exception ex)
             {
                 ex.printStackTrace();
             }
         }

     }

     private static void cyclicBarrierTest()
     {
         CyclicBarrier barrier = new CyclicBarrier(3);

         ExecutorService exec = Executors.newFixedThreadPool(3);
         for (int i = 0; i < 3; i++)
         {
             exec.execute(new MyRunner2(barrier));
         }
         exec.shutdown();
     }

    执行结果如下:
    代码如下:

    Tue May 07 11:31:20 CST 2013--pool-1-thread-2--第1次等待。
    Tue May 07 11:31:21 CST 2013--pool-1-thread-3--第1次等待。
    Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第1次等待。
    Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第2次等待。
    Tue May 07 11:31:26 CST 2013--pool-1-thread-3--第2次等待。
    Tue May 07 11:31:30 CST 2013--pool-1-thread-2--第2次等待。
    Tue May 07 11:31:32 CST 2013--pool-1-thread-1--第3次等待。
    Tue May 07 11:31:33 CST 2013--pool-1-thread-3--第3次等待。
    Tue May 07 11:31:33 CST 2013--pool-1-thread-2--第3次等待。

    可以看出,thread-2到第1次等待点时,一直等到thread-1到达后才继续执行。

    CountDownLatch则是采取类似”倒计时计数器”的机制来控制线程池中的线程,它有CountDown和Await两个方法。示例代码如下:
    代码如下:

    CountDownLatch示例
     private static void countdownLatchTest() throws InterruptedException
     {
         final CountDownLatch begin = new CountDownLatch(1);
         final CountDownLatch end = new CountDownLatch(5);
         ExecutorService exec = Executors.newFixedThreadPool(5);
         for (int i = 0; i < 5; i++)
         {
             Runnable runner = new Runnable()
             {
                 public void run()
                 {
                     Random r = new Random();
                     try
                     {
                         begin.await();
                         System.out.println(Thread.currentThread().getName() + "运行开始");
                         Thread.sleep(r.nextInt(10)*1000);
                         System.out.println(Thread.currentThread().getName() + "运行结束");
                     }
                     catch(Exception ex)
                     {
                         ex.printStackTrace();
                     }
                     finally
                     {
                         end.countDown();
                     }
                 }
             };
             exec.execute(runner);
         }
         begin.countDown();
         end.await();
         System.out.println(Thread.currentThread().getName() + "运行结束");
         exec.shutdown();
     }

    执行结果如下:
    代码如下:

    pool-1-thread-1运行开始
    pool-1-thread-5运行开始
    pool-1-thread-2运行开始
    pool-1-thread-3运行开始
    pool-1-thread-4运行开始
    pool-1-thread-2运行结束
    pool-1-thread-1运行结束
    pool-1-thread-3运行结束
    pool-1-thread-5运行结束
    pool-1-thread-4运行结束
    main运行结束

    零七网部分新闻及文章转载自互联网,供读者交流和学习,若有涉及作者版权等问题请及时与我们联系,以便更正、删除或按规定办理。感谢所有提供资讯的网站,欢迎各类媒体与零七网进行文章共享合作。

    零七广告
    零七广告
    零七广告
    零七广告