设为首页收藏本站

EPS数据狗论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 1202|回复: 1

Spark Streaming 中使用c3p0连接池操作mysql数据库

[复制链接]

232

主题

6282

金钱

9596

积分

高级用户

发表于 2016-9-21 16:09:26 | 显示全部楼层 |阅读模式

在Spark Streaming的应用程序中,有时候需要将计算结果保存到数据库中,为了高效这里使用批量插入,结合c3po连接池,说明一下使用方法。

  • 数据计算完成后,在foreachRDD中批量插入数据,因为是针对每一个partiton的数据操作,所以使用 rdd.foreachPartition,这里是一个批量插入页面PV和UV的操作,代码如下

    //RDD[(String,Int,Int)] 的意思是RDD[(页面名称,UV,PV)]data.foreachRDD((rdd:RDD[(String,Int,Int)],time:Time)=>{
       rdd.foreachPartition(data=>{     //从连接池中获取一个连接
         val conn = MDBManager.getMDBManager(isLocal).getConnection
         conn.setAutoCommit(false)     val sql = "insert into tableName set pageName=?,uvNum=?,pvNum=?"
         val preparedStatement = conn.prepareStatement(sql)
         data.foreach(r => {
           preparedStatement.setObject(1, r._1)
           preparedStatement.setObject(2, r._2)
           preparedStatement.setObject(3, r._3)
           preparedStatement.addBatch()
         })   //批量提交,如果数据量大,这里可以分批提交
         preparedStatement.executeBatch()
         conn.commit()
         conn.close()
    })

  • 这里创建一个单例的MDBManager,并使用c3p0获取连接,代码如下

    class MDBManager(isLocal:Boolean) extends Serializable{            
    private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true); private val prop = new Properties() private var in:InputStream = _   
    isLocal match{     case true  => in = getClass().getResourceAsStream("/c3p0.properties");     case false => in = new FileInputStream(new File(SparkFiles.get("c3p0.properties")))
      }      
    try {
       prop.load(in);
       cpds.setJdbcUrl(prop.getProperty("jdbcUrl").toString());
       cpds.setDriverClass(prop.getProperty("driverClass").toString());
       cpds.setUser(prop.getProperty("user").toString());
       cpds.setPassword(prop.getProperty("password").toString());      cpds.setMaxPoolSize(Integer.valueOf(prop.getProperty("maxPoolSize").toString()));      cpds.setMinPoolSize(Integer.valueOf(prop.getProperty("minPoolSize").toString()));      cpds.setAcquireIncrement(Integer.valueOf(prop.getProperty("acquireIncrement").toString()));      cpds.setInitialPoolSize(Integer.valueOf(prop.getProperty("initialPoolSize").toString()));      cpds.setMaxIdleTime(Integer.valueOf(prop.getProperty("maxIdleTime").toString()));
    } catch {   case ex: Exception => ex.printStackTrace()
    } def getConnection:Connection={  
         try {  
             return cpds.getConnection();  
         } catch {
           case ex:Exception => ex.printStackTrace()       null
         }  
    }   
    }object MDBManager{  var mdbManager:MDBManager=_def getMDBManager(isLocal:Boolean):MDBManager={
       synchronized{       if(mdbManager==null){
               mdbManager = new MDBManager(isLocal)
           }
       }
       mdbManager
    }
    }

  • 因为本地模式和集群模式的不同获取c3p0.properties配置文件也不一样,代码中分别提供了两种获取配件文件的方式,通过参数isLocal来确定使用哪种方式。

  • 由于使用的是mysql数据库和c3p0连接池,所以提交应用时需要添加mysql连接的jar包和c3p0的jar包,在Spark-submit中添加参数

    --jars /usr/local/spark1.3/lib/mysql-connector-java-5.1.38-bin.jar,/usr/local/spark1.3/lib/c3p0-0.9.1.2.jar

  • 提交应用时添加c3p0的配置文件,在Spark-submit中添加参数

    --files /usr/local/spark1.3/conf/c3p0.properties


作者:海纳百川

271

主题

7192

金钱

1万

积分

资深用户

发表于 2017-1-3 13:32:39 | 显示全部楼层
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

客服中心
关闭
在线时间:
周一~周五
8:30-17:30
QQ群:
653541906
联系电话:
010-85786021-8017
在线咨询
客服中心

意见反馈|网站地图|手机版|小黑屋|EPS数据狗论坛 ( 京ICP备09019565号-3 )   

Powered by BFIT! X3.4

© 2008-2028 BFIT Inc.

快速回复 返回顶部 返回列表