ResourceWorkManager.java

00001 /*
00002  * JOnAS: Java(TM) Open Application Server
00003  * Copyright (C) 1999 Bull S.A.
00004  * Contact: jonas-team@objectweb.org
00005  *
00006  * This library is free software; you can redistribute it and/or
00007  * modify it under the terms of the GNU Lesser General Public
00008  * License as published by the Free Software Foundation; either
00009  * version 2.1 of the License, or any later version.
00010  *
00011  * This library is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00014  * Lesser General Public License for more details.
00015  *
00016  * You should have received a copy of the GNU Lesser General Public
00017  * License along with this library; if not, write to the Free Software
00018  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
00019  * USA
00020  *
00021  * Initial developer(s): Eric Hardesty
00022  * Contributor(s): ______________________________________.
00023  *
00024  * --------------------------------------------------------------------------
00025  * $Id: ResourceWorkManager.java,v 1.10 2005/02/13 23:51:02 ehardesty Exp $
00026  * --------------------------------------------------------------------------
00027  */
00028 package org.objectweb.jonas.resource;
00029 
00030 import java.util.Collections;
00031 import java.util.HashMap;
00032 import java.util.LinkedList;
00033 import java.util.Map;
00034 import java.util.Vector;
00035 
00036 import javax.resource.spi.work.ExecutionContext;
00037 import javax.resource.spi.work.Work;
00038 import javax.resource.spi.work.WorkCompletedException;
00039 import javax.resource.spi.work.WorkEvent;
00040 import javax.resource.spi.work.WorkException;
00041 import javax.resource.spi.work.WorkListener;
00042 import javax.resource.spi.work.WorkManager;
00043 import javax.resource.spi.work.WorkRejectedException;
00044 import javax.transaction.NotSupportedException;
00045 import javax.transaction.SystemException;
00046 import javax.transaction.xa.Xid;
00047 
00048 import org.objectweb.jonas_ejb.container.TraceEjb;
00049 import org.objectweb.jotm.Current;
00050 import org.objectweb.transaction.jta.TransactionManager;
00051 import org.objectweb.util.monolog.api.BasicLevel;
00052 import org.objectweb.util.monolog.api.Logger;
00057 public class ResourceWorkManager implements WorkManager {
00058 
00059     private boolean      inited;
00060     private int          minThreads;
00061     private int          maxThreads;
00062     private long         waitTimeout;
00063     private long         execTimeout;
00064     private Vector       vThreads;
00065 
00066     private LinkedList   workObjs;
00067     private int freeThreads;  // free threads ready to work
00068 
00069     private final static int WO_REJECTED = 1;
00070     private final static int WO_ACCEPTED = 2;
00071     private final static int WO_STARTED = 3;
00072     private final static int WO_COMPLETED = 4;
00073 
00074     // Loggers used in the ResourceService
00075     private static Logger logger = null;
00076 
00080     private TransactionManager tm = null ;
00081 
00082     // Static hashtable: Xid ---> work Object
00083     private static Map wrkXids = Collections.synchronizedMap(new HashMap());
00084 
00085     public ResourceWorkManager(TransactionManager tmgr, int minThreads, int maxThreads, 
00086                                 int waitTimeout, int execTimeout, Logger mlogger) {
00087         // This was set to false to only allow Work objects once the RAR has been deployed, but
00088         // I think that is too restrictive, so I'm setting it to true which allows things to start
00089         // up right away.  I'm not sure if this is correct, but necessary until the spec is fixed
00090         // to allow for a ResourceAdapter prepare and then start later.
00091         inited      = true;
00092         this.minThreads  = minThreads;
00093         this.maxThreads  = maxThreads;
00094         this.waitTimeout = waitTimeout * 1000;
00095         this.execTimeout = execTimeout * 1000;
00096         vThreads    = new Vector();
00097         workObjs    = new LinkedList();
00098         tm          = tmgr;
00099         logger      = mlogger;
00100 
00101         // Start the specified number of threads
00102         for (int i = 0; i < minThreads; i++) {
00103             WorkerPool wp = new WorkerPool();
00104             vThreads.add(wp);
00105             wp.start();
00106         }
00107 
00108         // Need the Monitor thread to check on timeouts
00109         (new RWManagerMonitor()).start();
00110     }
00111 
00112 
00119     public void doWork(Work work) throws WorkException {
00120         doWork(work, INDEFINITE, null, null);
00121     }
00122 
00132     public void doWork(Work work, long timeout, ExecutionContext eCtx,
00133                         WorkListener wListen) throws WorkException {
00134 
00135         logger.log(BasicLevel.DEBUG, "work=" + work + " timeout=" + timeout
00136                    + " execCtx=" + eCtx + " wrkListen=" + wListen);
00137         int rStat = WorkEvent.WORK_COMPLETED;
00138         WorkObject wo = new WorkObject(work, timeout, eCtx, wListen, rStat);
00139         synchronized (wo.lock) {
00140             add(wo, rStat);
00141             try {
00142                 wo.lock.wait();
00143             } catch (InterruptedException ie) {
00144             }
00145         }
00146         if (wo.wException != null) {
00147             logger.log(BasicLevel.ERROR, "" + wo.wException);
00148             throw wo.wException;
00149         }
00150     }
00151 
00160     public long startWork(Work work) throws WorkException {
00161         return startWork(work, INDEFINITE, null, null);
00162     }
00163 
00175     public long startWork(Work work, long timeout, ExecutionContext eCtx,
00176                            WorkListener wListen) throws WorkException {
00177 
00178         logger.log(BasicLevel.DEBUG, "work=" + work + " timeout=" + timeout
00179                    + " execCtx=" + eCtx + " wrkListen=" + wListen);
00180         int rStat = WorkEvent.WORK_STARTED;
00181         WorkObject wo = new WorkObject(work, timeout, eCtx, wListen, rStat);
00182         synchronized (wo.lock) {
00183             add(wo, rStat);
00184             try {
00185                 wo.lock.wait();
00186             } catch (InterruptedException ie) {
00187             }
00188         }
00189 
00190         if (wo.wException != null) {
00191             throw wo.wException;
00192         }
00193         return (wo.startTime - wo.acceptTime);
00194     }
00195 
00202     public void scheduleWork(Work work) throws WorkException {
00203         scheduleWork(work, INDEFINITE, null, null);
00204     }
00205 
00215     public void scheduleWork(Work work, long timeout, 
00216                               ExecutionContext eCtx,
00217                               WorkListener wListen) throws WorkException {
00218 
00219         logger.log(BasicLevel.DEBUG, "work=" + work + " timeout=" + timeout
00220                    + " execCtx=" + eCtx + " wrkListen=" + wListen);
00221         int rStat = WorkEvent.WORK_ACCEPTED;
00222         WorkObject wo = new WorkObject(work, timeout, eCtx, wListen, rStat);
00223         add(wo, rStat);
00224     }
00225 
00230     public void setInited() {
00231         if (!inited) {
00232             synchronized (workObjs) {
00233                 inited = true;
00234                 workObjs.notifyAll();
00235             }
00236         }
00237     }
00238 
00242     public void releasePool() {
00243         for (int i = 0; i < vThreads.size(); i++) {
00244             WorkerPool wp = (WorkerPool) vThreads.get(i);
00245             wp.shutdown = true;
00246         }
00247         synchronized (workObjs) {
00248             workObjs.notifyAll();
00249         }
00250     }
00251 
00256     private void add(WorkObject wo, int rStat) throws WorkException {
00257         // Only accept scheduleWork objects before deployment is complete
00258         if (!inited) {
00259             if (rStat != WorkEvent.WORK_ACCEPTED) {
00260                 WorkRejectedException wex = new WorkRejectedException(
00261                       "Cannot call doWork or startWork before RA is deployed",
00262                       WorkException.INTERNAL);
00263                 WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_REJECTED, wo.work, wex);
00264                 if (wo.wListen != null) {
00265                     wo.wListen.workRejected(we);
00266                 }
00267                 throw wex;
00268             }
00269         }
00270 
00271         logger.log(BasicLevel.DEBUG, "workobj=" + wo + " rStat=" + rStat);
00272 
00273         // Add last to list, set accepted time and send the event if necessary
00274         synchronized (workObjs) {
00275             // If an EnvironmentContext and an Xid exists, then check Xid/Work hash
00276             if (wo.eCtx != null && wo.eCtx.getXid() != null) {
00277                 if (wrkXids.containsKey(wo.eCtx.getXid())) {
00278                     WorkRejectedException wex = new WorkRejectedException(
00279                           "Xid is already being processed by another work object",
00280                           WorkException.TX_CONCURRENT_WORK_DISALLOWED);
00281                     WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_REJECTED, wo.work, wex);
00282                     if (wo.wListen != null) {
00283                         wo.wListen.workRejected(we);
00284                     }
00285                     logger.log(BasicLevel.ERROR, "" + wex);
00286                     throw wex;
00287                 }
00288                 wrkXids.put(wo.eCtx.getXid(), wo.work);
00289             }
00290             workObjs.addLast(wo);
00291             wo.acceptTime = System.currentTimeMillis();
00292             if (wo.wListen != null) {
00293                 WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_ACCEPTED, wo.work, null);
00294                 wo.wListen.workAccepted(we);
00295             }
00296             if (freeThreads <= 0) {
00297                 WorkerPool wp = new WorkerPool();
00298                 vThreads.add(wp);
00299                 wp.start();
00300             }
00301             workObjs.notify();
00302         }
00303     }
00304 
00305 
00306     // Inner classes for ResourceWorkManager
00307     /* Wrapper class for the Work object, keeps state and time information
00308      */
00309     private class WorkObject {
00310         // Work object information
00311         Work work;
00312         ExecutionContext eCtx;
00313         WorkListener wListen;
00314         WorkException wException;
00315 
00316         // Time values
00317         long acceptTime;
00318         long executionTimeout;
00319         long startTime;
00320         long timeout;
00321 
00322         // State information
00323         Object lock;
00324         int rState;
00325         int cState;
00326         boolean rejected;
00327         
00328         public WorkObject(Work work, long timeout,
00329                            ExecutionContext eCtx,
00330                            WorkListener wListen,
00331                            int rState) {
00332 
00333             this.eCtx = eCtx;
00334             this.work = work;
00335             wException = null;
00336             this.wListen = wListen;
00337             
00338             executionTimeout = 0;
00339             acceptTime = 0;
00340             startTime = 0;
00341             long curTime = System.currentTimeMillis();
00342             if (timeout == INDEFINITE) {
00343                 this.timeout = timeout;
00344             } else {
00345                 this.timeout = curTime + timeout;
00346             }
00347             if (execTimeout > 0) {
00348                 executionTimeout = curTime + execTimeout;
00349             }
00350 
00351             lock = new Object();
00352             this.rState = rState;
00353             cState = -1;
00354             rejected = false;
00355         }
00356     }
00357 
00358 
00359     // Thread pool runs the work objects
00360     private class WorkerPool extends Thread {
00361         WorkObject wo = null;
00362         boolean shutdown = false;
00363 
00364         public WorkerPool() {
00365             super();
00366             setDaemon(true);
00367             logger.log(BasicLevel.DEBUG, getName() + " constructor");
00368         }
00369 
00370         public void run() {
00371 
00372             while (true) {
00373                 synchronized (workObjs) {
00374                     boolean waited = false;
00375                     while (workObjs.isEmpty()) {
00376                         // If a thread has waited and not gotten any work then there 
00377                         // should only be minThreads waiting, so if freeThreads is equal
00378                         // to minThreads then this thread should be stopped
00379                         if ((waited && freeThreads >= minThreads) || shutdown) {
00380                             logger.log(BasicLevel.DEBUG, getName() + " ending");
00381                             return;
00382                         }
00383                         try {
00384                             freeThreads++;
00385                             workObjs.wait(waitTimeout);
00386                             freeThreads--;
00387                             waited = true;
00388                         } catch (InterruptedException ignored) {
00389                             freeThreads--;
00390                             return;
00391                         }
00392                     }
00393                     if (inited) {
00394                         wo = (WorkObject) workObjs.removeFirst();
00395                     }
00396                 }
00397 
00398                 if (inited && wo != null) {
00399                     try {
00400                         checkTime();
00401                         if (wo.rState == WorkEvent.WORK_STARTED) {
00402                             logger.log(BasicLevel.DEBUG, "Start notification for " + wo);
00403                             synchronized (wo.lock) {
00404                                 wo.lock.notify();  // If waiting for execution start
00405                             }
00406                         }
00407                         if (wo.wListen != null) {
00408                             WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_STARTED,
00409                                                          wo.work, null, wo.startTime);
00410                             wo.wListen.workStarted(we);
00411                         }
00412                         // Need to setup up ExecutionContext if necessary
00413                         if (wo.eCtx != null) {
00414                              long timeout = wo.eCtx.getTransactionTimeout();
00415                              Xid xid = wo.eCtx.getXid();
00416                              if (xid != null) {
00417                                  try {
00418                                      if (timeout != WorkManager.UNKNOWN) {
00419                                          ((Current) tm).begin(xid, timeout);
00420                                      } else {
00421                                          ((Current) tm).begin(xid);
00422                                      }
00423                                  } catch (NotSupportedException e) {
00424                                      logger.log(BasicLevel.ERROR, "cannot start a transaction: NotSupportedException");
00425                                      WorkException wex = new WorkException(
00426                                            "Error starting a new transaction", e);
00427                                      throw wex;
00428                                  } catch (SystemException e) {
00429                                      logger.log(BasicLevel.ERROR, "cannot start a transaction:\n", e);
00430                                      WorkException wex = new WorkException(
00431                                            "Error starting a new transaction", e);
00432                                      throw wex;
00433                                  }
00434                              }
00435                         }
00436                         logger.log(BasicLevel.DEBUG, "Thread: " + getName() + " Running " + wo.work);
00437                         wo.work.run();
00438                         logger.log(BasicLevel.DEBUG, "Completed " + wo.work);
00439                         wo.wException = null;
00440                     } catch (WorkException wex) {
00441                         wo.wException = wex;
00442                     } catch (Throwable th) {
00443                         // Need to report issue and wrap with WorkException
00444                         WorkCompletedException wex = new WorkCompletedException(
00445                               "Error from work object", th);
00446                         wo.wException = wex;
00447                         logger.log(BasicLevel.ERROR, "" + wex);
00448                     } finally {
00449                         if (wo.eCtx != null && wo.eCtx.getXid() != null) {
00450                             wrkXids.remove(wo.eCtx.getXid());
00451                         }
00452                         if (wo.wListen != null && !wo.rejected) {
00453                             WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_COMPLETED,
00454                                                          wo.work, wo.wException, wo.startTime);
00455                             wo.wListen.workCompleted(we);
00456                         }
00457                         if (wo.rState == WorkEvent.WORK_COMPLETED) {
00458                             synchronized (wo.lock) {
00459                                 wo.lock.notify();  // If waiting for execution completion
00460                             }
00461                         }
00462                         // Make sure that our ThreadLocal storage from JOTM is cleared
00463                         // Another thread could have completed the transaction for us.
00464                         ((Current) tm).clearThreadTx();
00465                         wo.work = null;
00466                     }
00467                 }
00468             }
00469         }
00470 
00471         private void checkTime() throws WorkException{
00472 
00473             long curTime = System.currentTimeMillis();
00474 
00475             if (curTime > wo.timeout) {
00476                 WorkException wex = new WorkException(
00477                       "Error starting work in specified time", WorkException.START_TIMED_OUT);
00478                 if (wo.wListen != null) {
00479                     WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_REJECTED,
00480                                                  wo.work, wex);
00481                     wo.wListen.workRejected(we);
00482                 }
00483                 wo.rejected = true;
00484                 logger.log(BasicLevel.ERROR, "" + wex);
00485                 throw wex;
00486             }
00487             wo.startTime = curTime;
00488         }
00489     }
00490 
00491 
00492     // ResourceWorkManager Monitor thread to check for timeouts of work objects
00493     private class RWManagerMonitor extends Thread {
00494 
00495         public RWManagerMonitor() {
00496             super("RWManagerMonitor");
00497             setDaemon(true);
00498         }
00499 
00500         public void run() {
00501             int  cnt = 0;
00502             long curTime;
00503             while (true) {
00504                 try {
00505                     sleep(100); // 100 ms
00506                     cnt++;
00507                     if (!workObjs.isEmpty()) {
00508                         for (int i = 0; i < workObjs.size(); i++) {
00509                             WorkObject wo = (WorkObject) workObjs.get(i);
00510                             curTime = System.currentTimeMillis();
00511                             if (curTime > wo.timeout) {
00512                                 synchronized (workObjs) {
00513                                     if (workObjs.remove(wo)) {
00514                                         WorkException wex = new WorkException(
00515                                               "Error starting work in specified time",
00516                                               WorkException.START_TIMED_OUT);
00517                                         if (wo.wListen != null) {
00518                                             WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_REJECTED,
00519                                                                          wo.work, wex);
00520                                             wo.wListen.workRejected(we);
00521                                         }
00522                                         wo.wException = wex;
00523                                         throw wex;
00524                                     }
00525                                 }
00526                             }
00527                         }
00528                     }
00529 
00530                     // Check threads for execution timeout or thread issue every second
00531                     if (cnt >= 10) {
00532                         cnt = 0;
00533                         curTime = System.currentTimeMillis();
00534                         for (int i = 0; i < vThreads.size(); i++) {
00535                             WorkerPool wp = (WorkerPool) vThreads.get(i);
00536                             if (!wp.isAlive() && !wp.shutdown) {
00537                                 vThreads.remove(i);
00538                                 if (vThreads.size() < minThreads) {
00539                                     WorkerPool nwp = new WorkerPool();
00540                                     vThreads.add(nwp);
00541                                     nwp.start();
00542                                 }
00543                             } else if (wp.wo != null && wp.wo.executionTimeout > 0
00544                                        && curTime > wp.wo.executionTimeout) {
00545                                 if (wp.wo.work != null) {
00546                                     wp.wo.work.release();
00547                                 }
00548                             }
00549                         }
00550                     }
00551                 } catch (Exception ex) {
00552                     logger.log(BasicLevel.ERROR, ex.toString());
00553                 }
00554             }
00555         }
00556     }
00557 
00558 }

Generated on Tue Feb 15 15:05:21 2005 for JOnAS by  doxygen 1.3.9.1