Airflow2.2.3 + Celery + MySQL 8构建一个健壮的分布式调度集群

作者: Marionxue 2022-01-05 19:34:18数据库MySQL分布式 今天我们就再来看看如何通过Airflow和celery构建一个健壮的分布式调度集群。

前面聊了Airflow基础架构??,以及又讲了如何在容器化内部署Airflow??,今天我们就再来看看如何通过Airflow和celery构建一个健壮的分布式调度集群。

1集群环境

同样是在Ubuntu 20.04.3 LTS机器上安装Airflow集群,这次我们准备三台同等配置服务器,进行测试,前篇文章??[1]中,我们已经在Bigdata1服务器上安装了airflow的所有组件,没看过的可以点击链接先看下之前的文章,现在只需要在其他两个节点安装worker组件即可。

Bigdata1(A)Bigdata2(B)Bigdata3(C)
Webserver
Scheduler
Worker

在上篇文章中的docker-compose.yml中没有对部署文件以及数据目录进行的分离,这样在后期管理的时候不太方便,因此我们可以把服务停止后,将数据库以及数据目录与部署文件分开

部署文件:docker-compose.yaml/.env 存放在/apps/airflow目录下MySQL以及配置文件: 放在/data/mysqlairflow数据目录: 放在/data/airflow

这样拆分开就方便后期的统一管理了。

2部署worker服务

前期准备

    mkdir/data/airflow/{dags,plugins}-pvmkdir-pv/apps/airflowmkdir-pv/logs/airflow

