Solution for No module named MySQLdb on Airflow Web Interface
is Given Below:
I am trying to create and execute my first DAG on a local MYSQL db.
I want to read lines from a csv file and load it as MYSQL table using sqlalchemy. I can run the script without facing any error. However, when I try to run the script on the Airflow web interface, I see the following error note:
No module named ‘MySQLdb’
When I was trying to change the default airflow db from redis to MYSQL I encountered this error many times and solve the problem PyMySQL.
Here is my airflow.cfg settings:
sql_alchemy_conn = mysql+pymysql://user:[email protected]:3306/airflow_db
executor = CeleryExecutor
result_backend = db+mysql://user:[email protected]:3306/airflow_db
DAG definiton:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import timedelta
import os
from sync_todb import sync_todb
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'catchup': False,
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)}
dag = DAG(
'Content_Sync_ETL',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'])
t1 = PythonOperator(
task_id='LoadMYSQL',
python_callable=sync_todb,
dag=dag
)
Task definition:
def sync_todb() :
import pandas as pd
import numpy as np
import datetime
import sqlalchemy
import os
os.chdir(os.path.expanduser("~/Desktop/sync_perc"))
data = pd.read_csv('C.csv')
data.iloc[:,0] = pd.to_datetime(data.iloc[:,0])
data.loc[:,'download'] = data.loc[:,'download'].astype(float)
database_username="..."
database_password = '...'
database_ip = 'localhost'
database_name="db_connect"
database_connection = sqlalchemy.create_engine('mysql+mysqlconnector://{0}:{1}@{2}/{3}'.
format(database_username, database_password,
database_ip, database_name))
data.to_sql(con=database_connection, name="testt_af", if_exists="replace")
Currently, I can connect to airflow_db on MySQL and select all airflow tables. Also web interface seems OK. I can navigate through the dag file that I constructed with above settings.
As I mentioned, sync_todb() works on python as expected. When I run this DAG on Airflow Web Interface as Run Task Instance on Graph View, I get the following error:
Something bad has happened.
Python version: 3.8.11
Airflow version: 2.1.1
...
File "/home/murat/.local/lib/python3.8/site-packages/celery/backends/database/session.py", line 87, in session_factory
engine, session = self.create_session(dburi, **kwargs)
File "/home/murat/.local/lib/python3.8/site-packages/celery/backends/database/session.py", line 56, in create_session
engine = self.get_engine(dburi, **kwargs)
File "/home/murat/.local/lib/python3.8/site-packages/celery/backends/database/session.py", line 53, in get_engine
return create_engine(dburi, poolclass=NullPool, **kwargs)
File "/home/murat/.local/lib/python3.8/site-packages/sqlalchemy/engine/__init__.py", line 525, in create_engine
return strategy.create(*args, **kwargs)
File "/home/murat/.local/lib/python3.8/site-packages/sqlalchemy/engine/strategies.py", line 87, in create
dbapi = dialect_cls.dbapi(**dbapi_args)
File "/home/murat/.local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 118, in dbapi
return __import__("MySQLdb")
ModuleNotFoundError: No module named 'MySQLdb'
I am sorry for making a long beginner post. Any help is appreciated.
Best.