No module named MySQLdb on Airflow Web Interface

Solution for No module named MySQLdb on Airflow Web Interface
is Given Below:

I am trying to create and execute my first DAG on a local MYSQL db.

I want to read lines from a csv file and load it as MYSQL table using sqlalchemy. I can run the script without facing any error. However, when I try to run the script on the Airflow web interface, I see the following error note:

No module named ‘MySQLdb’

When I was trying to change the default airflow db from redis to MYSQL I encountered this error many times and solve the problem PyMySQL.

Here is my airflow.cfg settings:

sql_alchemy_conn = mysql+pymysql://user:[email protected]:3306/airflow_db

executor = CeleryExecutor

result_backend = db+mysql://user:[email protected]:3306/airflow_db

DAG definiton:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import timedelta
import os
from sync_todb import sync_todb
from airflow.utils.dates import days_ago


default_args = {
    'owner': 'airflow',
    'catchup': False,
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)}

dag =  DAG(
    'Content_Sync_ETL',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=['example']) 

t1 = PythonOperator(
    task_id='LoadMYSQL',
    python_callable=sync_todb,
    dag=dag
)

Task definition:

def sync_todb() :
       import pandas as pd
       import numpy as np
       import datetime
       import sqlalchemy 
       import os



       os.chdir(os.path.expanduser("~/Desktop/sync_perc"))
       data  = pd.read_csv('C.csv')


       data.iloc[:,0] = pd.to_datetime(data.iloc[:,0])
       data.loc[:,'download'] = data.loc[:,'download'].astype(float)




       database_username="..."
       database_password = '...'
       database_ip       = 'localhost'
       database_name="db_connect"
       database_connection = sqlalchemy.create_engine('mysql+mysqlconnector://{0}:{1}@{2}/{3}'.
                                                 format(database_username, database_password, 
                                                        database_ip, database_name))

       data.to_sql(con=database_connection, name="testt_af", if_exists="replace")

Currently, I can connect to airflow_db on MySQL and select all airflow tables. Also web interface seems OK. I can navigate through the dag file that I constructed with above settings.

As I mentioned, sync_todb() works on python as expected. When I run this DAG on Airflow Web Interface as Run Task Instance on Graph View, I get the following error:

Something bad has happened.

Python version: 3.8.11
Airflow version: 2.1.1

...
  File "/home/murat/.local/lib/python3.8/site-packages/celery/backends/database/session.py", line 87, in session_factory
    engine, session = self.create_session(dburi, **kwargs)
  File "/home/murat/.local/lib/python3.8/site-packages/celery/backends/database/session.py", line 56, in create_session
    engine = self.get_engine(dburi, **kwargs)
  File "/home/murat/.local/lib/python3.8/site-packages/celery/backends/database/session.py", line 53, in get_engine
    return create_engine(dburi, poolclass=NullPool, **kwargs)
  File "/home/murat/.local/lib/python3.8/site-packages/sqlalchemy/engine/__init__.py", line 525, in create_engine
    return strategy.create(*args, **kwargs)
  File "/home/murat/.local/lib/python3.8/site-packages/sqlalchemy/engine/strategies.py", line 87, in create
    dbapi = dialect_cls.dbapi(**dbapi_args)
  File "/home/murat/.local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 118, in dbapi
    return __import__("MySQLdb")
ModuleNotFoundError: No module named 'MySQLdb' 

I am sorry for making a long beginner post. Any help is appreciated.

Best.