worker的部署文件:

    ---version:'3'x-airflow-common:&airflow-common#Inordertoaddcustomdependenciesorupgradeproviderpackagesyoucanuseyourextendedimage.#Commenttheimageline,placeyourDockerfileinthedirectorywhereyouplacedthedocker-compose.yaml#anduncommentthe"build"linebelow,Thenrun`docker-composebuild`tobuildtheimages.image:${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.3}#build:.environment:&airflow-common-envAIRFLOW__CORE__EXECUTOR:CeleryExecutorAIRFLOW__CORE__SQL_ALCHEMY_CONN:mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflow#修改MySQL对应的账号和密码AIRFLOW__CELERY__RESULT_BACKEND:db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflow#修改MySQL对应的账号和密码AIRFLOW__CELERY__BROKER_URL:redis://:xxxx@$${REDIS_HOST}:7480/0#修改Redis的密码AIRFLOW__CORE__FERNET_KEY:''AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION:'true'AIRFLOW__CORE__LOAD_EXAMPLES:'true'AIRFLOW__API__AUTH_BACKEND:'airflow.api.auth.backend.basic_auth'_PIP_ADDITIONAL_REQUIREMENTS:${_PIP_ADDITIONAL_REQUIREMENTS:-}volumes:-/data/airflow/dags:/opt/airflow/dags-/logs/airflow:/opt/airflow/logs-/data/airflow/plugins:/opt/airflow/plugins-/data/airflow/airflow.cfg:/opt/airflow/airflow.cfguser:"${AIRFLOW_UID:-50000}:0"services:airflow-worker:<<:*airflow-commoncommand:celeryworkerhealthcheck:test:-"CMD-SHELL"-'celery--appairflow.executors.celery_executor.appinspectping-d"celery@$${HOSTNAME}"'interval:10stimeout:10sretries:5environment:<<:*airflow-common-env#Requiredtohandlewarmshutdownoftheceleryworkersproperly#Seehttps://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagationDUMB_INIT_SETSID:"0"restart:alwayshostname:bigdata-20-194#此处设置容器的主机名,便于在flower中查看是哪个workerdepends_on:airflow-init:condition:service_completed_successfullyairflow-init:<<:*airflow-commonentrypoint:/bin/bash#yamllintdisablerule:line-lengthcommand:--c-|functionver(){printf"%04d%04d%04d%04d"$${1//./}}airflow_version=$$(gosuairflowairflowversion)airflow_version_comparable=$$(ver$${airflow_version})min_airflow_version=2.2.0min_airflow_version_comparable=$$(ver$${min_airflow_version})if((airflow_version_comparable<min_airflow_version_comparable));thenechoecho-e"\033[1;31mERROR!!!:ToooldAirflowversion$${airflow_version}!\e[0m"echo"TheminimumAirflowversionsupported:$${min_airflow_version}.Onlyusethisorhigher!"echoexit1fiif[[-z"${AIRFLOW_UID}"]];thenechoecho-e"\033[1;33mWARNING!!!:AIRFLOW_UIDnotset!\e[0m"echo"IfyouareonLinux,youSHOULDfollowtheinstructionsbelowtoset"echo"AIRFLOW_UIDenvironmentvariable,otherwisefileswillbeownedbyroot."echo"Forotheroperatingsystemsyoucangetridofthewarningwithmanuallycreated.envfile:"echo"See:https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"echofione_meg=1048576mem_available=$$(($$(getconf_PHYS_PAGES)*$$(getconfPAGE_SIZE)/one_meg))cpus_available=$$(grep-cE'cpu[0-9]+'/proc/stat)disk_available=$$(df/|tail-1|awk'{print$$4}')warning_resources="false"if((mem_available<4000));thenechoecho-e"\033[1;33mWARNING!!!:NotenoughmemoryavailableforDocker.\e[0m"echo"Atleast4GBofmemoryrequired.Youhave$$(numfmt--toiec$$((mem_available*one_meg)))"echowarning_resources="true"fiif((cpus_available<2));thenechoecho-e"\033[1;33mWARNING!!!:NotenoughCPUSavailableforDocker.\e[0m"echo"Atleast2CPUsrecommended.Youhave$${cpus_available}"echowarning_resources="true"fiif((disk_available<one_meg*10));thenechoecho-e"\033[1;33mWARNING!!!:NotenoughDiskspaceavailableforDocker.\e[0m"echo"Atleast10GBsrecommended.Youhave$$(numfmt--toiec$$((disk_available*1024)))"echowarning_resources="true"fiif[[$${warning_resources}=="true"]];thenechoecho-e"\033[1;33mWARNING!!!:YouhavenotenoughresourcestorunAirflow(seeabove)!\e[0m"echo"Pleasefollowtheinstructionstoincreaseamountofresourcesavailable:"echo"https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"echofimkdir-p/sources/logs/sources/dags/sources/pluginschown-R"${AIRFLOW_UID}:0"/sources/{logs,dags,plugins}exec/entrypointairflowversion#yamllintenablerule:line-lengthenvironment:<<:*airflow-common-env_AIRFLOW_DB_UPGRADE:'true'_AIRFLOW_WWW_USER_CREATE:'true'_AIRFLOW_WWW_USER_USERNAME:${_AIRFLOW_WWW_USER_USERNAME:-airflow}_AIRFLOW_WWW_USER_PASSWORD:${_AIRFLOW_WWW_USER_PASSWORD:-airflow}user:"0:0"volumes:-.:/sourcesairflow-cli:<<:*airflow-commonprofiles:-debugenvironment:<<:*airflow-common-envCONNECTION_CHECK_MAX_COUNT:"0"#Workaroundforentrypointissue.See:https://github.com/apache/airflow/issues/16252command:-bash--c-airflow

初始化检测,检查环境是否满足:

    cd/apps/ariflow/echo-e"AIRFLOW_UID=$(id-u)">.env#注意,此处一定要保证AIRFLOW_UID是普通用户的UID,且保证此用户有创建这些持久化目录的权限docker-composeupairflow-init

如果数据库已经存在,初始化检测不影响已有的数据库,接下来就运行airflow-worker服务

    docker-composeup-d

接下来,按照同样的方式在bigdata3节点上安装airflow-worker服务就可以了。部署完成之后,就可以通过flower查看broker的状态:

3持久化配置文件

大多情况下,使用airflow多worker节点的集群,我们就需要持久化airflow的配置文件,并且将airflow同步到所有的节点上,因此这里需要修改一下docker-compose.yaml中x-airflow-common的volumes,将airflow.cfg通过挂载卷的形式挂载到容器中,配置文件可以在容器中拷贝一份出来,然后在修改;

