• -------------------------------------------------------------
  • ====================================

Kafka 之六 connector

kafka dewbay 5年前 (2019-04-12) 2998次浏览 已收录 0个评论 扫描二维码


           kafka Connect 是一个kafka和其他系统交互稳定的流处理。它使kafka和其他系统的数据交互变的十分容易。它可以把其他系统的数据导入到kafka中,也可以把kafka的数据导到其他系统中。kafka的 sink 功能可以把 kafka 的 topic 数据分发给其他储存系统或查询系统以便离线分析,它有下面的特征:

           1:对于Kafka connectors它有一套通用的框架

           2:可以分布式部署,也可以standalone 模式部署(生产环境建议集群部署,可以容错)

           3:REST 接口

           4:offset 自动管理 kafka 自动管理和提交 offset,这样开发者可以专注自己的业务开发


二:开发connector只要实现两个接口就可以,第一个接口是Connector,第二个接口是Task,若是开发Source,只要实现SourceConnector/SourceTask两个接口。比如把文件的数据读取到 kafka 中,/SourceTask会读取文件的每一行并把他们封装为 record 发送出去。

    第一步  实现 SourceConnector 类     public class SourceConnectorPartition extends SourceConnector{                              private String zkServers;           private String destination;              //start 方法会在 task 执行前调用,此方法多用于参数初始化              public void start(Map<String, String> props) {                    zkServers = PropertyVerify.getOrElse(props, ZK_SERVERS, null, "'zkServers' is null");                    destination = PropertyVerify.getOrElse(props, DESTINATION, null, "'destination' is null");               }              //task 停止会回调此方法              public void stop() {                   // nothing to do              }              //此方法必须重新,指定重写的 Task 类              public Class<? extends Task> taskClass() {                          return TestSourceTask.class;               }              //此方法就通常用于参数传递,把参数传递给后续的 Task,留 task 调用              public List<Map<String, String>> taskConfigs(int maxTasks) {                      List<Map<String, String>> taskConfigs = new ArrayList<Map<String, String>>();                      return taskConfigs;              }              public String version() {                  return VERSION;              }               public ConfigDef config() {              // TODO Auto-generated method stub               return null;              }     }

第二步:实现 sourcetask,自定义自己的 sourcetask public abstract class TestSourceTask implements Task { protected SourceTaskContext context; //初始化方法 public void initialize(SourceTaskContext context) { this.context = context; } //启动的时候调用此方法 @Override public abstract void start(Map<String, String> props); //循环调用后把数据封装为 SourceRecord public abstract List<SourceRecord> poll() throws InterruptedException; public void commit() throws InterruptedException public abstract void stop(); public void commitRecord(SourceRecord record) }

以上两步就开发完成了,下面源码走读一下一个 task 启动过程

     1.1 驱动类 驱动主程序 WorkerSourceTask 类

     class WorkerSourceTask extends WorkerTask

     1.2 WorkerSourceTask 第一步初始化,运行下面的方法,获取 taskConfig 中参数,此参数在自己定义的 task 中设置

   public void initialize(TaskConfig taskConfig) {        try {            this.taskConfig = taskConfig.originalsStrings();        } catch (Throwable t) {            log.error("Task {} failed initialization and will not be started.", t);            onFailure(t);        }    }

     1.3 WorkerSourceTask 执行 execute 方法

    @Override    public void execute() {        try {            //初始化 task 类中的 initialize 方法            task.initialize(new WorkerSourceTaskContext(offsetReader));             //task 中的 start 方法会执行            task.start(taskConfig);            log.info("Source task {} finished initialization and start", this);            synchronized (this) {                if (startedShutdownBeforeStartCompleted) {                    task.stop();                    return;                }                finishedStart = true;            }            while (!isStopping()) {                if (shouldPause()) {                    awaitUnpause();                    continue;                }                 if (toSend == null) {                    log.debug("Nothing to send to Kafka. Polling source for additional records");                   //调用 task 中的 poll 方法,并把 List<SourceRecord>返回赋值给 toSend                   toSend = task.poll();                }                if (toSend == null)                    continue;                log.debug("About to send " + toSend.size() + " records to Kafka");                if (!sendRecords())                    stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS);            }        } catch (InterruptedException e) {            // Ignore and allow to exit.        } finally {            // It should still be safe to commit offsets since any exception would have            // simply resulted in not getting more records but all the existing records should be ok to flush            // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit            // to fail.            commitOffsets();        }    }
 private boolean sendRecords() {        int processed = 0;       //把 record 循环发送到 kafa        for (final SourceRecord record : toSend) {            byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key());            byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());            final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), key, value);            log.trace("Appending record with key {}, value {}", record.key(), record.value());            // We need this queued first since the callback could happen immediately (even synchronously in some cases).            // Because of this we need to be careful about handling retries -- we always save the previously attempted            // record as part of toSend and need to use a flag to track whether we should actually add it to the outstanding            // messages and update the offsets.            synchronized (this) {                if (!lastSendFailed) {                    if (!flushing) {                        outstandingMessages.put(producerRecord, producerRecord);                    } else {                        outstandingMessagesBacklog.put(producerRecord, producerRecord);                    }                    // Offsets are converted & serialized in the OffsetWriter                    offsetWriter.offset(record.sourcePartition(), record.sourceOffset());                }            }            try {                producer.send(                        producerRecord,                        new Callback() {                            @Override                            public void onCompletion(RecordMetadata recordMetadata, Exception e) {                                if (e != null) {                                    // Given the default settings for zero data loss, this should basically never happen --                                    // between "infinite" retries, indefinite blocking on full buffers, and "infinite" request                                    // timeouts, callbacks with exceptions should never be invoked in practice. If the                                    // user overrode these settings, the best we can do is notify them of the failure via                                    // logging.                                    log.error("{} failed to send record to {}: {}", id, record.topic(), e);                                    log.debug("Failed record: topic {}, Kafka partition {}, key {}, value {}, source offset {}, source partition {}",                                            record.topic(), record.kafkaPartition(), record.key(), record.value(),                                            record.sourceOffset(), record.sourcePartition());                                } else {                                    log.trace("Wrote record successfully: topic {} partition {} offset {}",                                            recordMetadata.topic(), recordMetadata.partition(),                                            recordMetadata.offset());                                    commitTaskRecord(record);                                }                                recordSent(producerRecord);                            }                        });                lastSendFailed = false;            }}

露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:Kafka 之六 connector
喜欢 (1)
分享 (0)

表情 贴图 加粗 删除线 居中 斜体 签到


  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址