当前位置:首页> PHP教程> PHP精通
关键字
文章内容
Informix Dynamic Server 中的分布式事务
 
 
修改时间:[2008/08/14 12:29]    阅读次数:[904]    发表者:[起缘]
 

Uwe Weber
Informix 和 DB2 UDB 的 IT 专家, IBM Germany
2005 年 6 月 30 日

如果您正在使用 Java™ 开发数据库应用程序,并在寻找用于实现跨数据库的分布式事务解决方案,那么您应该评估一下 Java Transaction API(JTA)。本文将向您介绍分布式事务,以及如何在 Java 中使用 JTA 处理它们 —— 具体来说是在使用 DB2® UDB 或 Informix® Dynamic Server 的时候。
简介
在现代企业环境中,用多个数据库和多种品牌的数据库来存储公司数据已经不足为奇。最终,这些数据将会在不同数据库外进行比较、合并。

如果您有一个异构的数据库环境,并且计划将不同数据库中的数据收集到一个单独的应用程序中,那么您就应该可以使用传统技术执行该任务。在使用 Java 时,您将通过 JDBC 处理所有的数据库操作。清单 1 展示了在 Java 应用程序中如何连接 DB2 UDB 和 IDS 的代码片断。

清单 1. 使用 JDBC 建立到不同数据库的连接

            1 try { // load JDBC drivers
            2     Class.forName (JDBC_DRIVER_DB2);
            3     Class.forName (JDBC_DRIVER_IDS);
            4 }
            5 catch (Exception e) {
            6     // error handling
            7 }
            8
            9 try { // establish connection and proceed with operation
            10     con_db2 = DriverManager.getConnection (DBURL_DB2);
            11     con_ids = Drivermanager.getConnection (DBURL_IDS);
            12
            13     Statement stmt_db2 = con_db2.createStatement ();
            14     Statement stmt_ids = con_ids.createStatement ();
            15
            16     ResultSet rs_db2 = stmt_db2.executeQuery (SQL);
            17     ResultSet rs_ids = stmt_ids.executeQuery (SQL);
            18
            19     // do something very important with the result sets...
            20 }
            21 catch (SQLException e) {
            22     // error handling
            23 }
            

两阶段提交协议简介
清单 1 中的演示允许您修改不同数据库中的数据。代替执行查询,它可以使用 JDBC 方法 executeUpdate() 执行数据修改。

但是如果您需要在单个事务中封装到 DB2 和 IDS 表的新一行的 insert,要做什么呢?
意思就是说,如果其中一条 insert 语句失败了,就应该将数据库(这里:两种数据库!)的初始状态恢复为客户机未执行任何动作的状态。该行为可以通过使用两阶段提交(Two-Phase-Commit)协议完成。这一标准化协议描述了如何实现分布式事务(XA)或分布式工作单元(Distributed Unit of Work,DUOW)的技术,以达到跨数据库系统的一致状态(根据 ACID)。

常规事务(单阶段提交)中,由 COMMIT 或 ROLLBACK 所执行的事务终止是一种决定性的操作,与之相反,两阶段提交(Two-Phase-Commit)事务是分为两步(阶段)进行的。

首先,两阶段提交(Two-Phase-Commit)事务的启动与常规的单阶段提交(One-Phase-Commit)事务类似。接着,应用程序/客户机对该两阶段提交(Two-Phase-Commit)操作中所涉及的所有数据库执行其修改工作。现在,在最终提交该事务之前,客户机通知参与的数据库准备提交(第 1 阶段)。如果客户机从数据库收到一条“okay”,就发出命令向数据库提交该事务(第 2 阶段)。最后分布式事务(Distributed Transaction)结束。

两阶段提交(Two-Phase-Commit)中的第 1 阶段十分重要。通过首先询问数据库是否可以进行提交,一旦某一参与的数据库报告错误,就有机会立即中止整个事务。因而,第 2 阶段将由 ROLLBACK,而非 COMMIT 完成。

