博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm-源码分析-Topology Submit-Executor
阅读量:6432 次
发布时间:2019-06-23

本文共 8759 字,大约阅读时间需要 29 分钟。

在worker中通过executor/mk-executor worker e, 创建每个executor

(defn mk-executor [worker executor-id]  (let [executor-data (mk-executor-data worker executor-id) ;;1.mk-executor-data         _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))        task-datas (->> executor-data                        :task-ids                        (map (fn [t] [t (task/mk-task executor-data t)])) ;;2.mk-task                         (into {})                        (HashMap.))        _ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id))        report-error-and-die (:report-error-and-die executor-data)        component-id (:component-id executor-data)        ;;3.创建threads
;; starting the batch-transfer->worker ensures that anything publishing to that queue         ;; doesn't block (because it's a single threaded queue and the caching/consumer started        ;; trick isn't thread-safe)        system-threads [(start-batch-transfer->worker-handler! worker executor-data)]        handlers (with-error-reaction report-error-and-die                   (mk-threads executor-data task-datas))        threads (concat handlers system-threads)]
;;使用schedule-recurring定期产生SYSTEM_TICK(触发spout pending rotate)
(setup-ticks! worker executor-data)

1. mk-executor-data

(defn mk-executor-data [worker executor-id]  (let [worker-context (worker-context worker)        task-ids (executor-id->tasks executor-id) ;;包含的tasks        component-id (.getComponentId worker-context (first task-ids)) ;;所属于的component        storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)        executor-type (executor-type worker-context component-id) ;;executor类型, blot或者spout        batch-transfer->worker (disruptor/disruptor-queue   ;;executor的发送缓存queue                                  (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)                                  :claim-strategy :single-threaded                                  :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))        ]    (recursive-map     :worker worker     :worker-context worker-context     :executor-id executor-id     :task-ids task-ids     :component-id component-id     :open-or-prepare-was-called? (atom false)     :storm-conf storm-conf     :receive-queue ((:executor-receive-queue-map worker) executor-id) ;;取出executor所对应的disruptor queue     :storm-id (:storm-id worker)     :conf (:conf worker)     :shared-executor-data (HashMap.)     :storm-active-atom (:storm-active-atom worker)     :batch-transfer-queue batch-transfer->worker     :transfer-fn (mk-executor-transfer-fn batch-transfer->worker) ;;(1.1)      :suicide-fn (:suicide-fn worker)     :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker))     :type executor-type     ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)     :stats (mk-executor-stats <> (sampling-rate storm-conf)) ;;(1.2)     :interval->task->metric-registry (HashMap.)     :task->component (:task->component worker)     :stream->component->grouper (outbound-components worker-context component-id)     :report-error (throttled-report-error-fn <>)     :report-error-and-die (fn [error] ;;将error写到zk的error目录下,其他daemon进程可以知道                             ((:report-error <>) error)                             ((:suicide-fn <>)))     :deserializer (KryoTupleDeserializer. storm-conf worker-context)     :sampler (mk-stats-sampler storm-conf) ;;1.3 mk-stats-sampler      ;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function?     )))
 

1.1 mk-executor-transfer-fn

executor会把需要发送的tuple缓存到batch-transfer->worker queue中

参考下面的comments, 为了避免component block (大量的tuple没有被及时处理), 额外创建了overflow buffer, 只有当这个buffer也满了, 才停止nextTuple(对于spout executor比较需要overflow buffer)

;; the overflow buffer is used to ensure that spouts never block when emitting        ;; this ensures that the spout can always clear the incoming buffer (acks and fails), which        ;; prevents deadlock from occuring across the topology (e.g. Spout -> Bolt -> Acker -> Spout, and all        ;; buffers filled up)        ;; when the overflow buffer is full, spouts stop calling nextTuple until it's able to clear the overflow buffer        ;; this limits the size of the overflow buffer to however many tuples a spout emits in one call of nextTuple,         ;; preventing memory issues        overflow-buffer (LinkedList.)]

返回fn, fn用于将[task, tuple]放到overflow-buffer或者batch-transfer->worker queue中

注意, 这是executor->transfer-fn, 不同于worker->transfer-fn, 名字起的不好, 会混淆

executor的transfer-fn将tuple缓存到executor的batch-transfer->worker, 而worker->transfer-fn将tuple发送到worker的transfer queue

;; in its own function so that it can be mocked out by tracked topologies(defn mk-executor-transfer-fn [batch-transfer->worker]  (fn this    ([task tuple block? ^List overflow-buffer]      (if (and overflow-buffer (not (.isEmpty overflow-buffer))) ;;overflow存在并且不为空,说明queue已经满了,所以直接放overflow-buffer中        (.add overflow-buffer [task tuple])        (try-cause          (disruptor/publish batch-transfer->worker [task tuple] block?)        (catch InsufficientCapacityException e          (if overflow-buffer            (.add overflow-buffer [task tuple])            (throw e))          ))))    ([task tuple overflow-buffer]      (this task tuple (nil? overflow-buffer) overflow-buffer))    ([task tuple]      (this task tuple nil)      )))

