Alation Analytics V1 Shows Deleted Tables and Columns as Existing

Applies to Alation Analytics V1

Important

This issue is fixed in release 2020.4.x

Problem

A table was deleted from the Catalog and appears as a deleted object, but Alation Analytics V1 returns the values of the boolean_value field in the public.field_value table as False for this deleted table and its columns. For the deleted tables and attributes, this value should be equal to True.

Solution

On your instance, run the script given below to correct the value of the boolean_value field of the public.field_value table for the deleted tables and attributes objects. For deleted objects, the value of the boolean_value field should be equal to True.

This requires server-side access to the Alation host.

The script applies to releases V R7 (5.12.x) and 2020.3 (5.17.x). If you have encountered this problem on a release earlier than V R7, please contact Alation Support.

Important

Until your Alation instance is upgraded to version 2020.4, this script has to be run every time after some tables are deleted from the Catalog in order for the Alation Analytics database to reflect the deletion correctly. The script needs to be run only once after upgrading to 2020.4.

To run the script,

  1. Copy and save the script into a file deleted_field_update_with_attribute_R7.py.

  2. Copy or move the script file to /opt/alation/alation/opt/alation/django/alation_analytics/one_off_scripts (path outside the Alation shell) on your Alation server.

  3. SSH to the Alation host and enter the Alation shell:

    sudo /etc/init.d/alation shell
    
  4. Go to /opt/alation/django/alation_analytics/one_off_scripts (path inside the shell).

  5. Change user to alation and set the required permissions for the deleted_field_update_with_attribute_R7.py script file:

    sudo su alation
    sudo chown alation:alation deleted_field_update_with_attribute_R7.py
    
    sudo chmod +x deleted_field_update_with_attribute_R7.py
    
  6. Run the script:

    ./deleted_field_update_with_attribute_R7.py
    

After the script has run, the Alation Analytics will have the values of the boolean_value column correctly assigned to the deleted tables and attributes.

deleted_field_update_with_attribute_R7.py

Note

The script can also be downloaded from the Alation Community as a file: Alation Analytics V1 Troubleshooting: Deleted Tables and Columns Appear as Existing.

#! /usr/bin/env python
import sys
import os
import uuid
from psycopg2.extensions import AsIs
import django


def init_paths():
    print("Setting python paths..")
    # Set up the environment.
    PATH = os.path.dirname(os.path.abspath(__file__))
    # add django folder to path
    sys.path.append(os.path.join(PATH, '../../'))
    # add django/lib folder to path
    sys.path.append(os.path.join(PATH, '../../lib'))
    # add main?
    sys.path.append(os.path.join(PATH, '../../main'))
    # add alation/python and alation/security folders to path
    sys.path.append(os.path.join(PATH, '../../../python'))
    sys.path.append(os.path.join(PATH, '../../../security'))
    # add alation to path
    sys.path.append(os.path.join(PATH, '../../../'))

def setup_django():
    print("Initializing Django..")
    # let django know which settings file we're using
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "main.settings")
    # set it up!
    django.setup()


def abort_blocking_queries(load_table, db_connection):
        import time
        print('Killing queries running on the %s tables.', load_table)
        cursor = db_connection.cursor()
        cursor.execute("SELECT pid FROM pg_catalog.pg_locks blocking_locks ""WHERE relation=%s::regclass::oid", (load_table, ))
        blocking_pids = cursor.fetchall()
        for blocking_pid in blocking_pids:
            print('Killing query running on pid %s.', blocking_pid)
            cursor.execute("SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid=%s",(blocking_pid[0], ))
            time.sleep(5)
            cursor.execute("SELECT state FROM pg_stat_activity WHERE pid=%s", (blocking_pid[0], ))
            execution_status = cursor.fetchall()
            if execution_status:
                print('Successfully killed query running on pid %s.', blocking_pid[0])
            else:
                print('failed to kill query running on pid %s.', blocking_pid[0])
                return -1
        return 0



