001/*
002 * (C) Copyright 2006-2016 Nuxeo SA (http://nuxeo.com/) and others.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 *
016 * Contributors:
017 *     George Lefter
018 *     Florent Guillaume
019 */
020package org.nuxeo.ecm.directory.sql;
021
022import java.sql.Connection;
023import java.sql.SQLException;
024import java.util.LinkedHashMap;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028
029import javax.transaction.Synchronization;
030
031import org.apache.commons.lang.StringUtils;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.nuxeo.ecm.core.cache.CacheService;
035import org.nuxeo.ecm.core.schema.SchemaManager;
036import org.nuxeo.ecm.core.schema.types.Field;
037import org.nuxeo.ecm.core.schema.types.Schema;
038import org.nuxeo.ecm.core.storage.sql.ColumnType;
039import org.nuxeo.ecm.core.storage.sql.jdbc.db.Column;
040import org.nuxeo.ecm.core.storage.sql.jdbc.db.Table;
041import org.nuxeo.ecm.core.storage.sql.jdbc.dialect.Dialect;
042import org.nuxeo.ecm.directory.AbstractDirectory;
043import org.nuxeo.ecm.directory.DirectoryCSVLoader;
044import org.nuxeo.ecm.directory.DirectoryException;
045import org.nuxeo.ecm.directory.Session;
046import org.nuxeo.runtime.api.Framework;
047import org.nuxeo.runtime.datasource.ConnectionHelper;
048import org.nuxeo.runtime.transaction.TransactionHelper;
049
050public class SQLDirectory extends AbstractDirectory {
051
052    protected class TxSessionCleaner implements Synchronization {
053        private final SQLSession session;
054
055        Throwable initContext = captureInitContext();
056
057        protected TxSessionCleaner(SQLSession session) {
058            this.session = session;
059        }
060
061        protected Throwable captureInitContext() {
062            if (!log.isDebugEnabled()) {
063                return null;
064            }
065            return new Throwable("SQL directory session init context in " + SQLDirectory.this);
066        }
067
068        protected void checkIsNotLive() {
069            try {
070                if (!session.isLive()) {
071                    return;
072                }
073                if (initContext != null) {
074                    log.warn("Closing a sql directory session for you " + session, initContext);
075                } else {
076                    log.warn("Closing a sql directory session for you " + session);
077                }
078                if (!TransactionHelper.isTransactionActiveOrMarkedRollback()) {
079                    log.warn("Closing sql directory session outside a transaction" + session);
080                }
081                session.close();
082            } catch (DirectoryException e) {
083                log.error("Cannot state on sql directory session before commit " + SQLDirectory.this, e);
084            }
085
086        }
087
088        @Override
089        public void beforeCompletion() {
090            checkIsNotLive();
091        }
092
093        @Override
094        public void afterCompletion(int status) {
095            checkIsNotLive();
096        }
097
098    }
099
100    public static final Log log = LogFactory.getLog(SQLDirectory.class);
101
102    public static final String TENANT_ID_FIELD = "tenantId";
103
104    private final boolean nativeCase;
105
106    private Table table;
107
108    private Schema schema;
109
110    private Map<String, Field> schemaFieldMap;
111
112    private List<String> storedFieldNames;
113
114    private volatile Dialect dialect;
115
116    public SQLDirectory(SQLDirectoryDescriptor descriptor) {
117        super(descriptor);
118        nativeCase = Boolean.TRUE.equals(descriptor.nativeCase);
119
120        // register the references to other directories
121        addReferences(descriptor.getInverseReferences());
122        addReferences(descriptor.getTableReferences());
123
124        // cache parameterization
125        cache.setEntryCacheName(descriptor.cacheEntryName);
126        cache.setEntryCacheWithoutReferencesName(descriptor.cacheEntryWithoutReferencesName);
127        cache.setNegativeCaching(descriptor.negativeCaching);
128
129        // Cache fallback
130        CacheService cacheService = Framework.getLocalService(CacheService.class);
131        if (cacheService != null) {
132            if (descriptor.cacheEntryName == null && descriptor.getCacheMaxSize() != 0) {
133                cache.setEntryCacheName("cache-" + getName());
134                cacheService.registerCache("cache-" + getName(),
135                        descriptor.getCacheMaxSize(),
136                        descriptor.getCacheTimeout() / 60);
137            }
138            if (descriptor.cacheEntryWithoutReferencesName == null && descriptor.getCacheMaxSize() != 0) {
139                cache.setEntryCacheWithoutReferencesName(
140                        "cacheWithoutReference-" + getName());
141                cacheService.registerCache("cacheWithoutReference-" + getName(),
142                        descriptor.getCacheMaxSize(),
143                        descriptor.getCacheTimeout() / 60);
144            }
145        }
146    }
147
148    @Override
149    public SQLDirectoryDescriptor getDescriptor() {
150        return (SQLDirectoryDescriptor) descriptor;
151    }
152
153    /**
154     * Lazily initializes the connection.
155     *
156     * @return {@code true} if CSV data should be loaded
157     * @since 8.4
158     */
159    protected boolean initConnectionIfNeeded() {
160        // double checked locking with volatile pattern to ensure concurrent lazy init
161        if (dialect == null) {
162            synchronized (this) {
163                if (dialect == null) {
164                    return initConnection();
165                }
166            }
167        }
168        return false;
169    }
170
171    /**
172     * Initializes the table.
173     *
174     * @return {@code true} if CSV data should be loaded
175     * @since 6.0
176     */
177    protected boolean initConnection() {
178        SQLDirectoryDescriptor descriptor = getDescriptor();
179
180        try (Connection sqlConnection = getConnection()) {
181            dialect = Dialect.createDialect(sqlConnection, null);
182            // setup table and fields maps
183            String tableName = descriptor.tableName == null ? descriptor.name : descriptor.tableName;
184            table = SQLHelper.addTable(tableName, dialect, useNativeCase());
185            SchemaManager schemaManager = Framework.getLocalService(SchemaManager.class);
186            schema = schemaManager.getSchema(getSchema());
187            if (schema == null) {
188                throw new DirectoryException("schema not found: " + getSchema());
189            }
190            schemaFieldMap = new LinkedHashMap<>();
191            storedFieldNames = new LinkedList<>();
192            boolean hasPrimary = false;
193            for (Field f : schema.getFields()) {
194                String fieldName = f.getName().getLocalName();
195                schemaFieldMap.put(fieldName, f);
196
197                if (!isReference(fieldName)) {
198                    // list of fields that are actually stored in the table of
199                    // the current directory and not read from an external
200                    // reference
201                    storedFieldNames.add(fieldName);
202
203                    boolean isId = fieldName.equals(getIdField());
204                    ColumnType type = ColumnType.fromField(f);
205                    if (isId && descriptor.isAutoincrementIdField()) {
206                        type = ColumnType.AUTOINC;
207                    }
208                    Column column = SQLHelper.addColumn(table, fieldName, type, useNativeCase());
209                    if (isId) {
210                        if (descriptor.isAutoincrementIdField()) {
211                            column.setIdentity(true);
212                        }
213                        column.setPrimary(true);
214                        column.setNullable(false);
215                        hasPrimary = true;
216                    }
217                }
218            }
219            if (!hasPrimary) {
220                throw new DirectoryException(String.format(
221                        "Directory '%s' id field '%s' is not present in schema '%s'", getName(), getIdField(),
222                        getSchema()));
223            }
224
225            SQLHelper helper = new SQLHelper(sqlConnection, table, descriptor.getCreateTablePolicy());
226            boolean loadData = helper.setupTable();
227            return loadData;
228        } catch (SQLException e) {
229            // exception on close
230            throw new DirectoryException(e);
231        }
232    }
233
234    public Connection getConnection() throws DirectoryException {
235        SQLDirectoryDescriptor descriptor = getDescriptor();
236        if (StringUtils.isBlank(descriptor.dataSourceName)) {
237            throw new DirectoryException("Missing dataSource for SQL directory: " + getName());
238        }
239        try {
240            return ConnectionHelper.getConnection(descriptor.dataSourceName);
241        } catch (SQLException e) {
242            throw new DirectoryException("Cannot connect to SQL directory '" + getName() + "': " + e.getMessage(), e);
243        }
244    }
245
246    @Override
247    public Session getSession() throws DirectoryException {
248        boolean loadData = initConnectionIfNeeded();
249        SQLSession session = new SQLSession(this, getDescriptor());
250        addSession(session);
251        if (loadData && descriptor.getDataFileName() != null) {
252            Schema schema = Framework.getService(SchemaManager.class).getSchema(getSchema());
253            DirectoryCSVLoader.loadData(descriptor.getDataFileName(), descriptor.getDataFileCharacterSeparator(),
254                    schema, session::createEntry);
255        }
256        return session;
257    }
258
259    protected void addSession(final SQLSession session) throws DirectoryException {
260        super.addSession(session);
261        registerInTx(session);
262    }
263
264    protected void registerInTx(final SQLSession session) throws DirectoryException {
265        if (!TransactionHelper.isTransactionActive()) {
266            return;
267        }
268        TransactionHelper.registerSynchronization(new TxSessionCleaner(session));
269    }
270
271    public Map<String, Field> getSchemaFieldMap() {
272        return schemaFieldMap;
273    }
274
275    public List<String> getStoredFieldNames() {
276        return storedFieldNames;
277    }
278
279    public Table getTable() {
280        return table;
281    }
282
283    public Dialect getDialect() {
284        return dialect;
285    }
286
287    public boolean useNativeCase() {
288        return nativeCase;
289    }
290
291    @Override
292    public boolean isMultiTenant() {
293        return table.getColumn(TENANT_ID_FIELD) != null;
294    }
295
296    @Override
297    public String toString() {
298        return "SQLDirectory [name=" + descriptor.name + "]";
299    }
300
301}