From 9f04039cc47e9d46a5709e20833d0b5f48fed1b2 Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 21 Mar 2025 11:22:54 +0100 Subject: [PATCH 01/17] Rename EdgeAgent to edge-agent --- Package.swift | 6 +++++- Sources/{EdgeAgent => edge-agent}/EdgeAgent.swift | 1 - 2 files changed, 5 insertions(+), 2 deletions(-) rename Sources/{EdgeAgent => edge-agent}/EdgeAgent.swift (99%) diff --git a/Package.swift b/Package.swift index 64326ce..1358534 100644 --- a/Package.swift +++ b/Package.swift @@ -6,6 +6,10 @@ let package = Package( platforms: [ .macOS(.v15) ], + products: [ + .executable(name: "edge", targets: ["edge"]), + .executable(name: "edge-agent", targets: ["edge-agent"]), + ], dependencies: [ .package(url: "https://github.com/apple/swift-argument-parser.git", from: "1.5.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.6.3"), @@ -27,7 +31,7 @@ let package = Package( /// The EdgeAgent executable. It's currently here for development purposes, and will be /// moved to a separate package in the future. .executableTarget( - name: "EdgeAgent", + name: "edge-agent", dependencies: [ .product(name: "ArgumentParser", package: "swift-argument-parser"), .product(name: "Logging", package: "swift-log"), diff --git a/Sources/EdgeAgent/EdgeAgent.swift b/Sources/edge-agent/EdgeAgent.swift similarity index 99% rename from Sources/EdgeAgent/EdgeAgent.swift rename to Sources/edge-agent/EdgeAgent.swift index a96ad8a..f529fc6 100644 --- a/Sources/EdgeAgent/EdgeAgent.swift +++ b/Sources/edge-agent/EdgeAgent.swift @@ -9,4 +9,3 @@ struct EdgeCLI: ParsableCommand { subcommands: [] ) } - From decbdeeaf968b5943d4c1a864aff78ab0471033d Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 21 Mar 2025 11:23:56 +0100 Subject: [PATCH 02/17] Add grpc & protobuf --- Package.resolved | 146 ++++++++- Package.swift | 12 + .../services/v1/edge_agent_v1_service.proto | 15 + Scripts/EdgeAgentProto.sh | 44 +++ Sources/EdgeAgentProto/EdgeAgentProto.swift | 1 + .../v1/edge_agent_v1_service.grpc.swift | 299 ++++++++++++++++++ .../v1/edge_agent_v1_service.pb.swift | 83 +++++ .../EdgeAgentProto/Proto/edge_agent.protoset | 7 + 8 files changed, 606 insertions(+), 1 deletion(-) create mode 100644 Proto/edge/agent/services/v1/edge_agent_v1_service.proto create mode 100755 Scripts/EdgeAgentProto.sh create mode 100644 Sources/EdgeAgentProto/EdgeAgentProto.swift create mode 100644 Sources/EdgeAgentProto/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift create mode 100644 Sources/EdgeAgentProto/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift create mode 100644 Sources/EdgeAgentProto/Proto/edge_agent.protoset diff --git a/Package.resolved b/Package.resolved index cb0d8c5..64404f9 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,6 +1,42 @@ { - "originHash" : "ad1e11fc32cf3dd4130057a00a66ff5133a2eb146b272e199dd59131ab027200", + "originHash" : "c2c065d7c9448e4b645d2a0edd296de4574c21d10373de58a2e755c67267d8df", "pins" : [ + { + "identity" : "grpc-swift", + "kind" : "remoteSourceControl", + "location" : "https://github.com/grpc/grpc-swift.git", + "state" : { + "revision" : "c4d6281784f50bf2e60d3af45e83be1194056062", + "version" : "2.1.2" + } + }, + { + "identity" : "grpc-swift-nio-transport", + "kind" : "remoteSourceControl", + "location" : "https://github.com/grpc/grpc-swift-nio-transport.git", + "state" : { + "revision" : "dac5f358f2ed36cc782f4c5476117398e62cb53c", + "version" : "1.0.2" + } + }, + { + "identity" : "grpc-swift-protobuf", + "kind" : "remoteSourceControl", + "location" : "https://github.com/grpc/grpc-swift-protobuf.git", + "state" : { + "revision" : "63982ca29f11d2c6a7e559c1899fee812dd55cd9", + "version" : "1.1.0" + } + }, + { + "identity" : "swift-algorithms", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-algorithms.git", + "state" : { + "revision" : "87e50f483c54e6efd60e885f7f5aa946cee68023", + "version" : "1.2.1" + } + }, { "identity" : "swift-argument-parser", "kind" : "remoteSourceControl", @@ -10,6 +46,42 @@ "version" : "1.5.0" } }, + { + "identity" : "swift-atomics", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-atomics.git", + "state" : { + "revision" : "cd142fd2f64be2100422d658e7411e39489da985", + "version" : "1.2.0" + } + }, + { + "identity" : "swift-collections", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-collections.git", + "state" : { + "revision" : "671108c96644956dddcd89dd59c203dcdb36cec7", + "version" : "1.1.4" + } + }, + { + "identity" : "swift-http-structured-headers", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-http-structured-headers.git", + "state" : { + "revision" : "d01361d32e14ae9b70ea5bd308a3794a198a2706", + "version" : "1.2.0" + } + }, + { + "identity" : "swift-http-types", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-http-types.git", + "state" : { + "revision" : "ef18d829e8b92d731ad27bb81583edd2094d1ce3", + "version" : "1.3.1" + } + }, { "identity" : "swift-log", "kind" : "remoteSourceControl", @@ -18,6 +90,78 @@ "revision" : "3d8596ed08bd13520157f0355e35caed215ffbfa", "version" : "1.6.3" } + }, + { + "identity" : "swift-nio", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio.git", + "state" : { + "revision" : "c51907a839e63ebf0ba2076bba73dd96436bd1b9", + "version" : "2.81.0" + } + }, + { + "identity" : "swift-nio-extras", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-extras.git", + "state" : { + "revision" : "00f3f72d2f9942d0e2dc96057ab50a37ced150d4", + "version" : "1.25.0" + } + }, + { + "identity" : "swift-nio-http2", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-http2.git", + "state" : { + "revision" : "170f4ca06b6a9c57b811293cebcb96e81b661310", + "version" : "1.35.0" + } + }, + { + "identity" : "swift-nio-ssl", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-ssl.git", + "state" : { + "revision" : "0cc3528ff48129d64ab9cab0b1cd621634edfc6b", + "version" : "2.29.3" + } + }, + { + "identity" : "swift-nio-transport-services", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-transport-services.git", + "state" : { + "revision" : "3c394067c08d1225ba8442e9cffb520ded417b64", + "version" : "1.23.1" + } + }, + { + "identity" : "swift-numerics", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-numerics.git", + "state" : { + "revision" : "e0ec0f5f3af6f3e4d5e7a19d2af26b481acb6ba8", + "version" : "1.0.3" + } + }, + { + "identity" : "swift-protobuf", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-protobuf.git", + "state" : { + "revision" : "d72aed98f8253ec1aa9ea1141e28150f408cf17f", + "version" : "1.29.0" + } + }, + { + "identity" : "swift-system", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-system.git", + "state" : { + "revision" : "a34201439c74b53f0fd71ef11741af7e7caf01e1", + "version" : "1.4.2" + } } ], "version" : 3 diff --git a/Package.swift b/Package.swift index 1358534..efd72d7 100644 --- a/Package.swift +++ b/Package.swift @@ -13,6 +13,9 @@ let package = Package( dependencies: [ .package(url: "https://github.com/apple/swift-argument-parser.git", from: "1.5.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.6.3"), + .package(url: "https://github.com/grpc/grpc-swift.git", from: "2.0.0"), + .package(url: "https://github.com/grpc/grpc-swift-nio-transport.git", from: "1.0.0"), + .package(url: "https://github.com/grpc/grpc-swift-protobuf.git", from: "1.0.0"), ], targets: [ /// The main executable provided by edge-cli. @@ -63,5 +66,14 @@ let package = Package( .product(name: "Logging", package: "swift-log") ] ), + + /// Protobuf definitions for the EdgeAgent service. + .target( + name: "EdgeAgentProto", + dependencies: [ + .product(name: "GRPCCore", package: "grpc-swift"), + .product(name: "GRPCProtobuf", package: "grpc-swift-protobuf"), + ] + ), ] ) diff --git a/Proto/edge/agent/services/v1/edge_agent_v1_service.proto b/Proto/edge/agent/services/v1/edge_agent_v1_service.proto new file mode 100644 index 0000000..95856cc --- /dev/null +++ b/Proto/edge/agent/services/v1/edge_agent_v1_service.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package edge.agent.services.v1; + +service EdgeAgentService { + rpc RunContainer(RunContainerRequest) returns (stream RunContainerResponse); +} + +message RunContainerRequest { + +} + +message RunContainerResponse { + +} \ No newline at end of file diff --git a/Scripts/EdgeAgentProto.sh b/Scripts/EdgeAgentProto.sh new file mode 100755 index 0000000..af1e117 --- /dev/null +++ b/Scripts/EdgeAgentProto.sh @@ -0,0 +1,44 @@ +#!/bin/bash + +# This scripts generates the EdgeAgentProto Swift code. It requires `protoc` to be available in the PATH. +# It should be run from the root of the project. + +# Get the binary path +BIN_PATH=$(swift build --show-bin-path) +PROTOC_GEN_GRPC_PATH="$BIN_PATH/protoc-gen-grpc-swift" +PROTOC_GEN_SWIFT_PATH="$BIN_PATH/protoc-gen-swift" + +# Check if protoc-gen-grpc-swift exists, and build it if needed +if [ ! -f "$PROTOC_GEN_GRPC_PATH" ]; then + echo "protoc-gen-grpc-swift not found. Building it now..." + swift build --product protoc-gen-grpc-swift +fi + +rm -rf Sources/EdgeAgentProto/Proto +mkdir -p Sources/EdgeAgentProto/Proto + +protoc \ + --plugin $PROTOC_GEN_GRPC_PATH \ + --grpc-swift_out=Sources/EdgeAgentProto/Proto \ + --grpc-swift_opt=Visibility=Public \ + --grpc-swift_opt=Server=True \ + --grpc-swift_opt=Client=True \ + --include_imports \ + --descriptor_set_out=Sources/EdgeAgentProto/Proto/edge_agent.protoset \ + --experimental_allow_proto3_optional \ + -I=Proto \ + Proto/edge/agent/services/v1/*.proto + +# Check if protoc-gen-swift exists +if [ ! -f "$PROTOC_GEN_SWIFT_PATH" ]; then + echo "protoc-gen-swift not found. Building it now..." + swift build --product protoc-gen-swift +fi + +protoc \ + --plugin $PROTOC_GEN_SWIFT_PATH \ + --swift_out=Sources/EdgeAgentProto/Proto \ + --swift_opt=Visibility=Public \ + --experimental_allow_proto3_optional \ + -I=Proto \ + Proto/edge/agent/services/v1/*.proto \ No newline at end of file diff --git a/Sources/EdgeAgentProto/EdgeAgentProto.swift b/Sources/EdgeAgentProto/EdgeAgentProto.swift new file mode 100644 index 0000000..84a06e4 --- /dev/null +++ b/Sources/EdgeAgentProto/EdgeAgentProto.swift @@ -0,0 +1 @@ +@_exported import GRPCCore diff --git a/Sources/EdgeAgentProto/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift b/Sources/EdgeAgentProto/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift new file mode 100644 index 0000000..7e3030d --- /dev/null +++ b/Sources/EdgeAgentProto/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift @@ -0,0 +1,299 @@ +// DO NOT EDIT. +// swift-format-ignore-file +// +// Generated by the gRPC Swift generator plugin for the protocol buffer compiler. +// Source: edge/agent/services/v1/edge_agent_v1_service.proto +// +// For information on using the generated types, please see the documentation: +// https://github.com/grpc/grpc-swift + +import GRPCCore +import GRPCProtobuf + +// MARK: - edge.agent.services.v1.EdgeAgentService + +/// Namespace containing generated types for the "edge.agent.services.v1.EdgeAgentService" service. +public enum Edge_Agent_Services_V1_EdgeAgentService { + /// Service descriptor for the "edge.agent.services.v1.EdgeAgentService" service. + public static let descriptor = GRPCCore.ServiceDescriptor(fullyQualifiedService: "edge.agent.services.v1.EdgeAgentService") + /// Namespace for method metadata. + public enum Method { + /// Namespace for "RunContainer" metadata. + public enum RunContainer { + /// Request type for "RunContainer". + public typealias Input = Edge_Agent_Services_V1_RunContainerRequest + /// Response type for "RunContainer". + public typealias Output = Edge_Agent_Services_V1_RunContainerResponse + /// Descriptor for "RunContainer". + public static let descriptor = GRPCCore.MethodDescriptor( + service: GRPCCore.ServiceDescriptor(fullyQualifiedService: "edge.agent.services.v1.EdgeAgentService"), + method: "RunContainer" + ) + } + /// Descriptors for all methods in the "edge.agent.services.v1.EdgeAgentService" service. + public static let descriptors: [GRPCCore.MethodDescriptor] = [ + RunContainer.descriptor + ] + } +} + +extension GRPCCore.ServiceDescriptor { + /// Service descriptor for the "edge.agent.services.v1.EdgeAgentService" service. + public static let edge_agent_services_v1_EdgeAgentService = GRPCCore.ServiceDescriptor(fullyQualifiedService: "edge.agent.services.v1.EdgeAgentService") +} + +// MARK: edge.agent.services.v1.EdgeAgentService (server) + +extension Edge_Agent_Services_V1_EdgeAgentService { + /// Streaming variant of the service protocol for the "edge.agent.services.v1.EdgeAgentService" service. + /// + /// This protocol is the lowest-level of the service protocols generated for this service + /// giving you the most flexibility over the implementation of your service. This comes at + /// the cost of more verbose and less strict APIs. Each RPC requires you to implement it in + /// terms of a request stream and response stream. Where only a single request or response + /// message is expected, you are responsible for enforcing this invariant is maintained. + /// + /// Where possible, prefer using the stricter, less-verbose ``ServiceProtocol`` + /// or ``SimpleServiceProtocol`` instead. + public protocol StreamingServiceProtocol: GRPCCore.RegistrableRPCService { + /// Handle the "RunContainer" method. + /// + /// - Parameters: + /// - request: A streaming request of `Edge_Agent_Services_V1_RunContainerRequest` messages. + /// - context: Context providing information about the RPC. + /// - Throws: Any error which occurred during the processing of the request. Thrown errors + /// of type `RPCError` are mapped to appropriate statuses. All other errors are converted + /// to an internal error. + /// - Returns: A streaming response of `Edge_Agent_Services_V1_RunContainerResponse` messages. + func runContainer( + request: GRPCCore.StreamingServerRequest, + context: GRPCCore.ServerContext + ) async throws -> GRPCCore.StreamingServerResponse + } + + /// Service protocol for the "edge.agent.services.v1.EdgeAgentService" service. + /// + /// This protocol is higher level than ``StreamingServiceProtocol`` but lower level than + /// the ``SimpleServiceProtocol``, it provides access to request and response metadata and + /// trailing response metadata. If you don't need these then consider using + /// the ``SimpleServiceProtocol``. If you need fine grained control over your RPCs then + /// use ``StreamingServiceProtocol``. + public protocol ServiceProtocol: Edge_Agent_Services_V1_EdgeAgentService.StreamingServiceProtocol { + /// Handle the "RunContainer" method. + /// + /// - Parameters: + /// - request: A request containing a single `Edge_Agent_Services_V1_RunContainerRequest` message. + /// - context: Context providing information about the RPC. + /// - Throws: Any error which occurred during the processing of the request. Thrown errors + /// of type `RPCError` are mapped to appropriate statuses. All other errors are converted + /// to an internal error. + /// - Returns: A streaming response of `Edge_Agent_Services_V1_RunContainerResponse` messages. + func runContainer( + request: GRPCCore.ServerRequest, + context: GRPCCore.ServerContext + ) async throws -> GRPCCore.StreamingServerResponse + } + + /// Simple service protocol for the "edge.agent.services.v1.EdgeAgentService" service. + /// + /// This is the highest level protocol for the service. The API is the easiest to use but + /// doesn't provide access to request or response metadata. If you need access to these + /// then use ``ServiceProtocol`` instead. + public protocol SimpleServiceProtocol: Edge_Agent_Services_V1_EdgeAgentService.ServiceProtocol { + /// Handle the "RunContainer" method. + /// + /// - Parameters: + /// - request: A `Edge_Agent_Services_V1_RunContainerRequest` message. + /// - response: A response stream of `Edge_Agent_Services_V1_RunContainerResponse` messages. + /// - context: Context providing information about the RPC. + /// - Throws: Any error which occurred during the processing of the request. Thrown errors + /// of type `RPCError` are mapped to appropriate statuses. All other errors are converted + /// to an internal error. + func runContainer( + request: Edge_Agent_Services_V1_RunContainerRequest, + response: GRPCCore.RPCWriter, + context: GRPCCore.ServerContext + ) async throws + } +} + +// Default implementation of 'registerMethods(with:)'. +extension Edge_Agent_Services_V1_EdgeAgentService.StreamingServiceProtocol { + public func registerMethods(with router: inout GRPCCore.RPCRouter) where Transport: GRPCCore.ServerTransport { + router.registerHandler( + forMethod: Edge_Agent_Services_V1_EdgeAgentService.Method.RunContainer.descriptor, + deserializer: GRPCProtobuf.ProtobufDeserializer(), + serializer: GRPCProtobuf.ProtobufSerializer(), + handler: { request, context in + try await self.runContainer( + request: request, + context: context + ) + } + ) + } +} + +// Default implementation of streaming methods from 'StreamingServiceProtocol'. +extension Edge_Agent_Services_V1_EdgeAgentService.ServiceProtocol { + public func runContainer( + request: GRPCCore.StreamingServerRequest, + context: GRPCCore.ServerContext + ) async throws -> GRPCCore.StreamingServerResponse { + let response = try await self.runContainer( + request: GRPCCore.ServerRequest(stream: request), + context: context + ) + return response + } +} + +// Default implementation of methods from 'ServiceProtocol'. +extension Edge_Agent_Services_V1_EdgeAgentService.SimpleServiceProtocol { + public func runContainer( + request: GRPCCore.ServerRequest, + context: GRPCCore.ServerContext + ) async throws -> GRPCCore.StreamingServerResponse { + return GRPCCore.StreamingServerResponse( + metadata: [:], + producer: { writer in + try await self.runContainer( + request: request.message, + response: writer, + context: context + ) + return [:] + } + ) + } +} + +// MARK: edge.agent.services.v1.EdgeAgentService (client) + +extension Edge_Agent_Services_V1_EdgeAgentService { + /// Generated client protocol for the "edge.agent.services.v1.EdgeAgentService" service. + /// + /// You don't need to implement this protocol directly, use the generated + /// implementation, ``Client``. + public protocol ClientProtocol: Sendable { + /// Call the "RunContainer" method. + /// + /// - Parameters: + /// - request: A request containing a single `Edge_Agent_Services_V1_RunContainerRequest` message. + /// - serializer: A serializer for `Edge_Agent_Services_V1_RunContainerRequest` messages. + /// - deserializer: A deserializer for `Edge_Agent_Services_V1_RunContainerResponse` messages. + /// - options: Options to apply to this RPC. + /// - handleResponse: A closure which handles the response, the result of which is + /// returned to the caller. Returning from the closure will cancel the RPC if it + /// hasn't already finished. + /// - Returns: The result of `handleResponse`. + func runContainer( + request: GRPCCore.ClientRequest, + serializer: some GRPCCore.MessageSerializer, + deserializer: some GRPCCore.MessageDeserializer, + options: GRPCCore.CallOptions, + onResponse handleResponse: @Sendable @escaping (GRPCCore.StreamingClientResponse) async throws -> Result + ) async throws -> Result where Result: Sendable + } + + /// Generated client for the "edge.agent.services.v1.EdgeAgentService" service. + /// + /// The ``Client`` provides an implementation of ``ClientProtocol`` which wraps + /// a `GRPCCore.GRPCCClient`. The underlying `GRPCClient` provides the long-lived + /// means of communication with the remote peer. + public struct Client: ClientProtocol where Transport: GRPCCore.ClientTransport { + private let client: GRPCCore.GRPCClient + + /// Creates a new client wrapping the provided `GRPCCore.GRPCClient`. + /// + /// - Parameters: + /// - client: A `GRPCCore.GRPCClient` providing a communication channel to the service. + public init(wrapping client: GRPCCore.GRPCClient) { + self.client = client + } + + /// Call the "RunContainer" method. + /// + /// - Parameters: + /// - request: A request containing a single `Edge_Agent_Services_V1_RunContainerRequest` message. + /// - serializer: A serializer for `Edge_Agent_Services_V1_RunContainerRequest` messages. + /// - deserializer: A deserializer for `Edge_Agent_Services_V1_RunContainerResponse` messages. + /// - options: Options to apply to this RPC. + /// - handleResponse: A closure which handles the response, the result of which is + /// returned to the caller. Returning from the closure will cancel the RPC if it + /// hasn't already finished. + /// - Returns: The result of `handleResponse`. + public func runContainer( + request: GRPCCore.ClientRequest, + serializer: some GRPCCore.MessageSerializer, + deserializer: some GRPCCore.MessageDeserializer, + options: GRPCCore.CallOptions = .defaults, + onResponse handleResponse: @Sendable @escaping (GRPCCore.StreamingClientResponse) async throws -> Result + ) async throws -> Result where Result: Sendable { + try await self.client.serverStreaming( + request: request, + descriptor: Edge_Agent_Services_V1_EdgeAgentService.Method.RunContainer.descriptor, + serializer: serializer, + deserializer: deserializer, + options: options, + onResponse: handleResponse + ) + } + } +} + +// Helpers providing default arguments to 'ClientProtocol' methods. +extension Edge_Agent_Services_V1_EdgeAgentService.ClientProtocol { + /// Call the "RunContainer" method. + /// + /// - Parameters: + /// - request: A request containing a single `Edge_Agent_Services_V1_RunContainerRequest` message. + /// - options: Options to apply to this RPC. + /// - handleResponse: A closure which handles the response, the result of which is + /// returned to the caller. Returning from the closure will cancel the RPC if it + /// hasn't already finished. + /// - Returns: The result of `handleResponse`. + public func runContainer( + request: GRPCCore.ClientRequest, + options: GRPCCore.CallOptions = .defaults, + onResponse handleResponse: @Sendable @escaping (GRPCCore.StreamingClientResponse) async throws -> Result + ) async throws -> Result where Result: Sendable { + try await self.runContainer( + request: request, + serializer: GRPCProtobuf.ProtobufSerializer(), + deserializer: GRPCProtobuf.ProtobufDeserializer(), + options: options, + onResponse: handleResponse + ) + } +} + +// Helpers providing sugared APIs for 'ClientProtocol' methods. +extension Edge_Agent_Services_V1_EdgeAgentService.ClientProtocol { + /// Call the "RunContainer" method. + /// + /// - Parameters: + /// - message: request message to send. + /// - metadata: Additional metadata to send, defaults to empty. + /// - options: Options to apply to this RPC, defaults to `.defaults`. + /// - handleResponse: A closure which handles the response, the result of which is + /// returned to the caller. Returning from the closure will cancel the RPC if it + /// hasn't already finished. + /// - Returns: The result of `handleResponse`. + public func runContainer( + _ message: Edge_Agent_Services_V1_RunContainerRequest, + metadata: GRPCCore.Metadata = [:], + options: GRPCCore.CallOptions = .defaults, + onResponse handleResponse: @Sendable @escaping (GRPCCore.StreamingClientResponse) async throws -> Result + ) async throws -> Result where Result: Sendable { + let request = GRPCCore.ClientRequest( + message: message, + metadata: metadata + ) + return try await self.runContainer( + request: request, + options: options, + onResponse: handleResponse + ) + } +} \ No newline at end of file diff --git a/Sources/EdgeAgentProto/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift b/Sources/EdgeAgentProto/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift new file mode 100644 index 0000000..417340b --- /dev/null +++ b/Sources/EdgeAgentProto/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift @@ -0,0 +1,83 @@ +// DO NOT EDIT. +// swift-format-ignore-file +// swiftlint:disable all +// +// Generated by the Swift generator plugin for the protocol buffer compiler. +// Source: edge/agent/services/v1/edge_agent_v1_service.proto +// +// For information on using the generated types, please see the documentation: +// https://github.com/apple/swift-protobuf/ + +import SwiftProtobuf + +// If the compiler emits an error on this type, it is because this file +// was generated by a version of the `protoc` Swift plug-in that is +// incompatible with the version of SwiftProtobuf to which you are linking. +// Please ensure that you are building against the same version of the API +// that was used to generate this file. +fileprivate struct _GeneratedWithProtocGenSwiftVersion: SwiftProtobuf.ProtobufAPIVersionCheck { + struct _2: SwiftProtobuf.ProtobufAPIVersion_2 {} + typealias Version = _2 +} + +public struct Edge_Agent_Services_V1_RunContainerRequest: Sendable { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} +} + +public struct Edge_Agent_Services_V1_RunContainerResponse: Sendable { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} +} + +// MARK: - Code below here is support for the SwiftProtobuf runtime. + +fileprivate let _protobuf_package = "edge.agent.services.v1" + +extension Edge_Agent_Services_V1_RunContainerRequest: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = _protobuf_package + ".RunContainerRequest" + public static let _protobuf_nameMap = SwiftProtobuf._NameMap() + + public mutating func decodeMessage(decoder: inout D) throws { + // Load everything into unknown fields + while try decoder.nextFieldNumber() != nil {} + } + + public func traverse(visitor: inout V) throws { + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: Edge_Agent_Services_V1_RunContainerRequest, rhs: Edge_Agent_Services_V1_RunContainerRequest) -> Bool { + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + +extension Edge_Agent_Services_V1_RunContainerResponse: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = _protobuf_package + ".RunContainerResponse" + public static let _protobuf_nameMap = SwiftProtobuf._NameMap() + + public mutating func decodeMessage(decoder: inout D) throws { + // Load everything into unknown fields + while try decoder.nextFieldNumber() != nil {} + } + + public func traverse(visitor: inout V) throws { + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: Edge_Agent_Services_V1_RunContainerResponse, rhs: Edge_Agent_Services_V1_RunContainerResponse) -> Bool { + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} diff --git a/Sources/EdgeAgentProto/Proto/edge_agent.protoset b/Sources/EdgeAgentProto/Proto/edge_agent.protoset new file mode 100644 index 0000000..d631245 --- /dev/null +++ b/Sources/EdgeAgentProto/Proto/edge_agent.protoset @@ -0,0 +1,7 @@ + +„ +2edge/agent/services/v1/edge_agent_v1_service.protoedge.agent.services.v1" +RunContainerRequest" +RunContainerResponse2 +EdgeAgentServicek + RunContainer+.edge.agent.services.v1.RunContainerRequest,.edge.agent.services.v1.RunContainerResponse0bproto3 \ No newline at end of file From 2bcbea530e629fe1614ae942beeedfac388c9252 Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 21 Mar 2025 12:03:28 +0100 Subject: [PATCH 03/17] Set up ServiceLifecycle and GRPC server in edge-agent --- Package.resolved | 47 ++++++++++++++++++- Package.swift | 9 +++- .../{EdgeAgentProto.sh => EdgeAgentGRPC.sh} | 12 ++--- .../EdgeAgentGRPC.swift} | 0 .../v1/edge_agent_v1_service.grpc.swift | 0 .../v1/edge_agent_v1_service.pb.swift | 0 .../Proto/edge_agent.protoset | 0 Sources/edge-agent/EdgeAgent.swift | 43 +++++++++++++++-- 8 files changed, 100 insertions(+), 11 deletions(-) rename Scripts/{EdgeAgentProto.sh => EdgeAgentGRPC.sh} (75%) rename Sources/{EdgeAgentProto/EdgeAgentProto.swift => EdgeAgentGRPC/EdgeAgentGRPC.swift} (100%) rename Sources/{EdgeAgentProto => EdgeAgentGRPC}/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift (100%) rename Sources/{EdgeAgentProto => EdgeAgentGRPC}/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift (100%) rename Sources/{EdgeAgentProto => EdgeAgentGRPC}/Proto/edge_agent.protoset (100%) diff --git a/Package.resolved b/Package.resolved index 64404f9..244d3e2 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,5 +1,5 @@ { - "originHash" : "c2c065d7c9448e4b645d2a0edd296de4574c21d10373de58a2e755c67267d8df", + "originHash" : "c013fa6f6ecad5140db085594899898d778f2daf9a180f4cbb9ac3815060f23f", "pins" : [ { "identity" : "grpc-swift", @@ -10,6 +10,15 @@ "version" : "2.1.2" } }, + { + "identity" : "grpc-swift-extras", + "kind" : "remoteSourceControl", + "location" : "https://github.com/grpc/grpc-swift-extras.git", + "state" : { + "revision" : "c32c03dcfa0957025c52c27a1c0b7a070a199727", + "version" : "1.0.0" + } + }, { "identity" : "grpc-swift-nio-transport", "kind" : "remoteSourceControl", @@ -46,6 +55,15 @@ "version" : "1.5.0" } }, + { + "identity" : "swift-async-algorithms", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-async-algorithms.git", + "state" : { + "revision" : "4c3ea81f81f0a25d0470188459c6d4bf20cf2f97", + "version" : "1.0.3" + } + }, { "identity" : "swift-atomics", "kind" : "remoteSourceControl", @@ -64,6 +82,15 @@ "version" : "1.1.4" } }, + { + "identity" : "swift-distributed-tracing", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-distributed-tracing.git", + "state" : { + "revision" : "a64a0abc2530f767af15dd88dda7f64d5f1ff9de", + "version" : "1.2.0" + } + }, { "identity" : "swift-http-structured-headers", "kind" : "remoteSourceControl", @@ -154,6 +181,24 @@ "version" : "1.29.0" } }, + { + "identity" : "swift-service-context", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-service-context.git", + "state" : { + "revision" : "8946c930cae601452149e45d31d8ddfac973c3c7", + "version" : "1.2.0" + } + }, + { + "identity" : "swift-service-lifecycle", + "kind" : "remoteSourceControl", + "location" : "https://github.com/swift-server/swift-service-lifecycle.git", + "state" : { + "revision" : "7ee57f99fbe0073c3700997186721e74d925b59b", + "version" : "2.7.0" + } + }, { "identity" : "swift-system", "kind" : "remoteSourceControl", diff --git a/Package.swift b/Package.swift index efd72d7..1c86e66 100644 --- a/Package.swift +++ b/Package.swift @@ -16,6 +16,8 @@ let package = Package( .package(url: "https://github.com/grpc/grpc-swift.git", from: "2.0.0"), .package(url: "https://github.com/grpc/grpc-swift-nio-transport.git", from: "1.0.0"), .package(url: "https://github.com/grpc/grpc-swift-protobuf.git", from: "1.0.0"), + .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.7.0"), + .package(url: "https://github.com/grpc/grpc-swift-extras.git", from: "1.0.0"), ], targets: [ /// The main executable provided by edge-cli. @@ -38,6 +40,11 @@ let package = Package( dependencies: [ .product(name: "ArgumentParser", package: "swift-argument-parser"), .product(name: "Logging", package: "swift-log"), + .product(name: "GRPCNIOTransportHTTP2", package: "grpc-swift-nio-transport"), + .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"), + .product(name: "GRPCServiceLifecycle", package: "grpc-swift-extras"), + .product(name: "GRPCHealthService", package: "grpc-swift-extras"), + .target(name: "EdgeAgentGRPC"), ] ), @@ -69,7 +76,7 @@ let package = Package( /// Protobuf definitions for the EdgeAgent service. .target( - name: "EdgeAgentProto", + name: "EdgeAgentGRPC", dependencies: [ .product(name: "GRPCCore", package: "grpc-swift"), .product(name: "GRPCProtobuf", package: "grpc-swift-protobuf"), diff --git a/Scripts/EdgeAgentProto.sh b/Scripts/EdgeAgentGRPC.sh similarity index 75% rename from Scripts/EdgeAgentProto.sh rename to Scripts/EdgeAgentGRPC.sh index af1e117..30176ba 100755 --- a/Scripts/EdgeAgentProto.sh +++ b/Scripts/EdgeAgentGRPC.sh @@ -1,6 +1,6 @@ #!/bin/bash -# This scripts generates the EdgeAgentProto Swift code. It requires `protoc` to be available in the PATH. +# This scripts generates the EdgeAgentGRPC Swift code. It requires `protoc` to be available in the PATH. # It should be run from the root of the project. # Get the binary path @@ -14,17 +14,17 @@ if [ ! -f "$PROTOC_GEN_GRPC_PATH" ]; then swift build --product protoc-gen-grpc-swift fi -rm -rf Sources/EdgeAgentProto/Proto -mkdir -p Sources/EdgeAgentProto/Proto +rm -rf Sources/EdgeAgentGRPC/Proto +mkdir -p Sources/EdgeAgentGRPC/Proto protoc \ --plugin $PROTOC_GEN_GRPC_PATH \ - --grpc-swift_out=Sources/EdgeAgentProto/Proto \ + --grpc-swift_out=Sources/EdgeAgentGRPC/Proto \ --grpc-swift_opt=Visibility=Public \ --grpc-swift_opt=Server=True \ --grpc-swift_opt=Client=True \ --include_imports \ - --descriptor_set_out=Sources/EdgeAgentProto/Proto/edge_agent.protoset \ + --descriptor_set_out=Sources/EdgeAgentGRPC/Proto/edge_agent.protoset \ --experimental_allow_proto3_optional \ -I=Proto \ Proto/edge/agent/services/v1/*.proto @@ -37,7 +37,7 @@ fi protoc \ --plugin $PROTOC_GEN_SWIFT_PATH \ - --swift_out=Sources/EdgeAgentProto/Proto \ + --swift_out=Sources/EdgeAgentGRPC/Proto \ --swift_opt=Visibility=Public \ --experimental_allow_proto3_optional \ -I=Proto \ diff --git a/Sources/EdgeAgentProto/EdgeAgentProto.swift b/Sources/EdgeAgentGRPC/EdgeAgentGRPC.swift similarity index 100% rename from Sources/EdgeAgentProto/EdgeAgentProto.swift rename to Sources/EdgeAgentGRPC/EdgeAgentGRPC.swift diff --git a/Sources/EdgeAgentProto/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift b/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift similarity index 100% rename from Sources/EdgeAgentProto/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift rename to Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift diff --git a/Sources/EdgeAgentProto/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift b/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift similarity index 100% rename from Sources/EdgeAgentProto/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift rename to Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift diff --git a/Sources/EdgeAgentProto/Proto/edge_agent.protoset b/Sources/EdgeAgentGRPC/Proto/edge_agent.protoset similarity index 100% rename from Sources/EdgeAgentProto/Proto/edge_agent.protoset rename to Sources/EdgeAgentGRPC/Proto/edge_agent.protoset diff --git a/Sources/edge-agent/EdgeAgent.swift b/Sources/edge-agent/EdgeAgent.swift index f529fc6..d3169d6 100644 --- a/Sources/edge-agent/EdgeAgent.swift +++ b/Sources/edge-agent/EdgeAgent.swift @@ -1,11 +1,48 @@ import ArgumentParser +import EdgeAgentGRPC import Foundation +import GRPCHealthService +import GRPCNIOTransportHTTP2 +import GRPCServiceLifecycle +import Logging +import ServiceLifecycle @main -struct EdgeCLI: ParsableCommand { +struct EdgeAgent: AsyncParsableCommand { static let configuration = CommandConfiguration( commandName: "edge-agent", - abstract: "Edge Agent", - subcommands: [] + abstract: "Edge Agent" ) + + @Option(name: .shortAndLong, help: "The port to listen on for incoming connections.") + var port: Int = 50051 + + func run() async throws { + let logger = Logger(label: "apache-edge.agent") + + logger.info("Starting Edge Agent on port \(port)") + + let transport = HTTP2ServerTransport.Posix( + address: .ipv4(host: "0.0.0.0", port: port), + transportSecurity: .plaintext + ) + + let healthService = HealthService() + + let grpcServer = GRPCServer( + transport: transport, + services: [ + healthService + ] + ) + + let group = ServiceGroup( + services: [ + grpcServer + ], + logger: logger + ) + + try await group.run() + } } From 146a04e005e8731c967c97077330c08ada51e1c3 Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 28 Mar 2025 16:38:27 +0100 Subject: [PATCH 04/17] Rename Generate-Proto.sh --- Scripts/{EdgeAgentGRPC.sh => Generate-Proto.sh} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename Scripts/{EdgeAgentGRPC.sh => Generate-Proto.sh} (100%) diff --git a/Scripts/EdgeAgentGRPC.sh b/Scripts/Generate-Proto.sh similarity index 100% rename from Scripts/EdgeAgentGRPC.sh rename to Scripts/Generate-Proto.sh From 590e8f165ecd41fe55af33f32ca19a3cf5c25698 Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 28 Mar 2025 16:38:48 +0100 Subject: [PATCH 05/17] Define initial protocol for running containers --- .../services/v1/edge_agent_v1_service.proto | 44 +- .../v1/edge_agent_v1_service.grpc.swift | 82 ++-- .../v1/edge_agent_v1_service.pb.swift | 419 +++++++++++++++++- .../EdgeAgentGRPC/Proto/edge_agent.protoset | Bin 263 -> 879 bytes 4 files changed, 509 insertions(+), 36 deletions(-) diff --git a/Proto/edge/agent/services/v1/edge_agent_v1_service.proto b/Proto/edge/agent/services/v1/edge_agent_v1_service.proto index 95856cc..f2494d8 100644 --- a/Proto/edge/agent/services/v1/edge_agent_v1_service.proto +++ b/Proto/edge/agent/services/v1/edge_agent_v1_service.proto @@ -3,13 +3,53 @@ syntax = "proto3"; package edge.agent.services.v1; service EdgeAgentService { - rpc RunContainer(RunContainerRequest) returns (stream RunContainerResponse); + // Upload a container image to the agent, and run it. + // The first message in the stream MUST be the header. + rpc RunContainer(stream RunContainerRequest) returns (stream RunContainerResponse); } message RunContainerRequest { - + oneof request_type { + /// The first message in the stream MUST be the header. + Header header = 1; + + /// A chunk of the container tarball. + Chunk chunk = 2; + + /// After uploading the container, control messages can be sent to the agent. + ControlCommand control = 3; + } + + message Header { + // Unique name for the container image + string image_name = 1; + } + + message Chunk { + // Binary chunk of the container tarball + bytes data = 1; + } + + message ControlCommand { + oneof command { + Run run = 1; + } + + message Run { + // Whether to run the container with a debugger + bool debug = 1; + } + } } message RunContainerResponse { + oneof response_type { + Started started = 1; + } + message Started { + // The port that the debugger is listening on. + // If this is 0, the container is not running with a debugger. + uint32 debug_port = 1; + } } \ No newline at end of file diff --git a/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift b/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift index 7e3030d..7f19f41 100644 --- a/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift +++ b/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift @@ -58,6 +58,11 @@ extension Edge_Agent_Services_V1_EdgeAgentService { public protocol StreamingServiceProtocol: GRPCCore.RegistrableRPCService { /// Handle the "RunContainer" method. /// + /// > Source IDL Documentation: + /// > + /// > Upload a container image to the agent, and run it. + /// > The first message in the stream MUST be the header. + /// /// - Parameters: /// - request: A streaming request of `Edge_Agent_Services_V1_RunContainerRequest` messages. /// - context: Context providing information about the RPC. @@ -81,15 +86,20 @@ extension Edge_Agent_Services_V1_EdgeAgentService { public protocol ServiceProtocol: Edge_Agent_Services_V1_EdgeAgentService.StreamingServiceProtocol { /// Handle the "RunContainer" method. /// + /// > Source IDL Documentation: + /// > + /// > Upload a container image to the agent, and run it. + /// > The first message in the stream MUST be the header. + /// /// - Parameters: - /// - request: A request containing a single `Edge_Agent_Services_V1_RunContainerRequest` message. + /// - request: A streaming request of `Edge_Agent_Services_V1_RunContainerRequest` messages. /// - context: Context providing information about the RPC. /// - Throws: Any error which occurred during the processing of the request. Thrown errors /// of type `RPCError` are mapped to appropriate statuses. All other errors are converted /// to an internal error. /// - Returns: A streaming response of `Edge_Agent_Services_V1_RunContainerResponse` messages. func runContainer( - request: GRPCCore.ServerRequest, + request: GRPCCore.StreamingServerRequest, context: GRPCCore.ServerContext ) async throws -> GRPCCore.StreamingServerResponse } @@ -102,15 +112,20 @@ extension Edge_Agent_Services_V1_EdgeAgentService { public protocol SimpleServiceProtocol: Edge_Agent_Services_V1_EdgeAgentService.ServiceProtocol { /// Handle the "RunContainer" method. /// + /// > Source IDL Documentation: + /// > + /// > Upload a container image to the agent, and run it. + /// > The first message in the stream MUST be the header. + /// /// - Parameters: - /// - request: A `Edge_Agent_Services_V1_RunContainerRequest` message. + /// - request: A stream of `Edge_Agent_Services_V1_RunContainerRequest` messages. /// - response: A response stream of `Edge_Agent_Services_V1_RunContainerResponse` messages. /// - context: Context providing information about the RPC. /// - Throws: Any error which occurred during the processing of the request. Thrown errors /// of type `RPCError` are mapped to appropriate statuses. All other errors are converted /// to an internal error. func runContainer( - request: Edge_Agent_Services_V1_RunContainerRequest, + request: GRPCCore.RPCAsyncSequence, response: GRPCCore.RPCWriter, context: GRPCCore.ServerContext ) async throws @@ -136,29 +151,19 @@ extension Edge_Agent_Services_V1_EdgeAgentService.StreamingServiceProtocol { // Default implementation of streaming methods from 'StreamingServiceProtocol'. extension Edge_Agent_Services_V1_EdgeAgentService.ServiceProtocol { - public func runContainer( - request: GRPCCore.StreamingServerRequest, - context: GRPCCore.ServerContext - ) async throws -> GRPCCore.StreamingServerResponse { - let response = try await self.runContainer( - request: GRPCCore.ServerRequest(stream: request), - context: context - ) - return response - } } // Default implementation of methods from 'ServiceProtocol'. extension Edge_Agent_Services_V1_EdgeAgentService.SimpleServiceProtocol { public func runContainer( - request: GRPCCore.ServerRequest, + request: GRPCCore.StreamingServerRequest, context: GRPCCore.ServerContext ) async throws -> GRPCCore.StreamingServerResponse { return GRPCCore.StreamingServerResponse( metadata: [:], producer: { writer in try await self.runContainer( - request: request.message, + request: request.messages, response: writer, context: context ) @@ -178,8 +183,13 @@ extension Edge_Agent_Services_V1_EdgeAgentService { public protocol ClientProtocol: Sendable { /// Call the "RunContainer" method. /// + /// > Source IDL Documentation: + /// > + /// > Upload a container image to the agent, and run it. + /// > The first message in the stream MUST be the header. + /// /// - Parameters: - /// - request: A request containing a single `Edge_Agent_Services_V1_RunContainerRequest` message. + /// - request: A streaming request producing `Edge_Agent_Services_V1_RunContainerRequest` messages. /// - serializer: A serializer for `Edge_Agent_Services_V1_RunContainerRequest` messages. /// - deserializer: A deserializer for `Edge_Agent_Services_V1_RunContainerResponse` messages. /// - options: Options to apply to this RPC. @@ -188,7 +198,7 @@ extension Edge_Agent_Services_V1_EdgeAgentService { /// hasn't already finished. /// - Returns: The result of `handleResponse`. func runContainer( - request: GRPCCore.ClientRequest, + request: GRPCCore.StreamingClientRequest, serializer: some GRPCCore.MessageSerializer, deserializer: some GRPCCore.MessageDeserializer, options: GRPCCore.CallOptions, @@ -214,8 +224,13 @@ extension Edge_Agent_Services_V1_EdgeAgentService { /// Call the "RunContainer" method. /// + /// > Source IDL Documentation: + /// > + /// > Upload a container image to the agent, and run it. + /// > The first message in the stream MUST be the header. + /// /// - Parameters: - /// - request: A request containing a single `Edge_Agent_Services_V1_RunContainerRequest` message. + /// - request: A streaming request producing `Edge_Agent_Services_V1_RunContainerRequest` messages. /// - serializer: A serializer for `Edge_Agent_Services_V1_RunContainerRequest` messages. /// - deserializer: A deserializer for `Edge_Agent_Services_V1_RunContainerResponse` messages. /// - options: Options to apply to this RPC. @@ -224,13 +239,13 @@ extension Edge_Agent_Services_V1_EdgeAgentService { /// hasn't already finished. /// - Returns: The result of `handleResponse`. public func runContainer( - request: GRPCCore.ClientRequest, + request: GRPCCore.StreamingClientRequest, serializer: some GRPCCore.MessageSerializer, deserializer: some GRPCCore.MessageDeserializer, options: GRPCCore.CallOptions = .defaults, onResponse handleResponse: @Sendable @escaping (GRPCCore.StreamingClientResponse) async throws -> Result ) async throws -> Result where Result: Sendable { - try await self.client.serverStreaming( + try await self.client.bidirectionalStreaming( request: request, descriptor: Edge_Agent_Services_V1_EdgeAgentService.Method.RunContainer.descriptor, serializer: serializer, @@ -246,15 +261,20 @@ extension Edge_Agent_Services_V1_EdgeAgentService { extension Edge_Agent_Services_V1_EdgeAgentService.ClientProtocol { /// Call the "RunContainer" method. /// + /// > Source IDL Documentation: + /// > + /// > Upload a container image to the agent, and run it. + /// > The first message in the stream MUST be the header. + /// /// - Parameters: - /// - request: A request containing a single `Edge_Agent_Services_V1_RunContainerRequest` message. + /// - request: A streaming request producing `Edge_Agent_Services_V1_RunContainerRequest` messages. /// - options: Options to apply to this RPC. /// - handleResponse: A closure which handles the response, the result of which is /// returned to the caller. Returning from the closure will cancel the RPC if it /// hasn't already finished. /// - Returns: The result of `handleResponse`. public func runContainer( - request: GRPCCore.ClientRequest, + request: GRPCCore.StreamingClientRequest, options: GRPCCore.CallOptions = .defaults, onResponse handleResponse: @Sendable @escaping (GRPCCore.StreamingClientResponse) async throws -> Result ) async throws -> Result where Result: Sendable { @@ -272,23 +292,29 @@ extension Edge_Agent_Services_V1_EdgeAgentService.ClientProtocol { extension Edge_Agent_Services_V1_EdgeAgentService.ClientProtocol { /// Call the "RunContainer" method. /// + /// > Source IDL Documentation: + /// > + /// > Upload a container image to the agent, and run it. + /// > The first message in the stream MUST be the header. + /// /// - Parameters: - /// - message: request message to send. /// - metadata: Additional metadata to send, defaults to empty. /// - options: Options to apply to this RPC, defaults to `.defaults`. + /// - producer: A closure producing request messages to send to the server. The request + /// stream is closed when the closure returns. /// - handleResponse: A closure which handles the response, the result of which is /// returned to the caller. Returning from the closure will cancel the RPC if it /// hasn't already finished. /// - Returns: The result of `handleResponse`. public func runContainer( - _ message: Edge_Agent_Services_V1_RunContainerRequest, metadata: GRPCCore.Metadata = [:], options: GRPCCore.CallOptions = .defaults, + requestProducer producer: @Sendable @escaping (GRPCCore.RPCWriter) async throws -> Void, onResponse handleResponse: @Sendable @escaping (GRPCCore.StreamingClientResponse) async throws -> Result ) async throws -> Result where Result: Sendable { - let request = GRPCCore.ClientRequest( - message: message, - metadata: metadata + let request = GRPCCore.StreamingClientRequest( + metadata: metadata, + producer: producer ) return try await self.runContainer( request: request, diff --git a/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift b/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift index 417340b..08ec686 100644 --- a/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift +++ b/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift @@ -8,6 +8,7 @@ // For information on using the generated types, please see the documentation: // https://github.com/apple/swift-protobuf/ +import Foundation import SwiftProtobuf // If the compiler emits an error on this type, it is because this file @@ -25,8 +26,111 @@ public struct Edge_Agent_Services_V1_RunContainerRequest: Sendable { // `Message` and `Message+*Additions` files in the SwiftProtobuf library for // methods supported on all messages. + public var requestType: Edge_Agent_Services_V1_RunContainerRequest.OneOf_RequestType? = nil + + //// The first message in the stream MUST be the header. + public var header: Edge_Agent_Services_V1_RunContainerRequest.Header { + get { + if case .header(let v)? = requestType {return v} + return Edge_Agent_Services_V1_RunContainerRequest.Header() + } + set {requestType = .header(newValue)} + } + + //// A chunk of the container tarball. + public var chunk: Edge_Agent_Services_V1_RunContainerRequest.Chunk { + get { + if case .chunk(let v)? = requestType {return v} + return Edge_Agent_Services_V1_RunContainerRequest.Chunk() + } + set {requestType = .chunk(newValue)} + } + + //// After uploading the container, control messages can be sent to the agent. + public var control: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand { + get { + if case .control(let v)? = requestType {return v} + return Edge_Agent_Services_V1_RunContainerRequest.ControlCommand() + } + set {requestType = .control(newValue)} + } + public var unknownFields = SwiftProtobuf.UnknownStorage() + public enum OneOf_RequestType: Equatable, Sendable { + //// The first message in the stream MUST be the header. + case header(Edge_Agent_Services_V1_RunContainerRequest.Header) + //// A chunk of the container tarball. + case chunk(Edge_Agent_Services_V1_RunContainerRequest.Chunk) + //// After uploading the container, control messages can be sent to the agent. + case control(Edge_Agent_Services_V1_RunContainerRequest.ControlCommand) + + } + + public struct Header: Sendable { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + /// Unique name for the container image + public var imageName: String = String() + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} + } + + public struct Chunk: @unchecked Sendable { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + /// Binary chunk of the container tarball + public var data: Data = Data() + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} + } + + public struct ControlCommand: Sendable { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + public var command: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.OneOf_Command? = nil + + public var run: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run { + get { + if case .run(let v)? = command {return v} + return Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run() + } + set {command = .run(newValue)} + } + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public enum OneOf_Command: Equatable, Sendable { + case run(Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run) + + } + + public struct Run: Sendable { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + /// Whether to run the container with a debugger + public var debug: Bool = false + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} + } + + public init() {} + } + public init() {} } @@ -35,8 +139,37 @@ public struct Edge_Agent_Services_V1_RunContainerResponse: Sendable { // `Message` and `Message+*Additions` files in the SwiftProtobuf library for // methods supported on all messages. + public var responseType: Edge_Agent_Services_V1_RunContainerResponse.OneOf_ResponseType? = nil + + public var started: Edge_Agent_Services_V1_RunContainerResponse.Started { + get { + if case .started(let v)? = responseType {return v} + return Edge_Agent_Services_V1_RunContainerResponse.Started() + } + set {responseType = .started(newValue)} + } + public var unknownFields = SwiftProtobuf.UnknownStorage() + public enum OneOf_ResponseType: Equatable, Sendable { + case started(Edge_Agent_Services_V1_RunContainerResponse.Started) + + } + + public struct Started: Sendable { + // SwiftProtobuf.Message conformance is added in an extension below. See the + // `Message` and `Message+*Additions` files in the SwiftProtobuf library for + // methods supported on all messages. + + /// The port that the debugger is listening on. + /// If this is 0, the container is not running with a debugger. + public var debugPort: UInt32 = 0 + + public var unknownFields = SwiftProtobuf.UnknownStorage() + + public init() {} + } + public init() {} } @@ -46,18 +179,231 @@ fileprivate let _protobuf_package = "edge.agent.services.v1" extension Edge_Agent_Services_V1_RunContainerRequest: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { public static let protoMessageName: String = _protobuf_package + ".RunContainerRequest" - public static let _protobuf_nameMap = SwiftProtobuf._NameMap() + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .same(proto: "header"), + 2: .same(proto: "chunk"), + 3: .same(proto: "control"), + ] public mutating func decodeMessage(decoder: inout D) throws { - // Load everything into unknown fields - while try decoder.nextFieldNumber() != nil {} + while let fieldNumber = try decoder.nextFieldNumber() { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every case branch when no optimizations are + // enabled. https://github.com/apple/swift-protobuf/issues/1034 + switch fieldNumber { + case 1: try { + var v: Edge_Agent_Services_V1_RunContainerRequest.Header? + var hadOneofValue = false + if let current = self.requestType { + hadOneofValue = true + if case .header(let m) = current {v = m} + } + try decoder.decodeSingularMessageField(value: &v) + if let v = v { + if hadOneofValue {try decoder.handleConflictingOneOf()} + self.requestType = .header(v) + } + }() + case 2: try { + var v: Edge_Agent_Services_V1_RunContainerRequest.Chunk? + var hadOneofValue = false + if let current = self.requestType { + hadOneofValue = true + if case .chunk(let m) = current {v = m} + } + try decoder.decodeSingularMessageField(value: &v) + if let v = v { + if hadOneofValue {try decoder.handleConflictingOneOf()} + self.requestType = .chunk(v) + } + }() + case 3: try { + var v: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand? + var hadOneofValue = false + if let current = self.requestType { + hadOneofValue = true + if case .control(let m) = current {v = m} + } + try decoder.decodeSingularMessageField(value: &v) + if let v = v { + if hadOneofValue {try decoder.handleConflictingOneOf()} + self.requestType = .control(v) + } + }() + default: break + } + } } public func traverse(visitor: inout V) throws { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every if/case branch local when no optimizations + // are enabled. https://github.com/apple/swift-protobuf/issues/1034 and + // https://github.com/apple/swift-protobuf/issues/1182 + switch self.requestType { + case .header?: try { + guard case .header(let v)? = self.requestType else { preconditionFailure() } + try visitor.visitSingularMessageField(value: v, fieldNumber: 1) + }() + case .chunk?: try { + guard case .chunk(let v)? = self.requestType else { preconditionFailure() } + try visitor.visitSingularMessageField(value: v, fieldNumber: 2) + }() + case .control?: try { + guard case .control(let v)? = self.requestType else { preconditionFailure() } + try visitor.visitSingularMessageField(value: v, fieldNumber: 3) + }() + case nil: break + } try unknownFields.traverse(visitor: &visitor) } public static func ==(lhs: Edge_Agent_Services_V1_RunContainerRequest, rhs: Edge_Agent_Services_V1_RunContainerRequest) -> Bool { + if lhs.requestType != rhs.requestType {return false} + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + +extension Edge_Agent_Services_V1_RunContainerRequest.Header: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = Edge_Agent_Services_V1_RunContainerRequest.protoMessageName + ".Header" + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .standard(proto: "image_name"), + ] + + public mutating func decodeMessage(decoder: inout D) throws { + while let fieldNumber = try decoder.nextFieldNumber() { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every case branch when no optimizations are + // enabled. https://github.com/apple/swift-protobuf/issues/1034 + switch fieldNumber { + case 1: try { try decoder.decodeSingularStringField(value: &self.imageName) }() + default: break + } + } + } + + public func traverse(visitor: inout V) throws { + if !self.imageName.isEmpty { + try visitor.visitSingularStringField(value: self.imageName, fieldNumber: 1) + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: Edge_Agent_Services_V1_RunContainerRequest.Header, rhs: Edge_Agent_Services_V1_RunContainerRequest.Header) -> Bool { + if lhs.imageName != rhs.imageName {return false} + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + +extension Edge_Agent_Services_V1_RunContainerRequest.Chunk: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = Edge_Agent_Services_V1_RunContainerRequest.protoMessageName + ".Chunk" + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .same(proto: "data"), + ] + + public mutating func decodeMessage(decoder: inout D) throws { + while let fieldNumber = try decoder.nextFieldNumber() { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every case branch when no optimizations are + // enabled. https://github.com/apple/swift-protobuf/issues/1034 + switch fieldNumber { + case 1: try { try decoder.decodeSingularBytesField(value: &self.data) }() + default: break + } + } + } + + public func traverse(visitor: inout V) throws { + if !self.data.isEmpty { + try visitor.visitSingularBytesField(value: self.data, fieldNumber: 1) + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: Edge_Agent_Services_V1_RunContainerRequest.Chunk, rhs: Edge_Agent_Services_V1_RunContainerRequest.Chunk) -> Bool { + if lhs.data != rhs.data {return false} + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + +extension Edge_Agent_Services_V1_RunContainerRequest.ControlCommand: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = Edge_Agent_Services_V1_RunContainerRequest.protoMessageName + ".ControlCommand" + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .same(proto: "run"), + ] + + public mutating func decodeMessage(decoder: inout D) throws { + while let fieldNumber = try decoder.nextFieldNumber() { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every case branch when no optimizations are + // enabled. https://github.com/apple/swift-protobuf/issues/1034 + switch fieldNumber { + case 1: try { + var v: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run? + var hadOneofValue = false + if let current = self.command { + hadOneofValue = true + if case .run(let m) = current {v = m} + } + try decoder.decodeSingularMessageField(value: &v) + if let v = v { + if hadOneofValue {try decoder.handleConflictingOneOf()} + self.command = .run(v) + } + }() + default: break + } + } + } + + public func traverse(visitor: inout V) throws { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every if/case branch local when no optimizations + // are enabled. https://github.com/apple/swift-protobuf/issues/1034 and + // https://github.com/apple/swift-protobuf/issues/1182 + try { if case .run(let v)? = self.command { + try visitor.visitSingularMessageField(value: v, fieldNumber: 1) + } }() + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand, rhs: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand) -> Bool { + if lhs.command != rhs.command {return false} + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + +extension Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.protoMessageName + ".Run" + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .same(proto: "debug"), + ] + + public mutating func decodeMessage(decoder: inout D) throws { + while let fieldNumber = try decoder.nextFieldNumber() { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every case branch when no optimizations are + // enabled. https://github.com/apple/swift-protobuf/issues/1034 + switch fieldNumber { + case 1: try { try decoder.decodeSingularBoolField(value: &self.debug) }() + default: break + } + } + } + + public func traverse(visitor: inout V) throws { + if self.debug != false { + try visitor.visitSingularBoolField(value: self.debug, fieldNumber: 1) + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run, rhs: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run) -> Bool { + if lhs.debug != rhs.debug {return false} if lhs.unknownFields != rhs.unknownFields {return false} return true } @@ -65,18 +411,79 @@ extension Edge_Agent_Services_V1_RunContainerRequest: SwiftProtobuf.Message, Swi extension Edge_Agent_Services_V1_RunContainerResponse: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { public static let protoMessageName: String = _protobuf_package + ".RunContainerResponse" - public static let _protobuf_nameMap = SwiftProtobuf._NameMap() + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .same(proto: "started"), + ] public mutating func decodeMessage(decoder: inout D) throws { - // Load everything into unknown fields - while try decoder.nextFieldNumber() != nil {} + while let fieldNumber = try decoder.nextFieldNumber() { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every case branch when no optimizations are + // enabled. https://github.com/apple/swift-protobuf/issues/1034 + switch fieldNumber { + case 1: try { + var v: Edge_Agent_Services_V1_RunContainerResponse.Started? + var hadOneofValue = false + if let current = self.responseType { + hadOneofValue = true + if case .started(let m) = current {v = m} + } + try decoder.decodeSingularMessageField(value: &v) + if let v = v { + if hadOneofValue {try decoder.handleConflictingOneOf()} + self.responseType = .started(v) + } + }() + default: break + } + } } public func traverse(visitor: inout V) throws { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every if/case branch local when no optimizations + // are enabled. https://github.com/apple/swift-protobuf/issues/1034 and + // https://github.com/apple/swift-protobuf/issues/1182 + try { if case .started(let v)? = self.responseType { + try visitor.visitSingularMessageField(value: v, fieldNumber: 1) + } }() try unknownFields.traverse(visitor: &visitor) } public static func ==(lhs: Edge_Agent_Services_V1_RunContainerResponse, rhs: Edge_Agent_Services_V1_RunContainerResponse) -> Bool { + if lhs.responseType != rhs.responseType {return false} + if lhs.unknownFields != rhs.unknownFields {return false} + return true + } +} + +extension Edge_Agent_Services_V1_RunContainerResponse.Started: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { + public static let protoMessageName: String = Edge_Agent_Services_V1_RunContainerResponse.protoMessageName + ".Started" + public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ + 1: .standard(proto: "debug_port"), + ] + + public mutating func decodeMessage(decoder: inout D) throws { + while let fieldNumber = try decoder.nextFieldNumber() { + // The use of inline closures is to circumvent an issue where the compiler + // allocates stack space for every case branch when no optimizations are + // enabled. https://github.com/apple/swift-protobuf/issues/1034 + switch fieldNumber { + case 1: try { try decoder.decodeSingularUInt32Field(value: &self.debugPort) }() + default: break + } + } + } + + public func traverse(visitor: inout V) throws { + if self.debugPort != 0 { + try visitor.visitSingularUInt32Field(value: self.debugPort, fieldNumber: 1) + } + try unknownFields.traverse(visitor: &visitor) + } + + public static func ==(lhs: Edge_Agent_Services_V1_RunContainerResponse.Started, rhs: Edge_Agent_Services_V1_RunContainerResponse.Started) -> Bool { + if lhs.debugPort != rhs.debugPort {return false} if lhs.unknownFields != rhs.unknownFields {return false} return true } diff --git a/Sources/EdgeAgentGRPC/Proto/edge_agent.protoset b/Sources/EdgeAgentGRPC/Proto/edge_agent.protoset index d6312451e665bf5fede5ce0aaa66d08458e60ec4..6342e51c3e09817ee93b1ce38376bc5326af1d85 100644 GIT binary patch literal 879 zcmbW0K}*9h7>2ENoBiCT#37?1$Sw|))>cm*1UG~UBBKMZrCY-`v}sM#fk*HD3xA&e z2$L@z?6AZ1*1XC4^v&C^@KXh!##8D>Q_6&!Q@)r_XzngNSIHsD;lc~c9Vg?dNXe1f zba2yIdYpyV`8MG&C|EybBAPSG1Nv6bT#!eo&S(@<-ZoB+?w;>E>jBOhvok>FU@IsY z_nNMA*%rVA|SGlO$rXq%L9h8B}#_asn_< zWK$uF5{lQi0t=NVDcHwQ!AOL_P8^8{Pt*f^wm%JMsxD=`@*N}qlNYS4@%H~=Rns!C zg2@b~T#*)3VmdCS2;U9#qGzdw@P54})Vcnkp?J?|=gWYWG_HK@JWE+l$q;I}h`69} z8T5LypxW+?v{&_6V(c!|N>vq#F~cn70{M-=LS-n0-aa%q)gVmie;9CZFUR#xP54L; eI7y(snw*?(77(Y|zS!)r4$(C(jd2x!o2y?;jv)pB delta 63 zcmaFQ*3QJt)xtE9*_T Date: Fri, 28 Mar 2025 16:39:20 +0100 Subject: [PATCH 06/17] Add swift nio dependency, update target dependencies --- Package.swift | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Package.swift b/Package.swift index 1c86e66..c6cdcdb 100644 --- a/Package.swift +++ b/Package.swift @@ -18,6 +18,7 @@ let package = Package( .package(url: "https://github.com/grpc/grpc-swift-protobuf.git", from: "1.0.0"), .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.7.0"), .package(url: "https://github.com/grpc/grpc-swift-extras.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"), ], targets: [ /// The main executable provided by edge-cli. @@ -25,8 +26,11 @@ let package = Package( name: "edge", dependencies: [ .product(name: "ArgumentParser", package: "swift-argument-parser"), - .target(name: "EdgeCLI"), .product(name: "Logging", package: "swift-log"), + .product(name: "_NIOFileSystem", package: "swift-nio"), + .product(name: "GRPCNIOTransportHTTP2", package: "grpc-swift-nio-transport"), + .target(name: "EdgeAgentGRPC"), + .target(name: "EdgeCLI"), ], resources: [ .copy("Resources") @@ -44,7 +48,9 @@ let package = Package( .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"), .product(name: "GRPCServiceLifecycle", package: "grpc-swift-extras"), .product(name: "GRPCHealthService", package: "grpc-swift-extras"), + .product(name: "_NIOFileSystem", package: "swift-nio"), .target(name: "EdgeAgentGRPC"), + .target(name: "Shell"), ] ), From f85bc1d085934248a625c6832ea5c291c76b2d94 Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 28 Mar 2025 16:39:47 +0100 Subject: [PATCH 07/17] Add agent connection options to RunCommand --- Sources/edge/AgentConnectionOptions.swift | 9 +++++++++ Sources/edge/Commands/RunCommand.swift | 2 ++ 2 files changed, 11 insertions(+) create mode 100644 Sources/edge/AgentConnectionOptions.swift diff --git a/Sources/edge/AgentConnectionOptions.swift b/Sources/edge/AgentConnectionOptions.swift new file mode 100644 index 0000000..d3aa0d6 --- /dev/null +++ b/Sources/edge/AgentConnectionOptions.swift @@ -0,0 +1,9 @@ +import ArgumentParser + +struct AgentConnectionOptions: ParsableArguments { + @Option(name: .long, help: "The host of the Edge Agent to connect to.") + var agentHost: String + + @Option(name: .long, help: "The port of the Edge Agent to connect to.") + var agentPort: Int = 50051 +} diff --git a/Sources/edge/Commands/RunCommand.swift b/Sources/edge/Commands/RunCommand.swift index 13dda5f..a9daa2b 100644 --- a/Sources/edge/Commands/RunCommand.swift +++ b/Sources/edge/Commands/RunCommand.swift @@ -25,6 +25,8 @@ struct RunCommand: AsyncParsableCommand { @Flag(name: .shortAndLong, help: "Attach a debugger to the container") var debug: Bool = false + @OptionGroup var agentConnectionOptions: AgentConnectionOptions + func run() async throws { let logger = Logger(label: "apache-edge.cli.run") From 75bfea18154be75d36fb7504cd829c207443ef2e Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 28 Mar 2025 16:40:18 +0100 Subject: [PATCH 08/17] Call into edge agent to run the container, instead of running it from edge CLI --- Sources/edge/Commands/RunCommand.swift | 77 ++++++++++++++++++-------- 1 file changed, 55 insertions(+), 22 deletions(-) diff --git a/Sources/edge/Commands/RunCommand.swift b/Sources/edge/Commands/RunCommand.swift index a9daa2b..e9deea3 100644 --- a/Sources/edge/Commands/RunCommand.swift +++ b/Sources/edge/Commands/RunCommand.swift @@ -1,8 +1,13 @@ import ArgumentParser import ContainerBuilder +import EdgeAgentGRPC import EdgeCLI import Foundation +import GRPCCore +import GRPCNIOTransportHTTP2 import Logging +import NIO +import NIOFileSystem import Shell struct RunCommand: AsyncParsableCommand { @@ -85,30 +90,58 @@ struct RunCommand: AsyncParsableCommand { outputPath: outputPath ) - logger.info( - "Loading into Docker", - metadata: [ - "imageName": .string(imageName), - "path": .string(outputPath), - ] - ) - try await Shell.run(["docker", "load", "-i", outputPath]) - - if debug { - logger.info( - "Running container with debugger", - metadata: ["imageName": .string(imageName)] + try await withGRPCClient( + transport: .http2NIOPosix( + target: .dns( + host: agentConnectionOptions.agentHost, + port: agentConnectionOptions.agentPort + ), + transportSecurity: .plaintext ) - try await Shell.run([ - "docker", "run", "--rm", "-it", "-p", "4242:4242", - "--cap-add=SYS_PTRACE", "--security-opt", "seccomp=unconfined", imageName, - "ds2", "gdbserver", "0.0.0.0:4242", "/bin/\(executableTarget.name)", - ]) - return - } + ) { client in + let agent = Edge_Agent_Services_V1_EdgeAgentService.Client(wrapping: client) + try await agent.runContainer { writer in + // First, send the header. + try await writer.write( + .with { + $0.header.imageName = imageName + } + ) - logger.info("Running container", metadata: ["imageName": .string(imageName)]) - try await Shell.run(["docker", "run", "--rm", imageName]) + // Send the chunks + let fileHandle = try await FileSystem.shared.openFile( + forReadingAt: FilePath(outputPath) + ) + do { + for try await chunk in fileHandle.readChunks() { + try await writer.write( + .with { + $0.requestType = .chunk( + .with { $0.data = Data(buffer: chunk) } + ) + } + ) + } + } catch { + try await fileHandle.close() + throw error + } + try await fileHandle.close() + + // Send the control command to start the container. + try await writer.write( + .with { + $0.requestType = .control( + .with { $0.command = .run(.with { $0.debug = debug }) } + ) + } + ) + } onResponse: { response in + for try await message in response.messages { + print(message) + } + } + } } } From f3befb19aaf9cab64c507801e0755c1d91979497 Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 28 Mar 2025 16:40:48 +0100 Subject: [PATCH 09/17] Add DockerCLI type for calling into docker --- Sources/edge-agent/DockerCLI.swift | 67 ++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 Sources/edge-agent/DockerCLI.swift diff --git a/Sources/edge-agent/DockerCLI.swift b/Sources/edge-agent/DockerCLI.swift new file mode 100644 index 0000000..661ceea --- /dev/null +++ b/Sources/edge-agent/DockerCLI.swift @@ -0,0 +1,67 @@ +import Shell + +/// Represents the Docker CLI interface for managing container images and running containers. +public struct DockerCLI: Sendable { + public let command: String + + public init(command: String = "docker") { + self.command = command + } + + /// Options for the Docker run command. + public enum RunOption: Sendable { + /// Remove the container when it exits. + case rm + + /// Keep STDIN open even if not attached. + case interactive + + /// Allocate a pseudo-TTY. + case tty + + /// Publish a container's port to the host. + case publishPort(hostPort: UInt16, containerPort: UInt16) + + /// Add Linux capabilities. + case capAdd(String) + + /// Security options. + case securityOpt(String) + + /// The arguments to pass to the Docker run command. + var arguments: [String] { + switch self { + case .rm: + return ["--rm"] + case .interactive: + return ["-i"] + case .tty: + return ["-t"] + case .publishPort(let hostPort, let containerPort): + return ["-p", "\(hostPort):\(containerPort)"] + case .capAdd(let capability): + return ["--cap-add=\(capability)"] + case .securityOpt(let option): + return ["--security-opt", option] + } + } + } + + /// Load a Docker image from a tar archive. + @discardableResult + public func load(filePath: String) async throws -> String { + let arguments = [command, "load", "-i", filePath] + return try await Shell.run(arguments) + } + + /// Run a Docker container. + @discardableResult + public func run( + options: [RunOption] = [], + image: String, + command: [String] = [] + ) async throws -> String { + let arguments = [self.command, "run"] + options.flatMap(\.arguments) + [image] + command + return try await Shell.run(arguments) + } +} From e7ce9c0d5a262e3b77ca4917ce906014844f4448 Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 28 Mar 2025 16:41:04 +0100 Subject: [PATCH 10/17] Exclude protoset from build --- Package.swift | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Package.swift b/Package.swift index c6cdcdb..319c955 100644 --- a/Package.swift +++ b/Package.swift @@ -86,6 +86,9 @@ let package = Package( dependencies: [ .product(name: "GRPCCore", package: "grpc-swift"), .product(name: "GRPCProtobuf", package: "grpc-swift-protobuf"), + ], + exclude: [ + "Proto/edge_agent.protoset" ] ), ] From 9031bd2d0931c887ec0d164a006a1fc9c77a4f31 Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 28 Mar 2025 16:41:20 +0100 Subject: [PATCH 11/17] Set agent log level to trace for debug builds --- Sources/edge-agent/EdgeAgent.swift | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Sources/edge-agent/EdgeAgent.swift b/Sources/edge-agent/EdgeAgent.swift index d3169d6..bcb522a 100644 --- a/Sources/edge-agent/EdgeAgent.swift +++ b/Sources/edge-agent/EdgeAgent.swift @@ -18,6 +18,14 @@ struct EdgeAgent: AsyncParsableCommand { var port: Int = 50051 func run() async throws { + LoggingSystem.bootstrap { label in + var handler = StreamLogHandler.standardError(label: label) + #if DEBUG + handler.logLevel = .trace + #endif + return handler + } + let logger = Logger(label: "apache-edge.agent") logger.info("Starting Edge Agent on port \(port)") From cb2792988028873389e7b6fc9d37d7c6345f3160 Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 28 Mar 2025 16:42:19 +0100 Subject: [PATCH 12/17] Initial EdgeAgentService implementation --- .../Services/EdgeAgentService.swift | 49 +++++ .../RunContainerRequestHandler+Proto.swift | 48 ++++ .../Services/RunContainerRequestHandler.swift | 205 ++++++++++++++++++ 3 files changed, 302 insertions(+) create mode 100644 Sources/edge-agent/Services/EdgeAgentService.swift create mode 100644 Sources/edge-agent/Services/RunContainerRequestHandler+Proto.swift create mode 100644 Sources/edge-agent/Services/RunContainerRequestHandler.swift diff --git a/Sources/edge-agent/Services/EdgeAgentService.swift b/Sources/edge-agent/Services/EdgeAgentService.swift new file mode 100644 index 0000000..aade6a1 --- /dev/null +++ b/Sources/edge-agent/Services/EdgeAgentService.swift @@ -0,0 +1,49 @@ +import EdgeAgentGRPC + +struct EdgeAgentService: Edge_Agent_Services_V1_EdgeAgentService.ServiceProtocol { + func runContainer( + request: StreamingServerRequest, + context: ServerContext + ) async throws -> StreamingServerResponse { + return StreamingServerResponse { + ( + writer: RPCWriter + ) async throws -> Metadata in + try await withThrowingDiscardingTaskGroup { group in + var handler = RunContainerRequestHandler() + + // Add a task to write outgoing events to the response. + group.addTask { [events = handler.events] in + for try await event in events { + try await writer.write(event.proto) + } + } + + // Iterate over incoming messages, converting each from protobuf before passing it + // to the request handler. + for try await message in request.messages { + switch message.requestType { + case .header(let header): + let header = try RunContainerRequestHandler.Header(validating: header) + try await handler.handle(header) + case .chunk(let chunk): + let chunk = try RunContainerRequestHandler.Chunk(validating: chunk) + try await handler.handle(chunk) + case .control(let control): + let control = try RunContainerRequestHandler.ControlCommand( + validating: control + ) + try await handler.handle(control) + case nil: + throw RPCError( + code: .invalidArgument, + message: "Invalid request: Unknown message type" + ) + } + } + } + + return Metadata() + } + } +} diff --git a/Sources/edge-agent/Services/RunContainerRequestHandler+Proto.swift b/Sources/edge-agent/Services/RunContainerRequestHandler+Proto.swift new file mode 100644 index 0000000..38b5650 --- /dev/null +++ b/Sources/edge-agent/Services/RunContainerRequestHandler+Proto.swift @@ -0,0 +1,48 @@ +import EdgeAgentGRPC + +extension RunContainerRequestHandler.Header { + /// Initialize a header from a protobuf header, validating the contents. + init(validating proto: Edge_Agent_Services_V1_RunContainerRequest.Header) throws { + guard !proto.imageName.isEmpty else { + throw RPCError(code: .invalidArgument, message: "Image name cannot be empty") + } + + self.imageName = proto.imageName + } +} + +extension RunContainerRequestHandler.Chunk { + init(validating proto: Edge_Agent_Services_V1_RunContainerRequest.Chunk) throws { + guard !proto.data.isEmpty else { + throw RPCError(code: .invalidArgument, message: "Chunk data cannot be empty") + } + + self.data = proto.data + } +} + +extension RunContainerRequestHandler.ControlCommand { + init(validating proto: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand) throws { + switch proto.command { + case .run(let run): + self = .run(Run(debug: run.debug)) + case nil: + throw RPCError(code: .invalidArgument, message: "Control command cannot be unspecified") + } + } +} + +extension RunContainerRequestHandler.Event { + var proto: Edge_Agent_Services_V1_RunContainerResponse { + .with { + switch self { + case .containerStarted(let containerStarted): + $0.responseType = .started( + .with { + $0.debugPort = containerStarted.debugPort + } + ) + } + } + } +} diff --git a/Sources/edge-agent/Services/RunContainerRequestHandler.swift b/Sources/edge-agent/Services/RunContainerRequestHandler.swift new file mode 100644 index 0000000..1c069f6 --- /dev/null +++ b/Sources/edge-agent/Services/RunContainerRequestHandler.swift @@ -0,0 +1,205 @@ +import Foundation +import Logging +import NIOFileSystem +import Shell + +/// A state machine that handles the request to run a container. +struct RunContainerRequestHandler { + enum State { + /// This is the initial state. The handler is waiting for the header. + case waitingForHeader + + /// After the header is received, the handler transitions to the `acceptingChunks`. In this + /// state, a file handle is opened for writing and chunks are being accepted. + case acceptingChunks(AcceptingChunks) + + /// Container is running, with associated data about the running container. + case running(Running) + + struct AcceptingChunks { + let header: Header + var writer: BufferedWriter + var imagePath: FilePath + var fileHandle: WriteFileHandle + } + + struct Running { + let imageName: String + let debugPort: UInt32 + } + } + + /// The header of the request. + struct Header { + let imageName: String + } + + struct Chunk { + let data: Data + } + + enum ControlCommand { + case run(Run) + + struct Run { + var debug: Bool + } + } + + enum Event { + case containerStarted(ContainerStarted) + + struct ContainerStarted { + let debugPort: UInt32 + } + } + + enum Error: Swift.Error { + /// A message was received before the header. + case expectedHeader + + /// A header message was received, but not expected. + case unexpectedHeader + + /// A chunk message was received, but not expected. + case unexpectedChunk + + /// An internal inconsistency was detected. This is a programming error in the agent. + case internalInconsistency + + /// An unexpected control command was received. + case unexpectedControlCommand(ControlCommand) + + /// The container failed to start. + case containerStartFailed(Swift.Error) + } + + public let events: AsyncStream + private let eventsContinuation: AsyncStream.Continuation + + private var state: State = .waitingForHeader + private let dockerCLI = DockerCLI() + private let logger = Logger(label: "edge-agent.run-container") + + init() { + let (stream, continuation) = AsyncStream.makeStream(bufferingPolicy: .unbounded) + self.events = stream + self.eventsContinuation = continuation + } + + mutating func handle(_ header: Header) async throws { + guard case .waitingForHeader = self.state else { + throw Error.unexpectedHeader + } + + // Create a file for writing in the temporary directory. + let uuid = UUID().uuidString + let fileName = "container-\(header.imageName).\(uuid).tar" + let path = try await FileSystem.shared.temporaryDirectory.appending(fileName) + logger.info("Writing container image", metadata: ["path": .string(path.string)]) + let writeHandle = try await FileSystem.shared.openFile( + forWritingAt: path, + options: .newFile(replaceExisting: false) + ) + let writer = writeHandle.bufferedWriter() + + self.state = .acceptingChunks( + State.AcceptingChunks( + header: header, + writer: writer, + imagePath: path, + fileHandle: writeHandle + ) + ) + } + + mutating func handle(_ chunk: Chunk) async throws { + guard case .acceptingChunks(var state) = self.state else { + throw Error.unexpectedChunk + } + + logger.debug("Writing chunk", metadata: ["size": .string("\(chunk.data.count) bytes")]) + try await state.writer.write(contentsOf: chunk.data) + self.state = .acceptingChunks(state) + } + + mutating func handle(_ control: ControlCommand) async throws { + switch (state, control) { + case (.waitingForHeader, _): + throw Error.expectedHeader + + case (.acceptingChunks(var acceptingState), .run(let run)): + // Finalize writing the container image + try await acceptingState.writer.flush() + try await acceptingState.fileHandle.close() + + // Load the container image into Docker + let imagePath = acceptingState.imagePath.string + logger.info( + "Loading container image into Docker", + metadata: ["path": .string(imagePath)] + ) + try await dockerCLI.load(filePath: imagePath) + + let imageName = acceptingState.header.imageName + var runOptions: [DockerCLI.RunOption] = [.rm] + var debugPort: UInt32 = 0 + + if run.debug { + // Configure for debugging + debugPort = 4242 + logger.info( + "Starting container in debug mode", + metadata: ["image": .string(imageName), "port": .string("\(debugPort)")] + ) + runOptions.append(contentsOf: [ + .publishPort(hostPort: UInt16(debugPort), containerPort: 4242), + .capAdd("SYS_PTRACE"), + .securityOpt("seccomp=unconfined"), + ]) + + do { + try await dockerCLI.run( + options: runOptions, + image: imageName, + command: ["ds2", "gdbserver", "0.0.0.0:\(debugPort)", "/bin/\(imageName)"] + ) + logger.info( + "Container started in debug mode successfully", + metadata: ["image": .string(imageName)] + ) + } catch { + logger.error( + "Failed to start container in debug mode", + metadata: ["error": .string("\(error)")] + ) + throw Error.containerStartFailed(error) + } + } else { + // Start the container without debugging + logger.info( + "Starting container without debugging", + metadata: ["image": .string(imageName)] + ) + try await dockerCLI.run(options: runOptions, image: imageName) + logger.info( + "Container started successfully", + metadata: ["image": .string(imageName)] + ) + } + + eventsContinuation.yield(.containerStarted(.init(debugPort: debugPort))) + + // Update state to running + self.state = .running( + State.Running( + imageName: imageName, + debugPort: debugPort + ) + ) + + case (.running, _): + throw Error.unexpectedControlCommand(control) + } + } +} From ad8435600d47774a7cec52d11da579cdd0b8fd01 Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 28 Mar 2025 16:42:29 +0100 Subject: [PATCH 13/17] Add EdgeAgentService to GRPC server --- Sources/edge-agent/EdgeAgent.swift | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Sources/edge-agent/EdgeAgent.swift b/Sources/edge-agent/EdgeAgent.swift index bcb522a..750884e 100644 --- a/Sources/edge-agent/EdgeAgent.swift +++ b/Sources/edge-agent/EdgeAgent.swift @@ -36,11 +36,16 @@ struct EdgeAgent: AsyncParsableCommand { ) let healthService = HealthService() + healthService.provider.updateStatus( + .serving, + forService: Edge_Agent_Services_V1_EdgeAgentService.descriptor + ) let grpcServer = GRPCServer( transport: transport, services: [ - healthService + healthService, + EdgeAgentService(), ] ) From 37753fb8d94814e9752c2ca8e000d0b3d16a56de Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Thu, 3 Apr 2025 11:37:31 +0200 Subject: [PATCH 14/17] Add Swift Crypto for Linux compatibility --- Package.resolved | 20 ++++++++++++++++++- Package.swift | 4 +++- .../ContainerBuilder/ContainerBuilder.swift | 4 ++-- Sources/edge/Commands/RunCommand.swift | 4 ++-- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/Package.resolved b/Package.resolved index 244d3e2..1323d3a 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,5 +1,5 @@ { - "originHash" : "c013fa6f6ecad5140db085594899898d778f2daf9a180f4cbb9ac3815060f23f", + "originHash" : "ce6c19e46ec922ba5d029b6794aada7f6001f93288dfbff669536ddf80aa411e", "pins" : [ { "identity" : "grpc-swift", @@ -55,6 +55,15 @@ "version" : "1.5.0" } }, + { + "identity" : "swift-asn1", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-asn1.git", + "state" : { + "revision" : "ae33e5941bb88d88538d0a6b19ca0b01e6c76dcf", + "version" : "1.3.1" + } + }, { "identity" : "swift-async-algorithms", "kind" : "remoteSourceControl", @@ -82,6 +91,15 @@ "version" : "1.1.4" } }, + { + "identity" : "swift-crypto", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-crypto.git", + "state" : { + "revision" : "a6ce32a18b81b04ce7e897d1d98df6eb2da04786", + "version" : "3.12.2" + } + }, { "identity" : "swift-distributed-tracing", "kind" : "remoteSourceControl", diff --git a/Package.swift b/Package.swift index 319c955..aea7ad3 100644 --- a/Package.swift +++ b/Package.swift @@ -19,6 +19,7 @@ let package = Package( .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.7.0"), .package(url: "https://github.com/grpc/grpc-swift-extras.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"), + .package(url: "https://github.com/apple/swift-crypto.git", from: "3.12.2"), ], targets: [ /// The main executable provided by edge-cli. @@ -68,7 +69,8 @@ let package = Package( .target( name: "ContainerBuilder", dependencies: [ - .target(name: "Shell") + .target(name: "Shell"), + .product(name: "Crypto", package: "swift-crypto"), ] ), diff --git a/Sources/ContainerBuilder/ContainerBuilder.swift b/Sources/ContainerBuilder/ContainerBuilder.swift index 13b60e2..17dfbb5 100644 --- a/Sources/ContainerBuilder/ContainerBuilder.swift +++ b/Sources/ContainerBuilder/ContainerBuilder.swift @@ -1,4 +1,4 @@ -import CryptoKit +import Crypto import Foundation import Shell @@ -136,7 +136,7 @@ public func buildDockerContainerImage( try FileManager.default.removeItem(at: tempDir) } -// Calculate SHA256 hash using CryptoKit +// Calculate SHA256 hash using Swift Crypto private func sha256(data: Data) -> String { let digest = SHA256.hash(data: data) return digest.map { String(format: "%02x", $0) }.joined() diff --git a/Sources/edge/Commands/RunCommand.swift b/Sources/edge/Commands/RunCommand.swift index e9deea3..5b36621 100644 --- a/Sources/edge/Commands/RunCommand.swift +++ b/Sources/edge/Commands/RunCommand.swift @@ -17,7 +17,7 @@ struct RunCommand: AsyncParsableCommand { var description: String { switch self { case .noExecutableTarget: - return String(localized: "No executable target found in package") + return "No executable target found in package" } } } @@ -118,7 +118,7 @@ struct RunCommand: AsyncParsableCommand { try await writer.write( .with { $0.requestType = .chunk( - .with { $0.data = Data(buffer: chunk) } + .with { $0.data = Data(chunk.readableBytesView) } ) } ) From c4d929b5364c1aec0f2613f28ff4bd7145b13f2e Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 4 Apr 2025 09:01:49 +0200 Subject: [PATCH 15/17] Use TransportServices on macOS --- Sources/edge/Commands/RunCommand.swift | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/Sources/edge/Commands/RunCommand.swift b/Sources/edge/Commands/RunCommand.swift index 5b36621..e93d79d 100644 --- a/Sources/edge/Commands/RunCommand.swift +++ b/Sources/edge/Commands/RunCommand.swift @@ -90,15 +90,23 @@ struct RunCommand: AsyncParsableCommand { outputPath: outputPath ) - try await withGRPCClient( - transport: .http2NIOPosix( - target: .dns( - host: agentConnectionOptions.agentHost, - port: agentConnectionOptions.agentPort - ), + let target = ResolvableTargets.DNS( + host: agentConnectionOptions.agentHost, + port: agentConnectionOptions.agentPort + ) + #if os(macOS) + let transport = try HTTP2ClientTransport.TransportServices( + target: target, + transportSecurity: .plaintext + ) + #else + let transport = try HTTP2ClientTransport.Posix( + target: target, transportSecurity: .plaintext ) - ) { client in + #endif + + try await withGRPCClient(transport: transport) { client in let agent = Edge_Agent_Services_V1_EdgeAgentService.Client(wrapping: client) try await agent.runContainer { writer in // First, send the header. From 7575b2dd465dfbd18dc12d377e5a659245fb9945 Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Fri, 4 Apr 2025 09:02:03 +0200 Subject: [PATCH 16/17] agent: Host on ipv6 instead of ipv4 --- Sources/edge-agent/EdgeAgent.swift | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/Sources/edge-agent/EdgeAgent.swift b/Sources/edge-agent/EdgeAgent.swift index 750884e..c90b97d 100644 --- a/Sources/edge-agent/EdgeAgent.swift +++ b/Sources/edge-agent/EdgeAgent.swift @@ -30,23 +30,23 @@ struct EdgeAgent: AsyncParsableCommand { logger.info("Starting Edge Agent on port \(port)") - let transport = HTTP2ServerTransport.Posix( - address: .ipv4(host: "0.0.0.0", port: port), - transportSecurity: .plaintext - ) - let healthService = HealthService() healthService.provider.updateStatus( .serving, forService: Edge_Agent_Services_V1_EdgeAgentService.descriptor ) + let services: [any RegistrableRPCService] = [ + healthService, + EdgeAgentService(), + ] + let grpcServer = GRPCServer( - transport: transport, - services: [ - healthService, - EdgeAgentService(), - ] + transport: .http2NIOPosix( + address: .ipv6(host: "::", port: port), + transportSecurity: .plaintext + ), + services: services ) let group = ServiceGroup( From 8e7387f7feb0a3d25bc539b2733a7c7d2961aac2 Mon Sep 17 00:00:00 2001 From: Robbert Brandsma Date: Sat, 5 Apr 2025 12:27:21 +0200 Subject: [PATCH 17/17] Remove edge agent and proto from the package --- Package.resolved | 53 +- Package.swift | 56 +- .../services/v1/edge_agent_v1_service.proto | 55 -- Sources/EdgeAgentGRPC/EdgeAgentGRPC.swift | 1 - .../v1/edge_agent_v1_service.grpc.swift | 325 ------------ .../v1/edge_agent_v1_service.pb.swift | 490 ------------------ .../EdgeAgentGRPC/Proto/edge_agent.protoset | Bin 879 -> 0 bytes Sources/Shell/Shell.swift | 111 ---- Sources/edge-agent/DockerCLI.swift | 67 --- Sources/edge-agent/EdgeAgent.swift | 61 --- .../Services/EdgeAgentService.swift | 49 -- .../RunContainerRequestHandler+Proto.swift | 48 -- .../Services/RunContainerRequestHandler.swift | 205 -------- 13 files changed, 17 insertions(+), 1504 deletions(-) delete mode 100644 Proto/edge/agent/services/v1/edge_agent_v1_service.proto delete mode 100644 Sources/EdgeAgentGRPC/EdgeAgentGRPC.swift delete mode 100644 Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift delete mode 100644 Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift delete mode 100644 Sources/EdgeAgentGRPC/Proto/edge_agent.protoset delete mode 100644 Sources/Shell/Shell.swift delete mode 100644 Sources/edge-agent/DockerCLI.swift delete mode 100644 Sources/edge-agent/EdgeAgent.swift delete mode 100644 Sources/edge-agent/Services/EdgeAgentService.swift delete mode 100644 Sources/edge-agent/Services/RunContainerRequestHandler+Proto.swift delete mode 100644 Sources/edge-agent/Services/RunContainerRequestHandler.swift diff --git a/Package.resolved b/Package.resolved index 1323d3a..5d616db 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,22 +1,21 @@ { - "originHash" : "ce6c19e46ec922ba5d029b6794aada7f6001f93288dfbff669536ddf80aa411e", + "originHash" : "3fdea633ce1bf54ea9439217287fd68d6551bf835fa2286e18d142cd9d8865c0", "pins" : [ { - "identity" : "grpc-swift", + "identity" : "edge-agent-common", "kind" : "remoteSourceControl", - "location" : "https://github.com/grpc/grpc-swift.git", + "location" : "https://github.com/apache-edge/edge-agent-common", "state" : { - "revision" : "c4d6281784f50bf2e60d3af45e83be1194056062", - "version" : "2.1.2" + "revision" : "952035635c630d366dfbcff04af1934c7c051f23" } }, { - "identity" : "grpc-swift-extras", + "identity" : "grpc-swift", "kind" : "remoteSourceControl", - "location" : "https://github.com/grpc/grpc-swift-extras.git", + "location" : "https://github.com/grpc/grpc-swift.git", "state" : { - "revision" : "c32c03dcfa0957025c52c27a1c0b7a070a199727", - "version" : "1.0.0" + "revision" : "c4d6281784f50bf2e60d3af45e83be1194056062", + "version" : "2.1.2" } }, { @@ -64,15 +63,6 @@ "version" : "1.3.1" } }, - { - "identity" : "swift-async-algorithms", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-async-algorithms.git", - "state" : { - "revision" : "4c3ea81f81f0a25d0470188459c6d4bf20cf2f97", - "version" : "1.0.3" - } - }, { "identity" : "swift-atomics", "kind" : "remoteSourceControl", @@ -100,15 +90,6 @@ "version" : "3.12.2" } }, - { - "identity" : "swift-distributed-tracing", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-distributed-tracing.git", - "state" : { - "revision" : "a64a0abc2530f767af15dd88dda7f64d5f1ff9de", - "version" : "1.2.0" - } - }, { "identity" : "swift-http-structured-headers", "kind" : "remoteSourceControl", @@ -199,24 +180,6 @@ "version" : "1.29.0" } }, - { - "identity" : "swift-service-context", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-service-context.git", - "state" : { - "revision" : "8946c930cae601452149e45d31d8ddfac973c3c7", - "version" : "1.2.0" - } - }, - { - "identity" : "swift-service-lifecycle", - "kind" : "remoteSourceControl", - "location" : "https://github.com/swift-server/swift-service-lifecycle.git", - "state" : { - "revision" : "7ee57f99fbe0073c3700997186721e74d925b59b", - "version" : "2.7.0" - } - }, { "identity" : "swift-system", "kind" : "remoteSourceControl", diff --git a/Package.swift b/Package.swift index aea7ad3..698ccff 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version: 6.0 +// swift-tools-version: 6.1 import PackageDescription let package = Package( @@ -7,19 +7,18 @@ let package = Package( .macOS(.v15) ], products: [ - .executable(name: "edge", targets: ["edge"]), - .executable(name: "edge-agent", targets: ["edge-agent"]), + .executable(name: "edge", targets: ["edge"]) ], dependencies: [ .package(url: "https://github.com/apple/swift-argument-parser.git", from: "1.5.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.6.3"), - .package(url: "https://github.com/grpc/grpc-swift.git", from: "2.0.0"), .package(url: "https://github.com/grpc/grpc-swift-nio-transport.git", from: "1.0.0"), - .package(url: "https://github.com/grpc/grpc-swift-protobuf.git", from: "1.0.0"), - .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.7.0"), - .package(url: "https://github.com/grpc/grpc-swift-extras.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"), .package(url: "https://github.com/apple/swift-crypto.git", from: "3.12.2"), + .package( + url: "https://github.com/apache-edge/edge-agent-common.git", + revision: "952035635c630d366dfbcff04af1934c7c051f23" + ), ], targets: [ /// The main executable provided by edge-cli. @@ -30,7 +29,7 @@ let package = Package( .product(name: "Logging", package: "swift-log"), .product(name: "_NIOFileSystem", package: "swift-nio"), .product(name: "GRPCNIOTransportHTTP2", package: "grpc-swift-nio-transport"), - .target(name: "EdgeAgentGRPC"), + .product(name: "EdgeAgentGRPC", package: "edge-agent-common"), .target(name: "EdgeCLI"), ], resources: [ @@ -38,29 +37,12 @@ let package = Package( ] ), - /// The EdgeAgent executable. It's currently here for development purposes, and will be - /// moved to a separate package in the future. - .executableTarget( - name: "edge-agent", - dependencies: [ - .product(name: "ArgumentParser", package: "swift-argument-parser"), - .product(name: "Logging", package: "swift-log"), - .product(name: "GRPCNIOTransportHTTP2", package: "grpc-swift-nio-transport"), - .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"), - .product(name: "GRPCServiceLifecycle", package: "grpc-swift-extras"), - .product(name: "GRPCHealthService", package: "grpc-swift-extras"), - .product(name: "_NIOFileSystem", package: "swift-nio"), - .target(name: "EdgeAgentGRPC"), - .target(name: "Shell"), - ] - ), - /// Contains everything EdgeCLI, except for the command line interface. .target( name: "EdgeCLI", dependencies: [ .target(name: "ContainerBuilder"), - .target(name: "Shell"), + .product(name: "Shell", package: "edge-agent-common"), .product(name: "Logging", package: "swift-log"), ] ), @@ -69,29 +51,9 @@ let package = Package( .target( name: "ContainerBuilder", dependencies: [ - .target(name: "Shell"), + .product(name: "Shell", package: "edge-agent-common"), .product(name: "Crypto", package: "swift-crypto"), ] ), - - /// Utility for executing shell commands. - .target( - name: "Shell", - dependencies: [ - .product(name: "Logging", package: "swift-log") - ] - ), - - /// Protobuf definitions for the EdgeAgent service. - .target( - name: "EdgeAgentGRPC", - dependencies: [ - .product(name: "GRPCCore", package: "grpc-swift"), - .product(name: "GRPCProtobuf", package: "grpc-swift-protobuf"), - ], - exclude: [ - "Proto/edge_agent.protoset" - ] - ), ] ) diff --git a/Proto/edge/agent/services/v1/edge_agent_v1_service.proto b/Proto/edge/agent/services/v1/edge_agent_v1_service.proto deleted file mode 100644 index f2494d8..0000000 --- a/Proto/edge/agent/services/v1/edge_agent_v1_service.proto +++ /dev/null @@ -1,55 +0,0 @@ -syntax = "proto3"; - -package edge.agent.services.v1; - -service EdgeAgentService { - // Upload a container image to the agent, and run it. - // The first message in the stream MUST be the header. - rpc RunContainer(stream RunContainerRequest) returns (stream RunContainerResponse); -} - -message RunContainerRequest { - oneof request_type { - /// The first message in the stream MUST be the header. - Header header = 1; - - /// A chunk of the container tarball. - Chunk chunk = 2; - - /// After uploading the container, control messages can be sent to the agent. - ControlCommand control = 3; - } - - message Header { - // Unique name for the container image - string image_name = 1; - } - - message Chunk { - // Binary chunk of the container tarball - bytes data = 1; - } - - message ControlCommand { - oneof command { - Run run = 1; - } - - message Run { - // Whether to run the container with a debugger - bool debug = 1; - } - } -} - -message RunContainerResponse { - oneof response_type { - Started started = 1; - } - - message Started { - // The port that the debugger is listening on. - // If this is 0, the container is not running with a debugger. - uint32 debug_port = 1; - } -} \ No newline at end of file diff --git a/Sources/EdgeAgentGRPC/EdgeAgentGRPC.swift b/Sources/EdgeAgentGRPC/EdgeAgentGRPC.swift deleted file mode 100644 index 84a06e4..0000000 --- a/Sources/EdgeAgentGRPC/EdgeAgentGRPC.swift +++ /dev/null @@ -1 +0,0 @@ -@_exported import GRPCCore diff --git a/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift b/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift deleted file mode 100644 index 7f19f41..0000000 --- a/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.grpc.swift +++ /dev/null @@ -1,325 +0,0 @@ -// DO NOT EDIT. -// swift-format-ignore-file -// -// Generated by the gRPC Swift generator plugin for the protocol buffer compiler. -// Source: edge/agent/services/v1/edge_agent_v1_service.proto -// -// For information on using the generated types, please see the documentation: -// https://github.com/grpc/grpc-swift - -import GRPCCore -import GRPCProtobuf - -// MARK: - edge.agent.services.v1.EdgeAgentService - -/// Namespace containing generated types for the "edge.agent.services.v1.EdgeAgentService" service. -public enum Edge_Agent_Services_V1_EdgeAgentService { - /// Service descriptor for the "edge.agent.services.v1.EdgeAgentService" service. - public static let descriptor = GRPCCore.ServiceDescriptor(fullyQualifiedService: "edge.agent.services.v1.EdgeAgentService") - /// Namespace for method metadata. - public enum Method { - /// Namespace for "RunContainer" metadata. - public enum RunContainer { - /// Request type for "RunContainer". - public typealias Input = Edge_Agent_Services_V1_RunContainerRequest - /// Response type for "RunContainer". - public typealias Output = Edge_Agent_Services_V1_RunContainerResponse - /// Descriptor for "RunContainer". - public static let descriptor = GRPCCore.MethodDescriptor( - service: GRPCCore.ServiceDescriptor(fullyQualifiedService: "edge.agent.services.v1.EdgeAgentService"), - method: "RunContainer" - ) - } - /// Descriptors for all methods in the "edge.agent.services.v1.EdgeAgentService" service. - public static let descriptors: [GRPCCore.MethodDescriptor] = [ - RunContainer.descriptor - ] - } -} - -extension GRPCCore.ServiceDescriptor { - /// Service descriptor for the "edge.agent.services.v1.EdgeAgentService" service. - public static let edge_agent_services_v1_EdgeAgentService = GRPCCore.ServiceDescriptor(fullyQualifiedService: "edge.agent.services.v1.EdgeAgentService") -} - -// MARK: edge.agent.services.v1.EdgeAgentService (server) - -extension Edge_Agent_Services_V1_EdgeAgentService { - /// Streaming variant of the service protocol for the "edge.agent.services.v1.EdgeAgentService" service. - /// - /// This protocol is the lowest-level of the service protocols generated for this service - /// giving you the most flexibility over the implementation of your service. This comes at - /// the cost of more verbose and less strict APIs. Each RPC requires you to implement it in - /// terms of a request stream and response stream. Where only a single request or response - /// message is expected, you are responsible for enforcing this invariant is maintained. - /// - /// Where possible, prefer using the stricter, less-verbose ``ServiceProtocol`` - /// or ``SimpleServiceProtocol`` instead. - public protocol StreamingServiceProtocol: GRPCCore.RegistrableRPCService { - /// Handle the "RunContainer" method. - /// - /// > Source IDL Documentation: - /// > - /// > Upload a container image to the agent, and run it. - /// > The first message in the stream MUST be the header. - /// - /// - Parameters: - /// - request: A streaming request of `Edge_Agent_Services_V1_RunContainerRequest` messages. - /// - context: Context providing information about the RPC. - /// - Throws: Any error which occurred during the processing of the request. Thrown errors - /// of type `RPCError` are mapped to appropriate statuses. All other errors are converted - /// to an internal error. - /// - Returns: A streaming response of `Edge_Agent_Services_V1_RunContainerResponse` messages. - func runContainer( - request: GRPCCore.StreamingServerRequest, - context: GRPCCore.ServerContext - ) async throws -> GRPCCore.StreamingServerResponse - } - - /// Service protocol for the "edge.agent.services.v1.EdgeAgentService" service. - /// - /// This protocol is higher level than ``StreamingServiceProtocol`` but lower level than - /// the ``SimpleServiceProtocol``, it provides access to request and response metadata and - /// trailing response metadata. If you don't need these then consider using - /// the ``SimpleServiceProtocol``. If you need fine grained control over your RPCs then - /// use ``StreamingServiceProtocol``. - public protocol ServiceProtocol: Edge_Agent_Services_V1_EdgeAgentService.StreamingServiceProtocol { - /// Handle the "RunContainer" method. - /// - /// > Source IDL Documentation: - /// > - /// > Upload a container image to the agent, and run it. - /// > The first message in the stream MUST be the header. - /// - /// - Parameters: - /// - request: A streaming request of `Edge_Agent_Services_V1_RunContainerRequest` messages. - /// - context: Context providing information about the RPC. - /// - Throws: Any error which occurred during the processing of the request. Thrown errors - /// of type `RPCError` are mapped to appropriate statuses. All other errors are converted - /// to an internal error. - /// - Returns: A streaming response of `Edge_Agent_Services_V1_RunContainerResponse` messages. - func runContainer( - request: GRPCCore.StreamingServerRequest, - context: GRPCCore.ServerContext - ) async throws -> GRPCCore.StreamingServerResponse - } - - /// Simple service protocol for the "edge.agent.services.v1.EdgeAgentService" service. - /// - /// This is the highest level protocol for the service. The API is the easiest to use but - /// doesn't provide access to request or response metadata. If you need access to these - /// then use ``ServiceProtocol`` instead. - public protocol SimpleServiceProtocol: Edge_Agent_Services_V1_EdgeAgentService.ServiceProtocol { - /// Handle the "RunContainer" method. - /// - /// > Source IDL Documentation: - /// > - /// > Upload a container image to the agent, and run it. - /// > The first message in the stream MUST be the header. - /// - /// - Parameters: - /// - request: A stream of `Edge_Agent_Services_V1_RunContainerRequest` messages. - /// - response: A response stream of `Edge_Agent_Services_V1_RunContainerResponse` messages. - /// - context: Context providing information about the RPC. - /// - Throws: Any error which occurred during the processing of the request. Thrown errors - /// of type `RPCError` are mapped to appropriate statuses. All other errors are converted - /// to an internal error. - func runContainer( - request: GRPCCore.RPCAsyncSequence, - response: GRPCCore.RPCWriter, - context: GRPCCore.ServerContext - ) async throws - } -} - -// Default implementation of 'registerMethods(with:)'. -extension Edge_Agent_Services_V1_EdgeAgentService.StreamingServiceProtocol { - public func registerMethods(with router: inout GRPCCore.RPCRouter) where Transport: GRPCCore.ServerTransport { - router.registerHandler( - forMethod: Edge_Agent_Services_V1_EdgeAgentService.Method.RunContainer.descriptor, - deserializer: GRPCProtobuf.ProtobufDeserializer(), - serializer: GRPCProtobuf.ProtobufSerializer(), - handler: { request, context in - try await self.runContainer( - request: request, - context: context - ) - } - ) - } -} - -// Default implementation of streaming methods from 'StreamingServiceProtocol'. -extension Edge_Agent_Services_V1_EdgeAgentService.ServiceProtocol { -} - -// Default implementation of methods from 'ServiceProtocol'. -extension Edge_Agent_Services_V1_EdgeAgentService.SimpleServiceProtocol { - public func runContainer( - request: GRPCCore.StreamingServerRequest, - context: GRPCCore.ServerContext - ) async throws -> GRPCCore.StreamingServerResponse { - return GRPCCore.StreamingServerResponse( - metadata: [:], - producer: { writer in - try await self.runContainer( - request: request.messages, - response: writer, - context: context - ) - return [:] - } - ) - } -} - -// MARK: edge.agent.services.v1.EdgeAgentService (client) - -extension Edge_Agent_Services_V1_EdgeAgentService { - /// Generated client protocol for the "edge.agent.services.v1.EdgeAgentService" service. - /// - /// You don't need to implement this protocol directly, use the generated - /// implementation, ``Client``. - public protocol ClientProtocol: Sendable { - /// Call the "RunContainer" method. - /// - /// > Source IDL Documentation: - /// > - /// > Upload a container image to the agent, and run it. - /// > The first message in the stream MUST be the header. - /// - /// - Parameters: - /// - request: A streaming request producing `Edge_Agent_Services_V1_RunContainerRequest` messages. - /// - serializer: A serializer for `Edge_Agent_Services_V1_RunContainerRequest` messages. - /// - deserializer: A deserializer for `Edge_Agent_Services_V1_RunContainerResponse` messages. - /// - options: Options to apply to this RPC. - /// - handleResponse: A closure which handles the response, the result of which is - /// returned to the caller. Returning from the closure will cancel the RPC if it - /// hasn't already finished. - /// - Returns: The result of `handleResponse`. - func runContainer( - request: GRPCCore.StreamingClientRequest, - serializer: some GRPCCore.MessageSerializer, - deserializer: some GRPCCore.MessageDeserializer, - options: GRPCCore.CallOptions, - onResponse handleResponse: @Sendable @escaping (GRPCCore.StreamingClientResponse) async throws -> Result - ) async throws -> Result where Result: Sendable - } - - /// Generated client for the "edge.agent.services.v1.EdgeAgentService" service. - /// - /// The ``Client`` provides an implementation of ``ClientProtocol`` which wraps - /// a `GRPCCore.GRPCCClient`. The underlying `GRPCClient` provides the long-lived - /// means of communication with the remote peer. - public struct Client: ClientProtocol where Transport: GRPCCore.ClientTransport { - private let client: GRPCCore.GRPCClient - - /// Creates a new client wrapping the provided `GRPCCore.GRPCClient`. - /// - /// - Parameters: - /// - client: A `GRPCCore.GRPCClient` providing a communication channel to the service. - public init(wrapping client: GRPCCore.GRPCClient) { - self.client = client - } - - /// Call the "RunContainer" method. - /// - /// > Source IDL Documentation: - /// > - /// > Upload a container image to the agent, and run it. - /// > The first message in the stream MUST be the header. - /// - /// - Parameters: - /// - request: A streaming request producing `Edge_Agent_Services_V1_RunContainerRequest` messages. - /// - serializer: A serializer for `Edge_Agent_Services_V1_RunContainerRequest` messages. - /// - deserializer: A deserializer for `Edge_Agent_Services_V1_RunContainerResponse` messages. - /// - options: Options to apply to this RPC. - /// - handleResponse: A closure which handles the response, the result of which is - /// returned to the caller. Returning from the closure will cancel the RPC if it - /// hasn't already finished. - /// - Returns: The result of `handleResponse`. - public func runContainer( - request: GRPCCore.StreamingClientRequest, - serializer: some GRPCCore.MessageSerializer, - deserializer: some GRPCCore.MessageDeserializer, - options: GRPCCore.CallOptions = .defaults, - onResponse handleResponse: @Sendable @escaping (GRPCCore.StreamingClientResponse) async throws -> Result - ) async throws -> Result where Result: Sendable { - try await self.client.bidirectionalStreaming( - request: request, - descriptor: Edge_Agent_Services_V1_EdgeAgentService.Method.RunContainer.descriptor, - serializer: serializer, - deserializer: deserializer, - options: options, - onResponse: handleResponse - ) - } - } -} - -// Helpers providing default arguments to 'ClientProtocol' methods. -extension Edge_Agent_Services_V1_EdgeAgentService.ClientProtocol { - /// Call the "RunContainer" method. - /// - /// > Source IDL Documentation: - /// > - /// > Upload a container image to the agent, and run it. - /// > The first message in the stream MUST be the header. - /// - /// - Parameters: - /// - request: A streaming request producing `Edge_Agent_Services_V1_RunContainerRequest` messages. - /// - options: Options to apply to this RPC. - /// - handleResponse: A closure which handles the response, the result of which is - /// returned to the caller. Returning from the closure will cancel the RPC if it - /// hasn't already finished. - /// - Returns: The result of `handleResponse`. - public func runContainer( - request: GRPCCore.StreamingClientRequest, - options: GRPCCore.CallOptions = .defaults, - onResponse handleResponse: @Sendable @escaping (GRPCCore.StreamingClientResponse) async throws -> Result - ) async throws -> Result where Result: Sendable { - try await self.runContainer( - request: request, - serializer: GRPCProtobuf.ProtobufSerializer(), - deserializer: GRPCProtobuf.ProtobufDeserializer(), - options: options, - onResponse: handleResponse - ) - } -} - -// Helpers providing sugared APIs for 'ClientProtocol' methods. -extension Edge_Agent_Services_V1_EdgeAgentService.ClientProtocol { - /// Call the "RunContainer" method. - /// - /// > Source IDL Documentation: - /// > - /// > Upload a container image to the agent, and run it. - /// > The first message in the stream MUST be the header. - /// - /// - Parameters: - /// - metadata: Additional metadata to send, defaults to empty. - /// - options: Options to apply to this RPC, defaults to `.defaults`. - /// - producer: A closure producing request messages to send to the server. The request - /// stream is closed when the closure returns. - /// - handleResponse: A closure which handles the response, the result of which is - /// returned to the caller. Returning from the closure will cancel the RPC if it - /// hasn't already finished. - /// - Returns: The result of `handleResponse`. - public func runContainer( - metadata: GRPCCore.Metadata = [:], - options: GRPCCore.CallOptions = .defaults, - requestProducer producer: @Sendable @escaping (GRPCCore.RPCWriter) async throws -> Void, - onResponse handleResponse: @Sendable @escaping (GRPCCore.StreamingClientResponse) async throws -> Result - ) async throws -> Result where Result: Sendable { - let request = GRPCCore.StreamingClientRequest( - metadata: metadata, - producer: producer - ) - return try await self.runContainer( - request: request, - options: options, - onResponse: handleResponse - ) - } -} \ No newline at end of file diff --git a/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift b/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift deleted file mode 100644 index 08ec686..0000000 --- a/Sources/EdgeAgentGRPC/Proto/edge/agent/services/v1/edge_agent_v1_service.pb.swift +++ /dev/null @@ -1,490 +0,0 @@ -// DO NOT EDIT. -// swift-format-ignore-file -// swiftlint:disable all -// -// Generated by the Swift generator plugin for the protocol buffer compiler. -// Source: edge/agent/services/v1/edge_agent_v1_service.proto -// -// For information on using the generated types, please see the documentation: -// https://github.com/apple/swift-protobuf/ - -import Foundation -import SwiftProtobuf - -// If the compiler emits an error on this type, it is because this file -// was generated by a version of the `protoc` Swift plug-in that is -// incompatible with the version of SwiftProtobuf to which you are linking. -// Please ensure that you are building against the same version of the API -// that was used to generate this file. -fileprivate struct _GeneratedWithProtocGenSwiftVersion: SwiftProtobuf.ProtobufAPIVersionCheck { - struct _2: SwiftProtobuf.ProtobufAPIVersion_2 {} - typealias Version = _2 -} - -public struct Edge_Agent_Services_V1_RunContainerRequest: Sendable { - // SwiftProtobuf.Message conformance is added in an extension below. See the - // `Message` and `Message+*Additions` files in the SwiftProtobuf library for - // methods supported on all messages. - - public var requestType: Edge_Agent_Services_V1_RunContainerRequest.OneOf_RequestType? = nil - - //// The first message in the stream MUST be the header. - public var header: Edge_Agent_Services_V1_RunContainerRequest.Header { - get { - if case .header(let v)? = requestType {return v} - return Edge_Agent_Services_V1_RunContainerRequest.Header() - } - set {requestType = .header(newValue)} - } - - //// A chunk of the container tarball. - public var chunk: Edge_Agent_Services_V1_RunContainerRequest.Chunk { - get { - if case .chunk(let v)? = requestType {return v} - return Edge_Agent_Services_V1_RunContainerRequest.Chunk() - } - set {requestType = .chunk(newValue)} - } - - //// After uploading the container, control messages can be sent to the agent. - public var control: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand { - get { - if case .control(let v)? = requestType {return v} - return Edge_Agent_Services_V1_RunContainerRequest.ControlCommand() - } - set {requestType = .control(newValue)} - } - - public var unknownFields = SwiftProtobuf.UnknownStorage() - - public enum OneOf_RequestType: Equatable, Sendable { - //// The first message in the stream MUST be the header. - case header(Edge_Agent_Services_V1_RunContainerRequest.Header) - //// A chunk of the container tarball. - case chunk(Edge_Agent_Services_V1_RunContainerRequest.Chunk) - //// After uploading the container, control messages can be sent to the agent. - case control(Edge_Agent_Services_V1_RunContainerRequest.ControlCommand) - - } - - public struct Header: Sendable { - // SwiftProtobuf.Message conformance is added in an extension below. See the - // `Message` and `Message+*Additions` files in the SwiftProtobuf library for - // methods supported on all messages. - - /// Unique name for the container image - public var imageName: String = String() - - public var unknownFields = SwiftProtobuf.UnknownStorage() - - public init() {} - } - - public struct Chunk: @unchecked Sendable { - // SwiftProtobuf.Message conformance is added in an extension below. See the - // `Message` and `Message+*Additions` files in the SwiftProtobuf library for - // methods supported on all messages. - - /// Binary chunk of the container tarball - public var data: Data = Data() - - public var unknownFields = SwiftProtobuf.UnknownStorage() - - public init() {} - } - - public struct ControlCommand: Sendable { - // SwiftProtobuf.Message conformance is added in an extension below. See the - // `Message` and `Message+*Additions` files in the SwiftProtobuf library for - // methods supported on all messages. - - public var command: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.OneOf_Command? = nil - - public var run: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run { - get { - if case .run(let v)? = command {return v} - return Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run() - } - set {command = .run(newValue)} - } - - public var unknownFields = SwiftProtobuf.UnknownStorage() - - public enum OneOf_Command: Equatable, Sendable { - case run(Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run) - - } - - public struct Run: Sendable { - // SwiftProtobuf.Message conformance is added in an extension below. See the - // `Message` and `Message+*Additions` files in the SwiftProtobuf library for - // methods supported on all messages. - - /// Whether to run the container with a debugger - public var debug: Bool = false - - public var unknownFields = SwiftProtobuf.UnknownStorage() - - public init() {} - } - - public init() {} - } - - public init() {} -} - -public struct Edge_Agent_Services_V1_RunContainerResponse: Sendable { - // SwiftProtobuf.Message conformance is added in an extension below. See the - // `Message` and `Message+*Additions` files in the SwiftProtobuf library for - // methods supported on all messages. - - public var responseType: Edge_Agent_Services_V1_RunContainerResponse.OneOf_ResponseType? = nil - - public var started: Edge_Agent_Services_V1_RunContainerResponse.Started { - get { - if case .started(let v)? = responseType {return v} - return Edge_Agent_Services_V1_RunContainerResponse.Started() - } - set {responseType = .started(newValue)} - } - - public var unknownFields = SwiftProtobuf.UnknownStorage() - - public enum OneOf_ResponseType: Equatable, Sendable { - case started(Edge_Agent_Services_V1_RunContainerResponse.Started) - - } - - public struct Started: Sendable { - // SwiftProtobuf.Message conformance is added in an extension below. See the - // `Message` and `Message+*Additions` files in the SwiftProtobuf library for - // methods supported on all messages. - - /// The port that the debugger is listening on. - /// If this is 0, the container is not running with a debugger. - public var debugPort: UInt32 = 0 - - public var unknownFields = SwiftProtobuf.UnknownStorage() - - public init() {} - } - - public init() {} -} - -// MARK: - Code below here is support for the SwiftProtobuf runtime. - -fileprivate let _protobuf_package = "edge.agent.services.v1" - -extension Edge_Agent_Services_V1_RunContainerRequest: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { - public static let protoMessageName: String = _protobuf_package + ".RunContainerRequest" - public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ - 1: .same(proto: "header"), - 2: .same(proto: "chunk"), - 3: .same(proto: "control"), - ] - - public mutating func decodeMessage(decoder: inout D) throws { - while let fieldNumber = try decoder.nextFieldNumber() { - // The use of inline closures is to circumvent an issue where the compiler - // allocates stack space for every case branch when no optimizations are - // enabled. https://github.com/apple/swift-protobuf/issues/1034 - switch fieldNumber { - case 1: try { - var v: Edge_Agent_Services_V1_RunContainerRequest.Header? - var hadOneofValue = false - if let current = self.requestType { - hadOneofValue = true - if case .header(let m) = current {v = m} - } - try decoder.decodeSingularMessageField(value: &v) - if let v = v { - if hadOneofValue {try decoder.handleConflictingOneOf()} - self.requestType = .header(v) - } - }() - case 2: try { - var v: Edge_Agent_Services_V1_RunContainerRequest.Chunk? - var hadOneofValue = false - if let current = self.requestType { - hadOneofValue = true - if case .chunk(let m) = current {v = m} - } - try decoder.decodeSingularMessageField(value: &v) - if let v = v { - if hadOneofValue {try decoder.handleConflictingOneOf()} - self.requestType = .chunk(v) - } - }() - case 3: try { - var v: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand? - var hadOneofValue = false - if let current = self.requestType { - hadOneofValue = true - if case .control(let m) = current {v = m} - } - try decoder.decodeSingularMessageField(value: &v) - if let v = v { - if hadOneofValue {try decoder.handleConflictingOneOf()} - self.requestType = .control(v) - } - }() - default: break - } - } - } - - public func traverse(visitor: inout V) throws { - // The use of inline closures is to circumvent an issue where the compiler - // allocates stack space for every if/case branch local when no optimizations - // are enabled. https://github.com/apple/swift-protobuf/issues/1034 and - // https://github.com/apple/swift-protobuf/issues/1182 - switch self.requestType { - case .header?: try { - guard case .header(let v)? = self.requestType else { preconditionFailure() } - try visitor.visitSingularMessageField(value: v, fieldNumber: 1) - }() - case .chunk?: try { - guard case .chunk(let v)? = self.requestType else { preconditionFailure() } - try visitor.visitSingularMessageField(value: v, fieldNumber: 2) - }() - case .control?: try { - guard case .control(let v)? = self.requestType else { preconditionFailure() } - try visitor.visitSingularMessageField(value: v, fieldNumber: 3) - }() - case nil: break - } - try unknownFields.traverse(visitor: &visitor) - } - - public static func ==(lhs: Edge_Agent_Services_V1_RunContainerRequest, rhs: Edge_Agent_Services_V1_RunContainerRequest) -> Bool { - if lhs.requestType != rhs.requestType {return false} - if lhs.unknownFields != rhs.unknownFields {return false} - return true - } -} - -extension Edge_Agent_Services_V1_RunContainerRequest.Header: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { - public static let protoMessageName: String = Edge_Agent_Services_V1_RunContainerRequest.protoMessageName + ".Header" - public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ - 1: .standard(proto: "image_name"), - ] - - public mutating func decodeMessage(decoder: inout D) throws { - while let fieldNumber = try decoder.nextFieldNumber() { - // The use of inline closures is to circumvent an issue where the compiler - // allocates stack space for every case branch when no optimizations are - // enabled. https://github.com/apple/swift-protobuf/issues/1034 - switch fieldNumber { - case 1: try { try decoder.decodeSingularStringField(value: &self.imageName) }() - default: break - } - } - } - - public func traverse(visitor: inout V) throws { - if !self.imageName.isEmpty { - try visitor.visitSingularStringField(value: self.imageName, fieldNumber: 1) - } - try unknownFields.traverse(visitor: &visitor) - } - - public static func ==(lhs: Edge_Agent_Services_V1_RunContainerRequest.Header, rhs: Edge_Agent_Services_V1_RunContainerRequest.Header) -> Bool { - if lhs.imageName != rhs.imageName {return false} - if lhs.unknownFields != rhs.unknownFields {return false} - return true - } -} - -extension Edge_Agent_Services_V1_RunContainerRequest.Chunk: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { - public static let protoMessageName: String = Edge_Agent_Services_V1_RunContainerRequest.protoMessageName + ".Chunk" - public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ - 1: .same(proto: "data"), - ] - - public mutating func decodeMessage(decoder: inout D) throws { - while let fieldNumber = try decoder.nextFieldNumber() { - // The use of inline closures is to circumvent an issue where the compiler - // allocates stack space for every case branch when no optimizations are - // enabled. https://github.com/apple/swift-protobuf/issues/1034 - switch fieldNumber { - case 1: try { try decoder.decodeSingularBytesField(value: &self.data) }() - default: break - } - } - } - - public func traverse(visitor: inout V) throws { - if !self.data.isEmpty { - try visitor.visitSingularBytesField(value: self.data, fieldNumber: 1) - } - try unknownFields.traverse(visitor: &visitor) - } - - public static func ==(lhs: Edge_Agent_Services_V1_RunContainerRequest.Chunk, rhs: Edge_Agent_Services_V1_RunContainerRequest.Chunk) -> Bool { - if lhs.data != rhs.data {return false} - if lhs.unknownFields != rhs.unknownFields {return false} - return true - } -} - -extension Edge_Agent_Services_V1_RunContainerRequest.ControlCommand: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { - public static let protoMessageName: String = Edge_Agent_Services_V1_RunContainerRequest.protoMessageName + ".ControlCommand" - public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ - 1: .same(proto: "run"), - ] - - public mutating func decodeMessage(decoder: inout D) throws { - while let fieldNumber = try decoder.nextFieldNumber() { - // The use of inline closures is to circumvent an issue where the compiler - // allocates stack space for every case branch when no optimizations are - // enabled. https://github.com/apple/swift-protobuf/issues/1034 - switch fieldNumber { - case 1: try { - var v: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run? - var hadOneofValue = false - if let current = self.command { - hadOneofValue = true - if case .run(let m) = current {v = m} - } - try decoder.decodeSingularMessageField(value: &v) - if let v = v { - if hadOneofValue {try decoder.handleConflictingOneOf()} - self.command = .run(v) - } - }() - default: break - } - } - } - - public func traverse(visitor: inout V) throws { - // The use of inline closures is to circumvent an issue where the compiler - // allocates stack space for every if/case branch local when no optimizations - // are enabled. https://github.com/apple/swift-protobuf/issues/1034 and - // https://github.com/apple/swift-protobuf/issues/1182 - try { if case .run(let v)? = self.command { - try visitor.visitSingularMessageField(value: v, fieldNumber: 1) - } }() - try unknownFields.traverse(visitor: &visitor) - } - - public static func ==(lhs: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand, rhs: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand) -> Bool { - if lhs.command != rhs.command {return false} - if lhs.unknownFields != rhs.unknownFields {return false} - return true - } -} - -extension Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { - public static let protoMessageName: String = Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.protoMessageName + ".Run" - public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ - 1: .same(proto: "debug"), - ] - - public mutating func decodeMessage(decoder: inout D) throws { - while let fieldNumber = try decoder.nextFieldNumber() { - // The use of inline closures is to circumvent an issue where the compiler - // allocates stack space for every case branch when no optimizations are - // enabled. https://github.com/apple/swift-protobuf/issues/1034 - switch fieldNumber { - case 1: try { try decoder.decodeSingularBoolField(value: &self.debug) }() - default: break - } - } - } - - public func traverse(visitor: inout V) throws { - if self.debug != false { - try visitor.visitSingularBoolField(value: self.debug, fieldNumber: 1) - } - try unknownFields.traverse(visitor: &visitor) - } - - public static func ==(lhs: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run, rhs: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand.Run) -> Bool { - if lhs.debug != rhs.debug {return false} - if lhs.unknownFields != rhs.unknownFields {return false} - return true - } -} - -extension Edge_Agent_Services_V1_RunContainerResponse: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { - public static let protoMessageName: String = _protobuf_package + ".RunContainerResponse" - public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ - 1: .same(proto: "started"), - ] - - public mutating func decodeMessage(decoder: inout D) throws { - while let fieldNumber = try decoder.nextFieldNumber() { - // The use of inline closures is to circumvent an issue where the compiler - // allocates stack space for every case branch when no optimizations are - // enabled. https://github.com/apple/swift-protobuf/issues/1034 - switch fieldNumber { - case 1: try { - var v: Edge_Agent_Services_V1_RunContainerResponse.Started? - var hadOneofValue = false - if let current = self.responseType { - hadOneofValue = true - if case .started(let m) = current {v = m} - } - try decoder.decodeSingularMessageField(value: &v) - if let v = v { - if hadOneofValue {try decoder.handleConflictingOneOf()} - self.responseType = .started(v) - } - }() - default: break - } - } - } - - public func traverse(visitor: inout V) throws { - // The use of inline closures is to circumvent an issue where the compiler - // allocates stack space for every if/case branch local when no optimizations - // are enabled. https://github.com/apple/swift-protobuf/issues/1034 and - // https://github.com/apple/swift-protobuf/issues/1182 - try { if case .started(let v)? = self.responseType { - try visitor.visitSingularMessageField(value: v, fieldNumber: 1) - } }() - try unknownFields.traverse(visitor: &visitor) - } - - public static func ==(lhs: Edge_Agent_Services_V1_RunContainerResponse, rhs: Edge_Agent_Services_V1_RunContainerResponse) -> Bool { - if lhs.responseType != rhs.responseType {return false} - if lhs.unknownFields != rhs.unknownFields {return false} - return true - } -} - -extension Edge_Agent_Services_V1_RunContainerResponse.Started: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding { - public static let protoMessageName: String = Edge_Agent_Services_V1_RunContainerResponse.protoMessageName + ".Started" - public static let _protobuf_nameMap: SwiftProtobuf._NameMap = [ - 1: .standard(proto: "debug_port"), - ] - - public mutating func decodeMessage(decoder: inout D) throws { - while let fieldNumber = try decoder.nextFieldNumber() { - // The use of inline closures is to circumvent an issue where the compiler - // allocates stack space for every case branch when no optimizations are - // enabled. https://github.com/apple/swift-protobuf/issues/1034 - switch fieldNumber { - case 1: try { try decoder.decodeSingularUInt32Field(value: &self.debugPort) }() - default: break - } - } - } - - public func traverse(visitor: inout V) throws { - if self.debugPort != 0 { - try visitor.visitSingularUInt32Field(value: self.debugPort, fieldNumber: 1) - } - try unknownFields.traverse(visitor: &visitor) - } - - public static func ==(lhs: Edge_Agent_Services_V1_RunContainerResponse.Started, rhs: Edge_Agent_Services_V1_RunContainerResponse.Started) -> Bool { - if lhs.debugPort != rhs.debugPort {return false} - if lhs.unknownFields != rhs.unknownFields {return false} - return true - } -} diff --git a/Sources/EdgeAgentGRPC/Proto/edge_agent.protoset b/Sources/EdgeAgentGRPC/Proto/edge_agent.protoset deleted file mode 100644 index 6342e51c3e09817ee93b1ce38376bc5326af1d85..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 879 zcmbW0K}*9h7>2ENoBiCT#37?1$Sw|))>cm*1UG~UBBKMZrCY-`v}sM#fk*HD3xA&e z2$L@z?6AZ1*1XC4^v&C^@KXh!##8D>Q_6&!Q@)r_XzngNSIHsD;lc~c9Vg?dNXe1f zba2yIdYpyV`8MG&C|EybBAPSG1Nv6bT#!eo&S(@<-ZoB+?w;>E>jBOhvok>FU@IsY z_nNMA*%rVA|SGlO$rXq%L9h8B}#_asn_< zWK$uF5{lQi0t=NVDcHwQ!AOL_P8^8{Pt*f^wm%JMsxD=`@*N}qlNYS4@%H~=Rns!C zg2@b~T#*)3VmdCS2;U9#qGzdw@P54})Vcnkp?J?|=gWYWG_HK@JWE+l$q;I}h`69} z8T5LypxW+?v{&_6V(c!|N>vq#F~cn70{M-=LS-n0-aa%q)gVmie;9CZFUR#xP54L; eI7y(snw*?(77(Y|zS!)r4$(C(jd2x!o2y?;jv)pB diff --git a/Sources/Shell/Shell.swift b/Sources/Shell/Shell.swift deleted file mode 100644 index 32741df..0000000 --- a/Sources/Shell/Shell.swift +++ /dev/null @@ -1,111 +0,0 @@ -import Foundation -import Logging - -/// Utility for executing shell commands. -public enum Shell { - static let logger = Logger(label: "apache-edge.shell") - - /// Error thrown when a process execution fails. - public enum Error: Swift.Error, LocalizedError { - case nonZeroExit(command: [String], exitCode: Int32) - - public var errorDescription: String? { - switch self { - case .nonZeroExit(let command, let exitCode): - return - "Command '\(command.joined(separator: " "))' failed with exit code \(exitCode)" - } - } - } - - /// Run a CLI command. - /// - /// This method executes a command in a subprocess. If the command is not successful - /// (indicated by a non-zero exit code), an error is thrown. - /// - /// - Parameter arguments: An array of command-line arguments to execute. - /// - Returns: A string containing the command's standard output and standard error. - /// - Throws: An error if the command execution fails - @discardableResult public static func run(_ arguments: [String]) async throws -> String { - logger.info("Running command", metadata: ["command": .array(arguments.map { .string($0) })]) - - let process = Process() - - // Create pipes for stdout and stderr to both capture and display output - let stdoutPipe = Pipe() - let stderrPipe = Pipe() - let stdoutCapture = Pipe() - let stderrCapture = Pipe() - - process.executableURL = URL(fileURLWithPath: "/usr/bin/env") - process.arguments = arguments - - process.standardOutput = stdoutPipe - process.standardError = stderrPipe - process.environment = [ - // TODO: Don't hardcode the path to the Swift toolchain – manage our own toolchain instead. - "PATH": - "/Library/Developer/Toolchains/swift-6.0.3-RELEASE.xctoolchain/usr/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin", - "TOOLCHAINS": "org.swift.603202412101a", - ] - - stdoutPipe.fileHandleForReading.readabilityHandler = { fileHandle in - let data = fileHandle.availableData - if !data.isEmpty { - FileHandle.standardOutput.write(data) - stdoutCapture.fileHandleForWriting.write(data) - } - } - - stderrPipe.fileHandleForReading.readabilityHandler = { fileHandle in - let data = fileHandle.availableData - if !data.isEmpty { - FileHandle.standardError.write(data) - stderrCapture.fileHandleForWriting.write(data) - } - } - - try process.run() - - return try await withTaskCancellationHandler { - try await withCheckedThrowingContinuation { continuation in - process.terminationHandler = { proc in - // Clean up handlers - stdoutPipe.fileHandleForReading.readabilityHandler = nil - stderrPipe.fileHandleForReading.readabilityHandler = nil - - // Close write handles to ensure we can read all data - stdoutCapture.fileHandleForWriting.closeFile() - stderrCapture.fileHandleForWriting.closeFile() - - if process.terminationStatus == 0 { - // Read captured output - let stdoutData = stdoutCapture.fileHandleForReading.readDataToEndOfFile() - let stderrData = stderrCapture.fileHandleForReading.readDataToEndOfFile() - - // Combine stdout and stderr - let combinedData = stdoutData + stderrData - let output = String(data: combinedData, encoding: .utf8) ?? "" - continuation.resume(returning: output) - } else { - continuation.resume( - throwing: Error.nonZeroExit( - command: arguments, - exitCode: process.terminationStatus - ) - ) - } - } - } - } onCancel: { - // Kill the process when the task is cancelled - logger.trace( - "Task cancelled, terminating process", - metadata: [ - "command": .array(arguments.map { .string($0) }) - ] - ) - process.terminate() - } - } -} diff --git a/Sources/edge-agent/DockerCLI.swift b/Sources/edge-agent/DockerCLI.swift deleted file mode 100644 index 661ceea..0000000 --- a/Sources/edge-agent/DockerCLI.swift +++ /dev/null @@ -1,67 +0,0 @@ -import Shell - -/// Represents the Docker CLI interface for managing container images and running containers. -public struct DockerCLI: Sendable { - public let command: String - - public init(command: String = "docker") { - self.command = command - } - - /// Options for the Docker run command. - public enum RunOption: Sendable { - /// Remove the container when it exits. - case rm - - /// Keep STDIN open even if not attached. - case interactive - - /// Allocate a pseudo-TTY. - case tty - - /// Publish a container's port to the host. - case publishPort(hostPort: UInt16, containerPort: UInt16) - - /// Add Linux capabilities. - case capAdd(String) - - /// Security options. - case securityOpt(String) - - /// The arguments to pass to the Docker run command. - var arguments: [String] { - switch self { - case .rm: - return ["--rm"] - case .interactive: - return ["-i"] - case .tty: - return ["-t"] - case .publishPort(let hostPort, let containerPort): - return ["-p", "\(hostPort):\(containerPort)"] - case .capAdd(let capability): - return ["--cap-add=\(capability)"] - case .securityOpt(let option): - return ["--security-opt", option] - } - } - } - - /// Load a Docker image from a tar archive. - @discardableResult - public func load(filePath: String) async throws -> String { - let arguments = [command, "load", "-i", filePath] - return try await Shell.run(arguments) - } - - /// Run a Docker container. - @discardableResult - public func run( - options: [RunOption] = [], - image: String, - command: [String] = [] - ) async throws -> String { - let arguments = [self.command, "run"] + options.flatMap(\.arguments) + [image] + command - return try await Shell.run(arguments) - } -} diff --git a/Sources/edge-agent/EdgeAgent.swift b/Sources/edge-agent/EdgeAgent.swift deleted file mode 100644 index c90b97d..0000000 --- a/Sources/edge-agent/EdgeAgent.swift +++ /dev/null @@ -1,61 +0,0 @@ -import ArgumentParser -import EdgeAgentGRPC -import Foundation -import GRPCHealthService -import GRPCNIOTransportHTTP2 -import GRPCServiceLifecycle -import Logging -import ServiceLifecycle - -@main -struct EdgeAgent: AsyncParsableCommand { - static let configuration = CommandConfiguration( - commandName: "edge-agent", - abstract: "Edge Agent" - ) - - @Option(name: .shortAndLong, help: "The port to listen on for incoming connections.") - var port: Int = 50051 - - func run() async throws { - LoggingSystem.bootstrap { label in - var handler = StreamLogHandler.standardError(label: label) - #if DEBUG - handler.logLevel = .trace - #endif - return handler - } - - let logger = Logger(label: "apache-edge.agent") - - logger.info("Starting Edge Agent on port \(port)") - - let healthService = HealthService() - healthService.provider.updateStatus( - .serving, - forService: Edge_Agent_Services_V1_EdgeAgentService.descriptor - ) - - let services: [any RegistrableRPCService] = [ - healthService, - EdgeAgentService(), - ] - - let grpcServer = GRPCServer( - transport: .http2NIOPosix( - address: .ipv6(host: "::", port: port), - transportSecurity: .plaintext - ), - services: services - ) - - let group = ServiceGroup( - services: [ - grpcServer - ], - logger: logger - ) - - try await group.run() - } -} diff --git a/Sources/edge-agent/Services/EdgeAgentService.swift b/Sources/edge-agent/Services/EdgeAgentService.swift deleted file mode 100644 index aade6a1..0000000 --- a/Sources/edge-agent/Services/EdgeAgentService.swift +++ /dev/null @@ -1,49 +0,0 @@ -import EdgeAgentGRPC - -struct EdgeAgentService: Edge_Agent_Services_V1_EdgeAgentService.ServiceProtocol { - func runContainer( - request: StreamingServerRequest, - context: ServerContext - ) async throws -> StreamingServerResponse { - return StreamingServerResponse { - ( - writer: RPCWriter - ) async throws -> Metadata in - try await withThrowingDiscardingTaskGroup { group in - var handler = RunContainerRequestHandler() - - // Add a task to write outgoing events to the response. - group.addTask { [events = handler.events] in - for try await event in events { - try await writer.write(event.proto) - } - } - - // Iterate over incoming messages, converting each from protobuf before passing it - // to the request handler. - for try await message in request.messages { - switch message.requestType { - case .header(let header): - let header = try RunContainerRequestHandler.Header(validating: header) - try await handler.handle(header) - case .chunk(let chunk): - let chunk = try RunContainerRequestHandler.Chunk(validating: chunk) - try await handler.handle(chunk) - case .control(let control): - let control = try RunContainerRequestHandler.ControlCommand( - validating: control - ) - try await handler.handle(control) - case nil: - throw RPCError( - code: .invalidArgument, - message: "Invalid request: Unknown message type" - ) - } - } - } - - return Metadata() - } - } -} diff --git a/Sources/edge-agent/Services/RunContainerRequestHandler+Proto.swift b/Sources/edge-agent/Services/RunContainerRequestHandler+Proto.swift deleted file mode 100644 index 38b5650..0000000 --- a/Sources/edge-agent/Services/RunContainerRequestHandler+Proto.swift +++ /dev/null @@ -1,48 +0,0 @@ -import EdgeAgentGRPC - -extension RunContainerRequestHandler.Header { - /// Initialize a header from a protobuf header, validating the contents. - init(validating proto: Edge_Agent_Services_V1_RunContainerRequest.Header) throws { - guard !proto.imageName.isEmpty else { - throw RPCError(code: .invalidArgument, message: "Image name cannot be empty") - } - - self.imageName = proto.imageName - } -} - -extension RunContainerRequestHandler.Chunk { - init(validating proto: Edge_Agent_Services_V1_RunContainerRequest.Chunk) throws { - guard !proto.data.isEmpty else { - throw RPCError(code: .invalidArgument, message: "Chunk data cannot be empty") - } - - self.data = proto.data - } -} - -extension RunContainerRequestHandler.ControlCommand { - init(validating proto: Edge_Agent_Services_V1_RunContainerRequest.ControlCommand) throws { - switch proto.command { - case .run(let run): - self = .run(Run(debug: run.debug)) - case nil: - throw RPCError(code: .invalidArgument, message: "Control command cannot be unspecified") - } - } -} - -extension RunContainerRequestHandler.Event { - var proto: Edge_Agent_Services_V1_RunContainerResponse { - .with { - switch self { - case .containerStarted(let containerStarted): - $0.responseType = .started( - .with { - $0.debugPort = containerStarted.debugPort - } - ) - } - } - } -} diff --git a/Sources/edge-agent/Services/RunContainerRequestHandler.swift b/Sources/edge-agent/Services/RunContainerRequestHandler.swift deleted file mode 100644 index 1c069f6..0000000 --- a/Sources/edge-agent/Services/RunContainerRequestHandler.swift +++ /dev/null @@ -1,205 +0,0 @@ -import Foundation -import Logging -import NIOFileSystem -import Shell - -/// A state machine that handles the request to run a container. -struct RunContainerRequestHandler { - enum State { - /// This is the initial state. The handler is waiting for the header. - case waitingForHeader - - /// After the header is received, the handler transitions to the `acceptingChunks`. In this - /// state, a file handle is opened for writing and chunks are being accepted. - case acceptingChunks(AcceptingChunks) - - /// Container is running, with associated data about the running container. - case running(Running) - - struct AcceptingChunks { - let header: Header - var writer: BufferedWriter - var imagePath: FilePath - var fileHandle: WriteFileHandle - } - - struct Running { - let imageName: String - let debugPort: UInt32 - } - } - - /// The header of the request. - struct Header { - let imageName: String - } - - struct Chunk { - let data: Data - } - - enum ControlCommand { - case run(Run) - - struct Run { - var debug: Bool - } - } - - enum Event { - case containerStarted(ContainerStarted) - - struct ContainerStarted { - let debugPort: UInt32 - } - } - - enum Error: Swift.Error { - /// A message was received before the header. - case expectedHeader - - /// A header message was received, but not expected. - case unexpectedHeader - - /// A chunk message was received, but not expected. - case unexpectedChunk - - /// An internal inconsistency was detected. This is a programming error in the agent. - case internalInconsistency - - /// An unexpected control command was received. - case unexpectedControlCommand(ControlCommand) - - /// The container failed to start. - case containerStartFailed(Swift.Error) - } - - public let events: AsyncStream - private let eventsContinuation: AsyncStream.Continuation - - private var state: State = .waitingForHeader - private let dockerCLI = DockerCLI() - private let logger = Logger(label: "edge-agent.run-container") - - init() { - let (stream, continuation) = AsyncStream.makeStream(bufferingPolicy: .unbounded) - self.events = stream - self.eventsContinuation = continuation - } - - mutating func handle(_ header: Header) async throws { - guard case .waitingForHeader = self.state else { - throw Error.unexpectedHeader - } - - // Create a file for writing in the temporary directory. - let uuid = UUID().uuidString - let fileName = "container-\(header.imageName).\(uuid).tar" - let path = try await FileSystem.shared.temporaryDirectory.appending(fileName) - logger.info("Writing container image", metadata: ["path": .string(path.string)]) - let writeHandle = try await FileSystem.shared.openFile( - forWritingAt: path, - options: .newFile(replaceExisting: false) - ) - let writer = writeHandle.bufferedWriter() - - self.state = .acceptingChunks( - State.AcceptingChunks( - header: header, - writer: writer, - imagePath: path, - fileHandle: writeHandle - ) - ) - } - - mutating func handle(_ chunk: Chunk) async throws { - guard case .acceptingChunks(var state) = self.state else { - throw Error.unexpectedChunk - } - - logger.debug("Writing chunk", metadata: ["size": .string("\(chunk.data.count) bytes")]) - try await state.writer.write(contentsOf: chunk.data) - self.state = .acceptingChunks(state) - } - - mutating func handle(_ control: ControlCommand) async throws { - switch (state, control) { - case (.waitingForHeader, _): - throw Error.expectedHeader - - case (.acceptingChunks(var acceptingState), .run(let run)): - // Finalize writing the container image - try await acceptingState.writer.flush() - try await acceptingState.fileHandle.close() - - // Load the container image into Docker - let imagePath = acceptingState.imagePath.string - logger.info( - "Loading container image into Docker", - metadata: ["path": .string(imagePath)] - ) - try await dockerCLI.load(filePath: imagePath) - - let imageName = acceptingState.header.imageName - var runOptions: [DockerCLI.RunOption] = [.rm] - var debugPort: UInt32 = 0 - - if run.debug { - // Configure for debugging - debugPort = 4242 - logger.info( - "Starting container in debug mode", - metadata: ["image": .string(imageName), "port": .string("\(debugPort)")] - ) - runOptions.append(contentsOf: [ - .publishPort(hostPort: UInt16(debugPort), containerPort: 4242), - .capAdd("SYS_PTRACE"), - .securityOpt("seccomp=unconfined"), - ]) - - do { - try await dockerCLI.run( - options: runOptions, - image: imageName, - command: ["ds2", "gdbserver", "0.0.0.0:\(debugPort)", "/bin/\(imageName)"] - ) - logger.info( - "Container started in debug mode successfully", - metadata: ["image": .string(imageName)] - ) - } catch { - logger.error( - "Failed to start container in debug mode", - metadata: ["error": .string("\(error)")] - ) - throw Error.containerStartFailed(error) - } - } else { - // Start the container without debugging - logger.info( - "Starting container without debugging", - metadata: ["image": .string(imageName)] - ) - try await dockerCLI.run(options: runOptions, image: imageName) - logger.info( - "Container started successfully", - metadata: ["image": .string(imageName)] - ) - } - - eventsContinuation.yield(.containerStarted(.init(debugPort: debugPort))) - - // Update state to running - self.state = .running( - State.Running( - imageName: imageName, - debugPort: debugPort - ) - ) - - case (.running, _): - throw Error.unexpectedControlCommand(control) - } - } -}