Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions lib/migration_generator/migration_generator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,14 @@ defmodule AshPostgres.MigrationGenerator do
version in versions
end)
|> Enum.each(fn {version, mod} ->
runner_opts =
[
all: true,
prefix: prefix
]
|> maybe_put_mod_attribute(mod, :disable_ddl_transaction)
|> maybe_put_mod_attribute(mod, :disable_migration_lock)

Ecto.Migration.Runner.run(
repo,
[],
Expand All @@ -563,8 +571,7 @@ defmodule AshPostgres.MigrationGenerator do
:forward,
:down,
:down,
all: true,
prefix: prefix
runner_opts
)

Ecto.Migration.SchemaMigration.down(repo, repo.config(), version, prefix: prefix)
Expand Down Expand Up @@ -3858,4 +3865,14 @@ defmodule AshPostgres.MigrationGenerator do

defp to_ordered_object(value) when is_list(value), do: Enum.map(value, &to_ordered_object/1)
defp to_ordered_object(value), do: value

defp maybe_put_mod_attribute(opts, mod, attribute) do
migration_config = mod.__migration__()

case migration_config[attribute] do
nil -> opts
false -> opts
value -> Keyword.put(opts, attribute, value)
end
end
end
47 changes: 41 additions & 6 deletions lib/migration_generator/operation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ defmodule AshPostgres.MigrationGenerator.Operation do
end
end

def concurrent_option(table, multitenancy, schema) do
if multitenancy.strategy == :context do
# For tenant migrations, prefix is a function call
"concurrently: AshPostgres.MigrationHelper.maybe_index_concurrently?(:#{as_atom(table)}, repo(), prefix())"
else
# For regular migrations, prefix is a string or nil
prefix_arg = if schema, do: "\"#{schema}\"", else: "nil"

"concurrently: AshPostgres.MigrationHelper.maybe_index_concurrently?(:#{as_atom(table)}, repo(), #{prefix_arg})"
end
end

def on_delete(%{on_delete: {:nilify, columns}}) when is_list(columns) do
"on_delete: {:nilify, #{inspect(columns)}}"
end
Expand Down Expand Up @@ -903,30 +915,46 @@ defmodule AshPostgres.MigrationGenerator.Operation do
},
table: table,
schema: schema,
multitenancy: multitenancy
multitenancy: multitenancy,
concurrently: concurrently
}) do
keys = index_keys(keys, all_tenants?, multitenancy)

index_name = index_name || "#{table}_#{name}_index"

concurrent_opt =
if concurrently do
concurrent_option(table, multitenancy, schema)
else
nil
end

base_opts =
join([
"name: \"#{index_name}\"",
option("prefix", schema),
option("nulls_distinct", nils_distinct?),
concurrent_opt
])

cond do
base_filter && where ->
where = "(#{where}) AND (#{base_filter})"

"create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], #{join(["name: \"#{index_name}\"", option("prefix", schema), option("nulls_distinct", nils_distinct?), option("where", where)])})"
"create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], #{join([base_opts, option("where", where)])})"

base_filter ->
base_filter = "(#{base_filter})"

"create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], where: \"#{base_filter}\", #{join(["name: \"#{index_name}\"", option("prefix", schema), option("nulls_distinct", nils_distinct?)])})"
"create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], where: \"#{base_filter}\", #{base_opts})"

where ->
where = "(#{where})"

"create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], #{join(["name: \"#{index_name}\"", option("prefix", schema), option("nulls_distinct", nils_distinct?), option("where", where)])})"
"create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], #{join([base_opts, option("where", where)])})"

true ->
"create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], #{join(["name: \"#{index_name}\"", option("prefix", schema), option("nulls_distinct", nils_distinct?)])})"
"create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], #{base_opts})"
end
end

Expand Down Expand Up @@ -1007,11 +1035,18 @@ defmodule AshPostgres.MigrationGenerator.Operation do
{where, base_filter} -> %{index | where: base_filter <> " AND " <> where}
end

concurrent_opt =
if index.concurrently do
concurrent_option(table, multitenancy, schema)
else
nil
end

opts =
join([
option(:name, index.name),
option(:unique, index.unique),
option(:concurrently, index.concurrently),
concurrent_opt,
option(:using, index.using),
option(:prefix, index.prefix),
option(:where, index.where),
Expand Down
70 changes: 70 additions & 0 deletions lib/migration_helper.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# SPDX-FileCopyrightText: 2025 ash_postgres contributors <https://github.com/ash-project/ash_postgres/graphs.contributors>
#
# SPDX-License-Identifier: MIT

defmodule AshPostgres.MigrationHelper do
@moduledoc """
Helper functions for AshPostgres migrations.

This module provides utilities for migrations, particularly for handling
concurrent index creation in various scenarios.
"""

@doc """
Determines whether a concurrent index should be used.

Returns `false` (disables concurrent) in three scenarios:
1. When running in test environment
2. When inside a transaction
3. When the table has no existing records (for tenant migrations)

## Examples

# In a regular migration:
create index(:posts, [:title], concurrently: maybe_index_concurrently?(:posts, repo()))

# In a tenant migration:
create index(:posts, [:title], concurrently: maybe_index_concurrently?(:posts, repo(), prefix()))

"""
def maybe_index_concurrently?(table, repo, prefix \\ nil) do
cond do
Mix.env() == :test ->
false

repo.in_transaction?() ->
false

table_empty?(repo, table, prefix) ->
false

true ->
true
end
end

defp table_empty?(repo, table, prefix) do
table_name = to_string(table)

quoted_table =
if prefix do
Ecto.Adapters.SQL.quote_name({prefix, table_name})
else
Ecto.Adapters.SQL.quote_name(table_name)
end

[[exists]] =
Ecto.Adapters.SQL.query!(repo, "SELECT EXISTS(SELECT 1 FROM #{quoted_table} LIMIT 1)", [])

!exists
end

defp quote_name({schema, table}) do
quote_name(schema) <> "." <> quote_name(table)
end

defp quote_name(name) do
name = name |> to_string()
~s("#{String.replace(name, ~s("), ~s(""))}")
end
end
21 changes: 19 additions & 2 deletions lib/multitenancy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ defmodule AshPostgres.MultiTenancy do
|> Enum.filter(& &1)
|> Enum.map(&load_migration!/1)
|> Enum.each(fn {version, mod} ->
runner_opts =
[
all: true,
prefix: tenant_name
]
|> maybe_put_mod_attribute(mod, :disable_ddl_transaction)
|> maybe_put_mod_attribute(mod, :disable_migration_lock)

Ecto.Migration.Runner.run(
repo,
[],
Expand All @@ -54,8 +62,7 @@ defmodule AshPostgres.MultiTenancy do
:forward,
:up,
:up,
all: true,
prefix: tenant_name
runner_opts
)

Ecto.Migration.SchemaMigration.up(repo, repo.config(), version, prefix: tenant_name)
Expand Down Expand Up @@ -121,4 +128,14 @@ defmodule AshPostgres.MultiTenancy do
defp tenant_name_regex do
~r/^[a-zA-Z0-9_-]+$/
end

defp maybe_put_mod_attribute(opts, mod, attribute) do
migration_config = mod.__migration__()

case migration_config[attribute] do
nil -> opts
false -> opts
value -> Keyword.put(opts, attribute, value)
end
end
end
Loading
Loading