Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
com.github.seancorfield/next.jdbc {:mvn/version "1.2.780"}
org.postgresql/postgresql {:mvn/version "42.3.6"}
camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.2"}
com.github.vertical-blank/sql-formatter {:mvn/version "1.0.3"}
metosin/jsonista {:mvn/version "0.2.7"}}
com.github.vertical-blank/sql-formatter {:mvn/version "2.0.3"}
metosin/jsonista {:mvn/version "0.2.7"}
org.apache.commons/commons-lang3 {:mvn/version "3.12.0"}}
:aliases
{:test {:extra-paths ["test"]
:extra-deps {org.clojure/test.check {:mvn/version "0.10.0"}
clj-kondo/clj-kondo {:mvn/version "2022.05.31"}
metosin/testit {:mvn/version "0.4.0"}
lambdaisland/kaocha {:mvn/version "1.0.700"}}}
same/ish {:mvn/version "0.1.4"}
lambdaisland/kaocha {:mvn/version "1.66.1034"}
lambdaisland/kaocha-cloverage {:mvn/version "1.0.75"}}}
:runner
{:extra-deps {com.cognitect/test-runner
{:git/url "https://github.com/cognitect-labs/test-runner"
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
[com.github.seancorfield/next.jdbc "1.2.780"]
[org.postgresql/postgresql "42.3.6"]
[camel-snake-kebab/camel-snake-kebab "0.4.2"]
[com.github.vertical-blank/sql-formatter "1.0.3"]
[com.github.vertical-blank/sql-formatter "2.0.3"]
[metosin/jsonista "0.2.7"]
[com.stuartsierra/dependency "1.0.0"]])
106 changes: 95 additions & 11 deletions src/com/verybigthings/penkala/decomposition.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,53 @@
(:require [clojure.spec.alpha :as s]
[com.verybigthings.penkala.util :refer [as-vec path-prefix-join col->alias]]
[clojure.set :as set]
[clojure.string :as str]))
[clojure.string :as str])
(:import java.time.LocalDateTime
java.time.LocalTime
java.time.format.DateTimeFormatter
java.time.temporal.ChronoUnit))

(defmulti coerce-embedded-value
(fn [pg-type _]
pg-type))

(defmethod coerce-embedded-value :default [_ value] value)

(defmethod coerce-embedded-value "numeric" [_ value]
(bigdec value))

(defmethod coerce-embedded-value "real" [_ value]
(float value))

(defmethod coerce-embedded-value "money" [_ value]
(let [[_ numeric] (re-matches #"\D*(\d*[\.|,]\d*)\D*" value)
numeric' (str/replace numeric #"," ".")]
(Double/parseDouble numeric')))

(let [formatter (DateTimeFormatter/ofPattern "yyyy-MM-dd'T'HH:mm:ss[.][SSSSSS][SSSSS][SSSS][SSS][SS][S][XXX][XX][X]")]
(defmethod coerce-embedded-value "timestamp with time zone" [_ value]
(LocalDateTime/parse value formatter)))

(let [formatter (DateTimeFormatter/ofPattern "yyyy-MM-dd'T'HH:mm:ss[.][SSSSSS][SSSSS][SSSS][SSS][SS][S]")]
(defmethod coerce-embedded-value "timestamp without time zone" [_ value]
(LocalDateTime/parse value formatter)))

(let [formatter (DateTimeFormatter/ofPattern "HH:mm:ss[.][SSSSSS][SSSSS][SSSS][SSS][SS][S][XXX][XX][X]")]
(defmethod coerce-embedded-value "time with time zone" [_ value]
(println "1>>>>>" value)
(LocalTime/parse value formatter)))

(let [formatter (DateTimeFormatter/ofPattern "HH:mm:ss[.][SSSSSS][SSSSS][SSSS][SSS][SS][S]")]
(defmethod coerce-embedded-value "time without time zone" [_ value]

(LocalTime/parse value formatter)))

(defn coerce-embedded-row [heading row]
(->> heading
(map-indexed (fn [idx [col-name pg-type]]
(println col-name pg-type)
[(keyword col-name) (coerce-embedded-value pg-type (get row idx))]))
(into {})))

(defrecord DecompositionSchema [pk decompose-to namespace columns])

Expand Down Expand Up @@ -41,9 +87,12 @@
(s/def ::keep-nil?
boolean?)

(s/def ::embedded?
boolean?)

(s/def ::schema
(s/keys
:opt-un [::decompose-to ::pk ::schema ::keep-duplicates? ::keep-nil? ::namespace]))
:opt-un [::decompose-to ::pk ::schema ::keep-duplicates? ::keep-nil? ::embedded? ::namespace]))

(declare process-schema)

Expand All @@ -55,7 +104,7 @@
(reduce-kv
(fn [acc k v]
(if (map? v)
(assoc-in acc [:schemas (rename k)] (process-schema v))
(assoc-in acc [:schemas (rename k)] (process-schema k v))
(assoc-in acc [:renames (rename k)] v)))
schema
columns))]
Expand All @@ -69,9 +118,21 @@
schema
columns))))

