Processing a queue must operate row-by-row. The ID field is used as a handle to identify each queue message to be processed. For debugging purposes, if a processing action fails, it is useful to keep error information in the queue table. This gives you the possibility to see what went wrong with a message and reproduce the problem.
To create a job processing output events:
1. | Define a SQL Task that starts collecting errors: |
SELECT RulesEngine.StartCatchingErrors('Yes')
|
| Define this SQL Task as a Job Task in a new job called PROCESS_QUEUE_SCHEDTOUR. |
2. | Define another SQL Task that processes the next item in the queue: |
INVOKE http.send WITH
SELECT 'http://travel.com/guideScheduler'
, queue.xml_message
FROM output_queue_schedtour queue
WHERE ID = ( select message_id from IN_PROCESS_MESSAGE )
|
| Add this SQL Task as a Job Task to the PROCESS_QUEUE_SCHEDTOUR job. |
3. | Define another SQL Task that catches any errors encountered during processing the most recent item. |
First, define an External Set named IN_PROCESS_MESSAGE that will store any caught errors:
Element Name
|
Data Type
|
MESSAGE_ID
|
NUMBER
|
ERROR_MSG
|
NCLOB
|
| Define this External Set as being the Input Parameter Set of the PROCESS_QUEUE_SCHEDTOUR job. |
UPDATE in_process_message
SET error_msg = RulesEngine.GetLastCaughtErrors()
|
| Add this SQL Task as a Job Task to the PROCESS_QUEUE_SCHEDTOUR job. |
4. | Define a SQL Task that stops collecting errors: |
SELECT RulesEngine.StopCatchingErrors()
|
| Add this SQL Task as a Job Task to the PROCESS_QUEUE_SCHEDTOUR job. |
5. | Define a SQL Task that gets the message status: |
UPDATE output_queue_schedtour
SET
(
status
, error_message
) =
(
SELECT DECODE( error_msg, null, 'DONE', 'ERROR' )
, error_msg
FROM in_process_message
)
WHERE id =
(
SELECT message_id
FROM in_process_message
)
|
| Add this SQL Task as a Job Task to the PROCESS_QUEUE_SCHEDTOUR job. |
6. | Set Commit Type and Abort Mode attributes for the individual Job Tasks as follows: |
Job Task Executes
|
Commit Type
|
Abort Mode
|
SELECT RulesEngine.StartCatchingErrors('Yes')
|
Task
|
Abort Job On Error
|
INVOKE http.send
|
Task
|
Abort Task On Error
|
UPDATE in_process_message
|
Task
|
Abort Task On Error
|
SELECT RulesEngine.StopCatchingErrors()
|
Task
|
Abort Job On Error
|
UPDATE input_queue_reservation
|
Task
|
Abort Job On Error
|
|