图 1 提供了对于两阶段提交(Two-Phase-Commit)协议如何工作的图形化印象。正如所演示的,分布式事务(Distributed Transaction)使用由元组表示的描述符(例如:[x,b1])。其意思是,一个分布式事务(Distributed Transaction)包含两个元素。首先,有一个惟一全局事务 ID(global transaction id) —— 代表分布式事务(Distributed Transaction)的简单标识符 - 由 x 表示,第二个是分支 ID(branch id),它描述整个事务的一部分。一般,分支指的是一个数据库连接。如果您有一个将处理两个参与数据库的分布式事务(Distributed Transaction),您就可以用诸如 [100,1] 的描述符表示一个数据库,用诸如 [100,2] 的描述符表示另一数据库。因此本例中,就有一个编号为 100 的全局事务,其中包含两个 ID 分别为 1 和 2 的分支。

“但是”,您或许会问,“如果在两阶段提交(Two-Phase-Commit)协议的第 2 阶段中出现错误,又将发生什么事情呢?”
“的确,您将陷入麻烦中!”
实际上,稍后我们将会讨论该主题。

图 1. 两阶段提交中的时间线和应用程序流


请看 清单 2。在第 16-19 行代码中,您可能错觉地认为第 17 和 18 行的语句都是属于由 con_db2.setAutoCommit(false)(第 16 行)所定义的事务边界的一部分。而事实却是该行代码启动了一个显式事务,用于连接到由 con_db2.commit()(第 19 行)所提交的 DB2 数据库。第 18 行中所做的修改不受该事务的影响。本例没有使用两阶段提交(Two-Phase-Commit)协议,因此,它不是一个分布式事务(Distributed Transaction)。无论是到 DB2 数据库的连接,还是到 Informix Dynamic Server(IDS)的连接,它们都没有意识到彼此的存在。

清单 2. 非“两阶段提交”的应用程序

            1 try {
            2     Class.forName (JDBC_DRIVER_DB2);
            3     Class.forName (JDBC_DRIVER_IDS);
            4 }
            5 catch (Exception e) {
            6     // error handling
            7 }
            8
            9 try {
            10     con_db2 = DriverManager.getConnection (DBURL_DB2);
            11     con_ids = Drivermanager.getConnection (DBURL_IDS);
            12
            13     Statement stmt_db2 = con_db2.createStatement ();
            14     Statement stmt_ids = con_ids.createStatement ();
            15
            16     con_db2.setAutoCommit (false);
            17     stmt_db2.executeUpdate (SQL);
            18     stmt_ids.executeUpdate (SQL);
            19     con_db2.commit ();
            20
            21     // further processing
            22 }
            23 catch (SQLException e) {
            24    // error handling
            25 }
            

JTA 和事务管理器(TM)
Java Transaction API 允许您操作应用程序中的分布式事务(Distributed Transaction)。JTA 中有一组方法,它将传统的 JDBC 调用封装到了两阶段提交(Two-Phase-Commit)协议中。

在异构环境中,您通常会发现一个事务管理器(Transaction Manager),负责处理分布式事务。(实际上,事务管理器可以完成大量的工作负载平衡。)因此,不仅存在到数据库的直接连接,还有到事务管理器(Transaction Manager)的连接。这就是 JTA 发挥作用的地方:JTA 是 Java 应用程序和事务管理器(Transaction Manager)之间的接口。图 2 演示了一个包含分布式事务的典型环境。

由于存在事务管理器(Transaction Manager),它通常包含在应用程序服务器(Application Server)中,就不再有两层(Two-Tier)架构。传统的客户/服务器(Client/Server)架构已经由三层(Tree-Tier)架构所取代,三层架构包含应用程序/客户机、事务管理器(Transaction Manager)/应用程序服务器(Application Server)和数据库服务器,而数据库服务器一般称作 XA Resource。

图 2. 三层架构


 