(defn process-schema- [schema]
(defn process-embedded-schema [schema column]
(cond
(and (:embedded? schema) column)
(-> schema (dissoc :embedded?) (assoc :embedded column))

(and (:embedded? schema) (not column))
(throw (ex-info "Root schema can't have :embedded? true" {:schema schema}))

:else
schema))

(defn process-schema- [column schema]
(-> schema
process-schema-columns
(process-embedded-schema column)
(update :pk as-vec)))

(def process-schema (memoize process-schema-))
Expand All @@ -90,10 +151,20 @@
(defn assoc-descendants [acc schemas idx row]
(reduce-kv
(fn [m k v]
(let [descendant (build (get acc k) v idx row)]
(if descendant
(assoc m k descendant)
m)))
(if-let [embedded (:embedded v)]
(let [{:keys [heading body]} (get row embedded)
processed (reduce
(fn [acc [idx row]]
(->> row
(coerce-embedded-row heading)
(build acc v idx)))
{}
(map-indexed (fn [idx v] [idx v]) (as-vec body)))]
(assoc m k processed))
(let [descendant (build (get acc k) v idx row)]
(if descendant
(assoc m k descendant)
m))))
acc
schemas))

Expand Down Expand Up @@ -163,11 +234,13 @@
(expand-transformed-coll transformed)
transformed)))



(defn decompose
"Decomposes the data based on the schema."
[schema data]
(when (and (seq schema) (seq data))
(let [schema' (process-schema schema)
(let [schema' (process-schema nil schema)
mapping (reduce
(fn [acc [idx row]]
(build acc schema' idx row))
Expand Down Expand Up @@ -280,12 +353,23 @@
(assoc acc alias join-schema)))
acc)))
columns
(:joins relation))]
(:joins relation))
columns-with-joined-and-embeds (reduce
(fn [acc alias]
(let [col-id (get-in relation [:aliases->ids alias])
{:keys [type relation]} (get-in relation [:columns col-id])]
(if (= :embed type)
(let [overrides (get-in overrides [:schema alias])
schema (infer-schema relation overrides [])]
(assoc acc alias (assoc schema :embedded? true)))
acc)))
columns-with-joined
(:projection relation))]
(map->DecompositionSchema
{:pk pk
:decompose-to decompose-to
:namespace namespace
:schema columns-with-joined
:schema columns-with-joined-and-embeds
:processor processor
:keep-nil? keep-nil?
:keep-duplicates? keep-duplicates?})))))
8 changes: 6 additions & 2 deletions src/com/verybigthings/penkala/next_jdbc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@
(with-relations (concat tables views))))))

(defn prettify-sql [sql]
(SqlFormatter/format sql))
(.. SqlFormatter
(of "postgresql")
(format sql)))

(defn validate-relation [env rel]
(let [rel' (get env rel)]
Expand All @@ -142,6 +144,7 @@
relation' (if (keyword? relation) (validate-relation env relation) relation)
sqlvec (r/get-select-query relation' env params)
decomposition-schema (d/infer-schema relation' decomposition-schema-overrides)]
;;(clojure.pprint/pprint decomposition-schema)
;;(-> sqlvec first prettify-sql println)
;;(println (rest sqlvec))
(->> (jdbc/execute! db sqlvec default-next-jdbc-options)
Expand Down Expand Up @@ -246,4 +249,5 @@

(require '[com.verybigthings.penkala.helpers :refer [param]])

(r/get-select-query (r/where posts-rel [:= :user-id (param :user/id)]) {} {:user/id 1}))
(r/get-select-query (r/where posts-rel [:= :user-id (param :user/id)]) {} {:user/id 1}))

26 changes: 25 additions & 1 deletion src/com/verybigthings/penkala/relation.clj
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
(-extend [this col-name extend-expression])
(-extend-with-aggregate [this col-name agg-expression])
(-extend-with-window [this col-name window-expression partitions orders])
(-extend-with-embedded [this col-name relation])
(-rename [this prev-col-name next-col-name])
(-select [this projection])
(-select-all-but [this projection])
Expand Down Expand Up @@ -904,6 +905,16 @@
(assoc-in [:ids->aliases id] col-name)
(assoc-in [:aliases->ids col-name] id)
(update :projection conj col-name))))
(-extend-with-embedded [this col-name relation]
(when (contains? (:aliases->ids this) col-name)
(throw (ex-info (str "Column " col-name " already-exists") {:column col-name :relation this})))
(let [id (keyword (gensym "column-"))]
(-> this
(assoc-in [:columns id] {:type :embed
:relation relation})
(assoc-in [:ids->aliases id] col-name)
(assoc-in [:aliases->ids col-name] id)
(update :projection conj col-name))))
(-select [this projection]
(let [processed-projection (process-projection this (s/conform ::column-list projection))]
(assoc this :projection processed-projection)))
Expand Down Expand Up @@ -1278,6 +1289,16 @@
:orders (s/? ::orders))
:ret ::relation)

