Thursday, June 30, 2016

Linear Regression using Java Spark MLib

In this post I am reading data from Hbase table and create a Spark Vector to pass to LinearRegressionWithSGD

In Spark Vector all the features would be placed as columns. Vector have 2 implementations DenseVectorSparseVector. If the data is having many zeros better go with Sparse vector to save memory. Once the feature vector is created place each vector in LabeledPoint by labeling it. LabeledPoint can be visualized as rows and columns of an excel sheet. Each row is a training sample and columns are features. First column is a label or dependent variable or the variable to be predicted.

The reason I have chosen Stochastic Gradient Descent (SGD) compared to Gradient Descent (GD) is in SGD only one training example is taken in a particular iteration to perform an update where as in GD all samples are considered. My training examples are high so SGD is faster than GD. As I am fine with close approximations which SGD provides. Error function is not well minimized in case of SGD.



1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
Configuration conf = HBaseConfiguration.create();
conf.setLong("hbase.rpc.timeout", 600000);

conf.setLong("hbase.client.scanner.caching", 1000);

conf.set(TableInputFormat.INPUT_TABLE, SOURCE_TABLE);
SparkConf sparkConf = new SparkConf();
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

JavaPairRDD < ImmutableBytesWritable, Result > inputRDD =

 jsc.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

//Extracting the data from the Hbase table. I am taking values from 3 Hbase Columns. 


JavaPairRDD < String, String[] > dataRDD =
 inputRDD.mapToPair(new PairFunction<Tuple2 <ImmutableBytesWritable, Result> , String, String[]>() {

  private static final long serialVersionUID = 1 L;
  @Override
  public Tuple2 < String, String[] > call(Tuple2 < ImmutableBytesWritable, Result > tuple) throws Exception {
   String[] strArr = new String[3];
   Arrays.fill(strArr, "0.0");
   try {

    Result result = tuple._2;
    String keyRow = Bytes.toString(result.getRow());
    String val = null;
    if ((val = Bytes.toString(result.getValue(Bytes.toBytes(COLUMN_FAMILY), Bytes.toBytes(COLUMN_QUALIFIER)))) != null) {
     strArr[0] = val;
    }
    if ((val = Bytes.toString(result.getValue(Bytes.toBytes(COLUMN_FAMILY), Bytes.toBytes(COLUMN_QUALIFIER)))) != null) {
     strArr[1] = val;
    }
    if ((val = Bytes.toString(result.getValue(Bytes.toBytes(COLUMN_FAMILY), Bytes.toBytes(COLUMN_QUALIFIER)))) != null) {
     strArr[2] = val;
    }
    return new Tuple2 < String, String[] > (keyRow, strArr);
   } catch (Exception e) {
    e.printStackTrace();

    return null;

   }

  }

 });



// Prepare the RDD with the labels and features. I am taking label as 0.5. Based on your use case this may vary.      

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
JavaRDD<LabeledPoint> parsedData = dataRDD.map(new Function<Tuple2<String, String[]> , LabeledPoint>() {



 private static final long serialVersionUID = 1 L;

 @Override

 public LabeledPoint call(Tuple2<String, String[]> line) {



  String[] values = line._2();

  double[] v = new double[values.length];

  for (int i = 0; i < values.length; i++) {

   v[i] = Double.parseDouble(values[i]);

  }

  return new LabeledPoint(0.5, Vectors.dense(v));

 }

});

// Building the model

int numIterations = 100;

double miniBatchFraction = 1.0;

double stepsize = 0.7;

LinearRegressionWithSGD linReg = new LinearRegressionWithSGD(stepsize, numIterations, miniBatchFraction);

linReg.addIntercept();

final LinearRegressionModel model = linReg.run(JavaRDD.toRDD(parsedData));

double MSE = new JavaDoubleRDD(valuesAndPreds.map(

 new Function < Tuple2 < Double, Double > , Object > () {

  private static final long serialVersionUID = 1 L;

  public Object call(Tuple2 < Double, Double > pair) {

   return Math.pow(pair._1() - pair._2(), 2.0);

  }

 }

).rdd()).mean();

logger.info("Training Mean Squared Error = " + MSE);


Working on Hbase table using Spark

In this post we'll see how to read data from a Hbase table and write to another Hbase table using Spark RDDs.

To read the data from Hbase table set the 'SOURCE_TABLE' in Hbase configuration object. To create the RDD call the method newAPIHadoopRDD.

For writing to a DESTINATION_TABLE Hbase table first register it to the Job conf object. Based on the class specified to this method conf.setOutputFormatClass destination source is decided.


Refer to Spark Java API



1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
Configuration hbaseConf = HBaseConfiguration.create();

hbaseConf.setLong("hbase.rpc.timeout", 600000);

hbaseConf.setLong("hbase.client.scanner.caching", 1000);

hbaseConf.set(TableInputFormat.INPUT_TABLE, SOURCE_TABLE);

Job conf = Job.getInstance(hbaseConf, "Hbase Reader and Writer");

conf.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, DESTINATION_TABLE);

conf.setOutputFormatClass(TableOutputFormat.class);

SparkConf sparkConf = new SparkConf();

JavaSparkContext jsc = new JavaSparkContext(sparkConf);

JavaPairRDD < ImmutableBytesWritable, Result > inputRDD = jsc.newAPIHadoopRDD(hbaseConf, TableInputFormat.class,

 ImmutableBytesWritable.class, Result.class);

JavaPairRDD < ImmutableBytesWritable, Put > outputRDD = inputRDD.mapToPair(

  new PairFunction < Tuple2 < ImmutableBytesWritable, Result > , ImmutableBytesWritable, Put > () {

   private static final long serialVersionUID = 1 L;

   @Override

   public Tuple2 < ImmutableBytesWritable, Put > call(Tuple2 < ImmutableBytesWritable, Result > tuple) throws Exception {

     Result result = tuple._2;

     String keyRow = Bytes.toString(result.getRow());

     String valueFromHbase = Bytes.toString(result.getValue(Bytes.toBytes(SOURCE_TABLE_COLUMN_FAMILY), Bytes.toBytes(SOURCE_TABLE_COLUMN_QUALIFIER)));

     // Perform your business operations

     Put put = new Put(Bytes.toBytes(keyRow));

     put.addColumn(Bytes.toBytes(DESTINATION_TABLE_COLUMN_FAMILY), Bytes.toBytes(DESTINATION_TABLE_COLUMN_QUALIFIER), Bytes.toBytes(valueFromHbase);

      return new Tuple2 < ImmutableBytesWritable, Put > (new ImmutableBytesWritable(), put);
     }

     outputRDD.saveAsNewAPIHadoopDataset(conf.getConfiguration());

     jsc.close();