`

scala实现spark读取文件、清洗、入库base中

 
阅读更多

       日常工作中我们往往面对的数据都是海量的文件数据,我们如何快速通过spark将文件导入到hbase库中,我这写了一个简单的例子仅供参考,实际上数据是需要经过清洗才能放入到hbase库中的。

       由于数据文件内容涉及到公司实际项目,不便贴出,此文着重spark提出数据、清洗、入hbase库这个逻辑的实现,scala写的代码比较精简,代码如下:

 

ParseClient.java主要实现文件加载、清洗、入库的工作:

package main.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import scala.collection.mutable.ListBuffer

object ParseClient {
  def main(args: Array[String]) {
    val conf = new SparkConf();
conf.setAppName("ParseClient")
    conf.setMaster("local");
val sc = new SparkContext(conf);
val textRdd = sc.textFile("WW_2016-10-13~2016-10-13.txt");
---数据清洗
var smailList = new ListBuffer[String]();
val arrRdd = textRdd.flatMap { line => {
      val allList = new ListBuffer[ListBuffer[String]]();
if (line == "" || "".equals(line)) {
        allList += smailList;
smailList = new ListBuffer[String]();
} else {
        smailList += line;
}
      allList;
}
    }

    val truncArrRdd = arrRdd.map { arr => {
      val lineArr = new ListBuffer[(String, String, String, String)];
arr.foreach { element => {
        val eleArr = element.split(" ");
if (eleArr.length >= 4) {
          val tuple = (eleArr(0), eleArr(1), eleArr(2), eleArr(3));
lineArr += tuple;
}
      }
      }
      lineArr
    }
    }

    val resultRdd = truncArrRdd.map(tupleArr => {
      var serviceName: String = "";
var cosumerName: String = "";
var date: String = "";
var context: String = "";
for (tuple <- tupleArr) {
        if (tuple._3.contains("官方旗舰店")) {
          serviceName = tuple._3
        } else {
          cosumerName = tuple._3;
}
        date = tuple._1;
context = context + tuple._4 + "\n";
}

      (cosumerName, serviceName, date, context);
})

----获取hbase库模型对象、入hbase库代码实现
    val job = new HBaseCommon().getJob("jeffTest");
val put = resultRdd.map(tuple => {
      val rowkey = tuple._3 + tuple._2 + tuple._1;
val put = new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("content"), Bytes.toBytes(tuple._4))
      (new ImmutableBytesWritable, put)
    }).saveAsNewAPIHadoopDataset(job.getConfiguration());
sc.stop();
}
}

 

下面这个是辅助类HBaseCommon.java,主要是负责连接spark、hbase库、以及附带写了hbase库读取:

package main.scala

import java.util

import scala.collection.mutable.{ListBuffer, LinkedHashMap}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{KeyValue, HBaseConfiguration}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Get, Scan, HTable, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}

class HBaseCommon {
  /**
   * 获取初始化配置
   */
def getConfiguration(): Configuration = {
    val map: LinkedHashMap[String, String] = new LinkedHashMap[String, String]();
val HBASE_CONFIG = new Configuration();
HBASE_CONFIG.set("hbase.zookeeper.quorum", map.getOrElse("zookeeper_quorum", "host1,host2,host3"));
HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", map.getOrElse("zookeeper_port", "2181"));
val configuration = HBaseConfiguration.create(HBASE_CONFIG);
configuration;
}

  /**
   * 获取作业信息
   */
def getJob(tableName: String): Job = {
    val configuration = this.getConfiguration();
configuration.set(TableOutputFormat.OUTPUT_TABLE, tableName);
var job = new Job(configuration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    job;
}

  /**
   * 读取hbase某个表中的全部内容
   */
def getTableInfo(tableName: String, sc: SparkContext): util.ArrayList[String]= {
    val configuration = this.getConfiguration()
    configuration.set(TableInputFormat.INPUT_TABLE, tableName)

    val hBaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])

    var strinfo = ""
val count = hBaseRDD.count()
    println("HBase RDD Count:" + count)
    hBaseRDD.cache()

    val table = new HTable(configuration, tableName);
val g = new Get("row1".getBytes)
    val result = table.get(g)
    val value = Bytes.toString(result.getValue("basic".getBytes, "name".getBytes))

    hBaseRDD.cache()
    println("------------------------scan----------")
    val res = hBaseRDD.take(count.toInt)
    val reslist=new util.ArrayList[String]()
    for (j <- 1 until count.toInt) {
      var rs = res(j - 1)._2
      var kvs = rs.raw
      for (kv <- kvs) {
        strinfo += ("rowkey:" + new String(kv.getRow()) +
          " cf:" + new String(kv.getFamily()) +
          " column:" + new String(kv.getQualifier()) +
          " value:" + new String(kv.getValue())+"\n")

        reslist.add(new String(kv.getValue()))
      }
    }
    reslist
  }
}

 

 

 

分享到:
评论

相关推荐

    pom.xml(Idea中用于整合Scala实现Spark代码编写的配置文件)

    这个文件是用来在Idea中用于整合Scala实现Spark代码编写的pom配置文件. 内置 JDK规定, Spark-core, SparkSQL, mysql依赖的jar包,SparkStreaming, SparkStreaming + Kafka, 向kafka 生产数据需要包, 连接 Redis 需要...

    Scala和Spark大数据分析函数式编程、数据流和机器学习

    Scala和Spark大数据分析函数式编程、数据流和机器学习

    Scala and Spark for Big Data Analytics

    Scala and Spark for Big Data Analytics by Md. Rezaul Karim English | 25 July 2017 | ISBN: 1785280848 | ASIN: B072J4L8FQ | 898 Pages | AZW3 | 20.56 MB Harness the power of Scala to program Spark and ...

    spring boot + scala + spark http驱动spark计算

    原始用的jetty做的http接口,最近有时间,研究了下spring boot + scala + spark做大数据计算

    scala与spark基础

    本资源收集了scala与大数据spark的基础的学习笔记,有兴趣的同学可以下载学习

    spark框架中wordcount的scala实现

    scala语言和python一样都是交互式的语言,操作简单。这是wordcount的scala实现,简单明了,比java实现简单很多,希望对大家有所帮助

    scala与spark文档合集

    scala与spark文档合集,有好多本,包括快学Scala,scala与spark文档合集

    基于Scala的Spark数据处理练习设计源码

    文件类型包括43个Scala源代码文件、42个Java源代码文件、2个TXT文本文件、1个city_info文件、1个course文件、1个JSON配置文件、1个dfs_in文件、1个es_in文件、1个product_info文件和1个score文件。该项目旨在熟悉...

    scala和spark的安装

    scala和spark的安装和配置,以及启动spark,分发节点。

    基于Scala的Spark模型转换为PMML格式设计源码

    文件类型包括9个XML配置文件、2个CRC文件、2个Scala源代码文件、1个名称文件、1个Markdown文档、1个Parquet数据文件、1个名称列表文件、1个TXT文本文件、1个PMML文件和1个Java源代码文件。该系统利用JPMML-Spark将...

    Scala and Spark for Big Data Analytics.pdf

    Spark itself is written with Scala and naturally, as a starting point, we will discuss a brief introduction to Scala, such as the basic aspects of its history, purposes, and how to install Scala on ...

    Scala and Spark for Big Data Analytics epub

    Scala and Spark for Big Data Analytics 英文epub 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除

    解析JSON文件

    本范例是一个Scala工程,结合Java组件实现了对spark产品分析的结果json文件进行解析,将其内部的几何对象转换为记录集写入数据集里,由于Scala读写文件的效率比较高,故采用Scala与Java组件实现。

    基于Scala的Spark学习项目设计源码

    文件类型包括80个Scala源代码文件、4个XML配置文件、3个TXT文档、2个Markdown文档、2个Java源代码文件、1个GIT忽略文件、1个日志文件、1个JSON配置文件和1个Properties配置文件。该学习项目适合用于学习和实践Scala...

    基于Scala的Spark实时项目设计与实现源码

    本资源提供了一套基于Scala的Spark实时项目的设计与实现源码,包含73个文件,其中包括38个Class字节码文件,19个Scala源代码文件,10个XML配置文件,以及4个Properties配置文件。此外,还包括1个Git忽略文件和1个...

    实验七:Spark初级编程实践

    1、实验环境: ...(1) 在spark-shell中读取Linux系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数; 图3 spark统计行数 (2) 在spark-shell中读取HDFS系统文件“/user/hadoop/test.txt”

    scala&spark;

    scala&spark;-v6.0.docx中详细讲解了scala、spark的使用情况,以及例子

    osm4scala:Scala和Spark库专注于读取OpenStreetMap Pbf文件

    osm4scala:Scala和Spark库专注于读取OpenStreetMap Pbf文件

    基于Java、Scala和Spark的数据处理与分析设计源码

    本资源提供了一套基于Java、Scala和Spark的数据处理与分析设计源码,包含1381个文件,其中包括634个CRC文件,316个BK文件,35个Class字节码文件,20个XML配置文件,15个Java源代码文件,13个JAR打包文件,8个...

    Scala and Spark for Big Data Analytics_Code 源码

    Scala and Spark for Big Data Analytics_Code 源码 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除

Global site tag (gtag.js) - Google Analytics