包含 SQL 和 JTA 调用的 Java 应用程序。
管理分布式事务的应用程序服务器(Application Server)。
参与分布式事务的数据库。
Java 应用程序向应用程序服务器(Application Server)提交常规 SQL 语句和通用的 XA 调用。
应用程序所发送的消息由应用程序服务器(Application Server)进行处理,并使用 SQL 和数据库供应商特定的 XA 调用发送给数据库。
 

通常,应用程序服务器(Application Server)提供了应用程序可以使用的多种服务。在谈到分布式事务时,该服务就称作 XA Resource。当然,在应用程序可以使用 XA Resource 之前,首先要在应用程序服务器中注册和配置 XA Resource。

现在,如果您计划在应用程序中使用 JTA,就必须修改代码,以便还可以与应用程序服务器(Application Server)进行通信。这包括一些附加的方法调用和指定的错误/异常处理。请参阅 清单 3,以了解如何工作。

用 JTA 进行两阶段提交的必要条件
首先,在编写 JTA 应用程序时,您需要合适的 JDK。好消息就是在使用当前的 JDK 时,不需要任何附加包。(JTA 包也可以在 Sun Developer Network 上找到。)大多数的 JTA 相关类都在 javax.transaction 和 javax.transaction.xa 中。

您需要用于 DB2 UDB 和 Informix Dynamic Server 的 JDBC 驱动程序。您将需要 Type 4 JDBC 用于 Informix Dynamic Server。DB2 要求您来选择需要哪个 JDBC 驱动程序。有 Type 2、3 和 4 JDBC。在用 JTA 进行编程时,您必须使用 Type 2 或 4 JDBC 驱动程序。为了方便,本文中所演示的所有例子都使用 Type 4 JDBC 驱动程序用于 DB2。(关于各驱动程序之间差别的解释,请查阅手册。)

以上描述说明了应用程序服务器(Application Server)或事务管理器(Transaction Manager)的存在。在下面的例子中,您不会看到“外部”应用程序服务器(Application Server),因为已经使用 DB2XADataSource 和 IfxXADataSource 类直接将之构建到您的应用程序中了。如果您使用一个真正的应用程序服务器(Application Server),那么该应用程序服务器将使用这些类来连接到数据库的本地 XA 调用。

下面的例子(清单 3)演示了一个小型应用程序,该应用程序使用 JTA 实现两阶段提交(Two-Phase-Commit)协议。该例子并不完整,是为了让代码更加易读。请参阅 下载 部分,以获得完整代码。

清单 3. 两阶段提交的应用程序

            19 import java.io.BufferedReader;
            20 import java.io.FileInputStream;
            21 import java.io.IOException;
            22 import java.io.InputStreamReader;
            23
            24 import java.sql.Connection;
            25 import java.sql.SQLException;
            26 import java.sql.Statement;
            27
            28 import java.util.Properties;
            29
            30 import javax.sql.XAConnection;
            31 import javax.transaction.xa.XAException;
            32 import javax.transaction.xa.XAResource;
            33 import javax.transaction.xa.Xid;
            34
            35 import com.ibm.db2.jcc.DB2XADataSource;
            36 import com.ibm.db2.jcc.DB2Xid;
            37
            38 import com.informix.jdbcx.IfxXADataSource;
            39 import com.informix.jdbcx.IfxXid;
            

