Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

could not find implicit value for parameter mapper: it.nerdammer.spark.hbase.conversion.FieldWriter[org.apache.spark.sql.Row] rdd.toHBaseTable("mytable") #58

Open
abstract-karshit opened this issue Sep 28, 2017 · 2 comments

Comments

@abstract-karshit
Copy link

abstract-karshit commented Sep 28, 2017

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.{col, concat, lit}
import it.nerdammer.spark.hbase._

object SparkHBase {
    def main(args: Array[String]): Unit = {
    val sparkConf = new 
    SparkConf().setAppName("HbaseSpark
"HbaseSpark").setMaster("local[*]").set("spark.hbase.host", "localhost")
    

    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val df = sqlContext
      .read
      .format("com.databricks.spark.csv")
      .option("delimiter", "\001")
      .load("/Users/11130/small")

    val df1 = df.withColumn("row_key", concat(col("_c3"), lit("_"), col("_c5"), lit("_"), col("_c0")))
    df1.registerTempTable("mytable1")
    val newDf = sqlContext.sql("Select row_key, _c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7," +
      "_c8, _c9, _c10, _c11, _c12, _c13, _c14, _c15, _c16, _c17, _c18, _c19 from mytable")

    val rdd = newDf.rdd

    rdd.toHBaseTable("mytable")
      .toColumns("event_id", "device_id", "uidx", "session_id", "server_ts", "client_ts", "event_type", "data_set_name",
        "screen_name", "card_type", "widget_item_whom", "widget_whom", "widget_v_position", "widget_item0_h_position",
        "publisher_tag", "utm_medium", "utm_source", "utm_campaign", "referrer_url", "notification_class")
      .inColumnFamily("mycf")
      .save()

    sc.stop()
  }
}

Any idea what is going wrong here?
@nicolaferraro
Copy link
Contributor

You get a RDD[Row] with newDf.rdd and it's currently not supported. It would be a good idea to support it directly.

Can you try to workaround it by using something like:

val rdd = newDf.map(row => (row(1), row(2), ...))

In order to map into a RDD of tuples, that is currently supported.

@abstract-karshit
Copy link
Author


import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.{col, concat, lit}
import it.nerdammer.spark.hbase._


object SparkHBase {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName("HbaseSpark")
      .setMaster("local[*]")
      .set("spark.hbase.host", "localhost")

    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val df = sqlContext
      .read
      .format("com.databricks.spark.csv")
      .option("delimiter", "\001")
      .load("/Users/11130/small")

    val df1 = df.withColumn("row_key", concat(col("C3"), lit("_"), col("C5"), lit("_"), col("C0")))
    df1.registerTempTable("mytable")

    val newDf = sqlContext.sql("Select row_key, C0, C1, C2, C3, C4, C5, C6, C7," +
      "C8, C9, C10, C11, C12, C13, C14, C15, C16, C17, C18, C19 from mytable")

    val rdd = newDf.rdd

    val finalRdd = rdd.map(row => (row(0).toString, row(1).toString, row(2).toString, row(3).toString, row(4).toString, row(5).toString, row(6).toString,
      row(7).toString, row(8).toString, row(9).toString, row(10).toString, row(11).toString, row(12).toString, row(13).toString,
      row(14).toString, row(15).toString, row(16).toString, row(17).toString, row(18).toString, row(19).toString, row(20).toString))


    finalRdd.toHBaseTable("mytable")
      .toColumns("event_id", "device_id", "uidx", "session_id", "server_ts", "client_ts", "event_type", "data_set_name",
        "screen_name", "card_type", "widget_item_whom", "widget_whom", "widget_v_position", "widget_item0_h_position",
        "publisher_tag", "utm_medium", "utm_source", "utmCampaign", "referrer_url", "notificationClass")
      .inColumnFamily("mycf")
      .save()

    sc.stop()
  }
}

Thanks, this works. Just needed to cast any to string.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants