Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions share-api/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ dependencies:
- wai-extra
- wai-middleware-prometheus
- warp
- websockets
- witch
- witherable
- x509
Expand Down
6 changes: 6 additions & 0 deletions share-api/share-api.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ library
Share.Utils.Servant.Client
Share.Utils.Servant.PathInfo
Share.Utils.Servant.RawRequest
Share.Utils.Servant.Streaming
Share.Utils.Tags
Share.Utils.Unison
Share.Web.Admin.API
Expand Down Expand Up @@ -194,6 +195,9 @@ library
Share.Web.Support.Impl
Share.Web.Support.Types
Share.Web.Types
Share.Web.UCM.HistoryComments.API
Share.Web.UCM.HistoryComments.Impl
Share.Web.UCM.HistoryComments.Queries
Share.Web.UCM.Projects.Impl
Share.Web.UCM.Sync.HashJWT
Share.Web.UCM.Sync.Impl
Expand Down Expand Up @@ -356,6 +360,7 @@ library
, wai-extra
, wai-middleware-prometheus
, warp
, websockets
, witch
, witherable
, x509
Expand Down Expand Up @@ -513,6 +518,7 @@ executable share-api
, wai-extra
, wai-middleware-prometheus
, warp
, websockets
, witch
, witherable
, x509
Expand Down
5 changes: 5 additions & 0 deletions share-api/src/Share/Postgres/IDs.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ module Share.Postgres.IDs
NamespaceTermMappingId (..),
NamespaceTypeMappingId (..),
ComponentSummaryDigest (..),
PersonalKeyId (..),

-- * Conversions
hash32AsComponentHash_,
Expand Down Expand Up @@ -104,6 +105,10 @@ newtype ComponentSummaryDigest = ComponentSummaryDigest {unComponentSummaryDiges
deriving stock (Show, Eq, Ord)
deriving (PG.EncodeValue, PG.DecodeValue) via ByteString

newtype PersonalKeyId = PersonalKeyId {unPersonalKeyId :: Int32}
deriving stock (Eq, Ord, Show)
deriving (PG.DecodeValue, PG.EncodeValue) via Int32

toHash32 :: (Coercible h Hash) => h -> Hash32
toHash32 = Hash32.fromHash . coerce

Expand Down
63 changes: 63 additions & 0 deletions share-api/src/Share/Utils/Servant/Streaming.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
module Share.Utils.Servant.Streaming
( toConduit,
cborStreamToConduit,
fromConduit,
sourceIOWithAsync,
queueToCBORStream,
queueToSourceIO,
)
where

-- Orphan instances for SourceIO

import Codec.Serialise qualified as CBOR
import Conduit
import Control.Concurrent.STM.TBMQueue qualified as STM
import Control.Monad.Except
import Data.ByteString.Builder qualified as Builder
import Ki.Unlifted qualified as Ki
import Servant
import Servant.Conduit (conduitToSourceIO)
import Servant.Types.SourceT
import Share.Prelude
import Unison.Util.Servant.CBOR
import UnliftIO.STM qualified as STM

-- | Run the provided IO action in the background while streaming results.
--
-- Servant doesn't provide any easier way to do bracketing like this, all the IO must be
-- inside the SourceIO somehow.
sourceIOWithAsync :: IO a -> SourceIO r -> SourceIO r
sourceIOWithAsync action (SourceT k) =
SourceT \k' ->
Ki.scoped \scope -> do
_ <- Ki.fork scope action
k k'

toConduit :: (MonadIO m, MonadIO n) => SourceIO o -> m (ConduitT void o n ())
toConduit sourceIO = fmap (transPipe liftIO) . liftIO $ fromSourceIO $ sourceIO

cborStreamToConduit :: (MonadIO m, MonadIO n, CBOR.Serialise o) => SourceIO (CBORStream o) -> m (ConduitT void o (ExceptT CBORStreamError n) ())
cborStreamToConduit sourceIO = toConduit sourceIO <&> \stream -> (stream .| unpackCBORBytesStream)

fromConduit :: ConduitT void o IO () -> SourceIO o
fromConduit = conduitToSourceIO

queueToCBORStream :: forall a f. (CBOR.Serialise a, Foldable f) => STM.TBMQueue (f a) -> ConduitT () (CBORStream a) IO ()
queueToCBORStream q = do
let loop :: ConduitT () (CBORStream a) IO ()
loop = do
liftIO (STM.atomically (STM.readTBMQueue q)) >>= \case
-- The queue is closed.
Nothing -> do
pure ()
Just batches -> do
batches
& foldMap (CBOR.serialiseIncremental)
& (CBORStream . Builder.toLazyByteString)
& Conduit.yield
loop
loop

queueToSourceIO :: forall a f. (CBOR.Serialise a, Foldable f) => STM.TBMQueue (f a) -> SourceIO (CBORStream a)
queueToSourceIO q = fromConduit (queueToCBORStream q)
2 changes: 2 additions & 0 deletions share-api/src/Share/Web/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import Share.Web.Share.Webhooks.API qualified as Webhooks
import Share.Web.Support.API qualified as Support
import Share.Web.Types
import Share.Web.UCM.SyncV2.API qualified as SyncV2
import Unison.Server.HistoryComments.API qualified as Unison.HistoryComments
import Unison.Share.API.Projects qualified as UCMProjects
import Unison.Sync.API qualified as Unison.Sync

Expand Down Expand Up @@ -53,6 +54,7 @@ type API =
-- This path is deprecated, but is still in use by existing clients.
:<|> ("sync" :> MaybeAuthenticatedSession :> Unison.Sync.API)
:<|> ("ucm" :> "v1" :> "sync" :> MaybeAuthenticatedSession :> Unison.Sync.API)
:<|> ("ucm" :> "v1" :> "history-comments" :> MaybeAuthenticatedSession :> Unison.HistoryComments.API)
:<|> ("ucm" :> "v1" :> "projects" :> MaybeAuthenticatedSession :> UCMProjects.ProjectsAPI)
:<|> ("ucm" :> "v2" :> "sync" :> MaybeAuthenticatedUserId :> SyncV2.API)
:<|> ("admin" :> Admin.API)
Expand Down
5 changes: 5 additions & 0 deletions share-api/src/Share/Web/Authentication.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
module Share.Web.Authentication
( cookieSessionTTL,
requireAuthenticatedUser,
requireAuthenticatedUser',
UnauthenticatedError (..),
pattern MaybeAuthedUserID,
pattern AuthenticatedUser,
Expand Down Expand Up @@ -39,3 +40,7 @@ instance ToServerError UnauthenticatedError where
requireAuthenticatedUser :: Maybe Session -> WebApp UserId
requireAuthenticatedUser (AuthenticatedUser uid) = pure uid
requireAuthenticatedUser _ = Errors.respondError UnauthenticatedError

requireAuthenticatedUser' :: Maybe UserId -> WebApp UserId
requireAuthenticatedUser' (Just uid) = pure uid
requireAuthenticatedUser' _ = Errors.respondError UnauthenticatedError
9 changes: 9 additions & 0 deletions share-api/src/Share/Web/UCM/HistoryComments/API.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE TypeOperators #-}

module Share.Web.UCM.HistoryComments.API (API) where

import Servant
import Unison.Server.HistoryComments.API qualified as HistoryComments

type API = NamedRoutes HistoryComments.Routes
96 changes: 96 additions & 0 deletions share-api/src/Share/Web/UCM/HistoryComments/Impl.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
module Share.Web.UCM.HistoryComments.Impl (server) where

import Control.Monad.Except
import Control.Monad.Trans.Maybe
import Data.Either (partitionEithers)
import Network.WebSockets.Connection
import Share.IDs
import Share.IDs qualified as IDs
import Share.Postgres qualified as PG
import Share.Postgres.Queries qualified as PGQ
import Share.Postgres.Users.Queries qualified as UserQ
import Share.Prelude
import Share.Web.App (WebApp, WebAppServer)
import Share.Web.Authentication qualified as AuthN
import Share.Web.Authorization qualified as AuthZ
import Share.Web.Errors (Unimplemented (Unimplemented), reportError, respondError)
import Share.Web.UCM.HistoryComments.Queries qualified as Q
import Unison.Server.HistoryComments.API qualified as HistoryComments
import Unison.Server.HistoryComments.Types (DownloadCommentsRequest (DownloadCommentsRequest), HistoryCommentChunk (..), UploadCommentsResponse (..))
import Unison.Server.Types
import Unison.Util.Websockets
import UnliftIO

server :: Maybe UserId -> HistoryComments.Routes WebAppServer
server mayCaller =
HistoryComments.Routes
{ downloadHistoryComments = downloadHistoryCommentsStreamImpl mayCaller,
uploadHistoryComments = uploadHistoryCommentsStreamImpl mayCaller
}

wsMessageBufferSize :: Int
wsMessageBufferSize = 100

downloadHistoryCommentsStreamImpl :: Maybe UserId -> Connection -> WebApp ()
downloadHistoryCommentsStreamImpl mayUserId (DownloadCommentsRequest {}) = do
_ <- error "AUTH CHECK HERE"
respondError Unimplemented

-- Re-run the given STM action at most n times, collecting the results into a list.
-- If the action returns Nothing, stop early and return what has been collected so far, along with a Bool indicating whether the action was exhausted.
fetchChunk :: Int -> STM (Maybe a) -> STM ([a], Bool)
fetchChunk size action = do
let go 0 = pure ([], False)
go n = do
optional action >>= \case
Nothing -> do
-- No more values available at the moment
pure ([], False)
Just Nothing -> do
-- Queue is closed
pure ([], True)
Just (Just val) -> do
(rest, exhausted) <- go (n - 1)
pure (val : rest, exhausted)
go size

uploadHistoryCommentsStreamImpl :: Maybe UserId -> BranchRef -> Connection -> WebApp ()
uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = do
callerUserId <- AuthN.requireAuthenticatedUser' mayCallerUserId
result <- withQueues @UploadCommentsResponse @HistoryCommentChunk wsMessageBufferSize wsMessageBufferSize conn \q@(Queues {receive}) -> runExceptT $ do
projectBranchSH@ProjectBranchShortHand {userHandle, projectSlug, contributorHandle} <- case IDs.fromText @ProjectBranchShortHand branchRef of
Left err -> handleErrInQueue q (UploadCommentsGenericFailure $ IDs.toText err)
Right pbsh -> pure pbsh
let projectSH = ProjectShortHand {userHandle, projectSlug}
mayInfo <- runMaybeT $ mapMaybeT PG.runTransaction $ do
project <- MaybeT $ PGQ.projectByShortHand projectSH
branch <- MaybeT $ PGQ.branchByProjectBranchShortHand projectBranchSH
contributorUser <- MaybeT $ for contributorHandle UserQ.userByHandle
pure (project, branch, contributorUser)
(project, _branch, contributorUser) <- maybe (handleErrInQueue q $ UploadCommentsProjectBranchNotFound br) pure $ mayInfo
authZ <-
lift (AuthZ.checkUploadToProjectBranchCodebase callerUserId project.projectId contributorUser.user_id) >>= \case
Left _authErr -> handleErrInQueue q (UploadCommentsNotAuthorized br)
Right authZ -> pure authZ
projectId <- error "Process Branch Ref"
let loop :: ExceptT UploadCommentsResponse WebApp ()
loop = do
(chunk, closed) <- atomically $ fetchChunk insertCommentBatchSize do
receive <&> fmap \case
HistoryCommentErrorChunk err -> (Left $ UploadCommentsGenericFailure err)
chunk -> (Right chunk)
let (errs, chunks) = partitionEithers chunk
PG.runTransaction $ Q.insertHistoryComments authZ projectId chunks
for errs $ \err -> handleErrInQueue q err
when (not closed) loop
loop
case result of
Left err -> reportError err
Right (Left err) -> reportError err
Right (Right ()) -> pure ()
where
insertCommentBatchSize = 100
handleErrInQueue :: forall o x. Queues UploadCommentsResponse o -> UploadCommentsResponse -> ExceptT UploadCommentsResponse WebApp x
handleErrInQueue Queues {send} e = do
_ <- atomically $ send e
throwError e
Loading
Loading