-
Set plugins
Copyevent_plugins
into$AIRFLOW_HOME/plugins
folder. -
Set example DAG
Copykafka_event_plugin.py
into$AIRFLOW_HOME/dags
folder and modify.- Set timezone to local timezone
local_tz = pendulum.timezone("<local timezone>")
- Set kafka broker
broker = 'localhost:9092'
- Change
<email address>
inKafkaStatusEmailOperator
also need to set
smtp
server in$AIRFLOW_HOME/airflow.cfg
send_status_email = KafkaStatusEmailOperator( task_id="send_status_mail", sensor_name="kafka_trigger_test.my_consumer", to="<email address>", trigger_rule=TriggerRule.ALL_FAILED )
-
Create
etl-finish
andjob-finish
topic in kafka (or modify value oftopic
field within wanted messages inkafka_event_plugin.py
) -
Set config for event plugins
- Copy
event_plugins/common/storage/default.cfg
to your config folder and modify the setting if needed. e.g., setcreate_table_if_not_exist = True
if you want event_plugins to automatically create table if not exists. - Set
AIRFLOW_EVENT_PLUGINS_CONFIG
environment variable to the location of config. (Or directly modifydefault.cfg
which is not recommended)
export AIRFLOW_EVENT_PLUGINS_CONFIG=<your config>
- Copy
-
Start airflow services. DAG should look like picture below but without status.
-
Since
schedule_interval
isNone
in example DAG, manually trigger DAG from UI. -
Modify and send test messages.
- Change value of
timestamp
andpartition_values
to the day of execution - Produce the messages to kafka, check if status of task in DAG changed.
- Change value of
# topic: etl-finish
{"db": "db0", "table": "table0", "partition_fields": "", "partition_values": "", "timestamp": 1575190675}
{"db": "db1", "table": "table1", "partition_fields": "yyyymm", "partition_values": "201911", "timestamp": 1575190675}
# topic: job-finish
{"job_name": "jn0", "is_success": true, "duration": 10, "timestamp": 1575190675}
Note: if there're messages unreceived or not matched. It may look like picture below, and mail will be sent to the address.
Check the execution workflow in KafkaComsumerOperator
# kafka sensor
my_consumer = KafkaConsumerOperator(
# the name displayed in UI
task_id='my_consumer',
# use dag_id.task_id if not given, should be unique in database
sensor_name="kafka_trigger_test.my_consumer",
# parameters for kafka
broker=broker,
group_id='airflow-test',
client_id='airflow-test',
# wanted messages
msgs=kafka_msgs,
# the frequency to start kafka consumer and listening for messages
poke_interval=10,
# ttl in seconds, if not receiving messages, it would timeout and mark the task failed. crontab string is also available. e.g., "0 22 * * *" for daily triggered DAG
timeout=120,
# marking dummy tasks success or not
mark_success=True,
# skip the task without fail if set to True, check airflow sensor for more information
soft_fail=False,
# there's poke and reschedule mode for sensor, check airflow sensor for more information
mode='reschedule',
# print the db status or not
debug_mode=True,
)