您现在的位置:圣剑网 >> 文章频道 >> 网络技术 >> 正文
用Crontab打造简易工作流引擎
新闻来源:文章频道发布时间:2016-11-24 13:16我有话说(11)

1. 引言


众所周知,Oozie(1, 2)是基于时间条件与数据生成来做工作流调度的,但是Oozie的数据触发条件只支持HDFS路径,故而面临着这样的问题:


无法判断Hive partition是否已存在;

无法判断Elasticsearch index是否已写入完成;

...

因此,灵活可扩展的工作流引擎才是正确姿势!下面,我将介绍如何用Crontab来打造一个类似于Oozie的简易工作流引擎;对标Oozie,其应满足功能:


时间条件与数据生成触发任务,如Oozie coordinator的datasets与input-events;

支持触发条件的轮询;

支持任务并行执行,如Oozie workflow的fork与join;

捕获错误及运行状态日志。

2. 实现


触发条件


判断Hive partition是否已存在,思路比较简单——show partitions <tb_name>后能否grep到该partition:


# check wheter $1's partition ($2) exists

hive_partition_exists() {

    table_name=$1

    partition=$2

    hive -e "show partitions ${table_name}" | grep ${partition}

    [ $? == 0 ]

}

获取Hive 表的最后一个partition,grep命令配合正则表达式中的Lookahead匹配:


# get latest hive partition

latest_hive_partition() {

    table_name=$1

    partition_name=$2

    hive -e "show partitions ${table_name}" | tail -1 | grep -Po "(?<=${partition_name}=).*"

}

在检查ES index是否写入完成时,可用思路——定时flush index,然后判断当前时刻的doc数较上一时刻是否发生变化;若变化,则说明正在写入。Shell脚本处理json太蛋疼了,故不给出代码啦。


条件轮询


所谓“条件轮询”,是指如果数据未生成,则会一直轮询该条件是否满足。我们采用while循环中sleep的方式来实现条件轮询:


hive_partition_exists etl.ad_tb1 ${log_partition}

ad1_exists=$?

hive_partition_exists etl.ad_tb2 ${log_partition}

ad2_exists=$?

while (( ${ad1_exists} != 0 || ${ad2_exists} != 0))

do

    echo "`date -d "now"`: log partitions ${log_partition} not exist, and waiting" >> ${log_path}

    sleep 1m

    hive_partition_exists etl.ad_tb1 ${log_partition}

    ad1_exists=$?

    hive_partition_exists etl.ad_tb2 ${log_partition}

    ad2_exists=$?

done

实例


接下来,以Hive写Elasticsearch的为例,说明如何用crontab做定时Hive任务。hiveql脚本如下:


add jar /path/to/jars/elasticsearch-hadoop-2.3.1.jar;

set mapred.job.name=ad_tag-${LOG_PARTITION}~~${TAG_PARTITION};

set hive.map.aggr = false;


insert overwrite table ad_tag

select media, a.dvc as dvc, case when c1_arr is null then array('empty') else c1_arr end as c1_arr, '${LOG_PARTITION}' as week_time

from (

select dvc, app_name as media

from ad_log

where is_exposure = '1' and day_time between date_sub('${LOG_PARTITION}', 6) and '${LOG_PARTITION}'

group by dvc, app_name

) a 

left outer join (

select dvc, collect_set(c1) as c1_arr

from tag

lateral view inline(tag) in_tb

where day_time = '${TAG_PARTITION}'

group by dvc

) b

on a.dvc = b.dvc;

为了实现任务的并行执行,我用到Linux命令中的&:


log_partition=`date -d "5 day ago" "+%Y-%m-%d"`

tag_partition=$(latest_hive_partition tag.dmp_tag  day_time)

log_path="${log_partition}.log"


echo "`date -d "now"`: log partitions ${log_partition} exist" >> ${log_path}

echo "`date -d "now"`: latest tag partition ${tag_partition}" >> ${log_path}

hive -f ad_tag1.hql --hivevar LOG_PARTITION=${log_partition} --hivevar TAG_PARTITION=${tag_partition} & hive -f ad_tag2.hql --hivevar LOG_PARTITION=${log_partition} --hivevar TAG_PARTITION=${tag_partition}


exit 1

PS: 当手动执行脚本是OK的,但是crontab去执行时却出错,最可能的原因是crontab未能正确加载用户的环境变量;故可以在运行脚本中加入:


source /etc/profile

source /path/to/.bashrc

但是,用crontab做工作流调度,会存在如下问题:


无法很好地管理任务之间的依赖关系;

无法更好地监控任务的运行状况;

因Shell脚本的编程处理能力的限制,无法更自由地做扩展。

网友评论

关于圣剑 - 广告服务 - 招聘信息 - 友情连接 - 保护隐私权 - 意见反馈 - 帮助中心 - 联系我们

Copyright © 2016 Sjian.Net Inc. All Rights Reserved.大千网络科技有限公司 版权所有

不良信息举报中心 陕ICP备10005927号 陕公网安备61010402000003号