apache spark - sqlContext خطأ HiveDriver على SQLException: الأسلوب غير معتمد




apache-spark jdbc (2)

في الواقع نظرت إلى هذا. تستنفد Hotornworks و cloudera الدعم للاتصال بالخلية من Spark عبر Thrift Server.

لذا فأنت تعمل على شيء مستحيل.

https://www.cloudera.com/documentation/spark2/latest/topics/spark2_known_issues.html#ki_thrift_server .

تقول الروابط أن التوفير غير معطل ، ولكنه مخصص بشكل خاص للشفرة من الشرارة. يمكنني الاتصال بكافة أنواع قواعد البيانات من الشرر باستثناء الخلية.

لذلك عليك العمل على نمط مختلف من التفويض.

كما يتم توصيل كائن شرارة مباشرة إلى خلية أنها تقوم بإزالة دعم التوفير.

من سؤالك السابق ، فإنه قادر على قراءة البيانات ولكن قراءة البيانات الخاطئة. Spark 2.2 خطأ خادم التوفير في dataframe NumberFormatException عند الاستعلام جدول Hive

الشفرة

>>> df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test4",user="hive", password="hive").option("fetchsize", "10").load()
>>> df.select("*").show()
+---+----+
| id|desc|
+---+----+
| id|desc|
| id|desc|
+---+----+

المشكلة هنا في الخلية

تستخدم الطريقة الافتراضية في اللهجة الافتراضية لاقتباس المعرفات علامات الاقتباس المزدوجة. سيتم تحليل استعلام SQL مثل SELECT “dw_date” من جدول FROM بواسطة Hive لتحديد سلسلة حرفية ، بدلاً من عمود يسمى “dw_date”. من خلال استبدال علامات الاقتباس مع backticks ، يبدو أن يتم حل المشكلة. ومع ذلك ، في الاختبار الخاص بي ، يتم إدخال أسماء الأعمدة من Hive جميعها باسم الجدول مثل table.dw_date. لكن لا يمكنك أن تلتف بشكل مباشر حول البكرات حوله مثل table.dw_date . بدلا من ذلك ، نحن بحاجة إلى التفاف كل جزء على حدة

الشفرة

private case object HiveDialect extends JdbcDialect {
  override def canHandle(url : String): Boolean = url.startsWith("jdbc:hive2")
  override def quoteIdentifier(colName: String): String = {
    colName.split(‘.’).map(part => s”`$part`”).mkString(“.”)
  }
}

يرجى اتباع المشاركة أدناه لتنفيذ الحل.

https://medium.com/@viirya/custom-jdbc-dialect-for-hive-5dbb694cc2bd

https://medium.com/@huaxing/customize-spark-jdbc-data-source-to-work-with-your-dedicated-database-dialect-beec6519af27

لقد حاولت استخدام sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver") للحصول على جدول Hive في Spark دون أي نجاح. لقد فعلت البحث وقراءة أدناه:

كيفية الاتصال بخادم خلية عن بعد من الشرارة

شرارة 1.5.1 لا تعمل مع خلية jdbc 1.2.0

http://belablotski.blogspot.in/2016/01/access-hive-tables-from-spark-using.html

لقد استخدمت أحدث إصدار من Hortonworks Sandbox 2.6 وطلبت من المجتمع طرح السؤال نفسه:

https://community.hortonworks.com/questions/156828/pyspark-jdbc-py4jjavaerror-calling-o95load-javasql.html?childToView=156936#answer-156936

ما أريد القيام به هو بسيط جدا عبر pyspark :

df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="sample_07",user="maria_dev", password="maria_dev").load()

هذا أعطاني هذا الخطأ:

17/12/30 19:55:14 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://localhost:10016/default
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark-client/python/pyspark/sql/readwriter.py", line 139, in load
    return self._df(self._jreader.load())
  File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/usr/hdp/current/spark-client/python/pyspark/sql/utils.py", line 45, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o119.load.
: java.sql.SQLException: Method not supported
at org.apache.hive.jdbc.HiveResultSetMetaData.isSigned(HiveResultSetMetaData.java:143)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:136)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:91)
at org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:57)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:748)

باستخدام الخط المباشر ، فإنه يعمل بشكل جيد

beeline> !connect jdbc:hive2://localhost:10016/default maria_dev maria_dev
Connecting to jdbc:hive2://localhost:10016/default
Connected to: Spark SQL (version 2.1.1.2.6.1.0-129)
Driver: Hive JDBC (version 1.2.1000.2.6.1.0-129)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10016/default> select * from sample_07 limit 2;
+----------+-------------------------+------------+---------+--+
|   code   |       description       | total_emp  | salary  |
+----------+-------------------------+------------+---------+--+
| 00-0000  | All Occupations         | 134354250  | 40690   |
| 11-0000  | Management occupations  | 6003930    | 96150   |
+----------+-------------------------+------------+---------+--+

يمكنني أيضًا القيام بذلك:

spark = SparkSession.Builder().appName("testapp").enableHiveSupport().‌​getOrCreate()
spark.sql("select * from default.sample_07").collect()

ولكن هذا يقرأ في Hive Metadata مباشرة. أرغب في استخدام JDBC في Spark Thrift Server للحصول على أمان دقيق.

يمكنني القيام PostgreSQL مثل ذلك:

sqlContext.read.format("jdbc").options(driver="org.postgresql.Driver")

يمكنني أيضًا استخدام سكالا java.sql.{DriverManager, Connection, Statement, ResultSet} لإنشاء اتصال JDBC كجانب من العملاء للوصول إلى Spark. ولكن هذا يضع جميع البيانات في الذاكرة بشكل أساسي ثم إعادة إنشاء Dataframe يدويًا.

لذا فإن السؤال هو: هل هناك طريقة لإنشاء Dataframe Spark مع بيانات جدول Hive دون تحميل البيانات في الذاكرة إلى عميل JDBC مثل Scala وعدم استخدام SparkSession.Builder() مثل الأمثلة أعلاه؟ حالة الاستخدام الخاصة بي هي أنني بحاجة للتعامل مع أمان دقيق.


لست متأكدا إذا كنت أفهم سؤالك بشكل صحيح أم لا ، ولكن من خلال ما أفهمه سوف تحتاج إلى الحصول على جدول خلية في إطار البيانات ، لذلك لا تحتاج إلى الحصول على اتصال JDBC ، في الروابط الخاصة بك على سبيل المثال ، محاولة الاتصال بقواعد بيانات مختلفة (RDBMS) ، وليس Hive.

يرجى الاطلاع على النهج التالي ، باستخدام سياق الخلية ، يمكنك الحصول على الجدول في إطار البيانات.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SQLContext}

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("APPName")
    val sc = new SparkContext(sparkConf)
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    val sqlContext = new SQLContext(sc)

val hive_df = hiveContext.sql("select * from schema.table").first()

//other way
// val hive_df= hiveContext.table ("SchemaName.TableName")

//Below will print the first line
df.first()
//count on dataframe
df.count()

}

إذا كنت تريد حقًا استخدام اتصال JDBC ، لدي المثال التالي الذي استخدمته في قاعدة بيانات Oracle ، والذي قد يساعدك.

val oracle_data = sqlContext.load("jdbc", Map("url" -> "jdbc:oracle:thin:username/password//hostname:2134/databaseName", "dbtable" -> "Your query tmp", "driver" -> "oracle.jdbc.driver.OracleDriver"));