亚洲欧美日韩综合系列在线_91精品人妻一区二区_欧美大肥婆一级特大AA片_九色91视频免费观看_亚洲综合国产精品_av中文字幕在线不卡_久久精品色综合网_看黄色视频的软件_无卡无码高清中文字幕码2024_亚洲欧美日韩天堂网

Java 分布式事務(wù)(多數(shù)據(jù)源)

來源:kangojian 發(fā)布時(shí)間:2018-11-14 11:33:59 閱讀量:949

最近這幾天一直在整 怎么實(shí)現(xiàn)分布式事務(wù)。找了很多資料,不過大都相近類同。對(duì)Oracle、SQL Server、Mysql數(shù)已做過測(cè)試,其中Mysql5.0以上的才支持分布式事務(wù)。 

對(duì)于這些,主要是之前根本沒有接觸過分布式事務(wù),還錯(cuò)找了一些分布式事數(shù)據(jù)庫(kù)的資料,呵呵,結(jié)果不是我目前所需要的。 

測(cè)試過程中出現(xiàn)了很多錯(cuò)誤,一直都通不過,以為是用戶權(quán)限還有數(shù)據(jù)庫(kù)服務(wù)的問題,但一切都配置良好的情況下還一直都通不過。結(jié)果發(fā)現(xiàn),我導(dǎo)入的都是一些普通的JDBC連接包,于是狂搜實(shí)現(xiàn)XA事務(wù)的jar包。 

Mysql:            mysql-connector-java-5.1.6-bin.jar 

SQL Server:       sqljdbc.jar 

Oracle:           ojdbc14.jar 

用的是這些包才順利通過運(yùn)行。后面會(huì)附上這幾個(gè)jar包。 

好了,把源代碼也附上: 


import com.microsoft.sqlserver.jdbc.SQLServerXADataSource; 

import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource; 

import java.sql.*; 

import java.util.logging.Level; 

import java.util.logging.Logger; 

import javax.sql.*; 

import javax.transaction.xa.*; 

import oracle.jdbc.xa.client.OracleXADataSource; 


public class Mutil_DataSource_Test { 

    public static void main(String[] args){ 

        Mutil_DataSource_Test mdt = new Mutil_DataSource_Test(); 

        try { 

            mdt.test1(); 

        } catch (Exception ex) { 

            System.out.println("除SQLException、XAException之外的異常: \n"); 

            Logger.getLogger(Mutil_DataSource_Test.class.getName()).log(Level.SEVERE, null, ex); 

        } 

    } 


    class MyXid implements Xid{ 

        int formatId; 

        byte globalTransactionId[]; 

        byte branchQualifier[]; 

        public MyXid(){ 


        } 

        public MyXid(int formatId,byte[] globalTransactionId,byte[] branchQualifier){ 

            this.formatId = formatId; 

            this.globalTransactionId = globalTransactionId; 

            this.branchQualifier = branchQualifier; 

        } 


        public int getFormatId() { 

            return this.formatId; 

        } 

        public void setFormatId(int formatId){ 

            this.formatId = formatId; 

        } 

        public byte[] getGlobalTransactionId() { 

            return this.globalTransactionId; 

        } 

        public void setGlobalTransactionId(byte[] globalTransactionId){ 

            this.globalTransactionId = globalTransactionId; 

        } 

        public byte[] getBranchQualifier() { 

            return this.branchQualifier; 

        } 

        public void setBranchQualifier(byte[] branchQualifier){ 

            this.branchQualifier = branchQualifier; 

        } 

    } 


    //多數(shù)據(jù)庫(kù)測(cè)試 

