본문으로 바로가기
본문으로 바로가기

AvroConfluent

입력출력별칭

설명

Apache Avro는 이진 인코딩을 사용하여 효율적인 데이터 처리를 지원하는 행 지향 직렬화 포맷입니다. AvroConfluent 포맷은 Confluent 스키마 레지스트리 (또는 API 호환 서비스)를 사용하여 Avro로 인코딩된 메시지를 읽고 쓸 수 있도록 지원합니다.

각 메시지는 Confluent wire 형식을 사용합니다. 즉, 매직 바이트(0x00) 뒤에 4바이트 big-endian 스키마 ID가 오고, 그 뒤에 Avro 이진 datum이 이어집니다. 읽을 때 ClickHouse는 레지스트리를 쿼리하여 스키마 ID를 조회합니다. 쓸 때 ClickHouse는 출력 컬럼에서 파생된 스키마를 등록하고, 생성된 ID를 각 행 앞에 덧붙입니다. 스키마는 최적의 성능을 위해 캐시됩니다.

데이터 타입 매핑

아래 표는 Apache Avro 형식에서 지원되는 모든 데이터 타입과 INSERTSELECT 쿼리에서 이에 대응하는 ClickHouse 데이터 타입을 보여줍니다.

Avro 데이터 타입 INSERTClickHouse 데이터 타입Avro 데이터 타입 SELECT
boolean, int, long, float, doubleInt(8\16\32), UInt(8\16\32)int
boolean, int, long, float, doubleInt64, UInt64long
boolean, int, long, float, doubleFloat32float
boolean, int, long, float, doubleFloat64double
bytes, string, fixed, enumStringbytes 또는 string *
bytes, string, fixedFixedString(N)fixed(N)
enumEnum(8\16)enum
array(T)Array(T)array(T)
map(V, K)Map(V, K)map(string, K)
union(null, T), union(T, null)Nullable(T)union(null, T)
union(T1, T2, …) **Variant(T1, T2, …)union(T1, T2, …) **
nullNullable(Nothing)null
int (date) ***Date, Date32int (date) ***
long (timestamp-millis) ***DateTime64(3)long (timestamp-millis) ***
long (timestamp-micros) ***DateTime64(6)long (timestamp-micros) ***
bytes (decimal) ***DateTime64(N)bytes (decimal) ***
intIPv4int
fixed(16)IPv6fixed(16)
bytes (decimal) ***Decimal(P, S)bytes (decimal) ***
string (uuid) ***UUIDstring (uuid) ***
fixed(16)Int128/UInt128fixed(16)
fixed(32)Int256/UInt256fixed(32)
recordTuplerecord

* 기본값은 bytes이며, output_format_avro_string_column_pattern 설정으로 제어됩니다.

** Variant type은 필드 값으로 null을 암묵적으로 허용하므로, 예를 들어 Avro union(T1, T2, null)Variant(T1, T2)로 변환됩니다. 따라서 ClickHouse에서 Avro를 생성할 때에는 스키마 추론 과정에서 어떤 값이 실제로 null인지 알 수 없으므로 Avro union 타입 집합에 항상 null 타입을 포함해야 합니다.

*** Avro logical types

지원되지 않는 Avro logical 데이터 타입:

  • time-millis
  • time-micros
  • duration

포맷 설정

Setting설명기본값
input_format_avro_allow_missing_fields스키마에서 필드를 찾을 수 없는 경우 오류를 발생시키는 대신 기본값을 사용할지 여부입니다.0
input_format_avro_null_as_default널을 허용하지 않는 컬럼에 null 값을 삽입할 때 오류를 발생시키는 대신 기본값을 사용할지 여부입니다.0
format_avro_schema_registry_urlConfluent 스키마 레지스트리 URL입니다. 기본 인증을 사용하는 경우, URL 인코딩된 자격 증명을 URL 경로에 직접 포함할 수 있습니다.
format_avro_schema_registry_connection_timeout스키마 레지스트리 HTTP 클라이언트의 연결 타임아웃(초)입니다(스키마 가져오기와 등록 모두에 사용됨). 0보다 커야 하며 600(10분)보다 작아야 합니다.1
format_avro_schema_registry_send_timeout스키마 레지스트리 HTTP 클라이언트의 전송 타임아웃(초)입니다. 0보다 커야 하며 600(10분)보다 작아야 합니다.1
format_avro_schema_registry_receive_timeout스키마 레지스트리 HTTP 클라이언트의 수신 타임아웃(초)입니다. 0보다 커야 하며 600(10분)보다 작아야 합니다.1
output_format_avro_confluent_subject출력용: 스키마 레지스트리에서 스키마가 등록되는 subject 이름입니다. 쓰기 시 필요합니다.
output_format_avro_string_column_pattern출력용: Avro string으로 직렬화할 String 컬럼의 정규식 패턴입니다(기본값은 bytes).

예제

Kafka에서 읽기

Avro로 인코딩된 Kafka 토픽을 Kafka 테이블 엔진으로 읽으려면, 스키마 레지스트리의 URL을 지정하기 위해 format_avro_schema_registry_url 설정을 사용하십시오.

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;

Kafka에 쓰기

AvroConfluent 메시지를 Kafka 토픽에 쓰려면 스키마 레지스트리 URL과 subject 이름을 모두 설정하십시오. 스키마는 처음 기록할 때 레지스트리에 자동으로 등록됩니다.

CREATE TABLE topic1_sink
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';

INSERT INTO topic1_sink VALUES ('hello', 'world');

기본 인증 사용

스키마 레지스트리에 기본 인증이 필요한 경우(예: Confluent Cloud를 사용하는 경우), format_avro_schema_registry_url 설정에 URL 인코딩된 자격 증명을 제공할 수 있습니다.

CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';

문제 해결

Kafka consumer의 오류를 디버깅하고 수집 진행 상황을 모니터링하려면 system.kafka_consumers system table에서 쿼리를 실행합니다. 배포에 레플리카가 여러 개 있는 경우(예: ClickHouse Cloud) clusterAllReplicas 테이블 함수를 사용해야 합니다.

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

스키마 해석 관련 문제가 발생하면 kafkacatclickhouse-local을 사용하여 문제를 진단할 수 있습니다.

$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c