前期使用的时候,我们需要将docker-compose文件中的一些环境变量的值写入到airflow.cfg文件中,例如以下信息:

    [core]dags_folder=/opt/airflow/dagshostname_callable=socket.getfqdndefault_timezone=Asia/Shanghai#修改时区executor=CeleryExecutorsql_alchemy_conn=mysql+mysqldb://airflow:aaaa@$${MYSQL_HOST}:3306/airflowsql_engine_encoding=utf-8sql_alchemy_pool_enabled=Truesql_alchemy_pool_size=5sql_alchemy_max_overflow=10sql_alchemy_pool_recycle=1800sql_alchemy_pool_pre_ping=Truesql_alchemy_schema=parallelism=32max_active_tasks_per_dag=16dags_are_paused_at_creation=Truemax_active_runs_per_dag=16load_examples=Trueload_default_connections=Trueplugins_folder=/opt/airflow/pluginsexecute_tasks_new_python_interpreter=Falsefernet_key=donot_pickle=Truedagbag_import_timeout=30.0dagbag_import_error_tracebacks=Truedagbag_import_error_traceback_depth=2dag_file_processor_timeout=50task_runner=StandardTaskRunnerdefault_impersonation=security=unit_test_mode=Falseenable_xcom_pickling=Falsekilled_task_cleanup_time=60dag_run_conf_overrides_params=Truedag_discovery_safe_mode=Truedefault_task_retries=0default_task_weight_rule=downstreammin_serialized_dag_update_interval=30min_serialized_dag_fetch_interval=10max_num_rendered_ti_fields_per_task=30check_slas=Truexcom_backend=airflow.models.xcom.BaseXComlazy_load_plugins=Truelazy_discover_providers=Truemax_db_retries=3hide_sensitive_var_conn_fields=Truesensitive_var_conn_names=default_pool_task_slot_count=128[logging]base_log_folder=/opt/airflow/logsremote_logging=Falseremote_log_conn_id=google_key_path=remote_base_log_folder=encrypt_s3_logs=Falselogging_level=INFOfab_logging_level=WARNINGlogging_config_class=colored_console_log=Truecolored_log_format=[%%(blue)s%%(asctime)s%%(reset)s]{%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d}%%(log_color)s%%(levelname)s%%(reset)s-%%(log_color)s%%(message)s%%(reset)scolored_formatter_class=airflow.utils.log.colored_log.CustomTTYColoredFormatterlog_format=[%%(asctime)s]{%%(filename)s:%%(lineno)d}%%(levelname)s-%%(message)ssimple_log_format=%%(asctime)s%%(levelname)s-%%(message)stask_log_prefix_template=log_filename_template={{ti.dag_id}}/{{ti.task_id}}/{{ts}}/{{try_number}}.loglog_processor_filename_template={{filename}}.logdag_processor_manager_log_location=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.logtask_log_reader=taskextra_logger_names=worker_log_server_port=8793[metrics]statsd_on=Falsestatsd_host=localhoststatsd_port=8125statsd_prefix=airflowstatsd_allow_list=stat_name_handler=statsd_datadog_enabled=Falsestatsd_datadog_tags=[secrets]backend=backend_kwargs=[cli]api_client=airflow.api.client.local_clientendpoint_url=http://localhost:8080[debug]fail_fast=False[api]enable_experimental_api=Falseauth_backend=airflow.api.auth.backend.deny_allmaximum_page_limit=100fallback_page_limit=100google_oauth2_audience=google_key_path=access_control_allow_headers=access_control_allow_methods=access_control_allow_origins=[lineage]backend=[atlas]sasl_enabled=Falsehost=port=21000username=password=[operators]default_owner=airflowdefault_cpus=1default_ram=512default_disk=512default_gpus=0default_queue=defaultallow_illegal_arguments=False[hive]default_hive_mapred_queue=[webserver]base_url=https://devopsman.cn/airflow#自定义airflow域名default_ui_timezone=Asia/Shanghai#设置默认的时区web_server_host=0.0.0.0web_server_port=8080web_server_ssl_cert=web_server_ssl_key=web_server_master_timeout=120web_server_worker_timeout=120worker_refresh_batch_size=1worker_refresh_interval=6000reload_on_plugin_change=Falsesecret_key=emEfndkf3QWZ5zVLE1kVMg==workers=4worker_class=syncaccess_logfile=-error_logfile=-access_logformat=expose_config=Falseexpose_hostname=Trueexpose_stacktrace=Truedag_default_view=treedag_orientation=LRlog_fetch_timeout_sec=5log_fetch_delay_sec=2log_auto_tailing_offset=30log_animation_speed=1000hide_paused_dags_by_default=Falsepage_size=100navbar_color=#fffdefault_dag_run_display_number=25enable_proxy_fix=Falseproxy_fix_x_for=1proxy_fix_x_proto=1proxy_fix_x_host=1proxy_fix_x_port=1proxy_fix_x_prefix=1cookie_secure=Falsecookie_samesite=Laxdefault_wrap=Falsex_frame_enabled=Trueshow_recent_stats_for_completed_runs=Trueupdate_fab_perms=Truesession_lifetime_minutes=43200auto_refresh_interval=3[email]email_backend=airflow.utils.email.send_email_smtpemail_conn_id=smtp_defaultdefault_email_on_retry=Truedefault_email_on_failure=True[smtp]#邮箱配置smtp_host=localhostsmtp_starttls=Truesmtp_ssl=Falsesmtp_port=25smtp_mail_from=[email protected]smtp_timeout=30smtp_retry_limit=5[sentry]sentry_on=falsesentry_dsn=[celery_kubernetes_executor]kubernetes_queue=kubernetes[celery]celery_app_name=airflow.executors.celery_executorworker_concurrency=16worker_umask=0o077broker_url=redis://:xxxx@$${REDIS_HOST}:7480/0result_backend=db+mysql://airflow:aaaa@$${MYSQL_HOST}:3306/airflowflower_host=0.0.0.0flower_url_prefix=flower_port=5555flower_basic_auth=sync_parallelism=0celery_config_options=airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIGssl_active=Falsessl_key=ssl_cert=ssl_cacert=pool=preforkoperation_timeout=1.0task_track_started=Truetask_adoption_timeout=600task_publish_max_retries=3worker_precheck=False[celery_broker_transport_options][dask]cluster_address=127.0.0.1:8786tls_ca=tls_cert=tls_key=[scheduler]job_heartbeat_sec=5scheduler_heartbeat_sec=5num_runs=-1scheduler_idle_sleep_time=1min_file_process_interval=30dag_dir_list_interval=300print_stats_interval=30pool_metrics_interval=5.0scheduler_health_check_threshold=30orphaned_tasks_check_interval=300.0child_process_log_directory=/opt/airflow/logs/schedulerscheduler_zombie_task_threshold=300catchup_by_default=Truemax_tis_per_query=512use_row_level_locking=Truemax_dagruns_to_create_per_loop=10max_dagruns_per_loop_to_schedule=20schedule_after_task_execution=Trueparsing_processes=2file_parsing_sort_mode=modified_timeuse_job_schedule=Trueallow_trigger_in_future=Falsedependency_detector=airflow.serialization.serialized_objects.DependencyDetectortrigger_timeout_check_interval=15[triggerer]default_capacity=1000[kerberos]ccache=/tmp/airflow_krb5_ccacheprincipal=airflowreinit_frequency=3600kinit_path=kinitkeytab=airflow.keytabforwardable=Trueinclude_ip=True[github_enterprise]api_rev=v3[elasticsearch]host=log_id_template={dag_id}-{task_id}-{execution_date}-{try_number}end_of_log_mark=end_of_logfrontend=write_stdout=Falsejson_format=Falsejson_fields=asctime,filename,lineno,levelname,messagehost_field=hostoffset_field=offset[elasticsearch_configs]use_ssl=Falseverify_certs=True[kubernetes]pod_template_file=worker_container_repository=worker_container_tag=namespace=defaultdelete_worker_pods=Truedelete_worker_pods_on_failure=Falseworker_pods_creation_batch_size=1multi_namespace_mode=Falsein_cluster=Truekube_client_request_args=delete_option_kwargs=enable_tcp_keepalive=Truetcp_keep_idle=120tcp_keep_intvl=30tcp_keep_cnt=6verify_ssl=Trueworker_pods_pending_timeout=300worker_pods_pending_timeout_check_interval=120worker_pods_queued_check_interval=60worker_pods_pending_timeout_batch_size=100[smart_sensor]use_smart_sensor=Falseshard_code_upper_limit=10000shards=5sensors_enabled=NamedHivePartitionSensor