    public void test1() { 

        //定義所需用到的變量 

         Connection mysqlCn = null; 

        Connection sqlCn = null; 

        Connection mysqlCn2 = null; 

        Connection oraCn = null; 


        MysqlXADataSource mysqlDs = null; 

        SQLServerXADataSource sqlDs = null; 

        MysqlXADataSource mysqlDs2 = null; 

        OracleXADataSource oraDs = null; 


        XAConnection xamysqlCn = null; 

        XAConnection xasqlCn = null; 

        XAConnection xamysqlCn2 = null; 

        XAConnection xaoraCn = null; 


        XAResource xamysqlRes = null; 

        XAResource xasqlRes = null; 

        XAResource xamysqlRes2 = null; 

        XAResource xaoraRes = null; 


        Xid mysqlXid = null; 

        Xid sqlXid = null; 

        Xid mysqlXid2 = null; 

        Xid oraXid = null; 


        Statement mysqlpst = null; 

        Statement sqlpst = null; 

        Statement mysqlpst2 = null; 

        Statement orapst = null; 

    try{ 

        //獲得數(shù)據(jù)源 

        mysqlDs = new MysqlXADataSource(); 

        mysqlDs.setURL("jdbc:mysql://localhost:3306/test"); 

        mysqlDs2 = new MysqlXADataSource(); 

        mysqlDs2.setURL("jdbc:mysql://10.10.10.119:3306/test"); 

        sqlDs = new SQLServerXADataSource(); 

        sqlDs.setURL("jdbc:sqlserver://10.10.10.119:1433;DatabaseName=RTC;loginTimeout=20;user=sa;password=chgpwd122105"); 

//        sqlDs.setUser("sa"); 

//        sqlDs.setPassword("chgpwd122105"); 

//        sqlDs.setServerName("10.10.10.119"); 

//        sqlDs.setPortNumber(1433); 

//        sqlDs.setDatabaseName("RTC"); 

        oraDs = new OracleXADataSource(); 

        oraDs.setURL("jdbc:oracle:thin:@10.10.10.119:1521:WMS"); 

        //獲得連接 

        xamysqlCn = mysqlDs.getXAConnection("root", "9999"); 

System.out.println("xamysqlCn: "+xamysqlCn); 

        xasqlCn = sqlDs.getXAConnection(); 

System.out.println("xasqlCn: "+xasqlCn); 

        xamysqlCn2 = mysqlDs2.getXAConnection("root", "9999"); 

System.out.println("xamysqlCn2: "+xamysqlCn2); 

        xaoraCn = oraDs.getXAConnection("tiger", "tiger"); 

System.out.println("xaoraCn: "+xaoraCn); 


        mysqlCn = xamysqlCn.getConnection(); 

        sqlCn = xasqlCn.getConnection(); 

        mysqlCn2 = xamysqlCn2.getConnection(); 

        oraCn = xaoraCn.getConnection(); 


        mysqlpst = mysqlCn.createStatement(); 

        sqlpst = sqlCn.createStatement(); 

        mysqlpst2 = mysqlCn2.createStatement(); 

        orapst = oraCn.createStatement(); 

        //定義XAResource 

        xamysqlRes = xamysqlCn.getXAResource(); 

        xasqlRes = xasqlCn.getXAResource(); 

        xamysqlRes2 = xamysqlCn2.getXAResource(); 

        xaoraRes = xaoraCn.getXAResource(); 

        //定義Xid 

        mysqlXid = new MyXid(0, new byte[]{0x01}, new byte[]{0x02}); 

        sqlXid = new MyXid(0, new byte[]{0x01}, new byte[]{0x03}); 

        mysqlXid2 = new MyXid(0, new byte[]{0x01}, new byte[]{0x04}); 

        oraXid = new MyXid(0, new byte[]{0x01}, new byte[]{0x05}); 

        //執(zhí)行Mysql 

        xamysqlRes.start(mysqlXid, XAResource.TMNOFLAGS);        

        mysqlpst.executeUpdate("insert into test values(4,'XA','F','Class4')"); 

        xamysqlRes.end(mysqlXid, XAResource.TMSUCCESS); 

        //執(zhí)行SQLServer 

        xasqlRes.start(sqlXid, XAResource.TMNOFLAGS); 

        sqlpst.executeUpdate("insert into test values('444')"); 

        xasqlRes.end(sqlXid, XAResource.TMSUCCESS); 

        //執(zhí)行Mysql 

        xamysqlRes2.start(mysqlXid2, XAResource.TMNOFLAGS); 

        mysqlpst2.executeUpdate("insert into test values(4,'XA','F','Class4')"); 

        xamysqlRes2.end(mysqlXid2, XAResource.TMSUCCESS); 

        //執(zhí)行Oracle 

        System.out.println("xaoraRes: "+xaoraRes); 

        xaoraRes.start(oraXid, XAResource.TMNOFLAGS); 

        orapst.executeUpdate("insert into test123 values('4','44','444')"); 

        System.out.println("oraXid: "+oraXid); 

        xaoraRes.end(oraXid, XAResource.TMSUCCESS); 

        //準(zhǔn)備 

        int mysqlRea = xamysqlRes.prepare(mysqlXid); 

        int sqlRea = xasqlRes.prepare(sqlXid); 

        int mysqlRea2 = xamysqlRes2.prepare(mysqlXid2); 

        int oraRea = xaoraRes.prepare(oraXid); 

        //判斷準(zhǔn)備就緒與否  提交或回滾 

        if(mysqlRea == xamysqlRes.XA_OK && mysqlRea2 == xamysqlRes.XA_OK && oraRea == xaoraRes.XA_OK && sqlRea == xasqlRes.XA_OK){ 

//        if(mysqlRea == xamysqlRes.XA_OK && mysqlRea2 == xamysqlRes.XA_OK && oraRea == xaoraRes.XA_OK){ 

//        if(mysqlRea == xamysqlRes.XA_OK && sqlRea == xasqlRes.XA_OK && mysqlRea2 == xamysqlRes.XA_OK){ 

//        if(mysqlRea == xamysqlRes.XA_OK && mysqlRea2 == xamysqlRes.XA_OK){ 

            xamysqlRes.commit(mysqlXid, false); 

System.out.println("Mysql 事務(wù)提交成功!"); 

            xasqlRes.commit(sqlXid, false); 

System.out.println("SQLServer 事務(wù)提交成功!"); 

            xamysqlRes2.commit(mysqlXid2, false); 

System.out.println("Mysql2 事務(wù)提交成功!"); 

            xaoraRes.commit(oraXid, false); 

System.out.println("Oracle 事務(wù)提交成功!"); 

        }else{ 

            xamysqlRes.rollback(mysqlXid); 

            xasqlRes.rollback(sqlXid); 

            xamysqlRes2.rollback(mysqlXid2); 

            xaoraRes.rollback(oraXid); 

System.out.println("事務(wù)回滾成功!"); 

        } 

    }catch(SQLException ex){ 

        Logger.getLogger(Mutil_DataSource_Test.class.getName()).log(Level.SEVERE, null, ex); 

        try{ 

            xamysqlRes.rollback(mysqlXid); 

            xasqlRes.rollback(sqlXid); 

            xamysqlRes2.rollback(mysqlXid2); 

            xaoraRes.rollback(oraXid); 

        }catch(XAException e){ 

            System.out.println("回滾也出錯(cuò)咯!~"); 

            e.printStackTrace(); 

        } 

    }catch(XAException ex){ 

        Logger.getLogger(Mutil_DataSource_Test.class.getName()).log(Level.SEVERE, null, ex); 

    }finally{ 

        try { 

        //關(guān)閉 

        mysqlpst.close(); 

        mysqlCn.close(); 

        xamysqlCn.close(); 

        sqlpst.close(); 

        sqlCn.close(); 

        xasqlCn.close(); 

        mysqlpst2.close(); 

        mysqlCn2.close(); 

        xamysqlCn2.close(); 

        orapst.close(); 

        oraCn.close(); 

        xaoraCn.close(); 

        } catch (SQLException ex) { 

            Logger.getLogger(Mutil_DataSource_Test.class.getName()).log(Level.SEVERE, null, ex); 

        } 

    } 

    } 


分布式事務(wù)分為兩個(gè)階段,第一個(gè)階段相當(dāng)于是一個(gè)預(yù)提交,第二階段才是正真的提交。 

首先要實(shí)現(xiàn)的是Xid接口,formatId可以理解為一個(gè)全局事務(wù)的ID,不過我上面的代碼沒有去做一些異常的處理,還有正確的鏈接的關(guān)閉,只是自己做的一個(gè)小小的測(cè)試,以后項(xiàng)目中再去處理這些。第一階段如果出錯(cuò)或是不順利則不會(huì)提交,需要做一些回滾處理。如果順利則準(zhǔn)備提交,進(jìn)入第二階段,在第二階段可能會(huì)出現(xiàn)問題,如果第一個(gè)分支事務(wù)提交成功了,而后有一個(gè)分支事務(wù)提交失敗,這樣則會(huì)造成數(shù)據(jù)不準(zhǔn)確,目前還不知道有沒有方法可以解決些問題,好像XAResource.recover()可以處理,但具體怎么解決,在項(xiàng)目中自己還是可以想到辦法解決。 

對(duì)XA分布式事務(wù)  我是初學(xué)者,而且也沒有誰指點(diǎn),只是在網(wǎng)上找些資料。我上面的理解可能也還有些不足或是錯(cuò)誤。希望高手看到后能指點(diǎn)指點(diǎn)。

--------------------- 



標(biāo)簽: 數(shù)據(jù)庫(kù)
分享:
評(píng)論:
你還沒有登錄,請(qǐng)先