Django Inspectdb Issue Using Oracle Database
Solution 1:
Two things:
- Inspectdb doesn't officially support oracle (see Django Docs - inspectdb ) sad times.
- Django doesn't have very strong support for Oracle schemas ( see unresolved Django ticket 6148) So you may have better luck if you are able to connect using the main user for the schema, making the schema you wish to introspect the default schema.
I was able to get a basic model file output by changing a the select in introspection.py. For me I changed the get_table_list function in django/db/backends/oracle/introspection.py (around line 40) from:
def get_table_list(self, cursor):
"Returns a list of table names in the current database."
cursor.execute("SELECT TABLE_NAME FROM USER_TABLES")
return [row[0].lower() for row in cursor.fetchall()]
To
def get_table_list(self, cursor):
"Returns a list of table names in the current database."
cursor.execute("SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = 'SCHEMA_TO_QUERY'")
return [row[0].lower() for row in cursor.fetchall()]
But gave up on django when I read the overall poor support for schemas in Oracle
Solution 2:
It works for me.
Did you check user has rights to see all the tables in Oracle ?
Anyway I'm curious about what's the SQL being used by inspectdb.
Solution 3:
@Plecebo is on the right track. The get_table_list method is the source of the problem, but the Select statement given did not work.
I temporarily hard coded the table names to quickly get what I needed:
(django/db/backends/oracle/introspection.py line 40)
def get_table_list(self, cursor):
"Returns a list of table names in the current database."
#cursor.execute("SELECT TABLE_NAME FROM USER_TABLES")
return ['table_name1', 'table_name2']
#return [row[0].lower() for row in cursor.fetchall()]
Solution 4:
As this came up in my recent search to sort out inspectdb
with Django 2.0 and an Oracle 11g legacy database I'm working with, I have taken a go at fixing introspection, and so far I've managed to get output for basic tables after modifying /lib/python3.6/site-packages/django/db/backends/oracle/introspection.py
:
(essentially replacing all user_* tables for all_* tables)
My currently working solution is the file contents below (introspection.py
).
import warnings
from collections import namedtuple
import cx_Oracle
from django.db import models
from django.db.backends.base.introspection import (
BaseDatabaseIntrospection, FieldInfo as BaseFieldInfo, TableInfo,
)
from django.utils.deprecation import RemovedInDjango21Warning
FieldInfo = namedtuple('FieldInfo', BaseFieldInfo._fields + ('is_autofield',))
class DatabaseIntrospection(BaseDatabaseIntrospection):
# Maps type objects to Django Field types.
data_types_reverse = {
cx_Oracle.BLOB: 'BinaryField',
cx_Oracle.CLOB: 'TextField',
cx_Oracle.DATETIME: 'DateField',
cx_Oracle.FIXED_CHAR: 'CharField',
cx_Oracle.FIXED_NCHAR: 'CharField',
cx_Oracle.NATIVE_FLOAT: 'FloatField',
cx_Oracle.NCHAR: 'CharField',
cx_Oracle.NCLOB: 'TextField',
cx_Oracle.NUMBER: 'DecimalField',
cx_Oracle.STRING: 'CharField',
cx_Oracle.TIMESTAMP: 'DateTimeField',
}
cache_bust_counter = 1
def get_field_type(self, data_type, description):
if data_type == cx_Oracle.NUMBER:
precision, scale = description[4:6]
if scale == 0:
if precision > 11:
return 'BigAutoField' if description.is_autofield else 'BigIntegerField'
elif precision == 1:
return 'BooleanField'
elif description.is_autofield:
return 'AutoField'
else:
return 'IntegerField'
elif scale == -127:
return 'FloatField'
return super().get_field_type(data_type, description)
def get_table_list(self, cursor):
"""Return a list of table and view names in the current database."""
# cursor.execute("SELECT TABLE_NAME, 't' FROM USER_TABLES UNION ALL "
# "SELECT VIEW_NAME, 'v' FROM USER_VIEWS")
cursor.execute("SELECT TABLE_NAME, 't' FROM ALL_TABLES WHERE OWNER = 'V500' ")
return [TableInfo(row[0].lower(), row[1]) for row in cursor.fetchall()]
def get_table_description(self, cursor, table_name):
"""
Return a description of the table with the DB-API cursor.description
interface.
"""
cursor.execute("""
SELECT
column_name,
data_default,
CASE
WHEN char_used IS NULL THEN data_length
ELSE char_length
END as internal_size,
0 as is_autofield
FROM ALL_TAB_COLUMNS
WHERE table_name = UPPER(%s)""", [table_name])
field_map = {
column: (internal_size, default if default != 'NULL' else None, is_autofield)
for column, default, internal_size, is_autofield in cursor.fetchall()
}
self.cache_bust_counter += 1
cursor.execute("SELECT * FROM {} WHERE ROWNUM < 2 AND {} > 0".format(
self.connection.ops.quote_name(table_name),
self.cache_bust_counter))
description = []
for desc in cursor.description:
name = desc[0]
internal_size, default, is_autofield = field_map[name]
name = name % {} # cx_Oracle, for some reason, doubles percent signs.
description.append(FieldInfo(*(
(name.lower(),) +
desc[1:3] +
(internal_size, desc[4] or 0, desc[5] or 0) +
desc[6:] +
(default, is_autofield)
)))
return description
def table_name_converter(self, name):
"""Table name comparison is case insensitive under Oracle."""
return name.lower()
def get_sequences(self, cursor, table_name, table_fields=()):
# Tables don't exist in 11g (this function added in django 2
# cursor.execute("""
# SELECT
# user_tab_identity_cols.sequence_name,
# user_tab_identity_cols.column_name
# FROM
# user_tab_identity_cols,
# user_constraints,
# user_cons_columns cols
# WHERE
# user_constraints.constraint_name = cols.constraint_name
# AND user_constraints.table_name = user_tab_identity_cols.table_name
# AND cols.column_name = user_tab_identity_cols.column_name
# AND user_constraints.constraint_type = 'P'
# AND user_tab_identity_cols.table_name = UPPER(%s)
# """, [table_name])
# # Oracle allows only one identity column per table.
# row = cursor.fetchone()
# if row:
# return [{'name': row[0].lower(), 'table': table_name, 'column': row[1].lower()}]
# # To keep backward compatibility for AutoFields that aren't Oracle
# # identity columns.
# for f in table_fields:
# if isinstance(f, models.AutoField):
# return [{'table': table_name, 'column': f.column}]
return []
def get_relations(self, cursor, table_name):
"""
Return a dictionary of {field_name: (field_name_other_table, other_table)}
representing all relationships to the given table.
"""
table_name = table_name.upper()
cursor.execute("""
SELECT ca.column_name, cb.table_name, cb.column_name
FROM ALL_CONSTRAINTS, ALL_CONS_COLUMNS ca, ALL_CONS_COLUMNS cb
WHERE ALL_CONSTRAINTS.table_name = %s AND
ALL_CONSTRAINTS.constraint_name = ca.constraint_name AND
ALL_CONSTRAINTS.r_constraint_name = cb.constraint_name AND
ca.position = cb.position""", [table_name])
relations = {}
for row in cursor.fetchall():
relations[row[0].lower()] = (row[2].lower(), row[1].lower())
return relations
def get_key_columns(self, cursor, table_name):
cursor.execute("""
SELECT ccol.column_name, rcol.table_name AS referenced_table, rcol.column_name AS referenced_column
FROM ALL_CONSTRAINTS c
JOIN ALL_CONS_COLUMNS ccol
ON ccol.constraint_name = c.constraint_name
JOIN ALL_CONS_COLUMNS rcol
ON rcol.constraint_name = c.r_constraint_name
WHERE c.table_name = %s AND c.constraint_type = 'R'""", [table_name.upper()])
return [tuple(cell.lower() for cell in row)
for row in cursor.fetchall()]
def get_indexes(self, cursor, table_name):
warnings.warn(
"get_indexes() is deprecated in favor of get_constraints().",
RemovedInDjango21Warning, stacklevel=2
)
sql = """
SELECT LOWER(uic1.column_name) AS column_name,
CASE ALL_CONSTRAINTS.constraint_type
WHEN 'P' THEN 1 ELSE 0
END AS is_primary_key,
CASE ALL_INDEXES.uniqueness
WHEN 'UNIQUE' THEN 1 ELSE 0
END AS is_unique
FROM ALL_CONSTRAINTS, ALL_INDEXES, ALL_IND_COLUMNS uic1
WHERE ALL_CONSTRAINTS.constraint_type (+) = 'P'
AND ALL_CONSTRAINTS.index_name (+) = uic1.index_name
AND ALL_INDEXES.uniqueness (+) = 'UNIQUE'
AND ALL_INDEXES.index_name (+) = uic1.index_name
AND uic1.table_name = UPPER(%s)
AND uic1.column_position = 1
AND NOT EXISTS (
SELECT 1
FROM ALL_IND_COLUMNS uic2
WHERE uic2.index_name = uic1.index_name
AND uic2.column_position = 2
)
"""
cursor.execute(sql, [table_name])
indexes = {}
for row in cursor.fetchall():
indexes[row[0]] = {'primary_key': bool(row[1]),
'unique': bool(row[2])}
return indexes
def get_constraints(self, cursor, table_name):
"""
Retrieve any constraints or keys (unique, pk, fk, check, index) across
one or more columns.
"""
constraints = {}
# Loop over the constraints, getting PKs, uniques, and checks
cursor.execute("""
SELECT
ALL_CONSTRAINTS.constraint_name,
LISTAGG(LOWER(cols.column_name), ',') WITHIN GROUP (ORDER BY cols.position),
CASE ALL_CONSTRAINTS.constraint_type
WHEN 'P' THEN 1
ELSE 0
END AS is_primary_key,
CASE
WHEN ALL_CONSTRAINTS.constraint_type IN ('P', 'U') THEN 1
ELSE 0
END AS is_unique,
CASE ALL_CONSTRAINTS.constraint_type
WHEN 'C' THEN 1
ELSE 0
END AS is_check_constraint
FROM
ALL_CONSTRAINTS
LEFT OUTER JOIN
ALL_CONS_COLUMNS cols ON ALL_CONSTRAINTS.constraint_name = cols.constraint_name
WHERE
ALL_CONSTRAINTS.constraint_type = ANY('P', 'U', 'C')
AND ALL_CONSTRAINTS.table_name = UPPER(%s)
GROUP BY ALL_CONSTRAINTS.constraint_name, ALL_CONSTRAINTS.constraint_type
""", [table_name])
for constraint, columns, pk, unique, check in cursor.fetchall():
constraints[constraint] = {
'columns': columns.split(','),
'primary_key': pk,
'unique': unique,
'foreign_key': None,
'check': check,
'index': unique, # All uniques come with an index
}
# Foreign key constraints
cursor.execute("""
SELECT
cons.constraint_name,
LISTAGG(LOWER(cols.column_name), ',') WITHIN GROUP (ORDER BY cols.position),
LOWER(rcols.table_name),
LOWER(rcols.column_name)
FROM
ALL_CONSTRAINTS cons
INNER JOIN
ALL_CONS_COLUMNS rcols ON rcols.constraint_name = cons.r_constraint_name AND rcols.position = 1
LEFT OUTER JOIN
ALL_CONS_COLUMNS cols ON cons.constraint_name = cols.constraint_name
WHERE
cons.constraint_type = 'R' AND
cons.table_name = UPPER(%s)
GROUP BY cons.constraint_name, rcols.table_name, rcols.column_name
""", [table_name])
for constraint, columns, other_table, other_column in cursor.fetchall():
constraints[constraint] = {
'primary_key': False,
'unique': False,
'foreign_key': (other_table, other_column),
'check': False,
'index': False,
'columns': columns.split(','),
}
# Now get indexes
cursor.execute("""
SELECT
ind.index_name,
LOWER(ind.index_type),
LISTAGG(LOWER(cols.column_name), ',') WITHIN GROUP (ORDER BY cols.column_position),
LISTAGG(cols.descend, ',') WITHIN GROUP (ORDER BY cols.column_position)
FROM
ALL_IND_COLUMNS cols, ALL_INDEXES ind
WHERE
cols.table_name = UPPER(%s) AND
NOT EXISTS (
SELECT 1
FROM ALL_CONSTRAINTS cons
WHERE ind.index_name = cons.index_name
) AND cols.index_name = ind.index_name
GROUP BY ind.index_name, ind.index_type
""", [table_name])
for constraint, type_, columns, orders in cursor.fetchall():
constraints[constraint] = {
'primary_key': False,
'unique': False,
'foreign_key': None,
'check': False,
'index': True,
'type': 'idx' if type_ == 'normal' else type_,
'columns': columns.split(','),
'orders': orders.split(','),
}
return constraints
Noting that Django 1.11 is the last version to officially support 11g was something I picked up from How to make Django 2.0 to use Oracle 11g syntax instead of 12c?, which prompted me to look at the 1.11 codebase to see what had changed (https://github.com/django/django/blob/stable/1.11.x/django/db/backends/oracle/introspection.py)
At present I'm working to read data from a read only database connection, so fixing migrate isn't something I've got motivation for presently. At the time of writing the inspectdb function is outputting 6000+ tables as python code without any major issues.
Solution 5:
Haaaa.. just facing this problem and found a silly reason!!
No need to edit any lower layer files as was done in one of the post. You face this issue when you dont have tables in that Database. Hehe..
Create a few tables and then try
. It works like a charm.
Post a Comment for "Django Inspectdb Issue Using Oracle Database"