Alation Analytics V1 Shows Deleted Tables and Columns as Existing¶
Applies to Alation Analytics V1
Important
This issue is fixed in release 2020.4.x
Problem¶
A table was deleted from the Catalog and appears as a deleted object, but Alation Analytics V1 returns the values of the boolean_value
field in the public.field_value
table as False
for this deleted table and its columns. For the deleted tables and attributes, this value should be equal to True
.
Solution¶
On your instance, run the script given below to correct the value of the boolean_value
field of the public.field_value
table for the deleted tables and attributes objects. For deleted objects, the value of the boolean_value
field should be equal to True
.
This requires server-side access to the Alation host.
The script applies to releases V R7 (5.12.x) and 2020.3 (5.17.x). If you have encountered this problem on a release earlier than V R7, please contact Alation Support.
Important
Until your Alation instance is upgraded to version 2020.4, this script has to be run every time after some tables are deleted from the Catalog in order for the Alation Analytics database to reflect the deletion correctly. The script needs to be run only once after upgrading to 2020.4.
To run the script,
Copy and save the script into a file
deleted_field_update_with_attribute_R7.py
.Copy or move the script file to /opt/alation/alation/opt/alation/django/alation_analytics/one_off_scripts (path outside the Alation shell) on your Alation server.
SSH to the Alation host and enter the Alation shell:
sudo /etc/init.d/alation shell
Go to /opt/alation/django/alation_analytics/one_off_scripts (path inside the shell).
Change user to
alation
and set the required permissions for thedeleted_field_update_with_attribute_R7.py
script file:sudo su alation sudo chown alation:alation deleted_field_update_with_attribute_R7.py sudo chmod +x deleted_field_update_with_attribute_R7.py
Run the script:
./deleted_field_update_with_attribute_R7.py
After the script has run, the Alation Analytics will have the values of the boolean_value
column correctly assigned to the deleted tables and attributes.
deleted_field_update_with_attribute_R7.py¶
Note
The script can also be downloaded from the Alation Community as a file: Alation Analytics V1 Troubleshooting: Deleted Tables and Columns Appear as Existing.
#! /usr/bin/env python
import sys
import os
import uuid
from psycopg2.extensions import AsIs
import django
def init_paths():
print("Setting python paths..")
# Set up the environment.
PATH = os.path.dirname(os.path.abspath(__file__))
# add django folder to path
sys.path.append(os.path.join(PATH, '../../'))
# add django/lib folder to path
sys.path.append(os.path.join(PATH, '../../lib'))
# add main?
sys.path.append(os.path.join(PATH, '../../main'))
# add alation/python and alation/security folders to path
sys.path.append(os.path.join(PATH, '../../../python'))
sys.path.append(os.path.join(PATH, '../../../security'))
# add alation to path
sys.path.append(os.path.join(PATH, '../../../'))
def setup_django():
print("Initializing Django..")
# let django know which settings file we're using
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "main.settings")
# set it up!
django.setup()
def abort_blocking_queries(load_table, db_connection):
import time
print('Killing queries running on the %s tables.', load_table)
cursor = db_connection.cursor()
cursor.execute("SELECT pid FROM pg_catalog.pg_locks blocking_locks ""WHERE relation=%s::regclass::oid", (load_table, ))
blocking_pids = cursor.fetchall()
for blocking_pid in blocking_pids:
print('Killing query running on pid %s.', blocking_pid)
cursor.execute("SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid=%s",(blocking_pid[0], ))
time.sleep(5)
cursor.execute("SELECT state FROM pg_stat_activity WHERE pid=%s", (blocking_pid[0], ))
execution_status = cursor.fetchall()
if execution_status:
print('Successfully killed query running on pid %s.', blocking_pid[0])
else:
print('failed to kill query running on pid %s.', blocking_pid[0])
return -1
return 0
def update_deleted_field():
from rosemeta.evaluation.db_util import get_db_conn
from django.conf import settings
from util.inherent_identity_extensions import alation_fp
from alation_analytics.utils.alation_object_fields import FIELDS_PER_OBJECT_TYPE
from alation_analytics.tasks.etl_time_period_job import TimePeriodETLJob
from util.sql import bulk_upsert_from_values
from alation_analytics.utils.constants import FieldValueSource
from alation_analytics.utils.alation_object_fields import FIELDS_PER_OBJECT_TYPE
from alation_analytics.models.models_field import ObjectFieldValue,FieldValue
from rosemeta.models import Table
from rosemeta.models import Attribute
from django.db import connections
aa_db_connection = get_db_conn(settings.DATABASES[settings.ALATION_ANALYTICS_DBNAME])
LOAD_TABLE = ObjectFieldValue._meta.db_table
FIELD_VALUE_LOAD_TABLE = FieldValue._meta.db_table
result = abort_blocking_queries(LOAD_TABLE,aa_db_connection)
result = result | abort_blocking_queries(FIELD_VALUE_LOAD_TABLE,aa_db_connection)
if result ==-1:
print("exiting inbetween as abort_blocking_queries is unsuccessfull")
return
COLUMNS_FOR_UPSERT = [
field.name + ' ' + field.db_type(connections[settings.ALATION_ANALYTICS_DBNAME])
for field in ObjectFieldValue._meta.fields
]
FIELD_VALUE_COLUMNS = [
field.name + ' ' + field.db_type(connections[settings.ALATION_ANALYTICS_DBNAME])
for field in FieldValue._meta.fields
]
#get deleted table ids from rosemeta_table
table_objects = [ {'oid':uuid.UUID(int = tup[0]).hex,'ts_updated': (tup[2] if tup[2] is not None else tup[1])} for tup in Table.objects.filter(deleted=True).values_list('id','ts_updated','ts_deleted')]
field_id = FIELDS_PER_OBJECT_TYPE["table"]["deleted"]["field_id"]
value_datatype = str(FIELDS_PER_OBJECT_TYPE["table"]["deleted"]["datatype"])
value_unicode=str(True)
print("total objects to be updated for table otype %d "% len(table_objects))
#create fingerprint for ofv and fv
value_fp = alation_fp(0, value_datatype + value_unicode)
FV_RETURN_FIELDS = ["value_fp"]
#OFV update
transformed_ofv_data =[]
value_fp_data = {}
value_source = FieldValueSource.NATIVE_FIELD_IN_ALATION
for objs in table_objects:
ts_updated = objs.get('ts_updated')
time_period_id = TimePeriodETLJob.get_time_period_id_from_date(ts_updated)
transformed_ofv_data.append((objs.get('oid'),27,field_id,value_fp,value_source,None,ts_updated,time_period_id))
try:
print("starting with object_field_value table otype update !!!")
#we only do update no upsert to prevent new delete events from adding to OFV
bulk_upsert_from_values(
provide_field_types=True,
target_table=LOAD_TABLE,
values_list=transformed_ofv_data,
join_fields=COLUMNS_FOR_UPSERT[1:4],
update_only=True,
fields_to_reset=COLUMNS_FOR_UPSERT[4:],
conn=aa_db_connection)
print("updated successfully into object_field_value , db commit yet to be done!!")
print("starting with field_value upsert !!!")
value_fp_data[value_fp] = FieldValue.get_field_value_tuple_for_datatype(value_fp, True, value_datatype)
#we do a upsert here since without FV join with OFV fails
result = bulk_upsert_from_values(
provide_field_types=True,
target_table=FIELD_VALUE_LOAD_TABLE,
values_list=value_fp_data.values(),
join_fields=FIELD_VALUE_COLUMNS[:1],
fields_to_reset=FIELD_VALUE_COLUMNS[1:],
return_fields=FV_RETURN_FIELDS,
conn=aa_db_connection)
print("inserted number of records in field_value %d " % len(result))
if aa_db_connection:
aa_db_connection.commit()
print("commit successfull, ofv and fv values updated successfully for table otype")
except Exception as e:
print("exception raised during object_field_value or field_value update for table otype ", e)
if aa_db_connection:
aa_db_connection.rollback()
aa_db_connection.close()
return
#clearing ofv data list which we used for tables so that we can start with attribute update
del(transformed_ofv_data[:])
attribute_objects = [ {'oid':uuid.UUID(int = tup[0]).hex,'ts_updated': (tup[2] if tup[2] is not None else tup[1])} for tup in Attribute.objects.filter(deleted=True).values_list('id','ts_updated','ts_deleted')]
for objs in attribute_objects:
ts_updated = objs.get('ts_updated')
time_period_id = TimePeriodETLJob.get_time_period_id_from_date(ts_updated)
transformed_ofv_data.append((objs.get('oid'),1,field_id,value_fp,value_source,None,ts_updated,time_period_id))
print("total objects to be updated for attribute otype %d "% len(attribute_objects))
try:
print("starting with object_field_value attribute otype update !!!")
#we only do update no upsert to prevent new delete events from adding to OFV
bulk_upsert_from_values(
provide_field_types=True,
target_table=LOAD_TABLE,
values_list=transformed_ofv_data,
join_fields=COLUMNS_FOR_UPSERT[1:4],
update_only=True,
fields_to_reset=COLUMNS_FOR_UPSERT[4:],
conn=aa_db_connection)
if aa_db_connection:
aa_db_connection.commit()
print("commit successfull, ofv values for attribute otype updated successfully")
except Exception as e:
print("exception raised during object_field_value or field_value update for attribute", e)
if aa_db_connection:
aa_db_connection.rollback()
aa_db_connection.close()
return
aa_db_connection.close()
print("ofv and fv are updated successfully with deleted field as true for actual deleted tables and its attributes ")
def run():
#set python path
init_paths()
#initilize django
setup_django()
#call deleted method which takes care of setting up deleted field in OFV and FV
update_deleted_field()
if __name__ == '__main__':
run()