当前位置: 首页 > news >正文

mvc 手机网站开发google网站登陆模板

mvc 手机网站开发,google网站登陆模板,织梦网站怎么做,美图秀秀网页版入口文章目录 前言StreamTask 部署启动Task 线程启动StreamTask 初始化StreamTask 执行 前言 Flink的StreamTask的启动和执行是一个复杂的过程,涉及多个关键步骤。以下是StreamTask启动和执行的主要流程: 初始化:StreamTask的初始化阶段涉及多个…

文章目录

  • 前言
  • StreamTask 部署启动
  • Task 线程启动
  • StreamTask 初始化
  • StreamTask 执行


前言

Flink的StreamTask的启动和执行是一个复杂的过程,涉及多个关键步骤。以下是StreamTask启动和执行的主要流程:

  1. 初始化:StreamTask的初始化阶段涉及多个任务,包括Operator的配置、task特定的初始化以及初始化算子的State等。在这个阶段,Flink将业务处理函数抽象为operator,并通过operatorChain将业务代码串起来执行,以完成业务逻辑的处理。同时,还会调用具体task的init方法进行初始化。
  2. 读取数据和事件:StreamTask通过mailboxProcessor读取数据和事件。
  3. 运行业务逻辑:在StreamTask的beforeInvoke方法中,主要调用生成operatorChain并执行相关的业务逻辑。这些业务逻辑可能包括Source算子和map算子等,它们将被Chain在一起并在一个线程内同步执行。
  4. 资源清理:在执行完业务逻辑后,StreamTask会进行关闭和资源清理的操作,这部分在afterInvoke阶段完成。

值得注意的是,从资源角度来看,每个TaskManager内部有多个slot,每个slot内部运行着一个subtask,即每个slot内部运行着一个StreamTask。这意味着StreamTask是由TaskManager(TM)部署并执行的本地处理单元。

总的来说,Flink的StreamTask启动和执行是一个由多个阶段和组件协同工作的过程,涉及数据的读取、业务逻辑的执行以及资源的清理等多个方面。这些步骤确保了StreamTask能够高效、准确地处理数据流,并满足实时计算和分析的需求。


StreamTask 部署启动

当 TaskExecutor 接收提交 Task 执行的请求,则调用:

TaskExecutor.submitTask(TaskDeploymentDescriptor tdd, 
JobMasterId jobMasterId,Time timeout){// 构造 Task 对象Task task = new Task(jobInformation, taskInformation, ExecutionAttemptId,AllocationId, SubtaskIndex, ....);// 启动 Task 的执行task.startTaskThread();
}

Task对象的构造方法

public Task(.....){
// 封装一个 Task信息对象 TaskInfo,(TaskInfo, JobInfo,JobMasterInfo)
this.taskInfo = new TaskInfo(....);
// 各种成员变量赋值
......
// 一个Task的执行有输入也有输出: 关于输出的抽象: ResultPartition 和
ResultSubPartitionPipelinedSubpartition// 初始化 ResultPartition 和 ResultSubPartition
final ResultPartitionWriter[] resultPartitionWriters =
shuffleEnvironment.createResultPartitionWriters(....);
this.consumableNotifyingPartitionWriters =
ConsumableNotifyingResultPartitionWriterDecorator.decorate(....);
// 一个Task的执行有输入也有输出: 关于输入的抽象: InputGate 和 InputChannel(从上有
一个Task节点拉取数据)
// InputChannel 可能有两种实现: Local Remote
// 初始化 InputGate 和 InputChannel
final IndexedInputGate[] gates = shuffleEnvironment.createInputGates(.....);
// 初始化一个用来执行 Task 的线程,目标对象,就是 Task 自己
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
}

Task 线程启动

Task 的启动,是通过启动 Task 对象的内部 executingThread 来执行 Task 的,具体逻辑在 run 方法中:


private void doRun() {
// 1、先更改 Task 的状态: CREATED ==> DEPLOYING
transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING);
// 2、准备 ExecutionConfig
final ExecutionConfig executionConfig =
serializedExecutionConfig.deserializeValue(userCodeClassLoader);
// 3、初始化输入和输出组件, 拉起 ResultPartition 和 InputGate
setupPartitionsAndGates(consumableNotifyingPartitionWriters,
inputGates);
// 4、注册 输出
for(ResultPartitionWriter partitionWriter :
consumableNotifyingPartitionWriters) {
taskEventDispatcher.registerPartition(partitionWriter.getPartitionId());
} /
/ 5、初始 环境对象 RuntimeEnvironment, 包装在 Task 执行过程中需要的各种组件
Environment env = new RuntimeEnvironment(jobId, vertexId, executionId,
....);
// 6、初始化 调用对象
// 两种最常见的类型: SourceStreamTask、OneInputStreamTask、
TwoInputStreamTask
// 父类: StreamTask
// 通过反射实例化 StreamTask 实例(可能的两种情况: SourceStreamTask,
OneInputStreamTask)
AbstractInvokable invokable =
loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// 7、先更改 Task 的状态: DEPLOYING ==> RUNNING
transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
// 8、真正把 Task 启动起来了
invokable.invoke();
// 9、StreamTask 需要正常结束,处理 buffer 中的数据
for(ResultPartitionWriter partitionWriter :
consumableNotifyingPartitionWriters) {
if(partitionWriter != null) {
partitionWriter.finish();
}
} /
/ 10、先更改 Task 的状态: RUNNING ==> FINISHED
transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED);

