[donghua@hdp ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper hdp:2181 --replication-factor 1 --partitions 1 --topic kafka_hive_topic
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "kafka_hive_topic".
2. Create hive table
[donghua@hdp ~]$ beeline -u "jdbc:hive2://hdp.dbaglobe.com:2181/demodb;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n donghua -p x
Connecting to jdbc:hive2://hdp.dbaglobe.com:2181/demodb;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
19/03/09 16:44:13 [main]: INFO jdbc.HiveConnection: Connected to hdp:10000
Connected to: Apache Hive (version 3.1.0.3.1.0.0-78)
Driver: Hive JDBC (version 3.1.0.3.1.0.0-78)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 3.1.0.3.1.0.0-78 by Apache Hive
0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb>
0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> CREATE EXTERNAL TABLE kafka_hive_table
. . . . . . . . . . . . . . . . . . . . . . > (`Country Name` string , `Language` string, `_id` struct<`$oid`:string>)
. . . . . . . . . . . . . . . . . . . . . . > STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
. . . . . . . . . . . . . . . . . . . . . . > TBLPROPERTIES
. . . . . . . . . . . . . . . . . . . . . . > ("kafka.topic" = "kafka_hive_topic", "kafka.bootstrap.servers"="hdp:6667");
No rows affected (4.747 seconds)
0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> desc kafka_hive_table;
+---------------+----------------------+--------------------+
| col_name | data_type | comment |
+---------------+----------------------+--------------------+
| country name | string | from deserializer |
| language | string | from deserializer |
| _id | struct<$oid:string> | from deserializer |
| __key | binary | from deserializer |
| __partition | int | from deserializer |
| __offset | bigint | from deserializer |
| __timestamp | bigint | from deserializer |
+---------------+----------------------+--------------------+
7 rows selected (0.359 seconds)
0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> !outputformat tsv2
0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> !brief
verbose: off
createtab_stmt
CREATE EXTERNAL TABLE `kafka_hive_table`(
`country name` string COMMENT 'from deserializer',
`language` string COMMENT 'from deserializer',
`_id` struct<$oid:string> COMMENT 'from deserializer',
`__key` binary COMMENT 'from deserializer',
`__partition` int COMMENT 'from deserializer',
`__offset` bigint COMMENT 'from deserializer',
`__timestamp` bigint COMMENT 'from deserializer')
ROW FORMAT SERDE
'org.apache.hadoop.hive.kafka.KafkaSerDe'
STORED BY
'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
WITH SERDEPROPERTIES (
'serialization.format'='1')
LOCATION
'hdfs://hdp.dbaglobe.com:8020/warehouse/tablespace/external/hive/demodb.db/kafka_hive_table'
TBLPROPERTIES (
'bucketing_version'='2',
'hive.kafka.max.retries'='6',
'hive.kafka.metadata.poll.timeout.ms'='30000',
'hive.kafka.optimistic.commit'='false',
'hive.kafka.poll.timeout.ms'='5000',
'kafka.bootstrap.servers'='hdp:6667',
'kafka.serde.class'='org.apache.hadoop.hive.serde2.JsonSerDe',
'kafka.topic'='kafka_hive_topic',
'kafka.write.semantic'='AT_LEAST_ONCE',
'transient_lastDdlTime'='1552121132')
27 rows selected (0.109 seconds)
0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb>
3. Ingest some data into Kafka topic
[donghua@hdp ~]$ /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list hdp:6667 --topic kafka_hive_topic
>{"Country Name":"Afrika","Language":"af","_id":{"$oid":"55a0f1d420a4d760b5fbdbd6"},"ISO":0}
>{"Country Name":"Oseanië","Language":"af","_id":{"$oid":"55a0f1d420a4d760b5fbdbd7"},"ISO":0}
>^C
[donghua@hdp ~]$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --topic kafka_hive_topic --bootstrap-server hdp:6667 --from-beginning
{"Country Name":"Afrika","Language":"af","_id":{"$oid":"55a0f1d420a4d760b5fbdbd6"},"ISO":0}
{"Country Name":"Oseanië","Language":"af","_id":{"$oid":"55a0f1d420a4d760b5fbdbd7"},"ISO":0}
^C
Processed a total of 2 messages
4. Query hive table
0: jdbc:hive2://hdp.dbaglobe.com:2181/demodb> select t.`Country Name` as Name, t.`Language` as lang, t.`__offset` from kafka_hive_table t;
INFO : Compiling command(queryId=hive_20190309175638_f3a01692-28be-4683-bc66-32854615782c): select t.`Country Name` as Name, t.`Language` as lang, t.`__offset` from kafka_hive_table t
INFO : Semantic Analysis Completed (retrial = false)
INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:name, type:string, comment:null), FieldSchema(name:lang, type:string, comment:null), FieldSchema(name:t.__offset, type:bigint, comment:null)], properties:null)
INFO : Completed compiling command(queryId=hive_20190309175638_f3a01692-28be-4683-bc66-32854615782c); Time taken: 0.289 seconds
INFO : Executing command(queryId=hive_20190309175638_f3a01692-28be-4683-bc66-32854615782c): select t.`Country Name` as Name, t.`Language` as lang, t.`__offset` from kafka_hive_table t
INFO : Completed executing command(queryId=hive_20190309175638_f3a01692-28be-4683-bc66-32854615782c); Time taken: 0.006 seconds
INFO : OK
+----------+-------+-------------+
| name | lang | t.__offset |
+----------+-------+-------------+
| Afrika | af | 12 |
| Oseanië | af | 13 |
+----------+-------+-------------+
2 rows selected (0.379 seconds)
Additional Finding
if empty messages inside Kafka topic, the hive result could be wrong, as below:
offset 3 and 4 are empty string
Row 3 and 4 are incorrect, which repeat data from row 2.
Is there any performance matrix available for this?
ReplyDelete