(defn extend-with-embedded [rel col-name other-rel]
(-extend-with-embedded rel col-name other-rel))

(s/fdef extend-with-embedded
:args (s/cat
:rel ::relation
:col-name keyword?
:other-rel ::relation)
:ret ::relation)

(defn rename
"Renames a column. If you rename a column, you must use a new name to reference it after that

Expand Down Expand Up @@ -1787,4 +1808,7 @@
:args (s/cat
:rel ::cte
:is-materialized (s/or :nil nil? :boolean boolean?))
:ret ::cte)
:ret ::cte)

(def empty-relation
(->Relation {:namespace false}))
62 changes: 61 additions & 1 deletion src/com/verybigthings/penkala/statement.clj
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,57 @@
(update :query conj (str "DISTINCT ON" (-> query join-comma wrap-parens))))))
acc))

(defn get-projection-for-embedded
([rel]
(let [cursor (get-projection-for-embedded rel {:columns #{} :path-prefix []})]
(-> cursor :columns sort)))
([{:keys [projection joins] :as rel} {:keys [columns path-prefix] :as cursor}]
(let [columns' (reduce
(fn [acc alias]
(let [col-id (get-in rel [:aliases->ids alias])
col-def (get-in rel [:columns col-id])
path-prefix-names (mapv name path-prefix)
col-path (conj path-prefix-names (name alias))
col-alias (if (seq path-prefix) (path-prefix-join col-path) (name alias))]
(conj acc col-alias)))
columns
projection)]
(reduce-kv
(fn [acc alias {:keys [relation projection]}]
(get-projection-for-embedded (update relation :projection #(or projection %)) (update acc :path-prefix conj alias)))
(assoc cursor :columns columns')
(get-in rel [:joins])))))

(defn compile-embedded [acc env rel embedded-relation]
(let [projection (get-projection-for-embedded embedded-relation)
embedded-id (-> "embedded-" gensym name)
data-relation-name (str "t1-" embedded-id)
data-and-types-relation-name (str "t2" embedded-id)
[query & params] (binding [*scopes* (conj *scopes* {:env env :rel rel})]
(format-select-query-without-params-resolution env (assoc embedded-relation :parent rel)))
heading (->> projection
(map (fn [alias]
(str "array['" alias "', pg_typeof(" (q data-relation-name) "." (q alias) ")::text]")))
join-comma)
body (->> projection
(map (fn [alias]
(str "to_json(" (q data-relation-name) "." (q alias) ")")))
join-comma)
final-query ["SELECT json_build_object"
(wrap-parens "'heading', array_to_json"
(wrap-parens (q data-and-types-relation-name) "." (q "heading"))
","
"'body', array_to_json"
(wrap-parens (q data-and-types-relation-name) "." (q "body")))
"FROM"
(wrap-parens "SELECT array[" heading "] as heading, "
"array_agg(array[" body "]) AS body FROM"
(wrap-parens query) spc (q data-relation-name) " GROUP BY heading")
(q data-and-types-relation-name)]]
(-> acc
(update :params into params)
(update :query conj (join-space final-query)))))

(defn with-projection
([acc env rel]
(let [{:keys [query params]} (with-projection empty-acc env rel [])]
Expand Down Expand Up @@ -601,6 +652,12 @@
(update :params into params)
(update :query conj (str (join-space query) spc "AS" spc (q col-alias)))))

(and (not (seq path-prefix)) (= :embed col-type))
(let [{:keys [query params]} (compile-embedded empty-acc env rel (:relation col-def))]
(-> acc
(update :params into params)
(update :query conj (str (-> query join-space wrap-parens) spc "AS" spc (q col-alias)))))

(and (not (seq path-prefix)) (= :window col-type))
(let [{:keys [value-expression partition-by order-by]} col-def
order-by-query-params (when order-by (compile-order-by empty-acc env rel order-by))
Expand Down Expand Up @@ -694,6 +751,9 @@
(update :params into params)
(update :query conj (join-space ["FROM" query "AS" (q (get-rel-alias-with-prefix env rel-name))]))))

(nil? (get-in rel [:spec :name]))
acc

:else
(let [rel-name (get-rel-alias rel)]
(update acc :query into [(if (:only rel) "FROM ONLY" "FROM")
Expand Down Expand Up @@ -891,7 +951,7 @@

(defn with-cte [env acc cte]
(let [cte-query (get-in cte [:spec :query])
materialized? (:materialized? cte-query)
materialized? (get-in cte [:spec :cte :materialized?])
[query & params] (cte-query env)
cte-query (cond-> []
(get-in cte [:spec :cte :recursive?])
Expand Down
Loading