StreamTask 初始化

StreamTask 初始化指的就是 SourceStreamTask 和 OneInputStreamTask 的实例对象的构建!Task 这个类,只是一个笼统意义上的 Task,就是一个通用 Task 的抽象,不管是批处理的,还是流式处理的,不管是 源Task, 还是逻辑处理 Task, 都被抽象成 Task 来进行调度执行!

private SourceStreamTask(Environment env, Object lock) throws Exception {super(env,null,FatalExitExceptionHandler.INSTANCE,StreamTaskActionExecutor.synchronizedExecutor(lock));this.lock = Preconditions.checkNotNull(lock);this.sourceThread = new LegacySourceFunctionThread();getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);}@Overrideprotected void init() {// we check if the source is actually inducing the checkpoints, rather// than the triggerSourceFunction<?> source = mainOperator.getUserFunction();if (source instanceof ExternallyInducedSource) {externallyInducedCheckpoints = true;ExternallyInducedSource.CheckpointTrigger triggerHook =new ExternallyInducedSource.CheckpointTrigger() {@Overridepublic void triggerCheckpoint(long checkpointId) throws FlinkException {// TODO - we need to see how to derive those. We should probably not// encode this in the// TODO -   source's trigger message, but do a handshake in this task// between the trigger// TODO -   message from the master, and the source's trigger// notificationfinal CheckpointOptions checkpointOptions =CheckpointOptions.forConfig(CheckpointType.CHECKPOINT,CheckpointStorageLocationReference.getDefault(),configuration.isExactlyOnceCheckpointMode(),configuration.isUnalignedCheckpointsEnabled(),configuration.getAlignedCheckpointTimeout().toMillis());final long timestamp = System.currentTimeMillis();final CheckpointMetaData checkpointMetaData =new CheckpointMetaData(checkpointId, timestamp, timestamp);try {SourceStreamTask.super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions).get();} catch (RuntimeException e) {throw e;} catch (Exception e) {throw new FlinkException(e.getMessage(), e);}}};((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);}getEnvironment().getMetricGroup().getIOMetricGroup().gauge(MetricNames.CHECKPOINT_START_DELAY_TIME,this::getAsyncCheckpointStartDelayNanos);recordWriter.setMaxOverdraftBuffersPerGate(0);}

StreamTask 执行

核心步骤如下:

public final void invoke() throws Exception {
// Task 正式工作之前
beforeInvoke();
// Task 开始工作: 针对数据执行正儿八经的逻辑处理
runMailboxLoop();
// Task 要结束
afterInvoke();
// Task 最后执行清理
cleanUpInvoke();
}

总结一下要点:

  • 在 beforeInvoke() 中,主要是初始化 OperatorChain,然后调用 init() 执行初始化,然后恢复状态,更改 Task 自己的状态为 isRunning = true
  • 在 runMailboxLoop() 中,主要是不停的处理 mail,这里是 FLink-1.10 的一项改进,使用了mailbox 模型来处理任务
  • 在 afterInvoke() 中,主要是完成 Task 要结束之前需要完成的一些细节,比如,把 Buffer 中还没flush 的数据 flush 出来
  • 在 cleanUpInvoke() 中,主要做一些资源的释放,执行各种关闭动作:set false,interrupt,
    shutdown,close,cleanup,dispose 等
http://www.laogonggong.com/news/94786.html

相关文章:

  • 老域名重新做网站沈阳看男科哪里医院男科好
  • 上海市建设小学网站西安网站建设ruiqinet
  • 深圳建设网站公司哪家好前端wordpress后端python
  • 自己做文学网站赚钱吗媒介盒子
  • 做视频解析网站播放器和接口滕州网站建设助企网络
  • 有了源码然后如何做网站页面设计文献
  • 网上花店网站建设规划书网站后台登陆网址是多少
  • seo网站优化方案摘要长沙网站制作首页
  • 如何提高网站用户体验黄骅信誉楼罗茂莲事件
  • 做资讯网站需要什么条件深圳十大传媒公司
  • 成都公司做网站的维护官网内容是什么工作
  • 帝国cms 调用网站名称wordpress 标题入库
  • 网站备案 种类绥阳网站建设
  • 数据分析师报考条件及科目长春seo服务
  • 哈尔滨模板建站品牌wordpress文件名乱码
  • 国外html响应式网站南宁seo计费管理
  • 网站母版页怎么做企业形象
  • 网站开发代理网站怎么做白色字
  • 泰安集团网站建设方案外贸网站的推广
  • 专业网站建设86215湖北建站管理系统信息
  • 网站开发最好开源 wordpress 主题
  • 网站注册好域名怎么办oppo手机应用商店
  • 3g版网站制作asp故障解答网站模板
  • 网上购物网站开发背景网站设计制作费用
  • 网站静态图怎么做百度做的网站国外可以打开吗
  • 建微信网站模板昆明做网站比较牛的
  • wordpress 站点地址wordpress首页不显示指定分类文章
  • 文昌网站 做炸饺子网站建设教育类旧式网站
  • 各网站的风格及特点芜湖建设公司网站
  • 通化工程建设信息网站吉安建设局官方网站