1.2 mk-executor-stats <> (sampling-rate storm-conf)

 

1.3 mk-stats-sampler

根据conf里面的sampling-rate创建一个sampler

(defn mk-stats-sampler [conf]  (even-sampler (sampling-rate conf)))

这里创建的是even-sampler,

(defn even-sampler [freq]  (let [freq (int freq)        start (int 0)        r (java.util.Random.)        curr (MutableInt. -1)        target (MutableInt. (.nextInt r freq))] ;;[0,freq]中的随机值    (with-meta      (fn []        (let [i (.increment curr)]          (when (>= i freq)            (.set curr start)            (.set target (.nextInt r freq))))          (= (.get curr) (.get target))) ;;FP没有直接赋值, 所以==简化为=      {:rate freq})))
(defn sampler-rate [sampler]  (:rate (meta sampler)))

even-sampler, 返回的是个fn ,并且通过with-meta添加metadata({:rate freq})

所以, 通过(:rate (meta sampler)), 可以从sampler的meta里面取出rate值

sampler就是fn, 每次调用都会返回(= curr target)

curr从start开始递增, 在达到target之前, 调用fn都是返回false
当curr等于target时, 调用fn返回true
当curr大于target时, 从新随机生成target, 将curr清零

所以sampler实际产生的效果, 就是不停的调用sampler, 会随机出现若干次false和一次true (在freq的范围内)

从而达到sampler的效果, 只有是true的时候才取样

其实对于简单的sampler, 比如rate是20%, 可以简单的每跳过4个取一个, 但是这样可能的问题是, 取样的规律性太强, 如果数据恰好符合你的规律, 比如5倍数的数据相同, 就会有问题

所以这里为了增加随机性, 采用这样的实现
并且这里对闭包和metadata的应用, 值得借鉴

 

2.mk-task, 创建task

(task/mk-task executor-data t)

 

3.创建threads

3.1 batch-transfer-queue handle thread, spout发送线程

从batch-transfer-queue取出messages, 没有到达batchend时, 放到cached-emit中的arraylist中

当达到batchend时, 使用transfer-fn将messages发送到transfer-queue (spout应该没有发送给自己的tuple吧)

(defn start-batch-transfer->worker-handler! [worker executor-data]  (let [worker-transfer-fn (:transfer-fn worker)        cached-emit (MutableObject. (ArrayList.)) ;;用于cache所有messages,直到batchend        storm-conf (:storm-conf executor-data)        serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data))        ]    (disruptor/consume-loop*      (:batch-transfer-queue executor-data)      (disruptor/handler [o seq-id batch-end?]        (let [^ArrayList alist (.getObject cached-emit)]          (.add alist o)          (when batch-end?            (worker-transfer-fn serializer alist)            (.setObject cached-emit (ArrayList.))            )))      :kill-fn (:report-error-and-die executor-data))))

 

Worker, transfer-fn

将task分为local和remote

对于local的, 使用local-transfer将messages发送到对应的recieve-queue里面
而对于remote的, 使用disruptor/publish发送到transfer-queue里面

storm使用kryo作为其java的序列化F/W ()

(defn mk-transfer-fn [worker]  (let [local-tasks (-> worker :task-ids set)        local-transfer (:transfer-local-fn worker)        ^DisruptorQueue transfer-queue (:transfer-queue worker)]    (fn [^KryoTupleSerializer serializer tuple-batch]      (let [local (ArrayList.)            remote (ArrayList.)]        (fast-list-iter [[task tuple :as pair] tuple-batch]          (if (local-tasks task)            (.add local pair)            (.add remote pair)            ))        (local-transfer local)        ;; not using map because the lazy seq shows up in perf profiles        (let [serialized-pairs (fast-list-for [[task ^TupleImpl tuple] remote] [task (.serialize serializer tuple)])]          (disruptor/publish transfer-queue serialized-pairs)

 

3.2 executor的执行thread

try…catch mk-threads函数, 如果发生异常将error写到zk, 以便其他的daemon能及时知道

handlers (with-error-reaction report-error-and-die

                   (mk-threads executor-data task-datas))

转载地址:http://mltga.baihongyu.com/

你可能感兴趣的文章
No module named yum
查看>>
Shell处理用户输入参数----getopts
查看>>
【函数】06、装饰器的应用
查看>>
v$sysstat
查看>>
剑指offer 66通关纪念
查看>>
医疗信息化 医学 医院管理 医疗器械 资料下载
查看>>
nginx.conf 示例配置
查看>>
在办公电脑上设置日志服务器监控思科和华为设备
查看>>
python 字符串替换
查看>>
我的友情链接
查看>>
Linux之常用网络命令
查看>>
linux php 安装 curl
查看>>
tomcat nginx默许的post大小限制
查看>>
OSI七层模型
查看>>
去除工程的.svn隐藏文件夹
查看>>
Python24 终端如何输出彩色字体
查看>>
XSS跨站脚本***
查看>>
linux 挂载光驱
查看>>
ASP.NET MVC Area操作
查看>>
CSS颜色代码大全
查看>>