def update_deleted_field():
    from rosemeta.evaluation.db_util import get_db_conn
    from django.conf import settings
    from util.inherent_identity_extensions import alation_fp
    from alation_analytics.utils.alation_object_fields import FIELDS_PER_OBJECT_TYPE
    from alation_analytics.tasks.etl_time_period_job import TimePeriodETLJob
    from util.sql import bulk_upsert_from_values
    from alation_analytics.utils.constants import FieldValueSource
    from alation_analytics.utils.alation_object_fields import FIELDS_PER_OBJECT_TYPE
    from alation_analytics.models.models_field import ObjectFieldValue,FieldValue
    from rosemeta.models import Table
    from rosemeta.models import Attribute
    from django.db import connections
    aa_db_connection = get_db_conn(settings.DATABASES[settings.ALATION_ANALYTICS_DBNAME])

    LOAD_TABLE = ObjectFieldValue._meta.db_table
    FIELD_VALUE_LOAD_TABLE = FieldValue._meta.db_table


    result = abort_blocking_queries(LOAD_TABLE,aa_db_connection)
    result =  result | abort_blocking_queries(FIELD_VALUE_LOAD_TABLE,aa_db_connection)

    if result ==-1:
        print("exiting inbetween as abort_blocking_queries is unsuccessfull")
        return


    COLUMNS_FOR_UPSERT = [
        field.name + ' ' + field.db_type(connections[settings.ALATION_ANALYTICS_DBNAME])
        for field in ObjectFieldValue._meta.fields
    ]

    FIELD_VALUE_COLUMNS = [
        field.name + ' ' + field.db_type(connections[settings.ALATION_ANALYTICS_DBNAME])
        for field in FieldValue._meta.fields
    ]

    #get deleted table ids from rosemeta_table
    table_objects = [ {'oid':uuid.UUID(int = tup[0]).hex,'ts_updated': (tup[2] if tup[2] is not None else tup[1])} for tup in Table.objects.filter(deleted=True).values_list('id','ts_updated','ts_deleted')]
    field_id = FIELDS_PER_OBJECT_TYPE["table"]["deleted"]["field_id"]
    value_datatype = str(FIELDS_PER_OBJECT_TYPE["table"]["deleted"]["datatype"])
    value_unicode=str(True)

    print("total objects to be updated for table otype  %d "% len(table_objects))

    #create fingerprint for ofv and fv
    value_fp = alation_fp(0, value_datatype + value_unicode)
    FV_RETURN_FIELDS = ["value_fp"]

    #OFV update
    transformed_ofv_data =[]
    value_fp_data = {}
    value_source = FieldValueSource.NATIVE_FIELD_IN_ALATION
    for objs in table_objects:
        ts_updated = objs.get('ts_updated')
        time_period_id = TimePeriodETLJob.get_time_period_id_from_date(ts_updated)
        transformed_ofv_data.append((objs.get('oid'),27,field_id,value_fp,value_source,None,ts_updated,time_period_id))
    try:
        print("starting with object_field_value table otype  update !!!")
        #we only do update no upsert to prevent new delete events from adding to OFV
        bulk_upsert_from_values(
            provide_field_types=True,
            target_table=LOAD_TABLE,
            values_list=transformed_ofv_data,
            join_fields=COLUMNS_FOR_UPSERT[1:4],
            update_only=True,
            fields_to_reset=COLUMNS_FOR_UPSERT[4:],
            conn=aa_db_connection)

        print("updated  successfully into object_field_value , db commit yet to be done!!")
        print("starting with field_value  upsert !!!")
        value_fp_data[value_fp] = FieldValue.get_field_value_tuple_for_datatype(value_fp, True, value_datatype)
        #we do a upsert here since without FV join with OFV fails
        result = bulk_upsert_from_values(
            provide_field_types=True,
            target_table=FIELD_VALUE_LOAD_TABLE,
            values_list=value_fp_data.values(),
            join_fields=FIELD_VALUE_COLUMNS[:1],
            fields_to_reset=FIELD_VALUE_COLUMNS[1:],
            return_fields=FV_RETURN_FIELDS,
            conn=aa_db_connection)

        print("inserted number of records in field_value  %d  " % len(result))
        if aa_db_connection:
            aa_db_connection.commit()
        print("commit successfull, ofv and fv values updated successfully for table otype")
    except Exception as e:
        print("exception raised during object_field_value or field_value update for table otype ", e)
        if aa_db_connection:
            aa_db_connection.rollback()
            aa_db_connection.close()
        return

    #clearing ofv data list which we used for tables so that we can start with attribute update
    del(transformed_ofv_data[:])
    attribute_objects = [ {'oid':uuid.UUID(int = tup[0]).hex,'ts_updated': (tup[2] if tup[2] is not None else tup[1])} for tup in Attribute.objects.filter(deleted=True).values_list('id','ts_updated','ts_deleted')]
    for objs in attribute_objects:
        ts_updated = objs.get('ts_updated')
        time_period_id = TimePeriodETLJob.get_time_period_id_from_date(ts_updated)
        transformed_ofv_data.append((objs.get('oid'),1,field_id,value_fp,value_source,None,ts_updated,time_period_id))

    print("total objects to be updated for attribute otype   %d "% len(attribute_objects))

    try:
        print("starting with object_field_value attribute otype   update !!!")
        #we only do update no upsert to prevent new delete events from adding to OFV
        bulk_upsert_from_values(
            provide_field_types=True,
            target_table=LOAD_TABLE,
            values_list=transformed_ofv_data,
            join_fields=COLUMNS_FOR_UPSERT[1:4],
            update_only=True,
            fields_to_reset=COLUMNS_FOR_UPSERT[4:],
            conn=aa_db_connection)
        if aa_db_connection:
            aa_db_connection.commit()
        print("commit successfull, ofv values for attribute otype  updated successfully")
    except Exception as e:
        print("exception raised during object_field_value or field_value update for attribute", e)
        if aa_db_connection:
            aa_db_connection.rollback()
            aa_db_connection.close()
        return

    aa_db_connection.close()
    print("ofv and fv are updated successfully with deleted field as true for actual deleted tables and its attributes ")

def run():
    #set python path
    init_paths()
    #initilize django
    setup_django()
    #call deleted method which takes care of setting up deleted field in OFV and FV
    update_deleted_field()

if __name__ == '__main__':
           run()