在第 19-39 行中,您看到了该应用程序中所使用的所有类。大多数类是您所知道的。第 30-33 行中导入的类是使用 JTA 所必要的。同样有意思的是第 35、36 和 38、39 行中的数据库供应商的特定类。xyzXADataSource 类包含了用于启用两阶段提交协议的本地 XA 代码。


            44 class DBX {
            45
            46     private Properties props;
            47     private String propertyfile = "jtadb2ifmx.properties";
            48
            56     DBX () {
            57
            58         Connection      db2con = null;
            59         Connection      ifxcon = null;
            60         DB2XADataSource db2ds = null;
            61         IfxXADataSource ifxds = null;
            62         Xid             db2xid = null;
            63         Xid             ifxxid = null;
            64         XAConnection    db2xacon = null;
            65         XAConnection    ifxxacon = null;
            66         XAResource      db2xares = null;
            67         XAResource      ifxxares = null;
            68
            69
            70         // read the properties
            71         props = new Properties ();
            72
            73         try {
            74             props.load (new FileInputStream (propertyfile));
            75         }
            76         catch (IOException io) {
            77             System.err.println ("Error while accessing the properties file (" +
            78                                 propertyfile + "). Abort.");
            79             System.exit (1);
            80         }
            

DBX 类仅仅包含一个私有成员,用于负责属性文件。在该文件中,有一些数据库特定的设置,例如到引擎的端口或登录信息。

该类的构造函数实例化了 SQL 和 XA 相关类:

Connection: 表示到数据库的传统 SQL(JDBC)连接。
DB2XADataSource 和 IfxXADataSource: 这些类包含到数据库的本地 XA 调用。使用这些类来启用两阶段提交协议(Two-Phase-Commit-Protocol)。如果有一个应用程序服务器(Application Server),就不需要在程序中处理这些类,因为应用程序服务器(Application Server)封装乐应用程序的这部分。
Xid: 指一个 XA 事务。本例中,使用了两个不同的数据库,所以需要两个不同的 Xid —— 每个数据库连接(分支)一个。
XAConnection: JTA 中的一部分。该类允许您启动(提交、准备提交 ...)分布式事务(Distributed Transaction)。
XAResource: 该资源指的是应用程序服务器(Application Server)所提供的一个服务。同样,本例中,我们不使用应用程序服务器(Application Server)。因此,必须在该应用程序中进行创建和初始化。
 


            83         db2ds = initDB2XADataSource ();
            84         ifxds = initIfxXADataSource ();
            

这些代码行调用一个方法来设置 XADataSource(参见下面)。


            360     IfxXADataSource initIfxXADataSource () {
            361
            362         System.out.print ("Create an IDS XA data source: ");
            363         IfxXADataSource ds = new IfxXADataSource ();
            364         ds.setDescription ("IDS XA data source");
            365         ds.setServerName (props.getProperty ("ifx.connection.instancename"));
            366         ds.setIfxIFXHOST (props.getProperty ("ifx.connection.host"));
            367         ds.setPortNumber (Integer.parseInt
            368             (props.getProperty ("ifx.connection.port")));
            369         ds.setDatabaseName (props.getProperty ("ifx.connection.databasename"));
            370
            371         System.out.println ("Okay.");
            372         return ds;
            373     }
            

为了方便,这里同时演示了用于 XADataSource 的 IDS 和 DB2 设置,因为它们十分相似。

在安装 IfxDataSource(第 363 行)之后,需要将多个设置指定到数据源对象。这些设置是从属性文件读取的。在设置传统的 JDBC 数据库连接时,所做的这些设置可以与数据库 URL 相比。请注意,没有将任何登录信息指定给数据源对象。登录信息仍然是数据库连接本身中的一部分。

正如上面所提到的,如果存在应用程序服务器(Application Server),还可以由它来进行这一初始化。

在用正确的参数初始化 XADataSource 之后,就将 XADataSource 返回给方法调用者。


            85         db2xacon = initDB2XAConnection (db2ds);
            86         ifxxacon = initIfxXAConnection (ifxds);
            

在第 85 和 86 行的代码中,创建了到数据库的 XA Connection。下面描述了如何初始化这些 XA Connection。


            329     XAConnection initIfxXAConnection (IfxXADataSource ifxdatasource) {
            330
            331         XAConnection xacon = null;
            332
            333
            334         try {
            335             System.out.print ("Set up IDS XA connection: ");
            336             xacon = ifxdatasource.getXAConnection (
            337                 props.getProperty ("ifx.connection.username"),
            338                 props.getProperty ("ifx.connection.password"));
            339
            340             System.out.println ("Okay.");
            341         }
            342         catch (SQLException e) {
            343             sqlerr (e);
            344         }
            345
            346         return xacon;
            347     }
            

为了设置 XAConnection,要使用前面初始化的 DataSource 对象。第 336 行使用 XADataSource 创建了 XAConnection。为了完成 XAConnection,只需要将身份验证信息传递给该对象。


            87         db2xares = initXAResource (db2xacon);
            88         ifxxares = initXAResource (ifxxacon);
            

现在,您准备创建 XAResource 对象了。这些对象将允许您操作两阶段提交(Two-Phase-Commit)。


            388     XAResource initXAResource (XAConnection xacon) {
            389
            390         XAResource xares = null;
            391
            392
            393         try {
            394             System.out.print ("Setting up a XA resource: ");
            395             xares = xacon.getXAResource ();
            396             System.out.println ("Okay.");
            397         }
            398         catch (SQLException e) {
            399             sqlerr (e);
            400         }
            401
            402         return xares;
            403     }
            

XAResource 对象的安装没有什么特别的。该对象是通过调用 XAConnection 中的 getXAResource() 来创建的。

在完成所有关于 XA 的准备之后,就创建到数据库的 JDBC 连接。


            89         db2con = getDatabaseConnection (db2xacon);
            90         ifxcon = getDatabaseConnection (ifxxacon);
            

在 getDatabaseConnection() 方法中,建立了一个 JDBC 数据库连接。


            250     Connection getDatabaseConnection (XAConnection xacon) {
            251
            252         Connection con = null;
            253
            254         try {
            255             System.out.print ("Establish database connection: ");
            256             con = xacon.getConnection ();
            257             System.out.println ("Okay.");
            258         }
            259         catch (SQLException e) {
            260             sqlerr (e);
            261         }
            262
            263         return con;
            264     }
            

这看上去有些混乱。既然已经在第 336 行中设置了 XAConnection,我们为何还需要 JDBC 连接呢?我们为何仍然需要一个“传统”连接的理由是所有其他 JDBC 操作和类(Statement、ResultSet ...)都基于或使用 Connection 对象。如果您看一看 JDBC 类的层次结构图,将会发现 XAConnection 并非是 Connection,反之亦然。XAConnection(实际上,它是 ConnectionPool 的子类)使用 Connection(层次化)。


            93         db2xid = createDB2XID ();
            94         ifxxid = createIfxXID ();
            

启动 XA 事务之前的最后一步就是为数据库创建 XA ID 对象。在分布式事务(Distributed Transaction)中进行操作时,总是要使用这个 xid。


            183     Xid createIfxXID () {
            184
            185         Xid xid = null;
            186
            187         byte [] gid = new byte[1];
            188         byte [] bid = new byte[1];
            189
            190         gid[0] =
            191             (Byte.decode (props.getProperty ("xid.global"))).byteValue ();
            192         bid[0] =
            193             (Byte.decode (props.getProperty ("xid.branch.ifx"))).byteValue ();
            194
            195         System.out.print ("Creating an XID (" + Byte.toString (gid[0]) + ", " +
            196                           Byte.toString (bid[0]) + ") for Informix: ");
            197
            198         xid = new IfxXid (0, gid, bid);
            199         System.out.println ("Okay.");
            200         return xid;
            201     }
            

createIfxXID 方法创建一个 XID(这里:用于 IDS 连接)。正如“两阶段提交协议简介”小节中提到的,XA 事务包含定义该事务的两个元素。上面例子中的重要部分在第 198 行中。IDS XID 是同三个参数创建的。第一个参数是 format ID,它描述在什么格式中构建分布式事务(Distributed Transaction)。您可以省略这一格式信息。第二个参数定义了全局事务 ID(global transaction ID)。该 ID 对于所有参与数据库来说是惟一的。第三个参数表示该全局事务中的事务分支。

在(为 DB2 和 IDS)构建 XID 之后,我们可以使用它们来修改单个事务中的数据。


            98         execBranch (db2con, db2xares, db2xid);
            99         execBranch (ifxcon, ifxxares, ifxxid);
            

execBranch() 方法包含了上面为每个连接所定义的 XA 事务中的修改。


            215     void execBranch (Connection con, XAResource xares, Xid xid) {
            216
            217         String sql = props.getProperty ("sql.statement");
            218
            219         try {
            220             xares.start (xid, javax.transaction.xa.XAResource.TMNOFLAGS);
            221
            222                 Statement stmt = con.createStatement ();
            223                 stmt.executeUpdate (sql);
            224
            225             xares.end (xid, javax.transaction.xa.XAResource.TMSUCCESS);
            226         }
            227         catch (XAException e) {
            228             System.err.println ("XA exception caught:");
            229             System.err.println ("Cause  : " + e.getCause ());
            230             System.err.println ("Message: " + e.getMessage ());
            231             e.printStackTrace ();
            232         }
            233         catch (SQLException e) {
            234             sqlerr (e);
            235         }
            236     }
            

第 219-226 行代码包含了分布式事务(Distributed Transaction)中为相应分支所使用的真正 SQL 语句。分支边界在第 220 行中以 start 方法开始。传递给该方法的参数就是我们已经知道的事务 ID,而第二个参数包含了用于该 XA 事务的一些附加信息。因为这是第一个两阶段提交(Two-Phase-Commit)协议操作,所以不需要向该方法传递任何特殊信息。TMNOFLAGS 说明了这一事实。分支边界终止于第 225 行。标志 TMSUCCESS 描述所有操作都成功。

在 IDS 和 DB2 的分支都执行之后,全局事务就准备提交这些修改。当然,在可以向数据库传送最后的提交之前,必须询问数据库是否准备进行提交。


            104         if (prepareCommit (db2xares, db2xid) == XAResource.XA_OK &&
            105             prepareCommit (ifxxares, ifxxid) == XAResource.XA_OK) {
            106             // both branches are ready to commit
            107             commitBranch (db2xares, db2xid);
            108             commitBranch (ifxxares, ifxxid);
            109         }
            110         else {
            111             // a resource reported an error
            112             rollbackBranch (db2xares, db2xid);
            113             rollbackBranch (ifxxares, ifxxid);
            114         }
            116     } // end of constructor
            

第 104 和 105 行通知数据库准备提交。如果数据库报告 XAResource.XA_OK,就可以提交整个事务。否则,该事务就将被 ROLLBACK 中止。


            417     int prepareCommit (XAResource xares, Xid xid) {
            418
            419         int rc = 0;
            420
            421         System.out.print ("Prepare XA branch (" +
            422             Byte.toString ((xid.getGlobalTransactionId ())[0]) + ", " +
            423             Byte.toString ((xid.getBranchQualifier ())[0]) + "): ");
            424
            425         try {
            426             xares.prepare (xid);
            427         }
            428         catch (XAException e) {
            429             xaerr (e);
            430         }
            431
            432         System.out.println ("Okay.");
            433         return rc;
            434     }
            

prepareCommit() 方法中最重要的一行在第 426 行中。prepare 方法引起数据库调用两阶段提交协议(Two-Phase-Commit)的“第 1 阶段”。

根据“第 1 阶段”的结果,将提交或中止该分布式事务(Distributed Transaction)。下面是将用于发出这些必要操作的两个方法。


            128     void commitBranch (XAResource xares, Xid xid) {
            129
            130         System.out.print ("Commit XA branch (" +
            131             Byte.toString ((xid.getGlobalTransactionId ())[0]) + ", " +
            132             Byte.toString ((xid.getBranchQualifier ())[0]) + "): ");
            133
            134         try {
            135             // second parameter is 'false' since we have a two phase commit
            136             xares.commit (xid, false);
            137         }
            138         catch (XAException e) {
            139             xaerr (e);
            140         }
            141
            142         System.out.println ("Okay.");
            143