修改完成之后,重启一下服务。

    docker-composerestart

4数据同步

因为airflow使用了三个worker节点,每个节点修改配置,其他节点都要同步,同时DAGS目录以及plugins目录也需要实时进行同步,在scheduler将信息调度到某个节点后,如果找不到对应的DAGS文件,就会报错,因此我们使用lsyncd进行数据实时同步:

    apt-getinstalllsyncd-y

配置节点之间通过公钥连接

    ssh-keygen-trsa-C"airflow-sync"-b4096#生成一对名为airflow-sync的密钥foripin100200;dossh-copy-id-i~/.ssh/airflow-sync.pub${USERNAME}@192.168.0.$ip-P12022;done

然后我们就可以通过私钥访问了其它节点了。

编辑同步的配置文件,lsyncd配置的更多参数学习,可以直达官方文档[2]

    settings{logfile="/var/log/lsyncd.log",#日志文件statusFile="/var/log/lsyncd.status",#同步状态信息pidfile="/var/run/lsyncd.pid",statusInterval=1,nodaemon=false,#守护进程inotifyMode="CloseWrite",maxProcesses=1,maxDelays=1,}sync{default.rsync,source="/data/airflow",target="192.168.0.100:/data/airflow",rsync={binary="/usr/bin/rsync",compress=false,archive=true,owner=true,perms=true,--delete=true,whole_file=false,rsh="/usr/bin/ssh-p12022-lsuoper-oStrictHostKeyChecking=no-i/home/username/.ssh/airflow-rsync"},}sync{default.rsync,source="/data/airflow",target="192.168.0.200:/data/airflow",rsync={binary="/usr/bin/rsync",compress=false,archive=true,owner=true,perms=true,--delete=true,whole_file=false,rsh="/usr/bin/ssh-p12022-lsuoper-oStrictHostKeyChecking=no-i/home/username/.ssh/airflow-rsync"},}

以上的参数是什么意思,可以访问官网查看,此处是通过rsync的rsh定义ssh命令,能够解决使用了私钥,自定义端口等安全措施的场景,当然你也可以使用配置无密访问,然后使用default.rsync或者default.rsyncssh等进行配置。

配置lsyncd的服务托管

    cat<<EOF>/etc/systemd/system/lsyncd.service[Unit]Description=lsyncdConditionFileIsExecutable=/usr/bin/lsyncdAfter=network-online.targetWants=network-online.target[Service]StartLimitBurst=10ExecStart=/usr/bin/lsyncd/etc/lsyncd.confRestart=on-failureRestartSec=120EnvironmentFile=-/etc/sysconfig/aliyunKillMode=process[Install]WantedBy=multi-user.targetEOFsystemctldaemon-reloadsystemctlenable--nowlsyncd.service#启动服务并配置开启自启

这样就完成了数据(dags,plugins,airflow.cfg)的同步问题,后期使用CICD场景的时候,便可以直接将dag文件上传到Bigdata1节点上即可,其他两个节点就会自动同步了。如果出现问题,可以通过查看日志进行debug

    lsyncd-logall/etc/lsyncd.conftail-f/var/log/lsyncd.log

5反向代理[3]

如果你需要将airflow放在反向代理之后,如https://lab.mycompany.com/myorg/airflow/你可以通过一下配置完成:

在airflow.cfg中配置base_url

    base_url=http://my_host/myorg/airflowenable_proxy_fix=True

nginx的配置

    server{listen80;server_namelab.mycompany.com;location/myorg/airflow/{proxy_passhttp://localhost:8080;proxy_set_headerHost$http_host;proxy_redirectoff;proxy_http_version1.1;proxy_set_headerUpgrade$http_upgrade;proxy_set_headerConnection"upgrade";}}

到这里就基本上完成的airflow分布式调度集群的安装了.看下具体效果如下。

看到这里说明你也正在使用或对Airflow感兴趣,顺便送你一个学习Airflow资料;

https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-12/1

参考资料

[1]Airflow 2.2.3 + MySQL8.0.27: https://mp.weixin.qq.com/s/VncpyXcTtlvnDkFrsAZ5lQ

[2]lsyncd config file: https://lsyncd.github.io/lsyncd/manual/config/file/

[3]airflow-behind-proxy: https://airflow.apache.org/docs/apache-airflow/stable/howto/run-behind-proxy.html