elasticsearch - query - spark读es



将Spark Dataframe保存到Elasticsearch-无法处理类型异常 (1)

我设计了一个简单的工作,可以从MySQL读取数据并将其保存在带有Spark的Elasticsearch中。

这是代码:

JavaSparkContext sc = new JavaSparkContext(
        new SparkConf().setAppName("MySQLtoEs")
                .set("es.index.auto.create", "true")
                .set("es.nodes", "127.0.0.1:9200")
                .set("es.mapping.id", "id")
                .set("spark.serializer", KryoSerializer.class.getName()));

SQLContext sqlContext = new SQLContext(sc);

// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");

// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
        "merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");

您可以看到代码非常简单。 它将数据读取到DataFrame中,选择一些列,然后执行 count 作为对Dataframe的基本操作。 到目前为止,一切正常。

然后,它尝试将数据保存到Elasticsearch中,但由于无法处理某种类型而失败。 您可以在 here 查看错误日志。

我不确定为什么它不能处理这种类型。 有人知道为什么会这样吗?

我正在使用Apache Spark 1.5.0,Elasticsearch 1.4.4和elaticsearch-hadoop 2.1.1

编辑:

  • 我已经用示例数据集以及源代码更新了gist链接。
  • 我还尝试过使用@costin在邮件列表中提到的elasticsearch-hadoop 开发版本

这个问题的答案很棘手,但是由于 samklr ,我设法弄清了问题所在。

但是,该解决方案并不简单,可能会考虑一些“不必要的”转换。

首先让我们谈谈 序列化

在数据的Spark序列化和功能序列化中要考虑两个方面的序列化。 在这种情况下,它与数据序列化以及反序列化有关。

从Spark的角度来看,唯一需要做的就是设置序列化-默认情况下,Spark依赖Java序列化,这很方便,但是效率很低。 这就是Hadoop本身引入自己的序列化机制和类型(即 Writables 。 因此,需要 InputFormatOutputFormats 返回 Writables 内容,而Spark不能直接使用。

使用elasticsearch-spark连接器时,必须启用一个不同的序列化(Kryo),该序列化可以自动处理转换并且还可以非常高效地完成转换。

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

即使由于Kryo不需要类实现要序列化的特定接口,这意味着POJO可以在RDD中使用,而无需进行任何启用Kryo序列化的工作。

就是说,@ samklr向我指出,Kryo需要在使用它们之前注册类。

这是因为Kryo编写了对要序列化的对象的类的引用(对于每个写入的对象,将写入一个引用),如果该类已注册,则它只是一个整数标识符,否则是完整的类名。 Spark代表您注册Scala类和许多其他框架类(例如Avro Generic或Thrift类)。

用Kryo注册课程很简单。 创建KryoRegistrator的子类,并覆盖 registerClasses() 方法:

public class MyKryoRegistrator implements KryoRegistrator, Serializable {
    @Override
    public void registerClasses(Kryo kryo) {
        // Product POJO associated to a product Row from the DataFrame            
        kryo.register(Product.class); 
    }
}

最后,在驱动程序中,将spark.kryo.registrator属性设置为KryoRegistrator实现的完全限定的类名:

conf.set("spark.kryo.registrator", "MyKryoRegistrator")

其次,甚至考虑到设置了Kryo序列化器并注册了类,并对Spark 1.5进行了更改,并且由于某种原因,Elasticsearch无法 反序列化 Dataframe,因为它无法将Dataframe的 SchemaType 推断到连接器中。

所以我不得不将数据框转换为JavaRDD

JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
    public Product call(Row row) throws Exception {
        long id = row.getLong(0);
        String title = row.getString(1);
        String description = row.getString(2);
        int merchantId = row.getInt(3);
        double price = row.getDecimal(4).doubleValue();
        String keywords = row.getString(5);
        long brandId = row.getLong(6);
        int categoryId = row.getInt(7);
        return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
    }
});

现在可以准备将​​数据写入elasticsearch了:

JavaEsSpark.saveToEs(products, "test/test");

参考文献:

  • Elasticsearch的Apache Spark支持 documentation
  • Hadoop最终指南,第19章。Spark,编辑。 4 –汤姆·怀特。
  • 用户 samklr