00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 package org.objectweb.jonas.resource;
00029
00030 import java.util.Collections;
00031 import java.util.HashMap;
00032 import java.util.LinkedList;
00033 import java.util.Map;
00034 import java.util.Vector;
00035
00036 import javax.resource.spi.work.ExecutionContext;
00037 import javax.resource.spi.work.Work;
00038 import javax.resource.spi.work.WorkCompletedException;
00039 import javax.resource.spi.work.WorkEvent;
00040 import javax.resource.spi.work.WorkException;
00041 import javax.resource.spi.work.WorkListener;
00042 import javax.resource.spi.work.WorkManager;
00043 import javax.resource.spi.work.WorkRejectedException;
00044 import javax.transaction.NotSupportedException;
00045 import javax.transaction.SystemException;
00046 import javax.transaction.xa.Xid;
00047
00048 import org.objectweb.jonas_ejb.container.TraceEjb;
00049 import org.objectweb.jotm.Current;
00050 import org.objectweb.transaction.jta.TransactionManager;
00051 import org.objectweb.util.monolog.api.BasicLevel;
00052 import org.objectweb.util.monolog.api.Logger;
00057 public class ResourceWorkManager implements WorkManager {
00058
00059 private boolean inited;
00060 private int minThreads;
00061 private int maxThreads;
00062 private long waitTimeout;
00063 private long execTimeout;
00064 private Vector vThreads;
00065
00066 private LinkedList workObjs;
00067 private int freeThreads;
00068
00069 private final static int WO_REJECTED = 1;
00070 private final static int WO_ACCEPTED = 2;
00071 private final static int WO_STARTED = 3;
00072 private final static int WO_COMPLETED = 4;
00073
00074
00075 private static Logger logger = null;
00076
00080 private TransactionManager tm = null ;
00081
00082
00083 private static Map wrkXids = Collections.synchronizedMap(new HashMap());
00084
00085 public ResourceWorkManager(TransactionManager tmgr, int minThreads, int maxThreads,
00086 int waitTimeout, int execTimeout, Logger mlogger) {
00087
00088
00089
00090
00091 inited = true;
00092 this.minThreads = minThreads;
00093 this.maxThreads = maxThreads;
00094 this.waitTimeout = waitTimeout * 1000;
00095 this.execTimeout = execTimeout * 1000;
00096 vThreads = new Vector();
00097 workObjs = new LinkedList();
00098 tm = tmgr;
00099 logger = mlogger;
00100
00101
00102 for (int i = 0; i < minThreads; i++) {
00103 WorkerPool wp = new WorkerPool();
00104 vThreads.add(wp);
00105 wp.start();
00106 }
00107
00108
00109 (new RWManagerMonitor()).start();
00110 }
00111
00112
00119 public void doWork(Work work) throws WorkException {
00120 doWork(work, INDEFINITE, null, null);
00121 }
00122
00132 public void doWork(Work work, long timeout, ExecutionContext eCtx,
00133 WorkListener wListen) throws WorkException {
00134
00135 logger.log(BasicLevel.DEBUG, "work=" + work + " timeout=" + timeout
00136 + " execCtx=" + eCtx + " wrkListen=" + wListen);
00137 int rStat = WorkEvent.WORK_COMPLETED;
00138 WorkObject wo = new WorkObject(work, timeout, eCtx, wListen, rStat);
00139 synchronized (wo.lock) {
00140 add(wo, rStat);
00141 try {
00142 wo.lock.wait();
00143 } catch (InterruptedException ie) {
00144 }
00145 }
00146 if (wo.wException != null) {
00147 logger.log(BasicLevel.ERROR, "" + wo.wException);
00148 throw wo.wException;
00149 }
00150 }
00151
00160 public long startWork(Work work) throws WorkException {
00161 return startWork(work, INDEFINITE, null, null);
00162 }
00163
00175 public long startWork(Work work, long timeout, ExecutionContext eCtx,
00176 WorkListener wListen) throws WorkException {
00177
00178 logger.log(BasicLevel.DEBUG, "work=" + work + " timeout=" + timeout
00179 + " execCtx=" + eCtx + " wrkListen=" + wListen);
00180 int rStat = WorkEvent.WORK_STARTED;
00181 WorkObject wo = new WorkObject(work, timeout, eCtx, wListen, rStat);
00182 synchronized (wo.lock) {
00183 add(wo, rStat);
00184 try {
00185 wo.lock.wait();
00186 } catch (InterruptedException ie) {
00187 }
00188 }
00189
00190 if (wo.wException != null) {
00191 throw wo.wException;
00192 }
00193 return (wo.startTime - wo.acceptTime);
00194 }
00195
00202 public void scheduleWork(Work work) throws WorkException {
00203 scheduleWork(work, INDEFINITE, null, null);
00204 }
00205
00215 public void scheduleWork(Work work, long timeout,
00216 ExecutionContext eCtx,
00217 WorkListener wListen) throws WorkException {
00218
00219 logger.log(BasicLevel.DEBUG, "work=" + work + " timeout=" + timeout
00220 + " execCtx=" + eCtx + " wrkListen=" + wListen);
00221 int rStat = WorkEvent.WORK_ACCEPTED;
00222 WorkObject wo = new WorkObject(work, timeout, eCtx, wListen, rStat);
00223 add(wo, rStat);
00224 }
00225
00230 public void setInited() {
00231 if (!inited) {
00232 synchronized (workObjs) {
00233 inited = true;
00234 workObjs.notifyAll();
00235 }
00236 }
00237 }
00238
00242 public void releasePool() {
00243 for (int i = 0; i < vThreads.size(); i++) {
00244 WorkerPool wp = (WorkerPool) vThreads.get(i);
00245 wp.shutdown = true;
00246 }
00247 synchronized (workObjs) {
00248 workObjs.notifyAll();
00249 }
00250 }
00251
00256 private void add(WorkObject wo, int rStat) throws WorkException {
00257
00258 if (!inited) {
00259 if (rStat != WorkEvent.WORK_ACCEPTED) {
00260 WorkRejectedException wex = new WorkRejectedException(
00261 "Cannot call doWork or startWork before RA is deployed",
00262 WorkException.INTERNAL);
00263 WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_REJECTED, wo.work, wex);
00264 if (wo.wListen != null) {
00265 wo.wListen.workRejected(we);
00266 }
00267 throw wex;
00268 }
00269 }
00270
00271 logger.log(BasicLevel.DEBUG, "workobj=" + wo + " rStat=" + rStat);
00272
00273
00274 synchronized (workObjs) {
00275
00276 if (wo.eCtx != null && wo.eCtx.getXid() != null) {
00277 if (wrkXids.containsKey(wo.eCtx.getXid())) {
00278 WorkRejectedException wex = new WorkRejectedException(
00279 "Xid is already being processed by another work object",
00280 WorkException.TX_CONCURRENT_WORK_DISALLOWED);
00281 WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_REJECTED, wo.work, wex);
00282 if (wo.wListen != null) {
00283 wo.wListen.workRejected(we);
00284 }
00285 logger.log(BasicLevel.ERROR, "" + wex);
00286 throw wex;
00287 }
00288 wrkXids.put(wo.eCtx.getXid(), wo.work);
00289 }
00290 workObjs.addLast(wo);
00291 wo.acceptTime = System.currentTimeMillis();
00292 if (wo.wListen != null) {
00293 WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_ACCEPTED, wo.work, null);
00294 wo.wListen.workAccepted(we);
00295 }
00296 if (freeThreads <= 0) {
00297 WorkerPool wp = new WorkerPool();
00298 vThreads.add(wp);
00299 wp.start();
00300 }
00301 workObjs.notify();
00302 }
00303 }
00304
00305
00306
00307
00308
00309 private class WorkObject {
00310
00311 Work work;
00312 ExecutionContext eCtx;
00313 WorkListener wListen;
00314 WorkException wException;
00315
00316
00317 long acceptTime;
00318 long executionTimeout;
00319 long startTime;
00320 long timeout;
00321
00322
00323 Object lock;
00324 int rState;
00325 int cState;
00326 boolean rejected;
00327
00328 public WorkObject(Work work, long timeout,
00329 ExecutionContext eCtx,
00330 WorkListener wListen,
00331 int rState) {
00332
00333 this.eCtx = eCtx;
00334 this.work = work;
00335 wException = null;
00336 this.wListen = wListen;
00337
00338 executionTimeout = 0;
00339 acceptTime = 0;
00340 startTime = 0;
00341 long curTime = System.currentTimeMillis();
00342 if (timeout == INDEFINITE) {
00343 this.timeout = timeout;
00344 } else {
00345 this.timeout = curTime + timeout;
00346 }
00347 if (execTimeout > 0) {
00348 executionTimeout = curTime + execTimeout;
00349 }
00350
00351 lock = new Object();
00352 this.rState = rState;
00353 cState = -1;
00354 rejected = false;
00355 }
00356 }
00357
00358
00359
00360 private class WorkerPool extends Thread {
00361 WorkObject wo = null;
00362 boolean shutdown = false;
00363
00364 public WorkerPool() {
00365 super();
00366 setDaemon(true);
00367 logger.log(BasicLevel.DEBUG, getName() + " constructor");
00368 }
00369
00370 public void run() {
00371
00372 while (true) {
00373 synchronized (workObjs) {
00374 boolean waited = false;
00375 while (workObjs.isEmpty()) {
00376
00377
00378
00379 if ((waited && freeThreads >= minThreads) || shutdown) {
00380 logger.log(BasicLevel.DEBUG, getName() + " ending");
00381 return;
00382 }
00383 try {
00384 freeThreads++;
00385 workObjs.wait(waitTimeout);
00386 freeThreads--;
00387 waited = true;
00388 } catch (InterruptedException ignored) {
00389 freeThreads--;
00390 return;
00391 }
00392 }
00393 if (inited) {
00394 wo = (WorkObject) workObjs.removeFirst();
00395 }
00396 }
00397
00398 if (inited && wo != null) {
00399 try {
00400 checkTime();
00401 if (wo.rState == WorkEvent.WORK_STARTED) {
00402 logger.log(BasicLevel.DEBUG, "Start notification for " + wo);
00403 synchronized (wo.lock) {
00404 wo.lock.notify();
00405 }
00406 }
00407 if (wo.wListen != null) {
00408 WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_STARTED,
00409 wo.work, null, wo.startTime);
00410 wo.wListen.workStarted(we);
00411 }
00412
00413 if (wo.eCtx != null) {
00414 long timeout = wo.eCtx.getTransactionTimeout();
00415 Xid xid = wo.eCtx.getXid();
00416 if (xid != null) {
00417 try {
00418 if (timeout != WorkManager.UNKNOWN) {
00419 ((Current) tm).begin(xid, timeout);
00420 } else {
00421 ((Current) tm).begin(xid);
00422 }
00423 } catch (NotSupportedException e) {
00424 logger.log(BasicLevel.ERROR, "cannot start a transaction: NotSupportedException");
00425 WorkException wex = new WorkException(
00426 "Error starting a new transaction", e);
00427 throw wex;
00428 } catch (SystemException e) {
00429 logger.log(BasicLevel.ERROR, "cannot start a transaction:\n", e);
00430 WorkException wex = new WorkException(
00431 "Error starting a new transaction", e);
00432 throw wex;
00433 }
00434 }
00435 }
00436 logger.log(BasicLevel.DEBUG, "Thread: " + getName() + " Running " + wo.work);
00437 wo.work.run();
00438 logger.log(BasicLevel.DEBUG, "Completed " + wo.work);
00439 wo.wException = null;
00440 } catch (WorkException wex) {
00441 wo.wException = wex;
00442 } catch (Throwable th) {
00443
00444 WorkCompletedException wex = new WorkCompletedException(
00445 "Error from work object", th);
00446 wo.wException = wex;
00447 logger.log(BasicLevel.ERROR, "" + wex);
00448 } finally {
00449 if (wo.eCtx != null && wo.eCtx.getXid() != null) {
00450 wrkXids.remove(wo.eCtx.getXid());
00451 }
00452 if (wo.wListen != null && !wo.rejected) {
00453 WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_COMPLETED,
00454 wo.work, wo.wException, wo.startTime);
00455 wo.wListen.workCompleted(we);
00456 }
00457 if (wo.rState == WorkEvent.WORK_COMPLETED) {
00458 synchronized (wo.lock) {
00459 wo.lock.notify();
00460 }
00461 }
00462
00463
00464 ((Current) tm).clearThreadTx();
00465 wo.work = null;
00466 }
00467 }
00468 }
00469 }
00470
00471 private void checkTime() throws WorkException{
00472
00473 long curTime = System.currentTimeMillis();
00474
00475 if (curTime > wo.timeout) {
00476 WorkException wex = new WorkException(
00477 "Error starting work in specified time", WorkException.START_TIMED_OUT);
00478 if (wo.wListen != null) {
00479 WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_REJECTED,
00480 wo.work, wex);
00481 wo.wListen.workRejected(we);
00482 }
00483 wo.rejected = true;
00484 logger.log(BasicLevel.ERROR, "" + wex);
00485 throw wex;
00486 }
00487 wo.startTime = curTime;
00488 }
00489 }
00490
00491
00492
00493 private class RWManagerMonitor extends Thread {
00494
00495 public RWManagerMonitor() {
00496 super("RWManagerMonitor");
00497 setDaemon(true);
00498 }
00499
00500 public void run() {
00501 int cnt = 0;
00502 long curTime;
00503 while (true) {
00504 try {
00505 sleep(100);
00506 cnt++;
00507 if (!workObjs.isEmpty()) {
00508 for (int i = 0; i < workObjs.size(); i++) {
00509 WorkObject wo = (WorkObject) workObjs.get(i);
00510 curTime = System.currentTimeMillis();
00511 if (curTime > wo.timeout) {
00512 synchronized (workObjs) {
00513 if (workObjs.remove(wo)) {
00514 WorkException wex = new WorkException(
00515 "Error starting work in specified time",
00516 WorkException.START_TIMED_OUT);
00517 if (wo.wListen != null) {
00518 WorkEvent we = new WorkEvent(wo.work, WorkEvent.WORK_REJECTED,
00519 wo.work, wex);
00520 wo.wListen.workRejected(we);
00521 }
00522 wo.wException = wex;
00523 throw wex;
00524 }
00525 }
00526 }
00527 }
00528 }
00529
00530
00531 if (cnt >= 10) {
00532 cnt = 0;
00533 curTime = System.currentTimeMillis();
00534 for (int i = 0; i < vThreads.size(); i++) {
00535 WorkerPool wp = (WorkerPool) vThreads.get(i);
00536 if (!wp.isAlive() && !wp.shutdown) {
00537 vThreads.remove(i);
00538 if (vThreads.size() < minThreads) {
00539 WorkerPool nwp = new WorkerPool();
00540 vThreads.add(nwp);
00541 nwp.start();
00542 }
00543 } else if (wp.wo != null && wp.wo.executionTimeout > 0
00544 && curTime > wp.wo.executionTimeout) {
00545 if (wp.wo.work != null) {
00546 wp.wo.work.release();
00547 }
00548 }
00549 }
00550 }
00551 } catch (Exception ex) {
00552 logger.log(BasicLevel.ERROR, ex.toString());
00553 }
00554 }
00555 }
00556 }
00557
00558 }