In this post I am reading data from Hbase table and create a Spark Vector to pass to LinearRegressionWithSGD
In Spark Vector all the features would be placed as columns. Vector have 2 implementations DenseVector, SparseVector. If the data is having many zeros better go with Sparse vector to save memory. Once the feature vector is created place each vector in LabeledPoint by labeling it. LabeledPoint can be visualized as rows and columns of an excel sheet. Each row is a training sample and columns are features. First column is a label or dependent variable or the variable to be predicted.
The reason I have chosen Stochastic Gradient Descent (SGD) compared to Gradient Descent (GD) is in SGD only one training example is taken in a particular iteration to perform an update where as in GD all samples are considered. My training examples are high so SGD is faster than GD. As I am fine with close approximations which SGD provides. Error function is not well minimized in case of SGD.
// Prepare the RDD with the labels and features. I am taking label as 0.5. Based on your use case this may vary.
In Spark Vector all the features would be placed as columns. Vector have 2 implementations DenseVector, SparseVector. If the data is having many zeros better go with Sparse vector to save memory. Once the feature vector is created place each vector in LabeledPoint by labeling it. LabeledPoint can be visualized as rows and columns of an excel sheet. Each row is a training sample and columns are features. First column is a label or dependent variable or the variable to be predicted.
The reason I have chosen Stochastic Gradient Descent (SGD) compared to Gradient Descent (GD) is in SGD only one training example is taken in a particular iteration to perform an update where as in GD all samples are considered. My training examples are high so SGD is faster than GD. As I am fine with close approximations which SGD provides. Error function is not well minimized in case of SGD.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | Configuration conf = HBaseConfiguration.create(); conf.setLong("hbase.rpc.timeout", 600000); conf.setLong("hbase.client.scanner.caching", 1000); conf.set(TableInputFormat.INPUT_TABLE, SOURCE_TABLE); SparkConf sparkConf = new SparkConf(); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaPairRDD < ImmutableBytesWritable, Result > inputRDD = jsc.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); //Extracting the data from the Hbase table. I am taking values from 3 Hbase Columns. JavaPairRDD < String, String[] > dataRDD = inputRDD.mapToPair(new PairFunction<Tuple2 <ImmutableBytesWritable, Result> , String, String[]>() { private static final long serialVersionUID = 1 L; @Override public Tuple2 < String, String[] > call(Tuple2 < ImmutableBytesWritable, Result > tuple) throws Exception { String[] strArr = new String[3]; Arrays.fill(strArr, "0.0"); try { Result result = tuple._2; String keyRow = Bytes.toString(result.getRow()); String val = null; if ((val = Bytes.toString(result.getValue(Bytes.toBytes(COLUMN_FAMILY), Bytes.toBytes(COLUMN_QUALIFIER)))) != null) { strArr[0] = val; } if ((val = Bytes.toString(result.getValue(Bytes.toBytes(COLUMN_FAMILY), Bytes.toBytes(COLUMN_QUALIFIER)))) != null) { strArr[1] = val; } if ((val = Bytes.toString(result.getValue(Bytes.toBytes(COLUMN_FAMILY), Bytes.toBytes(COLUMN_QUALIFIER)))) != null) { strArr[2] = val; } return new Tuple2 < String, String[] > (keyRow, strArr); } catch (Exception e) { e.printStackTrace(); return null; } } }); |
// Prepare the RDD with the labels and features. I am taking label as 0.5. Based on your use case this may vary.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | JavaRDD<LabeledPoint> parsedData = dataRDD.map(new Function<Tuple2<String, String[]> , LabeledPoint>() { private static final long serialVersionUID = 1 L; @Override public LabeledPoint call(Tuple2<String, String[]> line) { String[] values = line._2(); double[] v = new double[values.length]; for (int i = 0; i < values.length; i++) { v[i] = Double.parseDouble(values[i]); } return new LabeledPoint(0.5, Vectors.dense(v)); } }); // Building the model int numIterations = 100; double miniBatchFraction = 1.0; double stepsize = 0.7; LinearRegressionWithSGD linReg = new LinearRegressionWithSGD(stepsize, numIterations, miniBatchFraction); linReg.addIntercept(); final LinearRegressionModel model = linReg.run(JavaRDD.toRDD(parsedData)); double MSE = new JavaDoubleRDD(valuesAndPreds.map( new Function < Tuple2 < Double, Double > , Object > () { private static final long serialVersionUID = 1 L; public Object call(Tuple2 < Double, Double > pair) { return Math.pow(pair._1() - pair._2(), 2.0); } } ).rdd()).mean(); logger.info("Training Mean Squared Error = " + MSE); |