diff --git a/.github/workflows/build_docker_dependencies_image.yml b/.github/workflows/build_docker_dependencies_image.yml deleted file mode 100644 index 73c1c316..00000000 --- a/.github/workflows/build_docker_dependencies_image.yml +++ /dev/null @@ -1,52 +0,0 @@ -name: Build microservices dependencies docker image - -on: - push: - branches: - - develop - tags: - - 'v*' - paths: - - dockerfiles/microservices-dependencies.dockerfile - pull_request: - types: [closed] - branches: - - develop - - workflow_dispatch: - -jobs: - build: - runs-on: ubuntu-latest - steps: - - name: Checkout code - uses: actions/checkout@v4 - with: - repository: DUNE-DAQ/microservices - path: microservices - - - name: Log in to the Container registry - uses: docker/login-action@v2 - with: - registry: ghcr.io - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Extract metadata (tags, labels) for Docker - id: meta - uses: docker/metadata-action@v4 - with: - images: | - ghcr.io/DUNE-DAQ/microservices_dependencies - tags: | - type=raw,value=latest - type=ref,event=branch - type=ref,event=tag - - - name: Build and push Docker images - uses: docker/build-push-action@v3 - with: - context: ${{ github.workspace }}/microservices/ - push: true - tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} diff --git a/.github/workflows/build_docker_image.yml b/.github/workflows/build_docker_image.yml deleted file mode 100644 index 6a042ad2..00000000 --- a/.github/workflows/build_docker_image.yml +++ /dev/null @@ -1,52 +0,0 @@ -name: Build microservices docker image - -on: - push: - branches: - - develop - tags: - - 'v*' - paths: - - Dockerfile - pull_request: - types: [closed] - branches: - - develop - - workflow_dispatch: - -jobs: - build: - runs-on: ubuntu-latest - steps: - - name: Checkout code - uses: actions/checkout@v4 - with: - repository: DUNE-DAQ/microservices - path: microservices - - - name: Log in to the Container registry - uses: docker/login-action@v2 - with: - registry: ghcr.io - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Extract metadata (tags, labels) for Docker - id: meta - uses: docker/metadata-action@v4 - with: - images: | - ghcr.io/DUNE-DAQ/microservices - tags: | - type=raw,value=latest - type=ref,event=branch - type=ref,event=tag - - - name: Build and push Docker images - uses: docker/build-push-action@v3 - with: - context: ${{ github.workspace }}/microservices/ - push: true - tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} diff --git a/.github/workflows/build_docker_layers.yaml b/.github/workflows/build_docker_layers.yaml new file mode 100644 index 00000000..f8855f68 --- /dev/null +++ b/.github/workflows/build_docker_layers.yaml @@ -0,0 +1,140 @@ +name: Build microservices docker images +on: + push: + branches: + - develop + tags: + - 'v*' + pull_request: + branches: + - develop + paths: + - 'dockerfiles/requirements.txt' + - 'dockerfiles/Dockerfile.dependencies' + - 'dockerfiles/Dockerfile' + - '.github/workflows/build_docker_layers.yaml' + workflow_dispatch: + inputs: + force_rebuild_dependencies: + description: 'Force rebuild of dependencies image' + required: false + type: boolean + default: false +jobs: + build-dependencies: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + # Only build dependencies: + # if manually triggered with force flag + # on tag push + # on push to default branch AND dependency files changed + if: | + (github.event_name == 'workflow_dispatch' && github.event.inputs.force_rebuild_dependencies == 'true') || + github.ref_type == 'tag' || + (github.event_name == 'push' && github.ref == format('refs/heads/{0}', github.event.repository.default_branch) && + github.event.head_commit != null && + ( + contains(github.event.head_commit.modified, 'dockerfiles/requirements.txt') || + contains(github.event.head_commit.added, 'dockerfiles/requirements.txt') || + contains(github.event.head_commit.modified, 'dockerfiles/Dockerfile.dependencies') || + contains(github.event.head_commit.added, 'dockerfiles/Dockerfile.dependencies') || + contains(github.event.head_commit.modified, 'dockerfiles/Dockerfile') || + contains(github.event.head_commit.added, 'dockerfiles/Dockerfile') + )) + steps: + - name: Checkout code + uses: actions/checkout@v6 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Log in to GHCR + if: github.event_name != 'pull_request' + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Extract Docker metadata for dependencies + id: meta + uses: docker/metadata-action@v5 + with: + images: ghcr.io/DUNE-DAQ/microservices_dependencies + tags: | + type=raw,value=latest,enable={{is_default_branch}} + type=ref,event=branch + type=ref,event=tag + type=sha,format=short + - name: Build and push dependencies image + uses: docker/build-push-action@v6 + with: + context: ./dockerfiles + file: ./dockerfiles/Dockerfile.dependencies + platforms: linux/amd64 + push: ${{ github.event_name != 'pull_request' }} + provenance: true + sbom: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + build-microservices: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + # Always run, but depend on dependencies job if it ran + needs: [build-dependencies] + if: | + always() && + (needs.build-dependencies.result == 'success' || needs.build-dependencies.result == 'skipped') + steps: + - name: Checkout code + uses: actions/checkout@v6 + - name: Get git refs + id: git_refs + run: | + echo "short_sha=$(git rev-parse --short HEAD)" >> "${GITHUB_OUTPUT}" + echo "full_sha=$(git rev-parse HEAD)" >> "${GITHUB_OUTPUT}" + - name: Find microservices_dependency tag + id: find_dep_tag + run: | + if [[ "${{ needs.build-dependencies.result }}" == "success" ]]; then + # Dependencies image was rebuilt for this commit, so use current short SHA + echo "dep_tag=$(git rev-parse --short HEAD)" >> "${GITHUB_OUTPUT}" + else + # Fallback: use 'latest' if no SHA-like tag was found or API call failed + echo "Warning: Could not determine SHA-based dependency tag, falling back to 'latest'" >&2 + echo "dep_tag=latest" >> "${GITHUB_OUTPUT}" + fi + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Log in to GHCR + if: github.event_name != 'pull_request' + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Extract Docker metadata for microservices + id: meta + uses: docker/metadata-action@v5 + with: + images: ghcr.io/DUNE-DAQ/microservices + tags: | + type=raw,value=latest,enable={{is_default_branch}} + type=ref,event=branch + type=ref,event=tag + type=sha,format=short + - name: Build and push microservices image + uses: docker/build-push-action@v6 + with: + context: . + file: ./dockerfiles/Dockerfile + platforms: linux/amd64 + push: ${{ github.event_name != 'pull_request' }} + provenance: true + sbom: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + build-args: | + DEPENDENCY_TAG=${{ steps.find_dep_tag.outputs.dep_tag }} + MICROSERVICES_VERSION=${{ steps.git_refs.outputs.full_sha }} diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 17be6675..00000000 --- a/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -ARG DEPENDENCY_TAG=latest -FROM ghcr.io/dune-daq/microservices_dependencies:$DEPENDENCY_TAG - -COPY . /microservices - -ENTRYPOINT ["/microservices/entrypoint.sh"] \ No newline at end of file diff --git a/dockerfiles/Dockerfile b/dockerfiles/Dockerfile new file mode 100644 index 00000000..865bc8db --- /dev/null +++ b/dockerfiles/Dockerfile @@ -0,0 +1,13 @@ +# Must define DEPENDENCY_TAG before it is used +ARG DEPENDENCY_TAG=latest +FROM ghcr.io/dune-daq/microservices_dependencies:$DEPENDENCY_TAG + +ARG MICROSERVICES_VERSION=develop +RUN : "${APP_ROOT:?APP_ROOT variable is required}" \ + && git clone -b ${MICROSERVICES_VERSION} https://github.com/DUNE-DAQ/microservices.git \ + && cp microservices/entrypoint.sh / + +WORKDIR ${APP_ROOT}/microservices + +USER 1234 +ENTRYPOINT ["/entrypoint.sh"] diff --git a/dockerfiles/Dockerfile.dependencies b/dockerfiles/Dockerfile.dependencies new file mode 100644 index 00000000..f5516bba --- /dev/null +++ b/dockerfiles/Dockerfile.dependencies @@ -0,0 +1,70 @@ +FROM docker.io/almalinux:10 + +ARG ERSVERSION=coredaq-v5.5.0 # For issue.proto from ers +ARG ERSKAFKAVERSION=coredaq-v5.5.0 # For ERSSubscriber.py from erskafka +ARG OPMONLIBVERSION=coredaq-v5.5.0 # For opmon_entry.proto from opmonlib +ARG KAFKAOPMONVERSION=coredaq-v5.5.0 # For OpMonSubscriber.py from kafkaopmon + +ARG VENV_PATH=/opt/venv +ENV \ + APP_ROOT=/opt/app \ + APP_DATA=/opt/data \ + HOME=/opt/app \ + PYTHONPYCACHEPREFIX=/tmp/pycache \ + PYTHONUNBUFFERED=1 \ + PIP_NO_CACHE_DIR=1 + +ENV PATH="${VENV_PATH}/bin:$PATH" + +RUN mkdir -p ${APP_ROOT} ${APP_DATA} ${VENV_PATH} && chmod 1777 ${APP_DATA} +WORKDIR ${APP_ROOT} + +# Install base python bits +# Install required devel bits +RUN yum clean expire-cache \ + && yum -y install curl git python3-pip python3-pip-wheel \ + && yum -y install make gcc python3-devel protobuf-compiler protobuf-devel krb5-devel libffi-devel libpq-devel postgresql \ + && yum clean all + +# setup venv +RUN python3 -m venv ${VENV_PATH} \ + && ${VENV_PATH}/bin/pip install --no-cache-dir --upgrade pip \ + && rm -rf /root/.cache ${HOME}/.cache ${VENV_PATH}/pip-selfcheck.json + +COPY requirements.txt ${VENV_PATH}/ +RUN ${VENV_PATH}/bin/pip install --no-cache-dir -r ${VENV_PATH}/requirements.txt \ + && rm -rf /root/.cache ${HOME}/.cache ${VENV_PATH}/pip-selfcheck.json ${VENV_PATH}/requirements.txt + +# setup protobuf schemas +RUN echo "Installing https://raw.githubusercontent.com/DUNE-DAQ/ers/${ERSVERSION}/schema/ers/issue.proto" \ + && curl -fSL -O https://raw.githubusercontent.com/DUNE-DAQ/ers/$ERSVERSION/schema/ers/issue.proto \ + && mkdir -p ${VENV_PATH}/ers \ + && touch ${VENV_PATH}/ers/__init__.py \ + && protoc --python_out=${VENV_PATH}/ers issue.proto \ + && rm -f issue.proto \ + && echo "Installing https://raw.githubusercontent.com/DUNE-DAQ/opmonlib/${OPMONLIBVERSION}/schema/opmonlib/opmon_entry.proto" \ + && curl -fSL -O https://raw.githubusercontent.com/DUNE-DAQ/opmonlib/$OPMONLIBVERSION/schema/opmonlib/opmon_entry.proto \ + && mkdir -p ${VENV_PATH}/opmonlib \ + && touch ${VENV_PATH}/opmonlib/__init__.py \ + && protoc --python_out=${VENV_PATH}/opmonlib -I${APP_ROOT} opmon_entry.proto \ + && rm -f opmon_entry.proto + +# fetch ERS python bindings +RUN mkdir -p ${VENV_PATH}/erskafka \ + && touch ${VENV_PATH}/erskafka/__init__.py \ + && echo "Installing https://raw.githubusercontent.com/DUNE-DAQ/erskafka/$ERSKAFKAVERSION/python/erskafka/ERSSubscriber.py" \ + && curl -fSL https://raw.githubusercontent.com/DUNE-DAQ/erskafka/$ERSKAFKAVERSION/python/erskafka/ERSSubscriber.py -o ${VENV_PATH}/erskafka/ERSSubscriber.py \ + && mkdir -p ${VENV_PATH}/kafkaopmon \ + && touch ${VENV_PATH}/kafkaopmon/__init__.py \ + && echo "Installing https://raw.githubusercontent.com/DUNE-DAQ/kafkaopmon/${KAFKAOPMONVERSION}/python/kafkaopmon/OpMonSubscriber.py" \ + && curl -fSL https://raw.githubusercontent.com/DUNE-DAQ/kafkaopmon/$KAFKAOPMONVERSION/python/kafkaopmon/OpMonSubscriber.py -o ${VENV_PATH}/kafkaopmon/OpMonSubscriber.py + +# elisa_client_api and CERN kerberos needed by the logbook microservice at NP04 +COPY cern.repo /etc/yum.repos.d/ +RUN yum clean expire-cache \ + && yum -y install krb5-workstation cern-krb5-conf \ + && yum clean all + +RUN git clone --depth 1 -b develop https://github.com/DUNE-DAQ/elisa_client_api.git \ + && ${VENV_PATH}/bin/pip install --no-cache-dir ./elisa_client_api \ + && rm -rf /root/.cache ${HOME}/.cache ${VENV_PATH}/pip-selfcheck.json elisa_client_api diff --git a/dockerfiles/cern.repo b/dockerfiles/cern.repo new file mode 100644 index 00000000..9475eff2 --- /dev/null +++ b/dockerfiles/cern.repo @@ -0,0 +1,20 @@ +[cern] +name=AlmaLinux $releasever - CERN +baseurl=https://linuxsoft.cern.ch/cern/alma/$releasever/CERN/$basearch +enabled=1 +gpgcheck=1 +gpgkey=https://gitlab.cern.ch/linuxsupport/rpms/cern-gpg-keys/-/raw/main/src/RPM-GPG-KEY-kojiv3 + +[cern-testing] +name=AlmaLinux $releasever - CERN - testing +baseurl=https://linuxsoft.cern.ch/cern/alma/$releasever-testing/CERN/$basearch +enabled=0 +gpgcheck=1 +gpgkey=https://gitlab.cern.ch/linuxsupport/rpms/cern-gpg-keys/-/raw/main/src/RPM-GPG-KEY-kojiv3 + +[cern-source] +name=AlmaLinux $releasever - CERN Source +baseurl=https://linuxsoft.cern.ch/cern/alma/$releasever/CERN/Source/ +enabled=0 +gpgcheck=1 +gpgkey=https://gitlab.cern.ch/linuxsupport/rpms/cern-gpg-keys/-/raw/main/src/RPM-GPG-KEY-kojiv3 diff --git a/dockerfiles/microservices-dependencies.dockerfile b/dockerfiles/microservices-dependencies.dockerfile deleted file mode 100644 index abf36516..00000000 --- a/dockerfiles/microservices-dependencies.dockerfile +++ /dev/null @@ -1,49 +0,0 @@ -FROM cern/alma9-base - -ARG ERSVERSION=v1.5.2 # For issue.proto from ers -ARG ERSKAFKAVERSION=v1.5.4 # For ERSSubscriber.py from erskafka -ARG OPMONLIBVERSION=v2.0.0 # For opmon_entry.proto from opmonlib -ARG KAFKAOPMONVERSION=v2.0.0 # For OpMonSubscriber.py from kafkaopmon - -ARG LOCALPYDIR=/microservices_python - -RUN yum clean all \ - && yum -y install gcc make git unzip libpq-devel libffi-devel python3-pip python3-wheel krb5-devel python3-devel \ - && yum clean all - -# Can drop when migration to sqlalchemy is complete since we're using the thin client there -RUN curl -O https://download.oracle.com/otn_software/linux/instantclient/1919000/oracle-instantclient19.19-basic-19.19.0.0.0-1.el9.x86_64.rpm \ - && yum -y install libaio libnsl \ - && rpm -iv oracle-instantclient19.19-basic-19.19.0.0.0-1.el9.x86_64.rpm - -COPY requirements.txt / -RUN python3 -m pip install --upgrade setuptools \ - && python3 -m pip install -r requirements.txt \ - && python3 -m pip cache remove \* - -# elisa_client_api needed by the logbook microservice -RUN git clone https://github.com/DUNE-DAQ/elisa_client_api.git \ - && python3 -m pip install --upgrade setuptools \ - && python3 -m pip install ./elisa_client_api - -# protoc-24.3-linux-x86_64.zip is the latest zipfile available as of Sep-15-2023 -# See also https://grpc.io/docs/protoc-installation/#install-pre-compiled-binaries-any-os - -RUN curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v24.3/protoc-24.3-linux-x86_64.zip \ - && unzip protoc-24.3-linux-x86_64.zip \ - && curl -O https://raw.githubusercontent.com/DUNE-DAQ/ers/$ERSVERSION/schema/ers/issue.proto \ - && mkdir -p $LOCALPYDIR/ers \ - && protoc --python_out=$LOCALPYDIR/ers issue.proto \ - && curl -O https://raw.githubusercontent.com/DUNE-DAQ/opmonlib/$OPMONLIBVERSION/schema/opmonlib/opmon_entry.proto \ - && mkdir -p $LOCALPYDIR/opmonlib \ - && protoc --python_out=$LOCALPYDIR/opmonlib -I/ -I/include opmon_entry.proto - -RUN mkdir -p $LOCALPYDIR/erskafka \ - && curl https://raw.githubusercontent.com/DUNE-DAQ/erskafka/$ERSKAFKAVERSION/python/erskafka/ERSSubscriber.py -o $LOCALPYDIR/erskafka/ERSSubscriber.py \ - && mkdir -p $LOCALPYDIR/kafkaopmon \ - && curl https://raw.githubusercontent.com/DUNE-DAQ/kafkaopmon/$KAFKAOPMONVERSION/python/kafkaopmon/OpMonSubscriber.py -o $LOCALPYDIR/kafkaopmon/OpMonSubscriber.py - -ENV PYTHONPATH=$LOCALPYDIR:$PYTHONPATH - -# This ensures the container will run as non-root by default. Hat tip Pat Riehecky. -# USER 60000:0 # [Commented out so various entrypoint.sh scripts as of Sep-12-2023 continue to work, to be addressed later] \ No newline at end of file diff --git a/dockerfiles/requirements.txt b/dockerfiles/requirements.txt index 078c11fe..906cd7fe 100644 --- a/dockerfiles/requirements.txt +++ b/dockerfiles/requirements.txt @@ -1,43 +1,16 @@ -aniso8601==9.0.1 -apispec==5.1.1 -Authlib==1.3.1 -beautifulsoup4==4.12.3 -click==8.1.2 -dataclasses==0.6 -defusedxml==0.7.1 -Flask==2.2.5 -Flask-Caching==2.0.2 -Flask-HTTPAuth==4.6.0 -flask-redis==0.4.0 -Flask-RESTful==0.3.9 -Flask-SQLAlchemy==3.0.5 -future==1.0.0 -gevent==22.10.2 -greenlet==2.0.2 -gunicorn==20.1.0 -importlib-metadata==4.11.3 -influxdb==5.3.1 -itsdangerous==2.0.1 -Jinja2==3.1.1 -kafka-python==2.0.2 -lxml==5.2.2 -MarkupSafe==2.1.1 -psycopg2-binary==2.9.7 -pymongo==4.0.2 -pytz==2022.1 -qrcode==7.4.2 -redis==3.5.3 -requests==2.32.3 -requests-gssapi==1.3.0 -setuptools==39.2.0 -sh==2.0.7 -six==1.16.0 -typing_extensions==4.2.0 -Werkzeug==2.2.2 -zipp==3.8.0 -zope.event==4.6 -zope.interface==5.5.2 -protobuf==4.24.3 -sqlalchemy==2.0.20 -oracledb==2.0.1 -auth-get-sso-cookie==2.2.0 +Flask>=2.3,<3.0 +Flask-Caching>=2.3,<2.4 +Flask-HTTPAuth>=4.8,<4.9 +Flask-RESTful>=0.3,<0.4 +Flask-SQLAlchemy>=3.1,<3.2 +click>=8.3,<8.4 +gunicorn[gevent]>=23.0,<24.0 +influxdb>=5.3,<5.4 +kafka-python>=2.3,<2.4 +protobuf>=6.33,<7.0 +sh>=2.2,<2.3 +sqlalchemy[oracle-oracledb,postgresql-psycopgbinary]>=2.0,<2.1 + +# elisa requires this but doesn't list the requirement +# since it is called via subprocess/sh +auth-get-sso-cookie>=2.2,<2.3 diff --git a/docs/README.md b/docs/README.md index e8794ebc..7cab2a67 100644 --- a/docs/README.md +++ b/docs/README.md @@ -6,7 +6,7 @@ docker run --rm -e MICROSERVICE= ghcr.io/dune-daq/microser ``` There are a couple of points to note: -* The value of MICROSERVICE should be the name of a given microservice's subdirectory in this repo. As of Jul-25-2024, the available subdirectories are: `config-service`, `elisa-logbook`, `ers-dbwriter`, `ers-protobuf-dbwriter`, `opmon-dbwriter` (now deprecated), `opmon-protobuf-dbwriter`, `runnumber-rest` and `runregistry-rest`. +* The value of MICROSERVICE should be the name of a given microservice's subdirectory in this repo. * Most microservices require additional environment variables to be set, which can be passed using the usual docker syntax: `-e VARIABLE_NAME=` * If you don't know what these additional environment variables are, you can just run the `docker` command as above without setting them; the container will exit out almost immediately but only after telling you what variables are missing * The microservices image tag will be `microservices:` or `microservices:`, i.e. `microservices:develop`. @@ -22,16 +22,16 @@ The `microservices_dependencies` image is used as the base image for `microservi If you do not wish to to push your changes to the repo when testing, you can also use: ```bash -docker build -f Dockerfile -t ghcr.io/dune-daq/microservices:user-my-branch . +docker build -f ./dockerfiles/Dockerfile -t ghcr.io/dune-daq/microservices:user-my-branch . docker push ghcr.io/dune-daq/microservices:user-my-branch ``` -and +and ```bash -docker build -f dockerfiles/microservices-dependencies.dockerfile -t ghcr.io/dune-daq/microservices_dependencies:user-my-branch /dockerfiles +docker build -f ./dockerfiles/Dockerfile.dependencies -t ghcr.io/dune-daq/microservices_dependencies:user-my-branch /dockerfiles docker push ghcr.io/dune-daq/microservices_dependencies:user-my-branch ``` -This will copy your current microservices directory. \ No newline at end of file +This will copy your current microservices directory. diff --git a/docs/README_config-service.md b/docs/README_config-service.md deleted file mode 100644 index c9bcdd5c..00000000 --- a/docs/README_config-service.md +++ /dev/null @@ -1,74 +0,0 @@ -# Configuration - -The Configuration services/scripts consist of two main applications: the main configuration service and the archiver. -One can use the service exclusively, without the need of running the archiver. - -## MongoDB Service - -The service application is responsible for DAQling configuration retrieval and storing from/to a MongoDB database. - -### Dependencies - -Apply the following Ansible roles with a playbook: - - install-webdeps - install-mongodb - -### Running the service - - python3 conf-service.py - -The service is located at `scripts/Configuration/`. - -The `service-config.json` configuration file is at `scripts/Configuration/config`. - -### Uploading a configuration collection - -The `uploadConfig` tool, allows to specify the name of a directory in `configs/` whose content will be uploaded to the configuration database as a collection. - -Uploading a configuration folder with the same name, will automatically bump the version of the uploaded document. - - If the directory name contains a trailing "_vN" (version N) string, the latter will be stripped and the configuration files will be added to the collection with the stripped folder name, tagging it with the next free version number in that collection. - -### Checking out a configuration - -The `checkoutConfig` tool, allows to get a list of available configuration collections: - -``` -python3 checkoutConfig.py list -``` - -Checkout locally the latest version of a configuration collection name: - -``` -python3 checkoutConfig.py -``` - -Checkout a specific version of a configuration collection name: - -``` -python3 checkoutConfig.py -``` - -The checked out folder, will have a trailing `_vN` string reporting the version of the configuration. - -## Archiver (experimental) - -The service is responsible for periodic lookup for new configurations in a MongoDB configuration database. -If newer configurations were found, than the last inserted configuration in the Oracle database, the service -reads the new ones and inserts them into the Oracle archives. - -### Running the archiver - -The archiver is meant to be registered as a supervised process, with the following parameters: - - [group:configuration] - programs=archiver - - [program:archiver] - command=//python3.6 //scripts/Configuration/archiver.py --config // - numprocs=1 - autostart=true - autorestart=true - stopsignal=QUIT - stopwaitsecs=10 diff --git a/docs/README_ers-dbwriter.md b/docs/README_ers-dbwriter.md deleted file mode 100644 index b03a1fc6..00000000 --- a/docs/README_ers-dbwriter.md +++ /dev/null @@ -1,67 +0,0 @@ -`dbwriter.py` is the script responsible for taking the ERS messages from kafka -and writing to a postgreSQL database so that the messages can be displayed in a -grafana dashboard. The secrets to connect to the database are obtained from -environment variables. To run it manually do: -```python dbwriter.py``` -and if the env variables are set, it should start printing the messages that it -is receiving and writing to the database. - -Long term, it may be preferable to use `telegraf` as a kafka to postgresql bridge or KafkaConnect for postgresql to avoid longterm code maintance here. - -# Deploying on kubernetes -First, we need to make the secrets. Create a yaml file `ers-secret.yaml` containing the secrets: -``` -apiVersion: v1 -kind: Secret -metadata: - name: ers-secret - namespace: monitoring -type: Opaque -data: - ERS_DBWRITER_HOST: - ERS_DBWRITER_PORT: - ERS_DBWRITER_USER: - ERS_DBWRITER_PASS: - ERS_DBWRITER_NAME: -``` -where after each of the env variables (`ERS_DBWRITER_XXXX`) the secret goes in base64 form (can be obtained by doing `echo -n "secret" | base64`). To add the secrets run -``` -kubectl apply -f ers-secret.yaml -``` -If all went well when we do `kubectl get secrets` we should see something like -``` -NAME TYPE DATA AGE -ers-secret Opaque 5 37m -``` -Once the secrets are set, do `kubectl apply -f ersdbwriter.yaml`. - -We can get the pod name by doing `kubectl -n monitoring get pods` and then it will show something like -``` -NAME READY STATUS RESTARTS AGE -erskafka-7dfdf88864-4mwvd 1/1 Running 0 15m -``` -where the important part is that `STATUS` is `Running` - -If needed, the logs can be accessed by -``` -kubectl logs erskafka-7dfdf88864-4mwvd -n monitoring -``` - -# Running locally -The script can also be run locally which can be useful to debug or start up quickly. After setting up a working area and cloning this repo, run: -``` -pip install -r requirements.txt -export ERS_DBWRITER_HOST=host -export ERS_DBWRITER_PORT=port -export ERS_DBWRITER_USER=user -export ERS_DBWRITER_PASS=pass -export ERS_DBWRITER_NAME=name -python3 dbwriter.python3 -``` -where the values of the env variables have to be substituted by their actual values. While running, the messages received from kafka will be printed as they arrive: -``` -$ python3 dbwriter.py -ConsumerRecord(topic='erskafka-reporting', partition=0, offset=3156, timestamp=1667205932928, timestamp_type=0, key=None, value=b'{"application_name":"trigger","chain":0,"cwd":"/nfs/sw/jcarcell/N22-10-30","file_name":"/tmp/root/spack-stage/spack-stage-trigger-N22-10-30-mt5md7ywovoz27didgrn5grbucv623da/spack-src/plugins/ModuleLevelTrigger.cpp","function_name":"void dunedaq::trigger::ModuleLevelTrigger::do_start(const nlohmann::json&)","group_hash":987365171,"host_name":"np04-srv-022","issue_name":"trigger::TriggerStartOfRun","line_number":130,"message":"Start of run 293287","package_name":"unknown","params":["runno: 293287"],"partition":"jcarcell","process_id":1802216,"qualifiers":["unknown"],"severity":"INFO","thread_id":1802245,"time":1667205932928,"usecs_since_epoch":1667205932928264,"user_id":141008,"user_name":"jcarcell"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=707, serialized_header_size=-1) -``` -It can run at the same time locally and in kubernetes. - diff --git a/docs/README_ers-protobuf-dbwriter.md b/docs/README_ers-protobuf-dbwriter.md index 138267b0..bbbbcf0f 100644 --- a/docs/README_ers-protobuf-dbwriter.md +++ b/docs/README_ers-protobuf-dbwriter.md @@ -16,13 +16,9 @@ metadata: namespace: monitoring type: Opaque data: - ERS_DBWRITER_HOST: - ERS_DBWRITER_PORT: - ERS_DBWRITER_USER: - ERS_DBWRITER_PASS: - ERS_DBWRITER_NAME: + DATABASE_URI: ``` -where after each of the env variables (`ERS_DBWRITER_XXXX`) the secret goes in base64 form (can be obtained by doing `echo -n "secret" | base64`). To add the secrets run +where after each of the env variables the secret goes in base64 form (can be obtained by doing `echo -n "secret" | base64`). To add the secrets run ``` kubectl apply -f ers-secret.yaml ``` @@ -46,7 +42,6 @@ The script can also be run locally which can be useful to debug or start up quic ``` python3 dbwriter.py ``` -Passing the appropriate variables. +Passing the appropriate variables. As this script requires ers and erskafak, it has to be launched by a developing envirnoment. It can run at the same time locally and in kubernetes. - diff --git a/docs/README_opmon-protobuf-dbwriter.md b/docs/README_opmon-protobuf-dbwriter.md index a4fa5e23..15137044 100644 --- a/docs/README_opmon-protobuf-dbwriter.md +++ b/docs/README_opmon-protobuf-dbwriter.md @@ -9,7 +9,6 @@ The script can be run locally which can be useful to debug or start up quickly. ``` python3 dbwriter.py ``` -Passing the appropriate variables. +Passing the appropriate variables. As this script requires opmonlibs and kafkaopmon, it has to be launched by a developing envirnoment. It can run at the same time locally and in kubernetes. - diff --git a/docs/README_runnumber-rest.md b/docs/README_runnumber-rest.md index b9598838..1d7967dd 100644 --- a/docs/README_runnumber-rest.md +++ b/docs/README_runnumber-rest.md @@ -1,17 +1,13 @@ # Installation steps -You need the following package (from the "CERN Only" repository) installed on the host: -``` -oracle-instantclient12.1-devel -oracle-instantclient-tnsnames.ora -``` - -Then, create the virtual environment: +Create the virtual environment: ``` python3 -m venv env source env/bin/activate python -m pip install -r requirements.txt ``` +For NP04 this uses the pure python Oracle thin client. + # Running the server Once in venv: ``` diff --git a/docs/README_runregistry-rest.md b/docs/README_runregistry-rest.md index 9418f7a3..1d7967dd 100644 --- a/docs/README_runregistry-rest.md +++ b/docs/README_runregistry-rest.md @@ -1,27 +1,18 @@ # Installation steps -For Oracle, you need the following packages installed on the host. - -``` -oracle-instantclient12.1-devel -``` - -Then, create the virtual environment: +Create the virtual environment: ``` python3 -m venv env source env/bin/activate python -m pip install -r requirements.txt ``` +For NP04 this uses the pure python Oracle thin client. + # Running the server Once in venv: ``` -python rest.py -``` -or, to use Postgres: +python3 rest.py ``` -python rest.py -p -``` - # Authentication You need the file `credentials.py` in the same directory as `backend.py`, this file needs to be of the form: ``` diff --git a/elisa-logbook/credmgr.py b/elisa-logbook/credmgr.py index a4066900..89e79aca 100644 --- a/elisa-logbook/credmgr.py +++ b/elisa-logbook/credmgr.py @@ -1,73 +1,56 @@ -import sys, os import logging -from getpass import getpass +import os +import shutil import subprocess -import logging -import tempfile - - -def which(program): - # https://stackoverflow.com/a/377028 - def is_exe(fpath): - return os.path.isfile(fpath) and os.access(fpath, os.X_OK) - - fpath, fname = os.path.split(program) - if fpath: - if is_exe(program): - print("Found1", program) - return program - else: - for path in os.environ.get("PATH", "").split(os.pathsep): - exe_file = os.path.join(path, program) - if is_exe(exe_file): - print("Found2", program) - return exe_file - - return None +import sys +from getpass import getpass +from pathlib import Path +from typing import Optional def env_for_kerberos(ticket_dir): - ticket_dir = os.path.expanduser(ticket_dir) - env = {"KRB5CCNAME": f"DIR:{ticket_dir}"} - return env + ticket_dir = Path(ticket_dir).expanduser() + return {"KRB5CCNAME": f"DIR:{ticket_dir}"} def new_kerberos_ticket( - user: str, realm: str, password: str = None, ticket_dir: str = "~/" + user: str, realm: str, password: Optional[str] = None, ticket_dir: str = "~/" ): + kinit_path = shutil.which("kinit") + if kinit_path is None: + raise RuntimeError( + "kinit binary not found in PATH. Please ensure Kerberos client tools are installed." + ) + env = env_for_kerberos(ticket_dir) success = False password_provided = password is not None while not success: - import subprocess - p = subprocess.Popen( - ["kinit", f"{user}@{realm}"], + [kinit_path, f"{user}@{realm}"], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, env=env, ) - if p.poll() is not None and p.returncode != 0: - raise RuntimeError( - f"Could not execute kinit {user}@{realm}\n{stdout_data[-1].decode()}" - ) - if password is None: print(f"Password for {user}@{realm}:") try: - from getpass import getpass - password = getpass() except KeyboardInterrupt: print() return False - stdout_data = p.communicate(password.encode()) - print(stdout_data[-1].decode()) + stdout_data, stderr_data = p.communicate((password + "\n").encode()) + + # Display stderr if present (where kinit typically sends output) + if stderr_data: + print(stderr_data.decode()) + elif stdout_data: + print(stdout_data.decode()) if not password_provided: password = None @@ -75,25 +58,29 @@ def new_kerberos_ticket( success = p.returncode == 0 if not success and password_provided: + error_msg = stderr_data.decode() if stderr_data else stdout_data.decode() raise RuntimeError( - f"Authentication error for {user}@{realm}. The password provided (likely in configuration file) is incorrect" + f"Authentication error for {user}@{realm}. The password provided (likely in configuration file) is incorrect\n{error_msg}" ) return True def get_kerberos_user(silent=False, ticket_dir: str = "~/"): - import logging - log = logging.getLogger("get_kerberos_user") + klist_path = shutil.which("klist") + if klist_path is None: + raise RuntimeError( + "klist binary not found in PATH. Please ensure Kerberos client tools are installed." + ) + env = env_for_kerberos(ticket_dir) args = [ - "klist" + klist_path ] # on my mac, I can specify --json and that gives everything nicely in json format... but... - import subprocess - proc = subprocess.run(args, capture_output=True, text=True, env=env) + proc = subprocess.run(args, check=False, capture_output=True, text=True, env=env) raw_kerb_info = proc.stdout.split("\n") if not silent: @@ -114,10 +101,14 @@ def get_kerberos_user(silent=False, ticket_dir: str = "~/"): def check_kerberos_credentials(against_user: str, silent=False, ticket_dir: str = "~/"): - import logging - log = logging.getLogger("check_kerberos_credentials") + klist_path = shutil.which("klist") + if klist_path is None: + raise RuntimeError( + "klist binary not found in PATH. Please ensure Kerberos client tools are installed." + ) + env = env_for_kerberos(ticket_dir) kerb_user = get_kerberos_user(silent=silent, ticket_dir=ticket_dir) @@ -126,23 +117,20 @@ def check_kerberos_credentials(against_user: str, silent=False, ticket_dir: str if kerb_user: log.info(f"Detected kerberos ticket for user: '{kerb_user}'") else: - log.info(f"No kerberos ticket found") + log.info("No kerberos ticket found") if not kerb_user: if not silent: log.info("No kerberos ticket") return False - elif kerb_user != against_user: # we enforce the user is the same + if kerb_user != against_user: # we enforce the user is the same if not silent: log.info("Another user is logged in") return False - else: - import subprocess - - ticket_is_valid = subprocess.call(["klist", "-s"], env=env) == 0 - if not silent and not ticket_is_valid: - log.info("Kerberos ticket is expired") - return ticket_is_valid + ticket_is_valid = subprocess.call([klist_path, "-s"], env=env) == 0 + if not silent and not ticket_is_valid: + log.info("Kerberos ticket is expired") + return ticket_is_valid class ServiceAccountWithKerberos: @@ -151,23 +139,32 @@ def __init__(self, service: str, username: str, password: str, realm: str): self.username = username self.password = password self.realm = realm + self.log = logging.getLogger(self.__class__.__name__) def generate_cern_sso_cookie(self, website, kerberos_directory, output_directory): + auth_get_sso_cookie_path = shutil.which("auth-get-sso-cookie") + if auth_get_sso_cookie_path is None: + raise RuntimeError( + "auth-get-sso-cookie binary not found in PATH. Please ensure CERN SSO tools are installed." + ) + env = {"KRB5CCNAME": f"DIR:{kerberos_directory}"} - import sh + import sh # noqa:PLC0415 - executable = sh.Command("auth-get-sso-cookie") + executable = sh.Command(auth_get_sso_cookie_path) try: - proc = executable( + executable( "-u", website, "-o", output_directory, _env=env, _new_session=False ) except sh.ErrorReturnCode as error: - self.log.error(error) + self.log.exception( + f"Couldn't get SSO cookie! {error.stdout=} {error.stderr=}" + ) raise RuntimeError( f"Couldn't get SSO cookie! {error.stdout=} {error.stderr=}" - ) from e + ) from error return output_directory @@ -176,57 +173,68 @@ class CredentialManager: def __init__(self): self.log = logging.getLogger(self.__class__.__name__) self.authentications = [] + self.user = None - def add_login(self, service: str, user: str, password: str, realm: str): + def add_login(self, service: str, user: str, password: str, realm: str = "CERN.CH"): self.authentications.append( ServiceAccountWithKerberos(service, user, password, realm) ) def add_login_from_file(self, service: str, file: str): - if not os.path.isfile(os.getcwd() + "/" + file + ".py"): + cwd = Path.cwd() + file_path = cwd / f"{file}.py" + if not file_path.is_file(): self.log.error(f"Couldn't find file {file} in PWD") - raise + raise FileNotFoundError(f"Couldn't find file {file} in PWD") - sys.path.append(os.getcwd()) + sys.path.append(str(cwd)) i = __import__(file, fromlist=[""]) self.add_login(service, i.user, i.password) self.log.info(f"Added login data from file: {file}") - def get_login(self, service: str, user: str): - for auth in self.authentications: - if service == auth.service and user == auth.user: - return auth - self.log.error(f"Couldn't find login for service: {service}, user: {user}") - - def get_login(self, service: str): + def get_login(self, service: str, user: Optional[str] = None): for auth in self.authentications: if service == auth.service: - return auth + if user is None or user == auth.username: + return auth + + if user: + self.log.error(f"Couldn't find login for service: {service}, user: {user}") + raise ValueError( + f"Couldn't find login for service: {service}, user: {user}" + ) self.log.error(f"Couldn't find login for service: {service}") + raise ValueError(f"Couldn't find login for service: {service}") def rm_login(self, service: str, user: str): for auth in self.authentications: - if service == auth.service and user == auth.user: + if service == auth.service and user == auth.username: self.authentications.remove(auth) return def new_kerberos_ticket(self): - success = False - for a in self.authentications: - if a.user == self.user: - password = a.password - break + if self.user is None: + self.log.error("No user set in CredentialManager") + return False - p = subprocess.Popen( - ["kinit", self.user + "@CERN.CH"], - stdout=subprocess.PIPE, - stdin=subprocess.PIPE, - stderr=subprocess.PIPE, + auth = next( + (a for a in self.authentications if a.username == self.user), + None ) - stdout_data = p.communicate(password.encode()) - print(stdout_data[-1].decode()) - success = p.returncode == 0 - return True + + if auth is None: + self.log.error(f"No authentication found for user: {self.user}") + return False + + try: + return new_kerberos_ticket( + user=auth.username, + realm=auth.realm, + password=auth.password + ) + except Exception: + self.log.exception("Failed to create Kerberos ticket") + return False credentials = CredentialManager() @@ -234,8 +242,6 @@ def new_kerberos_ticket(self): class CERNSessionHandler: def __init__(self, username: str): - import logging - self.log = logging.getLogger(self.__class__.__name__) self.elisa_username = username @@ -244,10 +250,7 @@ def __init__(self, username: str): @staticmethod def __get_elisa_kerberos_cache_path(): - import os - from pathlib import Path - - return Path(os.path.expanduser("/tmp/.nanorc_elisakerbcache")) + return Path("/tmp/.nanorc_elisakerbcache") def elisa_user_is_authenticated(self): elisa_user = credentials.get_login("elisa") @@ -260,10 +263,9 @@ def elisa_user_is_authenticated(self): def authenticate_elisa_user(self): elisa_user = credentials.get_login("elisa") elisa_kerb_cache = CERNSessionHandler.__get_elisa_kerberos_cache_path() - import os - if not os.path.isdir(elisa_kerb_cache): - os.mkdir(elisa_kerb_cache) + if not elisa_kerb_cache.is_dir(): + elisa_kerb_cache.mkdir(mode=0o700) if self.elisa_user_is_authenticated(): # we're authenticated, stop here diff --git a/elisa-logbook/elisa-logbook-deployment.yaml b/elisa-logbook/elisa-logbook-deployment.yaml index ff016479..c4437e9c 100644 --- a/elisa-logbook/elisa-logbook-deployment.yaml +++ b/elisa-logbook/elisa-logbook-deployment.yaml @@ -4,11 +4,11 @@ metadata: annotations: kluctl.io/skip-delete-if-tags: "true" labels: - pod-security.kubernetes.io/audit: baseline + pod-security.kubernetes.io/audit: restricted pod-security.kubernetes.io/audit-version: latest - pod-security.kubernetes.io/enforce: baseline # unified image runs as root :( + pod-security.kubernetes.io/enforce: restricted pod-security.kubernetes.io/enforce-version: latest - pod-security.kubernetes.io/warn: baseline + pod-security.kubernetes.io/warn: restricted pod-security.kubernetes.io/warn-version: latest name: elisa-logbook --- @@ -18,6 +18,7 @@ kind: Deployment metadata: labels: app.kubernetes.io/app: elisa-logbook + app.kubernetes.io/name: elisa-logbook app.kubernetes.io/component: elisa-logbook name: elisa-logbook namespace: elisa-logbook @@ -26,11 +27,13 @@ spec: selector: matchLabels: app.kubernetes.io/app: elisa-logbook + app.kubernetes.io/name: elisa-logbook app.kubernetes.io/component: elisa-logbook template: metadata: labels: app.kubernetes.io/app: elisa-logbook + app.kubernetes.io/name: elisa-logbook app.kubernetes.io/component: elisa-logbook spec: affinity: @@ -41,6 +44,7 @@ spec: - key: node-role.kubernetes.io/worker operator: Exists automountServiceAccountToken: false + hostUsers: false containers: - image: ghcr.io/dune-daq/microservices:latest name: elisa-logbook @@ -62,18 +66,55 @@ spec: secretKeyRef: key: password name: elisa-logbook-secret + - name: APP_DATA + value: /opt/data + ports: + - containerPort: 5005 + name: http + protocol: TCP + startupProbe: + tcpSocket: + port: http + initialDelaySeconds: 15 + periodSeconds: 20 + livenessProbe: + tcpSocket: + port: http + initialDelaySeconds: 15 + periodSeconds: 20 + readinessProbe: + tcpSocket: + port: http + initialDelaySeconds: 15 + periodSeconds: 20 resources: limits: memory: 1Gi requests: - memory: 8Mi + cpu: 100m + memory: 64Mi securityContext: + runAsUser: 11000 + runAsGroup: 11000 + runAsNonRoot: true allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + seccompProfile: + type: RuntimeDefault capabilities: drop: - ALL - runAsGroup: 11000 - seccompProfile: - type: RuntimeDefault + volumeMounts: + - mountPath: /opt/data + name: data-volume + - name: tmp-volume + mountPath: /tmp + volumes: # persistence is not required at this time + - name: data-volume + emptyDir: + sizeLimit: 10Gi + - name: tmp-volume + emptyDir: + sizeLimit: 256Mi securityContext: fsGroup: 11000 diff --git a/elisa-logbook/elisa.py b/elisa-logbook/elisa.py index a2078d74..28599426 100644 --- a/elisa-logbook/elisa.py +++ b/elisa-logbook/elisa.py @@ -1,13 +1,12 @@ -import logging, copy, os, tempfile -from flask import Flask -from credmgr import credentials -from credmgr import CERNSessionHandler +import copy +import logging +import tempfile +from credmgr import credentials from elisa_client_api.elisa import Elisa -from elisa_client_api.searchCriteria import SearchCriteria +from elisa_client_api.exception import ElisaError from elisa_client_api.messageInsert import MessageInsert from elisa_client_api.messageReply import MessageReply -from elisa_client_api.exception import * class ElisaLogbook: @@ -26,10 +25,10 @@ def __init__(self, configuration, handler): self.session_handler = handler def start_new_thread( - self, subject: str, body: str, command: str, author: str, systems: [str] + self, subject: str, body: str, command: str, author: str, systems: list[str] ): elisa_arg = copy.deepcopy(self.elisa_arguments) - elisa_user = credentials.get_login("elisa") + credentials.get_login("elisa") with tempfile.NamedTemporaryFile() as tf: try: @@ -52,17 +51,16 @@ def start_new_thread( message.body = body answer = elisa_inst.insertMessage(message) - except ElisaError as ex: - self.log.error(f"ELisA logbook: {str(ex)}") - self.log.error(answer) - raise ex + except ElisaError: + self.log.exception("Error with ELisA logbook") + raise self.log.info(f"ELisA logbook: Sent message (ID{answer.id})") return answer.id - def reply(self, body: str, command: str, author: str, systems: [str], id: int): + def reply(self, body: str, command: str, author: str, systems: list[str], message_id: int): elisa_arg = copy.deepcopy(self.elisa_arguments) - elisa_user = credentials.get_login("elisa") + credentials.get_login("elisa") with tempfile.NamedTemporaryFile() as tf: try: @@ -74,8 +72,8 @@ def reply(self, body: str, command: str, author: str, systems: [str], id: int): elisa_arg.update(sso) elisa_inst = Elisa(**elisa_arg) answer = None - self.log.info(f"ELisA logbook: Answering to message ID{id}") - message = MessageReply(id) + self.log.info(f"ELisA logbook: Answering to message ID{message_id}") + message = MessageReply(message_id) message.author = author message.systemsAffected = systems for attr_name, attr_data in self.message_attributes[command].items(): @@ -84,12 +82,11 @@ def reply(self, body: str, command: str, author: str, systems: [str], id: int): message.body = body answer = elisa_inst.replyToMessage(message) - except ElisaError as ex: - self.log.error(f"ELisA logbook: {str(ex)}") - self.log.error(answer) - raise ex + except ElisaError: + self.log.exception("Error with ELisA logbook") + raise self.log.info( - f"ELisA logbook: Sent message (ID{answer.id}), replying to ID{id}" + f"ELisA logbook: Sent message (ID{answer.id}), replying to ID{message_id}" ) return answer.id diff --git a/elisa-logbook/entrypoint.sh b/elisa-logbook/entrypoint.sh index 0f3108f8..5cd17fd7 100755 --- a/elisa-logbook/entrypoint.sh +++ b/elisa-logbook/entrypoint.sh @@ -1,10 +1,13 @@ #!/bin/bash +set -euo pipefail -cd $(dirname $0) +cd "$(dirname "$0")" || exit 2 +if [[ ! -f ../entrypoint_functions.sh ]]; then + echo "Error: entrypoint_functions.sh not found" >&2 + exit 2 +fi source ../entrypoint_functions.sh ensure_required_variables "USERNAME PASSWORD HARDWARE" -mkdir -p ./logfiles - -python3 ./logbook.py +exec gunicorn -b 0.0.0.0:5005 --workers=1 --worker-class=gevent --timeout 9000 --log-level=debug logbook:app diff --git a/elisa-logbook/logbook.py b/elisa-logbook/logbook.py index 0a8a852d..419cb021 100644 --- a/elisa-logbook/logbook.py +++ b/elisa-logbook/logbook.py @@ -4,18 +4,19 @@ __maintainer__ = "Jonathan Hancock" __email__ = "jonathan.hancock@cern.ch" +import json import os -import argparse import re -import json -from urllib import response +import traceback +from pathlib import Path +from typing import Optional from authentication import auth -from credmgr import credentials, CERNSessionHandler +from credmgr import CERNSessionHandler, credentials from elisa import ElisaLogbook -from flask import Flask, request, jsonify, make_response -from flask_restful import Api +from flask import Flask, jsonify, make_response, request from flask_caching import Cache +from flask_restful import Api # Sets up the app and processes command line inputs app = Flask(__name__) @@ -23,33 +24,60 @@ api = Api(app) # Converts the config json into a dictionary, and gets a list of the keys -with open("elisaconf.json") as json_file: +with Path("elisaconf.json").open() as json_file: elisaconf = json.load(json_file) + keylist = elisaconf.keys() hardware_string = "Please choose from one of the following options:" for key in keylist: hardware_string += " " hardware_string += key + # We use environment variables to pass data -user_var = (os.getenv("USERNAME")).rstrip("\n") -pass_var = (os.getenv("PASSWORD")).rstrip("\n") -hard_var = (os.getenv("HARDWARE")).rstrip("\n") -app.config["USER"] = user_var -app.config["PASSWORD"] = pass_var -app.config["PATH"] = "./logfiles/" +def get_required_env(name: str, default: Optional[str] = None) -> str: + value = os.getenv(name, default) + if not value: + raise RuntimeError(f"Required environment variable {name} is not set") + return value + + +hard_var = get_required_env("HARDWARE") +app.config["PATH"] = get_required_env("APP_DATA", "./logfiles") +app.config["USER"] = get_required_env("USERNAME") +app.config["PASSWORD"] = get_required_env("PASSWORD") + try: - app.config["HARDWARECONF"] = elisaconf[ - hard_var - ] # A dictionary containing all the hardware-dependant configs -except: - bad_string = hard_var + " is not a valid choice!" - raise Exception(bad_string + hardware_string) + app.config["HARDWARECONF"] = elisaconf[hard_var] +except KeyError as exc: + raise KeyError(f"{hard_var} is not a valid choice!{hardware_string}") from exc + +Path(app.config["PATH"]).mkdir(parents=True, exist_ok=True) +if not os.access(app.config["PATH"], os.W_OK): + raise PermissionError( + f"Error: Permission denied to access the file at {app.config['PATH']}" + ) credentials.add_login("elisa", app.config["USER"], app.config["PASSWORD"], "CERN.CH") cern_auth = CERNSessionHandler(username=app.config["USER"]) logbook = ElisaLogbook(app.config["HARDWARECONF"], cern_auth) + + +# Helper function to sanitize run_type to prevent path traversal +def sanitize_run_type(run_type: str) -> str: + """ + Sanitize run_type to prevent path traversal attacks. + Only allows alphanumeric characters, hyphens, and underscores. + The first character must be alphanumeric to avoid interpretation as command-line flags. + """ + if not run_type or not re.match(r"^[a-zA-Z0-9][a-zA-Z0-9_-]*$", run_type): + raise ValueError( + "Invalid run_type: must start with alphanumeric and contain only alphanumeric characters, hyphens, or underscores" + ) + return run_type + + # Main app # The general principle is to replace the methods of each class with API methods # The first type of logging is fileLogbook, which writes logs to a given file in the current working directory. @@ -66,19 +94,29 @@ def index(): def Fmessage_on_start(): try: run_number = int(request.json["run_num"]) - except: + except (ValueError, TypeError, KeyError): error = "Run number is not an integer!" return error, 400 try: - file_path = app.config["PATH"] + f"_{run_number}_{request.json['run_type']}.txt" - f = open(file_path, "w") - f.write( - f"-- User {request.json['author']} started a run {run_number}, of type {request.json['run_type']} --\n" - ) - f.write(request.json["author"] + ": " + request.json["message"] + "\n") - f.close() - rstring = "Logfile started at " + file_path + "\n" + # Security: Sanitize run_type to prevent path traversal + run_type = sanitize_run_type(request.json["run_type"]) + + base_path = Path(app.config["PATH"]) + file_path = base_path / f"_{run_number}_{run_type}.txt" + + # Security: Verify the resolved path is within the base directory + try: + file_path.resolve().relative_to(base_path.resolve()) + except ValueError: + return "Invalid file path", 400 + + with file_path.open("w") as f: + f.write( + f"-- User {request.json['author']} started a run {run_number}, of type {run_type} --\n" + ) + f.write(request.json["author"] + ": " + request.json["message"] + "\n") + rstring = "Logfile started at " + str(file_path) + "\n" return rstring, 201 except Exception as e: return str(e), 400 @@ -89,22 +127,34 @@ def Fmessage_on_start(): @auth.login_required def Fadd_message(): try: - file_path = ( - app.config["PATH"] - + f"_{request.json['run_num']}_{request.json['run_type']}.txt" - ) + run_number = int(request.json["run_num"]) + except (ValueError, TypeError, KeyError): + error = "Run number is not an integer!" + return error, 400 + + try: + # Security: Sanitize run_type to prevent path traversal + run_type = sanitize_run_type(request.json["run_type"]) + + base_path = Path(app.config["PATH"]) + file_path = base_path / f"_{run_number}_{run_type}.txt" + + # Security: Verify the resolved path is within the base directory + try: + file_path.resolve().relative_to(base_path.resolve()) + except ValueError: + return "Invalid file path", 400 except Exception as e: return str(e), 400 - if os.path.exists(file_path): - f = open(file_path, "a") - else: + if not file_path.exists(): error = "File not found!" return error, 404 - f.write(request.json["author"] + ": " + request.json["message"] + "\n") - f.close() - rstring = "Logfile updated at " + file_path + "\n" + with file_path.open("a") as f: + f.write(request.json["author"] + ": " + request.json["message"] + "\n") + + rstring = "Logfile updated at " + str(file_path) + "\n" return rstring, 200 @@ -113,25 +163,37 @@ def Fadd_message(): @auth.login_required def Fmessage_on_stop(): try: - file_path = ( - app.config["PATH"] - + f"_{request.json['run_num']}_{request.json['run_type']}.txt" - ) + run_num = int(request.json["run_num"]) + except (ValueError, TypeError, KeyError): + error = "Run number is not an integer!" + return error, 400 + + try: + # Security: Sanitize run_type to prevent path traversal + run_type = sanitize_run_type(request.json["run_type"]) + + base_path = Path(app.config["PATH"]) + file_path = base_path / f"_{run_num}_{run_type}.txt" + + # Security: Verify the resolved path is within the base directory + try: + file_path.resolve().relative_to(base_path.resolve()) + except ValueError: + return "Invalid file path", 400 except Exception as e: return str(e), 400 - if os.path.exists(file_path): - f = open(file_path, "a") - else: + if not file_path.exists(): error = "File not found!" return error, 404 - f.write( - f"-- User {request.json['author']} stopped the run {request.json['run_num']}, of type {request.json['run_type']} --\n" - ) - f.write(request.json["author"] + ": " + request.json["message"] + "\n") - f.close() - rstring = "Log stopped at " + file_path + "\n" + with file_path.open("a") as f: + f.write( + f"-- User {request.json['author']} stopped the run {run_num}, of type {run_type} --\n" + ) + f.write(request.json["author"] + ": " + request.json["message"] + "\n") + + rstring = "Log stopped at " + str(file_path) + "\n" return rstring, 200 @@ -169,9 +231,7 @@ def new_message(): author=request.json["author"], systems=sys_list, ) - except Exception as e: - import traceback - + except Exception: traceback.print_exc() stack = traceback.format_exc().split("\n") resp = make_response(jsonify(stacktrace=stack)) @@ -216,9 +276,7 @@ def reply_to_message(): systems=sys_list, id=request.json["id"], ) - except Exception as e: - import traceback - + except Exception: traceback.print_exc() stack = traceback.format_exc().split("\n") resp = make_response(jsonify(stacktrace=stack)) diff --git a/entrypoint.sh b/entrypoint.sh index a433f9f7..02a4a904 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -1,22 +1,34 @@ #!/bin/bash +set -euo pipefail -cd $(dirname $0) +if [[ ! -e ./entrypoint_functions.sh ]]; then + echo "This script should be run from the top of the microservices repo" >&2 + exit 2 +fi source ./entrypoint_functions.sh ensure_required_variables "MICROSERVICE" -microservice_dir=$(dirname $0)/$MICROSERVICE - -if [[ ! -e ${microservice_dir}/entrypoint.sh ]]; then - echo "This script sees the MICROSERVICE environment variable set to \"$MICROSERVICE\" but is unable to find the corresponding entrypoint script \"${microservice_dir}/entrypoint.sh\"" >&2 +# Validate MICROSERVICE is a safe directory name +if [[ ! "${MICROSERVICE}" =~ ^[a-zA-Z0-9_-]+$ ]]; then + echo "ERROR: MICROSERVICE must contain only alphanumeric characters, hyphens, and underscores" >&2 exit 2 fi -cd $microservice_dir +microservice_dir="$(pwd)/${MICROSERVICE}" + +# Verify entrypoint.sh exists in target service +if [[ ! -e "${microservice_dir}/entrypoint.sh" ]]; then + echo "This script sees the MICROSERVICE environment variable set to \"${MICROSERVICE}\" but is unable to find the corresponding entrypoint script \"${microservice_dir}/entrypoint.sh\"" >&2 + exit 2 +fi -./entrypoint.sh +# Verify entrypoint.sh is executable +if [[ ! -x "${microservice_dir}/entrypoint.sh" ]]; then + echo "${microservice_dir}/entrypoint.sh is not executable" >&2 + exit 2 +fi -retval=$? -echo "Return value of call to ${microservice_dir}/entrypoint.sh is $retval" +cd "${microservice_dir}" || exit 2 -exit $retval +exec "${microservice_dir}/entrypoint.sh" diff --git a/entrypoint_functions.sh b/entrypoint_functions.sh index 6685b93f..2d781ded 100644 --- a/entrypoint_functions.sh +++ b/entrypoint_functions.sh @@ -1,23 +1,45 @@ +#!/bin/bash +####################################### +# Validates that required environment variables are defined +# Arguments: +# Space-separated string of variable names +# Example usage: +# ensure_required_variables "USER HOME PASSWORD API_TOKEN" +# Returns: +# 0 if all variables are defined +# 3 if any variables are missing +####################################### function ensure_required_variables() { + local vars_as_string="${1}" + local -a vars + local var + local missing_variable=0 - vars_as_string=$1 + # Parse the space-separated variable names + IFS=' ' read -ra vars <<<"${vars_as_string}" - IFS=' ' read -ra vars <<<"$vars_as_string" - - missing_variable=false + echo "Checking for required environment variables..." + echo "----------------------------------------------" + # Check each variable for var in "${vars[@]}"; do - - if [[ -v $var ]]; then - echo "$var is defined as \"${!var}\"." + # Verify if variable is defined + if [[ -n "${var}" && -n "${!var-}" ]]; then + echo " ${var} is defined" else - echo "$var needs to be defined as an environment variable." - missing_variable=true + echo " XXX ${var} is NOT defined or empty" + missing_variable=1 fi done - if $missing_variable; then - echo "One or more required environment variables is undefined; exiting..." >&2 + echo "----------------------------------------------" + + # Exit if any variables are missing + if [[ "${missing_variable}" -ne 0 ]]; then + echo "ERROR: One or more required environment variables are undefined" >&2 + echo "Please define the missing variables and try again" >&2 exit 3 fi + + return 0 } diff --git a/ers-dbwriter/dbwriter.py b/ers-dbwriter/dbwriter.py deleted file mode 100644 index d7114eda..00000000 --- a/ers-dbwriter/dbwriter.py +++ /dev/null @@ -1,127 +0,0 @@ -# @file dbwriter.py Writing ERS info to PostgreSQL database -# This is part of the DUNE DAQ software, copyright 2020. -# Licensing/copyright details are in the COPYING file that you should have -# received with this code. -# - -from kafka import KafkaConsumer -import psycopg2 -import json -import os - - -def clean_database(cursor, connection): - cursor.execute(""" - DROP TABLE public."ErrorReports"; - """) - connection.commit() - - -def create_database(cursor, connection): - cursor.execute(""" - CREATE TABLE public."ErrorReports" ( - partition TEXT, - issue_name TEXT, - message TEXT, - severity TEXT, - usecs_since_epoch BIGINT, - time BIGINT, - qualifiers TEXT, - params TEXT, - cwd TEXT, - file_name TEXT, - function_name TEXT, - host_name TEXT, - package_name TEXT, - user_name TEXT, - application_name TEXT, - user_id INT, - process_id INT, - thread_id INT, - line_number INT, - chain TEXT - ); - """) - connection.commit() - - -def main(): - host = os.environ["ERS_DBWRITER_HOST"] - port = os.environ["ERS_DBWRITER_PORT"] - user = os.environ["ERS_DBWRITER_USER"] - password = os.environ["ERS_DBWRITER_PASS"] - dbname = os.environ["ERS_DBWRITER_NAME"] - kafka_bootstrap = os.environ.get( - "ERS_DBWRITER_KAFKA_BOOTSTRAP_SERVER", "monkafka.cern.ch:30092" - ) - - consumer = KafkaConsumer( - "erskafka-reporting", bootstrap_servers=kafka_bootstrap, group_id="ers-dbwriter" - ) - - try: - con = psycopg2.connect( - host=host, port=port, user=user, password=password, dbname=dbname - ) - except: - print("Connection to the database failed, aborting...") - exit() - - # These are the fields in the ERS messages, see erskafka/src/KafkaStream.cpp - fields = [ - "partition", - "issue_name", - "message", - "severity", - "usecs_since_epoch", - "time", - "qualifiers", - "params", - "cwd", - "file_name", - "function_name", - "host_name", - "package_name", - "user_name", - "application_name", - "user_id", - "process_id", - "thread_id", - "line_number", - "chain", - ] - - cur = con.cursor() - - try: # try to make sure tables exist - create_database(cur, con) - except: - # if this errors out it may be because the database is already there - con.rollback() - - # Infinite loop over the kafka messages - for message in consumer: - print(message) - js = json.loads(message.value) - if js == "[]": - continue - ls = [str(js[key]) for key in fields] - - try: - cur.execute( - f'INSERT INTO public."ErrorReports" ({",".join(fields)}) VALUES({("%s, " * len(ls))[:-2]})', - ls, - ) - # Save the insert (or any change) to the database - con.commit() - except psycopg2.errors.UndefinedTable: - con.rollback() - create_database(cur, con) - except psycopg2.errors.UndefinedColumn: - con.rollback() - clean_database(cur, con) - create_database(cur, con) - - -if __name__ == "__main__": - main() diff --git a/ers-dbwriter/entrypoint.sh b/ers-dbwriter/entrypoint.sh deleted file mode 100755 index 7dc0eb2d..00000000 --- a/ers-dbwriter/entrypoint.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/bash - -cd $(dirname $0) -source ../entrypoint_functions.sh - -ensure_required_variables "ERS_DBWRITER_HOST ERS_DBWRITER_PORT ERS_DBWRITER_USER ERS_DBWRITER_PASS ERS_DBWRITER_NAME ERS_DBWRITER_KAFKA_BOOTSTRAP_SERVER" - -python3 ./dbwriter.py diff --git a/ers-dbwriter/ers-dbwriter-deployment.yaml b/ers-dbwriter/ers-dbwriter-deployment.yaml deleted file mode 100644 index 13e0e0c7..00000000 --- a/ers-dbwriter/ers-dbwriter-deployment.yaml +++ /dev/null @@ -1,93 +0,0 @@ -apiVersion: v1 -kind: Namespace -metadata: - annotations: - kluctl.io/skip-delete-if-tags: "true" - labels: - pod-security.kubernetes.io/audit: baseline - pod-security.kubernetes.io/audit-version: latest - pod-security.kubernetes.io/enforce: baseline # unified image runs as root :( - pod-security.kubernetes.io/enforce-version: latest - pod-security.kubernetes.io/warn: baseline - pod-security.kubernetes.io/warn-version: latest - name: ers ---- -# You must still deploy kafka and postgresql with their upstream manifests -# You must still create the required secret -apiVersion: apps/v1 -kind: Deployment -metadata: - labels: - app.kubernetes.io/app: ers-dbwriter - app.kubernetes.io/component: ers-dbwriter - name: ers-kafka-dbwriter - namespace: ers -spec: - replicas: 1 - selector: - matchLabels: - app: erskafka-dbwriter - template: - metadata: - labels: - app: erskafka-dbwriter - app.kubernetes.io/app: ers-dbwriter - app.kubernetes.io/component: ers-dbwriter - spec: - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: node-role.kubernetes.io/worker - operator: Exists - automountServiceAccountToken: false - containers: - - image: ghcr.io/dune-daq/microservices:9e36 - imagePullPolicy: Always - name: erskafka-dbwriter - env: - - name: MICROSERVICE - value: ers-dbwriter - - name: ERS_DBWRITER_KAFKA_BOOTSTRAP_SERVER - value: dune-daq.kafka.svc.cluster.local:9092 - - name: ERS_DBWRITER_HOST - valueFrom: - secretKeyRef: - key: host - name: ers-postgresql-svcbind-custom-user - - name: ERS_DBWRITER_PORT - valueFrom: - secretKeyRef: - key: port - name: ers-postgresql-svcbind-custom-user - - name: ERS_DBWRITER_USER - valueFrom: - secretKeyRef: - key: username - name: ers-postgresql-svcbind-custom-user - - name: ERS_DBWRITER_PASS - valueFrom: - secretKeyRef: - key: password - name: ers-postgresql-svcbind-custom-user - - name: ERS_DBWRITER_NAME - valueFrom: - secretKeyRef: - key: database - name: ers-postgresql-svcbind-custom-user - resources: - limits: - memory: 1Gi - requests: - memory: 8Mi - securityContext: - allowPrivilegeEscalation: false - capabilities: - drop: - - ALL - runAsGroup: 11000 - seccompProfile: - type: RuntimeDefault - securityContext: - fsGroup: 11000 diff --git a/ers-protobuf-dbwriter/dbwriter.py b/ers-protobuf-dbwriter/dbwriter.py index 691a66c3..f6e02108 100644 --- a/ers-protobuf-dbwriter/dbwriter.py +++ b/ers-protobuf-dbwriter/dbwriter.py @@ -1,20 +1,33 @@ -# @file dbwriter.py Writing ERS schemas info to PostgreSQL database +# @file dbwriter.py Writing ERS schemas info to database using SQLAlchemy # This is part of the DUNE DAQ software, copyright 2020. # Licensing/copyright details are in the COPYING file that you should have # received with this code. # -import erskafka.ERSSubscriber as erssub -import ers.issue_pb2 as ersissue -import google.protobuf.json_format as pb_json -from functools import partial -import psycopg2 -import json -import click import logging +import re +import sys +import time +from functools import partial +import click +import erskafka.ERSSubscriber as erssub +import google.protobuf.json_format as pb_json +from sqlalchemy import ( + BigInteger, + Column, + Integer, + MetaData, + Table, + Text, + create_engine, + inspect, +) +from sqlalchemy.exc import OperationalError, ProgrammingError, SQLAlchemyError CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) +MAX_RETRIES = 3 +logger = logging.getLogger(__name__) @click.command(context_settings=CONTEXT_SETTINGS) @@ -22,7 +35,7 @@ "--subscriber-bootstrap", type=click.STRING, default="monkafka.cern.ch:30092", - help="boostrap server and port of the ERSSubscriber", + help="bootstrap server and port of the ERSSubscriber", ) @click.option( "--subscriber-group", @@ -37,45 +50,23 @@ help="timeout in ms used in the ERSSubscriber", ) @click.option( - "--db-address", - required=True, - type=click.STRING, - help="address of the PostgreSQL db", -) -@click.option( - "--db-port", required=True, type=click.STRING, help="port of the PostgreSQL db" -) -@click.option( - "--db-user", - required=True, - type=click.STRING, - help="user for login to the PostgreSQL db", -) -@click.option( - "--db-password", + "--db-uri", required=True, type=click.STRING, - help="password for login to the PostgreSQL db", -) -@click.option( - "--db-name", required=True, type=click.STRING, help="name of the PostgreSQL db" + help="SQLAlchemy database URI (e.g., postgresql://user:pass@host:port/dbname)", ) @click.option( "--db-table", required=True, type=click.STRING, - help="name of table used in the PostgreSQL db", + help="name of table used in the database", ) @click.option("--debug", type=click.BOOL, default=True, help="Set debug print levels") def cli( subscriber_bootstrap, subscriber_group, subscriber_timeout, - db_address, - db_port, - db_user, - db_password, - db_name, + db_uri, db_table, debug, ): @@ -85,37 +76,28 @@ def cli( datefmt="%Y-%m-%d %H:%M:%S", ) + if not re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*", db_table): + logger.fatal("Invalid --db-table name; use letters/numbers/underscore only") + sys.exit(2) + + metadata = MetaData() try: - con = psycopg2.connect( - host=db_address, - port=db_port, - user=db_user, - password=db_password, - dbname=db_name, + engine = create_engine( + db_uri, + pool_size=5, + max_overflow=10, + pool_pre_ping=True, + pool_recycle=3600, ) - except Exception as e: - logging.error(e) - logging.fatal("Connection to the database failed, aborting...") - exit() - - global table_name - table_name = '"' + db_table + '"' + issues_table = create_database_table(metadata, db_table, engine) + except SQLAlchemyError: + logger.exception("Failed to connect to database") + logger.fatal("Connection to the database failed, aborting...") + sys.exit(1) - cur = con.cursor() + check_tables(engine=engine) - try: # try to make sure tables exist - create_database(cursor=cur, connection=con) - except: - con.rollback() - logging.info("Database was already created") - else: - logging.info("Database creation: Success") - finally: - logging.info("Database is ready") - - check_tables(cursor=cur, connection=con) - - subscriber_conf = json.loads("{}") + subscriber_conf = {} subscriber_conf["bootstrap"] = subscriber_bootstrap subscriber_conf["timeout"] = subscriber_timeout if subscriber_group: @@ -123,138 +105,200 @@ def cli( sub = erssub.ERSSubscriber(subscriber_conf) - callback_function = partial(process_chain, cursor=cur, connection=con) + callback_function = partial(process_chain, engine=engine, issues_table=issues_table) - sub.add_callback(name="postgres", function=callback_function) + sub.add_callback(name="database", function=callback_function) sub.start() -def process_chain(chain, cursor, connection): - logging.debug(chain) +def process_chain(chain, engine, issues_table): + """Process a chain of issues and persist to database with retry logic""" + logger.debug(chain) - counter = 0 - success = False - while not success: - counter += 1 + table_recreated = False # Track if we've already recreated the table + + for attempt in range(1, MAX_RETRIES + 1): try: - for cause in reversed(chain.causes): - process_issue(issue=cause, session=chain.session, cursor=cursor) - - process_issue(issue=chain.final, session=chain.session, cursor=cursor) - connection.commit() - except psycopg2.errors.UndefinedTable as e: - logging.error(e) - logging.error( - "Table was undefined yet it was supposed to be defined at this point" + with engine.connect() as connection: + # Process all causes in reverse order + for cause in reversed(chain.causes): + with connection.begin(): + process_issue( + issue=cause, + session=chain.session, + connection=connection, + issues_table=issues_table, + ) + + # Process final issue + with connection.begin(): + process_issue( + issue=chain.final, + session=chain.session, + connection=connection, + issues_table=issues_table, + ) + + # Success - exit the retry loop + logger.debug(f"Entry sent successfully after {attempt} attempt(s)") + return True + + except ProgrammingError: + # ProgrammingError covers missing tables/columns, schema issues + logger.exception(f"Programming error on attempt {attempt}") + + if not table_recreated: + logger.warning( + "Schema/table issue detected, recreating table (one-time)" + ) + try: + clean_database(issues_table, engine) + issues_table.metadata.create_all(engine) + table_recreated = True + continue # retry after recreation + except Exception: + logger.exception("Failed to recreate table") + if attempt >= MAX_RETRIES: + logger.error( # noqa:TRY400 + "Failed to deliver issue after all retry attempts. Chain:\n%s", + pb_json.MessageToJson(chain), + ) + raise + continue + + # Table already recreated → persistent schema problem + logger.exception("Table already recreated, schema issue persists") + if attempt >= MAX_RETRIES: + logger.error( # noqa:TRY400 + "Failed to deliver issue after all retry attempts. Chain:\n%s", + pb_json.MessageToJson(chain), + ) + raise + + # Exponential backoff for transient issues, but never a huge number + time.sleep(min(0.5 * (2**attempt), 5.0)) + continue + + except OperationalError: + # OperationalError covers connection issues, locks, timeouts + logger.exception(f"Operational error on attempt {attempt}") + + if attempt >= MAX_RETRIES: + logger.error( # noqa:TRY400 + "Failed to deliver issue after all retry attempts. Chain:\n%s", + pb_json.MessageToJson(chain), + ) + raise + + # Exponential backoff for transient issues, but never a huge number + time.sleep(min(0.5 * (2**attempt), 5.0)) + continue + + except SQLAlchemyError: + # Catch-all for other SQLAlchemy errors + logger.exception(f"SQLAlchemy error on attempt {attempt}") + if attempt >= MAX_RETRIES: + logger.error( # noqa:TRY400 + "Failed to deliver issue after all retry attempts. Chain:\n%s", + pb_json.MessageToJson(chain), + ) + raise + + # Exponential backoff for transient issues, but never a huge number + time.sleep(min(0.5 * (2**attempt), 5.0)) + continue + + except Exception: + # Unexpected errors shouldn't be retried + logger.exception( + "Unexpected error on attempt %d\nFailed to deliver issue due to unexpected error\nChain:\n%s", + attempt, + pb_json.MessageToJson(chain), ) - connection.rollback() - create_database(cursor=cursor, connection=connection) - except psycopg2.errors.UndefinedColumn as e: - logging.warning(e) - connection.rollback() - clean_database(cursor=cursor, connection=connection) - create_database(cursor=cursor, connection=connection) - except Exception as e: - logging.error("Something unexpected happened") - logging.error(e) - - else: - success = True - logging.debug(f"Entry sent after {counter} attempts") - - if counter > 2: - if not success: - logging.error("Issue failed to be delivered") - logging.error(pb_json.MessageToJson(chain)) - break - - -def process_issue(issue, session, cursor): - fields = [] - values = [] - - ## top level info - add_entry("session", session, fields, values) - add_entry("issue_name", issue.name, fields, values) - add_entry("severity", issue.severity, fields, values) - add_entry("time", issue.time, fields, values) + # Do not backoff here! + raise - ## context related info - add_entry("cwd", issue.context.cwd, fields, values) - add_entry("file_name", issue.context.file_name, fields, values) - add_entry("function_name", issue.context.function_name, fields, values) - add_entry("host_name", issue.context.host_name, fields, values) - add_entry("line_number", issue.context.line_number, fields, values) - add_entry("package_name", issue.context.package_name, fields, values) - - add_entry("process_id", issue.context.process_id, fields, values) - add_entry("thread_id", issue.context.thread_id, fields, values) - add_entry("user_id", issue.context.user_id, fields, values) - add_entry("user_name", issue.context.user_name, fields, values) - add_entry("application_name", issue.context.application_name, fields, values) + # This should never be reached due to the raise in MAX_RETRIES checks, + # but include as a safety fallback + logger.error("Failed to deliver issue after all retry attempts") + logger.error(pb_json.MessageToJson(chain)) + raise RuntimeError("Failed to deliver issue after all retry attempts") - # heavy information - add_entry("inheritance", "/".join(issue.inheritance), fields, values) - add_entry("message", issue.message, fields, values) - add_entry("params", issue.parameters, fields, values) - command = f"INSERT INTO {table_name} ({','.join(fields)}) VALUES ({('%s, ' * len(values))[:-2]});" - - logging.debug(command) - cursor.execute(command, values) +def process_issue(issue, session, connection, issues_table): + values = {} + ## top level info + values["session"] = str(session) + values["issue_name"] = str(issue.name) + values["severity"] = str(issue.severity) + values["time"] = issue.time -def add_entry(field, value, fields, values): - fields.append(field) - values.append(str(value)) + ## context related info + values["cwd"] = str(issue.context.cwd) + values["file_name"] = str(issue.context.file_name) + values["function_name"] = str(issue.context.function_name) + values["host_name"] = str(issue.context.host_name) + values["line_number"] = issue.context.line_number + values["package_name"] = str(issue.context.package_name) + + values["process_id"] = issue.context.process_id + values["thread_id"] = issue.context.thread_id + values["user_id"] = issue.context.user_id + values["user_name"] = str(issue.context.user_name) + values["application_name"] = str(issue.context.application_name) + # heavy information + values["inheritance"] = "/".join(issue.inheritance) + values["message"] = str(issue.message) + values["params"] = str(issue.parameters) -def clean_database(cursor, connection): - command = f"DROP TABLE {table_name} ;" + ins = issues_table.insert().values(**values) + logger.debug(str(ins)) + connection.execute(ins) - logging.debug(command) - cursor.execute(command) - connection.commit() +def clean_database(issues_table, engine): + issues_table.drop(engine, checkfirst=True) + logger.debug(f"Dropped table {issues_table.name}") -def check_tables(cursor, connection): - command = """SELECT relname FROM pg_class WHERE relkind='r' - AND relname !~ '^(pg_|sql_)';""" - logging.debug(command) - cursor.execute(command) - tables = [i[0] for i in cursor.fetchall()] # A list() of tables. - logging.info(f"Tables: {tables}") +def check_tables(engine): + inspector = inspect(engine) + tables = inspector.get_table_names() + logger.info(f"Tables: {tables}") return tables -def create_database(cursor, connection): - command = f"CREATE TABLE {table_name} (" - command += """ - session TEXT, - issue_name TEXT, - inheritance TEXT, - message TEXT, - params TEXT, - severity TEXT, - time BIGINT, - cwd TEXT, - file_name TEXT, - function_name TEXT, - host_name TEXT, - package_name TEXT, - user_name TEXT, - application_name TEXT, - user_id INT, - process_id INT, - thread_id INT, - line_number INT - ); """ - - logging.debug(command) - cursor.execute(command) - connection.commit() +def create_database_table(metadata, table_name, engine): + issues_table = Table( + table_name, + metadata, + Column("session", Text), + Column("issue_name", Text), + Column("inheritance", Text), + Column("message", Text), + Column("params", Text), + Column("severity", Text), + Column("time", BigInteger), + Column("cwd", Text), + Column("file_name", Text), + Column("function_name", Text), + Column("host_name", Text), + Column("package_name", Text), + Column("user_name", Text), + Column("application_name", Text), + Column("user_id", Integer), + Column("process_id", Integer), + Column("thread_id", Integer), + Column("line_number", Integer), + ) + + metadata.create_all(engine, checkfirst=True) + logger.info("Database is ready") + logger.debug(f"Created table {table_name}") + return issues_table if __name__ == "__main__": diff --git a/ers-protobuf-dbwriter/entrypoint.sh b/ers-protobuf-dbwriter/entrypoint.sh index f56ee1d8..db3bd3fd 100755 --- a/ers-protobuf-dbwriter/entrypoint.sh +++ b/ers-protobuf-dbwriter/entrypoint.sh @@ -1,13 +1,18 @@ #!/bin/bash +set -euo pipefail -cd $(dirname $0) +cd "$(dirname "$0")" || exit 2 +if [[ ! -f ../entrypoint_functions.sh ]]; then + echo "Error: entrypoint_functions.sh not found" >&2 + exit 2 +fi source ../entrypoint_functions.sh -ensure_required_variables "ERS_DBWRITER_HOST ERS_DBWRITER_PORT ERS_DBWRITER_USER ERS_DBWRITER_PASS ERS_DBWRITER_NAME ERS_DBWRITER_KAFKA_BOOTSTRAP_SERVER ERS_TABLE_NAME ERS_DBWRITER_KAFKA_TIMEOUT_MS ERS_DBWRITER_KAFKA_GROUP " +ensure_required_variables "ERS_DBWRITER_KAFKA_BOOTSTRAP_SERVER ERS_DBWRITER_KAFKA_TIMEOUT_MS ERS_DBWRITER_KAFKA_GROUP DATABASE_URI ERS_DBWRITER_DB_TABLENAME" -python3 ./dbwriter.py --subscriber-bootstrap $ERS_DBWRITER_KAFKA_BOOTSTRAP_SERVER \ - --subscriber-group $ERS_DBWRITER_KAFKA_GROUP --subscriber-timeout $ERS_DBWRITER_KAFKA_TIMEOUT_MS \ - --db-address $ERS_DBWRITER_HOST --db-port $ERS_DBWRITER_PORT \ - --db-user $ERS_DBWRITER_USER --db-password $ERS_DBWRITER_PASS \ - --db-name $ERS_DBWRITER_NAME --db-table $ERS_TABLE_NAME \ - --debug False +exec python3 ./dbwriter.py --subscriber-bootstrap "${ERS_DBWRITER_KAFKA_BOOTSTRAP_SERVER}" \ + --subscriber-group "${ERS_DBWRITER_KAFKA_GROUP}" \ + --subscriber-timeout "${ERS_DBWRITER_KAFKA_TIMEOUT_MS}" \ + --db-uri "${DATABASE_URI}" \ + --db-table "${ERS_DBWRITER_DB_TABLENAME}" \ + --debug False diff --git a/ers-protobuf-dbwriter/ers-dbwriter-deployment.yaml b/ers-protobuf-dbwriter/ers-dbwriter-deployment.yaml index 44f84b88..1bf40ed3 100644 --- a/ers-protobuf-dbwriter/ers-dbwriter-deployment.yaml +++ b/ers-protobuf-dbwriter/ers-dbwriter-deployment.yaml @@ -4,11 +4,11 @@ metadata: annotations: kluctl.io/skip-delete-if-tags: "true" labels: - pod-security.kubernetes.io/audit: baseline + pod-security.kubernetes.io/audit: restricted pod-security.kubernetes.io/audit-version: latest - pod-security.kubernetes.io/enforce: baseline # unified image runs as root :( + pod-security.kubernetes.io/enforce: restricted pod-security.kubernetes.io/enforce-version: latest - pod-security.kubernetes.io/warn: baseline + pod-security.kubernetes.io/warn: restricted pod-security.kubernetes.io/warn-version: latest name: ers --- @@ -19,6 +19,7 @@ kind: Deployment metadata: labels: app.kubernetes.io/app: ers-protobuf-dbwriter + app.kubernetes.io/name: ers-protobuf-dbwriter app.kubernetes.io/component: ers-protobuf-dbwriter name: ersstream-dbwriter namespace: ers @@ -26,12 +27,14 @@ spec: replicas: 1 selector: matchLabels: - app: ersprotobuf-dbwriter + app.kubernetes.io/app: ers-protobuf-dbwriter + app.kubernetes.io/name: ers-protobuf-dbwriter + app.kubernetes.io/component: ers-protobuf-dbwriter template: metadata: labels: - app: ersprotobuf-dbwriter app.kubernetes.io/app: ers-protobuf-dbwriter + app.kubernetes.io/name: ers-protobuf-dbwriter app.kubernetes.io/component: ers-protobuf-dbwriter spec: affinity: @@ -42,58 +45,50 @@ spec: - key: node-role.kubernetes.io/worker operator: Exists automountServiceAccountToken: false + hostUsers: false containers: - image: ghcr.io/dune-daq/microservices:9685 imagePullPolicy: Always - name: ersprotobuf-dbwriter + name: ers-protobuf-dbwriter env: - name: MICROSERVICE value: ers-protobuf-dbwriter - name: ERS_DBWRITER_KAFKA_BOOTSTRAP_SERVER value: dune-daq.kafka.svc.cluster.local:9092 - - name: ERS_TABLE_NAME - value: ERSstream - name: ERS_DBWRITER_KAFKA_TIMEOUT_MS value: "500" - name: ERS_DBWRITER_KAFKA_GROUP value: ers-protobuf-dbwriter - - name: ERS_DBWRITER_HOST - valueFrom: - secretKeyRef: - key: host - name: ers-postgresql-svcbind-custom-user - - name: ERS_DBWRITER_PORT - valueFrom: - secretKeyRef: - key: port - name: ers-postgresql-svcbind-custom-user - - name: ERS_DBWRITER_USER - valueFrom: - secretKeyRef: - key: username - name: ers-postgresql-svcbind-custom-user - - name: ERS_DBWRITER_PASS - valueFrom: - secretKeyRef: - key: password - name: ers-postgresql-svcbind-custom-user - - name: ERS_DBWRITER_NAME + - name: ERS_DBWRITER_DB_TABLENAME + value: ERSstream + - name: DATABASE_URI valueFrom: secretKeyRef: - key: database + key: uri name: ers-postgresql-svcbind-custom-user resources: limits: memory: 1Gi requests: - memory: 8Mi + cpu: 100m + memory: 64Mi securityContext: + runAsUser: 11000 + runAsGroup: 11000 + runAsNonRoot: true allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + seccompProfile: + type: RuntimeDefault capabilities: drop: - ALL - runAsGroup: 11000 - seccompProfile: - type: RuntimeDefault + volumeMounts: + - name: tmp-volume + mountPath: /tmp + volumes: + - name: tmp-volume + emptyDir: + sizeLimit: 256Mi securityContext: fsGroup: 11000 diff --git a/opmon-dbwriter/entrypoint.sh b/opmon-dbwriter/entrypoint.sh deleted file mode 100755 index 63a6b62d..00000000 --- a/opmon-dbwriter/entrypoint.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/bash - -cd $(dirname $0) -source ../entrypoint_functions.sh - -ensure_required_variables "" - -python3 ./kafka-to-influx.py diff --git a/opmon-dbwriter/kafka-to-influx.py b/opmon-dbwriter/kafka-to-influx.py deleted file mode 100644 index c0f2f820..00000000 --- a/opmon-dbwriter/kafka-to-influx.py +++ /dev/null @@ -1,154 +0,0 @@ -# -# @file kafka-to-influx.py Writing opmon info to influx database -# This is part of the DUNE DAQ software, copyright 2020. -# Licensing/copyright details are in the COPYING file that you should have -# received with this code. -# - -from kafka import KafkaConsumer -from influxdb import InfluxDBClient -import influxdb -import json -import click - -CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) - - -@click.command(context_settings=CONTEXT_SETTINGS) -@click.option( - "--kafka-address", - type=click.STRING, - default="monkafka.cern.ch", - help="address of the kafka broker", -) -@click.option( - "--kafka-port", type=click.INT, default=30092, help="port of the kafka broker" -) -@click.option( - "--kafka-topics", - multiple=True, - default=["opmon"], - help="topics of the kafka broker", -) -@click.option( - "--kafka-consumer-id", - type=click.STRING, - default="microservice", - help="id of the kafka consumer, not really important", -) -@click.option( - "--kafka-consumer-group", - type=click.STRING, - default="opmon_microservice", - help="group ID of the kafka consumer, very important to be unique or information will not be duplicated", -) -@click.option( - "--kafka-timeout", - type=click.INT, - default=1000, - help="batch sizes in ms to send data to influx", -) -@click.option( - "--batch_size", - type=click.INT, - default=1000, - help="batch sizes to send data to influx", -) -@click.option( - "--influxdb-address", - type=click.STRING, - default="opmondb.cern.ch", - help="address of the influx db", -) -@click.option( - "--influxdb-port", type=click.INT, default=31002, help="port of the influxdb" -) -@click.option( - "--influxdb-name", - type=click.STRING, - default="influxv3", - help="name used in the influxdb query", -) -@click.option( - "--influxdb-create", - type=click.BOOL, - default=True, - help="Creates the influxdb if it does not exists", -) -def cli( - kafka_address, - kafka_port, - kafka_topics, - kafka_consumer_id, - kafka_consumer_group, - kafka_timeout, - batch_size, - influxdb_address, - influxdb_port, - influxdb_name, - influxdb_create, -): - bootstrap = f"{kafka_address}:{kafka_port}" - print("From Kafka server:", bootstrap) - - consumer = KafkaConsumer( - bootstrap_servers=bootstrap, - group_id=kafka_consumer_group, - client_id=kafka_consumer_id, - consumer_timeout_ms=kafka_timeout, - ) - - print("Consuming topics:", kafka_topics) - consumer.subscribe(kafka_topics) - - influx = InfluxDBClient(host=influxdb_address, port=influxdb_port) - db_list = influx.get_list_database() - print("Available DBs:", db_list) - if {"name": influxdb_name} not in db_list: - print(influxdb_name, "DB not available") - if influxdb_create: - influx.create_database(influxdb_name) - print("New list of DBs:", influx.get_list_database()) - - influx.switch_database(influxdb_name) - - while True: - # Infinite loop over the kafka messages - batch = [] - timestamp = 0 - try: - message_it = iter(consumer) - message = next(message_it) - # print(message) - js = json.loads(message.value) - batch.append(js) - timestamp = message.timestamp - # print(js) - # print(timestamp) - - except: - print("Nothing found") - - for message in consumer: - js = json.loads(message.value) - batch.append(js) - if message.timestamp != timestamp: - break - - if len(batch) > 0: - print("Sending", len(batch), "points") - ## influxdb implementation - try: - influx.write_points(batch) - except influxdb.exceptions.InfluxDBClientError as e: - print(e) - except: - print("Something went wrong: json batch not sent") - - -# else : -# print("Nothing is received") - - -if __name__ == "__main__": - cli(show_default=True, standalone_mode=True) diff --git a/opmon-protobuf-dbwriter/dbwriter.py b/opmon-protobuf-dbwriter/dbwriter.py index 321ab9a6..2ffe6d61 100644 --- a/opmon-protobuf-dbwriter/dbwriter.py +++ b/opmon-protobuf-dbwriter/dbwriter.py @@ -4,22 +4,20 @@ # received with this code. # -import kafkaopmon.OpMonSubscriber as opmon_sub -import google.protobuf.json_format as pb_json -from google.protobuf.timestamp_pb2 import Timestamp -import opmonlib.opmon_entry_pb2 as opmon_schema - -from influxdb import InfluxDBClient -import influxdb -from functools import partial -import json -import click import logging import queue import threading +from functools import partial +from urllib.parse import urlparse +import click +import influxdb +import kafkaopmon.OpMonSubscriber as opmon_sub +import opmonlib.opmon_entry_pb2 as opmon_schema +from influxdb import InfluxDBClient CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) +logger = logging.getLogger(__name__) @click.command(context_settings=CONTEXT_SETTINGS) @@ -51,19 +49,10 @@ ) # influx options @click.option( - "--influxdb-address", - type=click.STRING, - default="monkafka.cern.ch", - help="address of the influx db", -) -@click.option( - "--influxdb-port", type=click.INT, default=31002, help="port of the influxdb" -) -@click.option( - "--influxdb-name", + "--influxdb-uri", type=click.STRING, - default="test_influx", - help="Table name destination inside influxdb", + default="influxdb://localhost:8086/test_influx", + help="URI of the InfluxDB server (e.g., influxdb://user:pass@host:port/dbname)", ) @click.option( "--influxdb-create", @@ -77,31 +66,15 @@ default=500, help="Size in ms of the batches sent to influx", ) -@click.option( - "--influxdb-username", - type=click.STRING, - default=None, - help="Username to acces influxdb", -) -@click.option( - "--influxdb-password", - type=click.STRING, - default=None, - help="Password to acces influxdb", -) @click.option("--debug", type=click.BOOL, default=True, help="Set debug print levels") def cli( subscriber_bootstrap, subscriber_group, subscriber_timeout, subscriber_topic, - influxdb_address, - influxdb_port, - influxdb_name, + influxdb_uri, influxdb_create, influxdb_timeout, - influxdb_username, - influxdb_password, debug, ): logging.basicConfig( @@ -110,19 +83,23 @@ def cli( datefmt="%Y-%m-%d %H:%M:%S", ) - kwargs = dict() - if influxdb_username: - kwargs["username"] = influxdb_username - if influxdb_password: - kwargs["password"] = influxdb_password - influx = InfluxDBClient(host=influxdb_address, port=influxdb_port, **kwargs) + # Create InfluxDB client using from_dsn for URI-based connection + # The from_dsn method extracts database name from the URI path + influx = InfluxDBClient.from_dsn(influxdb_uri) + + # Extract database name from URI using urlparse + parsed_uri = urlparse(influxdb_uri) + influxdb_name = parsed_uri.path.lstrip("/") + if influxdb_name is None or influxdb_name == "": + raise ValueError("No database name in URI") + db_list = influx.get_list_database() - logging.info("Available DBs: %s", db_list) + logger.info("Available DBs: %s", db_list) if {"name": influxdb_name} not in db_list: - logging.warning("%s DB not available", influxdb_name) + logger.warning("%s DB not available", influxdb_name) if influxdb_create: influx.create_database(influxdb_name) - logging.info("New list of DBs: %s", influx.get_list_database()) + logger.info("New list of DBs: %s", influx.get_list_database()) influx.switch_database(influxdb_name) @@ -149,7 +126,7 @@ def cli( def consume(q: queue.Queue, timeout_ms, influx: InfluxDBClient = None): - logging.info("Starting consumer thread") + logger.info("Starting consumer thread") batch = [] batch_ms = 0 while True: @@ -169,7 +146,7 @@ def consume(q: queue.Queue, timeout_ms, influx: InfluxDBClient = None): batch_ms = entry.ms except queue.Empty: - logging.debug("Queue is empty") + logger.debug("Queue is empty") send_batch(batch, influx) batch = [] batch_ms = 0 @@ -177,15 +154,16 @@ def consume(q: queue.Queue, timeout_ms, influx: InfluxDBClient = None): def send_batch(batch: list, influx: InfluxDBClient = None): if len(batch) > 0: - logging.info("Sending %s points", len(batch)) + logger.info("Sending %s points", len(batch)) if influx: try: influx.write_points(batch) - except influxdb.exceptions.InfluxDBClientError as e: - logging.error(e) - except Exception as e: - logging.error("Something went wrong: json batch not sent") - logging.error("Details: {}".format(str(e))) + except influxdb.exceptions.InfluxDBClientError: + logger.exception("InfluxDB client error occurred") + except (ConnectionError, TimeoutError): + logger.exception("Network error while sending batch") + except Exception: + logger.exception("Something went wrong: json batch not sent") else: print(batch) diff --git a/opmon-protobuf-dbwriter/entrypoint.sh b/opmon-protobuf-dbwriter/entrypoint.sh index 0ab30ec7..b0875c0c 100755 --- a/opmon-protobuf-dbwriter/entrypoint.sh +++ b/opmon-protobuf-dbwriter/entrypoint.sh @@ -1,15 +1,20 @@ #!/bin/bash +set -euo pipefail -cd $(dirname $0) +cd "$(dirname "$0")" || exit 2 +if [[ ! -f ../entrypoint_functions.sh ]]; then + echo "Error: entrypoint_functions.sh not found" >&2 + exit 2 +fi source ../entrypoint_functions.sh -ensure_required_variables "OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER OPMON_DBWRITER_KAFKA_GROUP OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS OPMON_DBWRITER_TOPIC OPMON_DBWRITER_INFLUX_HOST OPMON_DBWRITER_INFLUX_PORT OPMON_DBWRITER_TABLE OPMON_DBWRITER_BATCH_SIZE_MS OPMON_DBWRITER_INFLUX_USER OPMON_DBWRITER_INFLUX_PASSWORD" +ensure_required_variables "OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER OPMON_DBWRITER_KAFKA_GROUP OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS OPMON_DBWRITER_TOPIC DATABASE_URI OPMON_DBWRITER_BATCH_SIZE_MS" -python3 ./dbwriter.py --subscriber-bootstrap $OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER \ - --subscriber-group $OPMON_DBWRITER_KAFKA_GROUP --subscriber-timeout $OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS \ - --subscriber-topic $OPMON_DBWRITER_TOPIC \ - --influxdb-address $OPMON_DBWRITER_INFLUX_HOST --influxdb-port $OPMON_DBWRITER_INFLUX_PORT \ - --influxdb-name $OPMON_DBWRITER_TABLE --influxdb-timeout $OPMON_DBWRITER_BATCH_SIZE_MS \ +exec python3 ./dbwriter.py --subscriber-bootstrap "${OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER}" \ + --subscriber-group "${OPMON_DBWRITER_KAFKA_GROUP}" \ + --subscriber-timeout "${OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS}" \ + --subscriber-topic "${OPMON_DBWRITER_TOPIC}" \ + --influxdb-uri "${DATABASE_URI}" \ + --influxdb-timeout "${OPMON_DBWRITER_BATCH_SIZE_MS}" \ --influxdb-create True \ - --debug False \ - --influxdb-username $OPMON_DBWRITER_INFLUX_USER --influxdb-password $OPMON_DBWRITER_INFLUX_PASSWORD + --debug False diff --git a/opmon-protobuf-dbwriter/generate_postgresql_partitions.sh b/opmon-protobuf-dbwriter/generate_postgresql_partitions.sh deleted file mode 100755 index f041ddf2..00000000 --- a/opmon-protobuf-dbwriter/generate_postgresql_partitions.sh +++ /dev/null @@ -1,117 +0,0 @@ -#!/bin/bash - -# -# SELECT c.relname AS table_name -# FROM pg_class c -# JOIN pg_namespace n ON n.oid = c.relnamespace -# WHERE c.relkind IN ('r', 'p') -# AND n.nspname NOT IN ('pg_catalog', 'information_schema') -# AND c.relispartition = false -# ORDER BY c.relname; -# - -TABLE_NAME="${1:-example}" -YEAR="${2:-$(date '+%Y')}" - -START_DATE="${YEAR}-01-01" - -# Calculate cutoff date (9 days ago) -CUTOFF_DATE=$(date -d "9 days ago" +%Y-%m-%d) -CUTOFF_TIMESTAMP=$(date -d "${CUTOFF_DATE}" +%s) - -# Function to check if a date is after the cutoff -is_after_cutoff() { - local check_date=$1 - local check_timestamp=$(date -d "${check_date}" +%s) - [[ ${check_timestamp} -gt ${CUTOFF_TIMESTAMP} ]] -} - -# Function to extract end date from partition SQL -extract_end_date() { - local partition_sql=$1 - echo "${partition_sql}" | grep -oP "TO \('\K[^']+" -} - -# Function to build partition SQL -build_partition() { - local table=$1 - local year=$2 - local week=$3 - local start=$4 - local end=$5 - - echo "CREATE TABLE IF NOT EXISTS ${table}_year${year}_week${week} PARTITION OF ${table} FOR VALUES FROM ('${start}') TO ('${end}');" -} - -# Function to output partition if it passes cutoff check -output_partition_if_valid() { - local partition_sql=$1 - - if [[ -n "${partition_sql}" ]]; then - local end_date=$(extract_end_date "${partition_sql}") - if is_after_cutoff "${end_date}"; then - echo -e "${partition_sql}" - fi - fi -} - -start_week=${START_DATE} -week_number=0 -previous_partition="" - -for count in {1..60}; do - week_number=$((week_number + 1)) - week_count_string=$(printf "%02d" ${week_number}) - - # Calculate end of week (7 days later) - end_week=$(date -d "${start_week} + 7 days" +%Y-%m-%d) - - start_week_year=$(date -d "${start_week}" +%Y) - end_week_year=$(date -d "${end_week}" +%Y) - - # Check if we've crossed into a new year - if [[ ${start_week_year} -ne ${end_week_year} ]]; then - # Calculate how many days remain in the current year - year_end=$(date -d "${start_week_year}-12-31" +%Y-%m-%d) - days_remaining=$((($(date -d "${year_end}" +%s) - $(date -d "${start_week}" +%s)) / 86400 + 1)) - - if [[ ${days_remaining} -le 3 ]]; then - # Merge short final week into previous week by extending to new year boundary - end_week=$(date -d "${start_week_year}-12-31 + 1 day" +%Y-%m-%d) - - # Output the extended previous week if valid - if is_after_cutoff "${end_week}"; then - echo "${previous_partition}" | sed "s/) TO ('[^']*');/) TO ('${end_week}');/" - fi - - # Start next week at the new year, reset week counter - start_week=${end_week} - week_number=0 - previous_partition="" - else - # Output the previous partition if valid - output_partition_if_valid "${previous_partition}" - - # Create final week of year going to year boundary - end_week=$(date -d "${start_week_year}-12-31 + 1 day" +%Y-%m-%d) - - previous_partition=$(build_partition "${TABLE_NAME}" "${start_week_year}" "${week_count_string}" "${start_week}" "${end_week}") - - # Start next week at the new year, reset week counter - start_week=${end_week} - week_number=0 - fi - else - # Output the previous partition if valid - output_partition_if_valid "${previous_partition}" - - # Store current partition - previous_partition=$(build_partition "${TABLE_NAME}" "${start_week_year}" "${week_count_string}" "${start_week}" "${end_week}") - - start_week=${end_week} - fi - -done - -# Output final partition if valid -output_partition_if_valid "${previous_partition}" diff --git a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml index d62a644b..cee632c7 100644 --- a/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml +++ b/opmon-protobuf-dbwriter/opmon-dbwriter-deployment.yaml @@ -4,11 +4,11 @@ metadata: annotations: kluctl.io/skip-delete-if-tags: "true" labels: - pod-security.kubernetes.io/audit: baseline + pod-security.kubernetes.io/audit: restricted pod-security.kubernetes.io/audit-version: latest - pod-security.kubernetes.io/enforce: baseline # unified image runs as root :( + pod-security.kubernetes.io/enforce: restricted pod-security.kubernetes.io/enforce-version: latest - pod-security.kubernetes.io/warn: baseline + pod-security.kubernetes.io/warn: restricted pod-security.kubernetes.io/warn-version: latest name: opmon --- @@ -17,6 +17,7 @@ kind: Deployment metadata: labels: app.kubernetes.io/app: opmon-protobuf-dbwriter + app.kubernetes.io/name: opmon-protobuf-dbwriter app.kubernetes.io/component: opmon-protobuf-dbwriter name: opmonstream-dbwriter namespace: opmon @@ -24,12 +25,14 @@ spec: replicas: 1 selector: matchLabels: - app: opmon-protobuf-dbwriter + app.kubernetes.io/app: opmon-protobuf-dbwriter + app.kubernetes.io/name: opmon-protobuf-dbwriter + app.kubernetes.io/component: opmon-protobuf-dbwriter template: metadata: labels: - app: opmon-protobuf-dbwriter app.kubernetes.io/app: opmon-protobuf-dbwriter + app.kubernetes.io/name: opmon-protobuf-dbwriter app.kubernetes.io/component: opmon-protobuf-dbwriter spec: affinity: @@ -40,6 +43,7 @@ spec: - key: node-role.kubernetes.io/worker operator: Exists automountServiceAccountToken: false + hostUsers: false containers: - image: ghcr.io/dune-daq/microservices:mroda-opmon_protobuf ## TBC imagePullPolicy: Always @@ -49,36 +53,42 @@ spec: value: opmon-protobuf-dbwriter - name: OPMON_DBWRITER_KAFKA_BOOTSTRAP_SERVER value: dune-daq.kafka.svc.cluster.local:9092 - - name: OPMON_DBWRITER_INFLUX_USER - value: user - - name: OPMON_DBWRITER_INFLUX_PASSWORD - value: pass - - name: OPMON_DBWRITER_INFLUX_HOST - value: opmon-influxdb.opmon.svc - - name: OPMON_DBWRITER_INFLUX_PORT - value: "8086" - name: OPMON_DBWRITER_KAFKA_GROUP value: opmon-protobuf-dbwriter - name: OPMON_DBWRITER_SUBSCRIBER_TIMEOUT_MS value: "5000" - name: OPMON_DBWRITER_TOPIC value: opmon_stream - - name: OPMON_DBWRITER_TABLE - value: opmon_protobuf_v1 + - name: DATABASE_URI + valueFrom: + secretKeyRef: + key: uri + name: influxdbv1-readwrite-password - name: OPMON_DBWRITER_BATCH_SIZE_MS value: "800" resources: limits: memory: 1Gi requests: - memory: 8Mi + cpu: 100m + memory: 64Mi securityContext: + runAsUser: 11000 + runAsGroup: 11000 + runAsNonRoot: true allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + seccompProfile: + type: RuntimeDefault capabilities: drop: - ALL - runAsGroup: 11000 - seccompProfile: - type: RuntimeDefault + volumeMounts: + - name: tmp-volume + mountPath: /tmp + volumes: + - name: tmp-volume + emptyDir: + sizeLimit: 256Mi securityContext: fsGroup: 11000 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..2898594b --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,48 @@ +[project] +name = "microservices" +requires-python = ">=3.9" + +[tool.ruff.lint] +extend-select = [ + "A", # detect shadowed builtins + "ARG", # unused-arguments + "ASYNC", # async + "B", # flake8-bugbear + "C4", # Catch incorrect use of comprehensions, dict, list, etc + "COM", # enforce trailing comma rules + "DTZ", # require strict timezone manipulation with datetime + "E", # PyCodeStyle errors + "F", # Pyflakes rules + "FBT", # detect boolean traps + "G", # Better usage of built-in logging + "I", # isort - Import sorting + "ICN", # Use common import conventions + "ISC", # Good use of string concatenation + "LOG", # Checks for issues using the standard library logging module. + "N", # enforce naming conventions, e.g. ClassName vs function_name + "PL", # pylint + "PTH", # Use pathlib instead of os.path + "PYI", # Linting rules for type annotations. + "Q", # Linting rules for quotes + "RET", # Good return practices + "RUF", # Ruff specific lint rules + "S", # bandit + "SIM", # flake8-simplify + "TC", # Enforce importing certain types in a TYPE_CHECKING block + "TCH", # Move type only imports to type-checking condition. + "TID", # Helps you write tidier imports. + "TRY", # tryceratops - track exception handling + "UP", # pyupgrade - Warn if certain things can changed due to newer Python versions + "W" # PyCodeStyle warnings +] +ignore = [ + "COM812", # de-dupe trailing commas + "SIM108", # ternary is ugly + "S101", # test cases use assert + "E501", # some lines are just super long + "N801", # names are what they are + "N803", # names are what they are + "N813", # names are what they are + "TRY003", # long messages are handy + "G004" # messages are fine as is +] diff --git a/runnumber-rest/api.py b/runnumber-rest/api.py index 728ab386..dd976bcc 100644 --- a/runnumber-rest/api.py +++ b/runnumber-rest/api.py @@ -14,15 +14,14 @@ ] import os -import datetime as dt import flask +from authentication import auth +from database import RunNumber, db, utc_now from flask_restful import Api, Resource -from flask_sqlalchemy import SQLAlchemy -from sqlalchemy import func, event -import re +from sqlalchemy import func, select -__all__ = ["app", "api", "db"] +__all__ = ["api", "app", "db"] app = flask.Flask(__name__) @@ -37,27 +36,9 @@ ) uri = app.config["SQLALCHEMY_DATABASE_URI"] -db = SQLAlchemy(app) +db.init_app(app) api = Api(app) -from urllib.parse import urlparse - -from authentication import auth -from database import RunNumber - -PARSED_URI = urlparse(app.config["SQLALCHEMY_DATABASE_URI"]) -DB_TYPE = PARSED_URI.scheme - - -@app.before_first_request -def register_event_handlers(): - @event.listens_for(db.engine, "handle_error") - def handle_exception(context): - if not context.is_disconnect and re.match( - r"^(?:DPI-1001|DPI-4011)", str(context.original_exception) - ): - context.is_disconnect = True - # $ curl -u fooUsr:barPass -X GET np04-srv-021:30016//runnumber/get @api.resource("/runnumber/get") @@ -71,9 +52,9 @@ class getRunNumber(Resource): @auth.login_required def get(self): - print("getNewRunNumber: no args") + print("getRunNumber: no args") try: - max_run_number = db.session.query(func.max(RunNumber.rn)).scalar() + max_run_number = db.session.execute(select(func.max(RunNumber.rn))).scalar() # maybe find consumers to see if we can drop the extra nesting print(f"getRunNumber: result {[[[max_run_number]]]}") return flask.make_response(flask.jsonify([[[max_run_number]]])) @@ -99,7 +80,7 @@ def get(self): current_max_run = None with db.session.begin(): current_max_run = ( - db.session.query(func.max(RunNumber.rn)).scalar() + db.session.execute(select(func.max(RunNumber.rn))).scalar() or app.config["RUN_START"] ) + 1 run = RunNumber(rn=current_max_run) @@ -125,8 +106,10 @@ def get(self, runNum): try: run = None with db.session.begin(): - run = db.session.query(RunNumber).filter_by(rn=runNum).one() - run.stop_time = dt.datetime.utcnow() + run = db.session.execute( + select(RunNumber).filter_by(rn=runNum) + ).scalar_one() + run.stop_time = utc_now() print(f"updateStopTimestamp: result {[run.start_time, run.stop_time]}") return flask.make_response( flask.jsonify([[[run.start_time, run.stop_time]]]) @@ -138,7 +121,7 @@ def get(self, runNum): @app.route("/") def index(): - root_text = f""" + return f""" @@ -180,4 +163,3 @@ def index(): """ - return root_text diff --git a/runnumber-rest/database.py b/runnumber-rest/database.py index 6f251097..a26044ca 100644 --- a/runnumber-rest/database.py +++ b/runnumber-rest/database.py @@ -1,8 +1,15 @@ -from datetime import datetime +import datetime -from api import db +from flask_sqlalchemy import SQLAlchemy -__all__ = ["RunNumber"] +__all__ = ["RunNumber", "db", "utc_now"] + +db = SQLAlchemy() + + +def utc_now(): + # Returns naive UTC datetime matching your existing schema (no timezone) + return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) class RunNumber(db.Model): @@ -11,6 +18,6 @@ class RunNumber(db.Model): ) flag = db.Column("flag", db.Boolean, nullable=False, default=False) start_time = db.Column( - "start_time", db.TIMESTAMP(6), nullable=False, default=datetime.utcnow + "start_time", db.TIMESTAMP(6), nullable=False, default=utc_now ) stop_time = db.Column("stop_time", db.TIMESTAMP(6), nullable=True) diff --git a/runnumber-rest/entrypoint.sh b/runnumber-rest/entrypoint.sh index 5b838ede..7c0c43a5 100755 --- a/runnumber-rest/entrypoint.sh +++ b/runnumber-rest/entrypoint.sh @@ -1,8 +1,13 @@ #!/bin/bash +set -euo pipefail -cd $(dirname $0) +cd "$(dirname "$0")" || exit 2 +if [[ ! -f ../entrypoint_functions.sh ]]; then + echo "Error: entrypoint_functions.sh not found" >&2 + exit 2 +fi source ../entrypoint_functions.sh ensure_required_variables "DATABASE_URI" -exec gunicorn -b 0.0.0.0:5000 --workers=1 --worker-class=gevent --timeout 5000000000 --log-level=debug rest:app +exec gunicorn -b 0.0.0.0:5000 --workers=1 --worker-class=gevent --timeout 9000 --log-level=debug rest:app diff --git a/runnumber-rest/rest.py b/runnumber-rest/rest.py old mode 100644 new mode 100755 index 0fe1650d..b96dfb86 --- a/runnumber-rest/rest.py +++ b/runnumber-rest/rest.py @@ -20,4 +20,4 @@ db.create_all() if __name__ == "__main__": - app.run(host="0.0.0.0", port=5005, debug=DEBUG) + app.run(host="0.0.0.0", port=5000, debug=DEBUG) diff --git a/runnumber-rest/runnumber-rest-deployment.yaml b/runnumber-rest/runnumber-rest-deployment.yaml index 7891455c..15d3b262 100644 --- a/runnumber-rest/runnumber-rest-deployment.yaml +++ b/runnumber-rest/runnumber-rest-deployment.yaml @@ -4,11 +4,11 @@ metadata: annotations: kluctl.io/skip-delete-if-tags: "true" labels: - pod-security.kubernetes.io/audit: baseline + pod-security.kubernetes.io/audit: restricted pod-security.kubernetes.io/audit-version: latest - pod-security.kubernetes.io/enforce: baseline # unified image runs as root :( + pod-security.kubernetes.io/enforce: restricted pod-security.kubernetes.io/enforce-version: latest - pod-security.kubernetes.io/warn: baseline + pod-security.kubernetes.io/warn: restricted pod-security.kubernetes.io/warn-version: latest name: runservices --- @@ -19,6 +19,7 @@ kind: Deployment metadata: labels: app.kubernetes.io/app: runnumber-rest + app.kubernetes.io/name: runnumber-rest app.kubernetes.io/component: runnumber-rest name: runnumber-rest namespace: runservices @@ -26,11 +27,13 @@ spec: selector: matchLabels: app.kubernetes.io/app: runnumber-rest + app.kubernetes.io/name: runnumber-rest app.kubernetes.io/component: runnumber-rest template: metadata: labels: app.kubernetes.io/app: runnumber-rest + app.kubernetes.io/name: runnumber-rest app.kubernetes.io/component: runnumber-rest spec: affinity: @@ -41,6 +44,7 @@ spec: - key: node-role.kubernetes.io/worker operator: Exists automountServiceAccountToken: false + hostUsers: false containers: - image: ghcr.io/dune-daq/microservices:latest imagePullPolicy: Always @@ -57,21 +61,47 @@ spec: - containerPort: 5000 protocol: TCP name: http + startupProbe: + tcpSocket: + port: http + initialDelaySeconds: 15 + periodSeconds: 20 + livenessProbe: + tcpSocket: + port: http + initialDelaySeconds: 15 + periodSeconds: 20 + readinessProbe: + tcpSocket: + port: http + initialDelaySeconds: 15 + periodSeconds: 20 resources: limits: memory: 1Gi requests: - memory: 8Mi + cpu: 100m + memory: 64Mi terminationMessagePath: /dev/termination-log terminationMessagePolicy: File securityContext: + runAsUser: 11000 + runAsGroup: 11000 + runAsNonRoot: true allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + seccompProfile: + type: RuntimeDefault capabilities: drop: - ALL - runAsGroup: 11000 - seccompProfile: - type: RuntimeDefault + volumeMounts: + - name: tmp-volume + mountPath: /tmp + volumes: + - name: tmp-volume + emptyDir: + sizeLimit: 256Mi securityContext: fsGroup: 11000 --- @@ -80,6 +110,7 @@ kind: Service metadata: labels: app.kubernetes.io/app: runnumber-rest + app.kubernetes.io/name: runnumber-rest app.kubernetes.io/component: runnumber-rest name: runnumber-rest namespace: runservices @@ -91,5 +122,6 @@ spec: targetPort: http selector: app.kubernetes.io/app: runnumber-rest + app.kubernetes.io/name: runnumber-rest app.kubernetes.io/component: runnumber-rest type: ClusterIP diff --git a/runregistry-rest/api.py b/runregistry-rest/api.py index 51e3910b..87cdef91 100644 --- a/runregistry-rest/api.py +++ b/runregistry-rest/api.py @@ -15,22 +15,30 @@ import io import os +import urllib.parse +from pathlib import Path import flask +from authentication import auth +from database import ( + RunRegistryConfigs, + RunRegistryMeta, + db, + utc_now, +) from flask_caching import Cache from flask_restful import Api, Resource -from flask_sqlalchemy import SQLAlchemy -from sqlalchemy import desc, event -import re +from sqlalchemy import desc, select +from sqlalchemy.exc import NoResultFound -__all__ = ["app", "api", "db"] +__all__ = ["api", "app", "db"] app = flask.Flask(__name__) app.config.update( MAX_CONTENT_LENGTH=32 * 1000 * 1000, UPLOAD_EXTENSIONS={".gz", ".tgz"}, - UPLOAD_PATH="", + UPLOAD_PATH=os.environ.get("APP_DATA", "uploads"), CACHE_TYPE="simple", SQLALCHEMY_DATABASE_URI=os.environ.get( "DATABASE_URI", "sqlite:////tmp/test.sqlite" @@ -42,40 +50,29 @@ ) cache = Cache(app) -db = SQLAlchemy(app) +db.init_app(app) api = Api(app) -import datetime as dt -import urllib -from urllib.parse import urlparse - -from authentication import auth -from database import RunRegistryConfigs, RunRegistryMeta -PARSED_URI = urlparse(app.config["SQLALCHEMY_DATABASE_URI"]) +PARSED_URI = urllib.parse.urlparse(app.config["SQLALCHEMY_DATABASE_URI"]) DB_TYPE = PARSED_URI.scheme - -@app.before_first_request -def register_event_handlers(): - @event.listens_for(db.engine, "handle_error") - def handle_exception(context): - if not context.is_disconnect and re.match( - r"^(?:DPI-1001|DPI-4011)", str(context.original_exception) - ): - context.is_disconnect = True +Path(app.config["UPLOAD_PATH"]).mkdir(parents=True, exist_ok=True) +if not os.access(app.config["UPLOAD_PATH"], os.W_OK): + raise PermissionError( + f"Error: Permission denied to access the file at {app.config['UPLOAD_PATH']}" + ) def cache_key(): args = flask.request.args - key = ( + return ( flask.request.path + "?" - + urllib.urlencode( + + urllib.parse.urlencode( [(k, v) for k in sorted(args) for v in sorted(args.getlist(k))] ) ) - return key # $ curl -u fooUsr:barPass -X GET np04-srv-017:30015/runregistry/getRunMeta/2 @@ -89,18 +86,15 @@ class getRunMeta(Resource): @auth.login_required def get(self, runNum): try: - result = ( - db.session.query( - RunRegistryMeta.run_number, - RunRegistryMeta.start_time, - RunRegistryMeta.stop_time, - RunRegistryMeta.detector_id, - RunRegistryMeta.run_type, - RunRegistryMeta.software_version, - ) - .filter(RunRegistryMeta.run_number == runNum) - .one() - ) + stmt = select( + RunRegistryMeta.run_number, + RunRegistryMeta.start_time, + RunRegistryMeta.stop_time, + RunRegistryMeta.detector_id, + RunRegistryMeta.run_type, + RunRegistryMeta.software_version, + ).filter(RunRegistryMeta.run_number == runNum) + result = db.session.execute(stmt).one() print(f"getRunMeta: result {result}") result = list(result) column_names = RunRegistryMeta.__table__.columns.keys() @@ -109,9 +103,11 @@ def get(self, runNum): ) # Don't like this but only way to stay consistent with Oracle cnu = [name.upper() for name in column_names] return flask.make_response(flask.jsonify(cnu, [[*result]])) + except NoResultFound: + return flask.make_response(flask.jsonify({"error": "Run not found"}), 404) except Exception as err_obj: print(f"Exception:{err_obj}") - return flask.make_response(flask.jsonify({"Exception": f"{err_obj}"})) + return flask.make_response(flask.jsonify({"Exception": f"{err_obj}"}), 500) # $ curl -u fooUsr:barPass -X GET np04-srv-017:30015/runregistry/getRunMetaLast/100 @@ -125,8 +121,8 @@ class getRunMetaLast(Resource): @auth.login_required def get(self, amount): try: - result = ( - db.session.query( + stmt = ( + select( RunRegistryMeta.run_number, RunRegistryMeta.start_time, RunRegistryMeta.stop_time, @@ -136,8 +132,8 @@ def get(self, amount): ) .order_by(desc(RunRegistryMeta.run_number)) .limit(amount) - .all() ) + result = db.session.execute(stmt).all() print(f"getRunMetaLast: result {result}") result = [list(row) for row in result] column_names = RunRegistryMeta.__table__.columns.keys() @@ -147,7 +143,7 @@ def get(self, amount): cnu = [name.upper() for name in column_names] return flask.make_response(flask.jsonify(cnu, [*result])) except Exception as err_obj: - return flask.make_response(flask.jsonify({"Exception": f"{err_obj}"})) + return flask.make_response(flask.jsonify({"Exception": f"{err_obj}"}), 500) # $ curl -u fooUsr:barPass -X GET -O -J np04-srv-017:30015/runregistry/getRunBlob/2 @@ -163,17 +159,24 @@ class getRunBlob(Resource): def get(self, runNum): print(f"getRunBlob: arg {runNum}") try: - blob = ( - db.session.query(RunRegistryConfigs.configuration) - .filter(RunRegistryConfigs.run_number == runNum) - .scalar() + stmt_blob = select(RunRegistryConfigs.configuration).filter( + RunRegistryConfigs.run_number == runNum ) - filename = ( - db.session.query(RunRegistryMeta.filename) - .filter(RunRegistryMeta.run_number == runNum) - .scalar() + blob = db.session.execute(stmt_blob).scalar() + if blob is None: + print(f"No blob found for {runNum}") + return flask.make_response(flask.jsonify({"error": "Configuration not found"}), 404) + + stmt_filename = select(RunRegistryMeta.filename).filter( + RunRegistryMeta.run_number == runNum ) + filename = db.session.execute(stmt_filename).scalar() + if not filename: + print(f"No filename found for {runNum}") + return flask.make_response(flask.jsonify({"error": "Filename not found"}), 404) + print("returning " + filename) + if DB_TYPE == "postgresql": resp = flask.make_response(bytes(blob)) else: @@ -183,7 +186,7 @@ def get(self, runNum): return resp except Exception as err_obj: print(f"Exception:{err_obj}") - return flask.make_response(flask.jsonify({"Exception": f"{err_obj}"})) + return flask.make_response(flask.jsonify({"Exception": f"{err_obj}"}), 500) # $ curl -u fooUsr:barPass -F "run_num=1000" -F "det_id=foo" -F "run_type=bar" -F "software_version=dunedaq-vX.Y.Z" -F "file=@sspconf.tar.gz" -X POST np04-srv-017:30015/runregistry/insertRun/ @@ -196,11 +199,14 @@ class insertRun(Resource): @auth.login_required def post(self): - filename = "" - local_file_name = None + local_file_path = None try: # Ensure form fields - run_number = flask.request.form.get("run_num") + try: + run_number = int(flask.request.form.get("run_num")) + except (KeyError,ValueError): + return flask.make_response("Invalid run_num (must be integer)", 400) + det_id = flask.request.form.get("det_id") run_type = flask.request.form.get("run_type") software_version = flask.request.form.get("software_version") @@ -209,22 +215,40 @@ def post(self): return flask.make_response("Missing required form fields", 400) filename = uploaded_file.filename - if ( - not filename - or os.path.splitext(filename)[1] not in app.config["UPLOAD_EXTENSIONS"] - ): + if not filename: + return flask.make_response("Invalid file or extension", 400) + + # Security: Sanitize filename to prevent path traversal + # Defense in depth: reject obviously malicious filenames early + if "/" in filename or "\\" in filename or ".." in filename: + return flask.make_response( + "Invalid filename: path separators not allowed", 400 + ) + + filename_path = Path(filename) + safe_filename = filename_path.name # Extract only the filename component + + if filename_path.suffix not in app.config["UPLOAD_EXTENSIONS"]: return flask.make_response("Invalid file or extension", 400) - local_file_name = os.path.join(app.config["UPLOAD_PATH"], filename) - if os.path.isfile(local_file_name): + upload_dir = Path(app.config["UPLOAD_PATH"]) + local_file_path = upload_dir / safe_filename + + # Security: Verify the resolved path is within the upload directory + try: + local_file_path.resolve().relative_to(upload_dir.resolve()) + except ValueError: + return flask.make_response("Invalid file path", 400) + + if local_file_path.is_file(): return flask.make_response( "File with the same name is already being processed. Try again later.", 400, ) - uploaded_file.save(local_file_name) + uploaded_file.save(str(local_file_path)) - with open(local_file_name, "rb") as file_in: + with local_file_path.open("rb") as file_in: data = io.BytesIO(file_in.read()) with db.session.begin(): @@ -232,7 +256,7 @@ def post(self): run_number=run_number, detector_id=det_id, run_type=run_type, - filename=filename, + filename=safe_filename, software_version=software_version, ) run_config = RunRegistryConfigs( @@ -242,14 +266,14 @@ def post(self): db.session.add(run_meta) db.session.add(run_config) - resp_data = [run_number, det_id, run_type, software_version, filename] + resp_data = [run_number, det_id, run_type, software_version, safe_filename] return flask.make_response(flask.jsonify([[[resp_data]]])) except Exception as err_obj: print(f"Exception:{err_obj}") - return flask.make_response(str(err_obj), 400) + return flask.make_response(str(err_obj), 500) finally: - if local_file_name and os.path.exists(local_file_name): - os.remove(local_file_name) + if local_file_path and local_file_path.exists(): + local_file_path.unlink() # $ curl -u fooUsr:barPass -X GET np04-srv-017:30015/runregistry/updateStopTime/ @@ -266,22 +290,24 @@ def get(self, runNum): try: run = None with db.session.begin(): - run = ( - db.session.query(RunRegistryMeta).filter_by(run_number=runNum).one() - ) - run.stop_time = dt.datetime.utcnow() + run = db.session.execute( + select(RunRegistryMeta).filter_by(run_number=runNum) + ).scalar_one() + run.stop_time = utc_now() print(f"updateStopTimestamp: result {[run.start_time, run.stop_time]}") return flask.make_response( flask.jsonify([[[run.start_time, run.stop_time]]]) ) + except NoResultFound: + return flask.make_response(flask.jsonify({"error": "Run not found"}), 404) except Exception as err_obj: print(f"Exception:{err_obj}") - return flask.make_response(flask.jsonify({"Exception": f"{err_obj}"})) + return flask.make_response(flask.jsonify({"Exception": f"{err_obj}"}), 500) @app.route("/") def index(): - root_text = f""" + return f""" @@ -346,5 +372,3 @@ def index(): """ - - return root_text diff --git a/runregistry-rest/database.py b/runregistry-rest/database.py index 4dbb1e70..bb69b068 100644 --- a/runregistry-rest/database.py +++ b/runregistry-rest/database.py @@ -1,8 +1,15 @@ -import datetime as dt +import datetime -from api import db +from flask_sqlalchemy import SQLAlchemy -__all__ = ["RunRegistryConfig", "RunRegistryMeta"] +__all__ = ["RunRegistryConfigs", "RunRegistryMeta", "db", "utc_now"] + +db = SQLAlchemy() + + +def utc_now(): + # Returns naive UTC datetime matching your existing schema (no timezone) + return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) class RunRegistryMeta(db.Model): @@ -10,7 +17,7 @@ class RunRegistryMeta(db.Model): "run_number", db.Integer, primary_key=True, autoincrement=True, nullable=False ) start_time = db.Column( - "start_time", db.TIMESTAMP(6), nullable=False, default=dt.datetime.utcnow + "start_time", db.TIMESTAMP(6), nullable=False, default=utc_now ) stop_time = db.Column("stop_time", db.TIMESTAMP(6), nullable=True) detector_id = db.Column("detector_id", db.String(40)) diff --git a/runregistry-rest/entrypoint.sh b/runregistry-rest/entrypoint.sh index 3c622d1b..a5dffa46 100755 --- a/runregistry-rest/entrypoint.sh +++ b/runregistry-rest/entrypoint.sh @@ -1,14 +1,13 @@ #!/bin/bash +set -euo pipefail -cd $(dirname $0) +cd "$(dirname "$0")" || exit 2 +if [[ ! -f ../entrypoint_functions.sh ]]; then + echo "Error: entrypoint_functions.sh not found" >&2 + exit 2 +fi source ../entrypoint_functions.sh -ensure_required_variables "DATABASE_URI" - -if [[ ! -d uploads ]]; then - mkdir --mode=1777 uploads -else - chmod 1777 uploads -fi +ensure_required_variables "DATABASE_URI APP_DATA" -exec gunicorn -b 0.0.0.0:5005 --workers=1 --worker-class=gevent --timeout 5000000000 --log-level=debug rest:app +exec gunicorn -b 0.0.0.0:5005 --workers=1 --worker-class=gevent --timeout 9000 --log-level=debug rest:app diff --git a/runregistry-rest/rest.py b/runregistry-rest/rest.py old mode 100644 new mode 100755 diff --git a/runregistry-rest/runregistry-rest-deployment.yaml b/runregistry-rest/runregistry-rest-deployment.yaml index 977b2753..e3fe8c03 100644 --- a/runregistry-rest/runregistry-rest-deployment.yaml +++ b/runregistry-rest/runregistry-rest-deployment.yaml @@ -4,11 +4,11 @@ metadata: annotations: kluctl.io/skip-delete-if-tags: "true" labels: - pod-security.kubernetes.io/audit: baseline + pod-security.kubernetes.io/audit: restricted pod-security.kubernetes.io/audit-version: latest - pod-security.kubernetes.io/enforce: baseline # unified image runs as root :( + pod-security.kubernetes.io/enforce: restricted pod-security.kubernetes.io/enforce-version: latest - pod-security.kubernetes.io/warn: baseline + pod-security.kubernetes.io/warn: restricted pod-security.kubernetes.io/warn-version: latest name: runservices --- @@ -19,6 +19,7 @@ kind: Deployment metadata: labels: app.kubernetes.io/app: runregistry-rest + app.kubernetes.io/name: runregistry-rest app.kubernetes.io/component: runregistry-rest name: runregistry-rest namespace: runservices @@ -26,11 +27,13 @@ spec: selector: matchLabels: app.kubernetes.io/app: runregistry-rest + app.kubernetes.io/name: runregistry-rest app.kubernetes.io/component: runregistry-rest template: metadata: labels: app.kubernetes.io/app: runregistry-rest + app.kubernetes.io/name: runregistry-rest app.kubernetes.io/component: runregistry-rest spec: affinity: @@ -41,6 +44,7 @@ spec: - key: node-role.kubernetes.io/worker operator: Exists automountServiceAccountToken: false + hostUsers: false containers: - image: ghcr.io/dune-daq/microservices:latest imagePullPolicy: Always @@ -48,6 +52,8 @@ spec: env: - name: MICROSERVICE value: runregistry-rest + - name: APP_DATA + value: /opt/data - name: DATABASE_URI valueFrom: secretKeyRef: @@ -57,28 +63,52 @@ spec: - containerPort: 5005 protocol: TCP name: http + startupProbe: + tcpSocket: + port: http + initialDelaySeconds: 15 + periodSeconds: 20 + livenessProbe: + tcpSocket: + port: http + initialDelaySeconds: 15 + periodSeconds: 20 + readinessProbe: + tcpSocket: + port: http + initialDelaySeconds: 15 + periodSeconds: 20 resources: limits: memory: 1Gi requests: - memory: 8Mi + cpu: 100m + memory: 64Mi terminationMessagePath: /dev/termination-log terminationMessagePolicy: File securityContext: + runAsUser: 11000 + runAsGroup: 11000 + runAsNonRoot: true allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + seccompProfile: + type: RuntimeDefault capabilities: drop: - ALL - runAsGroup: 11000 - seccompProfile: - type: RuntimeDefault volumeMounts: - - mountPath: /microservices/runregistry-rest/uploads - name: uploads-volume - volumes: # persistance is not required at this time - - name: uploads-volume + - mountPath: /opt/data + name: data-volume + - name: tmp-volume + mountPath: /tmp + volumes: # persistence is not required at this time + - name: data-volume emptyDir: sizeLimit: 10Gi + - name: tmp-volume + emptyDir: + sizeLimit: 256Mi securityContext: fsGroup: 11000 --- @@ -87,6 +117,7 @@ kind: Service metadata: labels: app.kubernetes.io/app: runregistry-rest + app.kubernetes.io/name: runregistry-rest app.kubernetes.io/component: runregistry-rest name: runregistry-rest namespace: runservices @@ -98,5 +129,6 @@ spec: targetPort: http selector: app.kubernetes.io/app: runregistry-rest + app.kubernetes.io/name: runregistry-rest app.kubernetes.io/component: runregistry-rest type: ClusterIP diff --git a/tools/postgresql_partitions_for_all_tables.sh b/tools/postgresql_partitions_for_all_tables.sh new file mode 100755 index 00000000..d1de0871 --- /dev/null +++ b/tools/postgresql_partitions_for_all_tables.sh @@ -0,0 +1,125 @@ +#!/bin/bash +set -euo pipefail + +# Orchestrator script to create partitions for all non-system tables (both regular and parent partitioned tables) +# Usage: +# DATABASE_URI="postgresql://user:pass@host/db" ./postgresql_partitions_for_all_tables.sh +# Or with specific year: +# YEAR=2025 DATABASE_URI="postgresql://user:pass@host/db" ./postgresql_partitions_for_all_tables.sh + +SCRIPT_DIR="$(cd "$(dirname "$(readlink -f "${BASH_SOURCE[0]}")")" && pwd)" +YEAR="${YEAR:-$(date '+%Y')}" + +# Array to collect errors +declare -a ERRORS=() + +# Function to find all non-system tables (both regular and parent partitioned tables) +find_tables() { + local query="SELECT n.nspname || '.' || c.relname AS table_name +FROM pg_class c +JOIN pg_namespace n ON n.oid = c.relnamespace +WHERE c.relkind IN ('r', 'p') + AND n.nspname NOT IN ('pg_catalog', 'information_schema') + AND n.nspname NOT LIKE 'pg_temp_%' + AND c.relispartition = false +ORDER BY n.nspname, c.relname;" + + echo "${query}" +} + +# Function to add error with context +add_error() { + local error_msg="$1" + ERRORS+=("${error_msg}") +} + +# Validate DATABASE_URI +if [[ -z "${DATABASE_URI:-}" ]]; then + echo "Error: DATABASE_URI environment variable is not set" >&2 + exit 1 +fi + +# Validate per-table script exists +PARTITION_SCRIPT="${SCRIPT_DIR}/sample_postgresql_partitions_for_table.sh" +if [[ ! -f "${PARTITION_SCRIPT}" ]]; then + echo "Error: Script not found: ${PARTITION_SCRIPT}" >&2 + exit 1 +fi + +if [[ ! -x "${PARTITION_SCRIPT}" ]]; then + echo "Error: Script is not executable: ${PARTITION_SCRIPT}" >&2 + exit 1 +fi + +# Test database connection +if ! psql "${DATABASE_URI}" -c "SELECT 1" >/dev/null 2>&1; then + echo "Error: Cannot connect to database using DATABASE_URI" >&2 + exit 1 +fi + +# Fetch table list +if ! TABLES_OUTPUT=$(find_tables | psql -XAtq -v ON_ERROR_STOP=1 "${DATABASE_URI}" 2>&1); then + echo "Error: Failed to query tables from database" >&2 + if [[ -n "${TABLES_OUTPUT:-}" ]]; then + echo "Error details: ${TABLES_OUTPUT}" >&2 + fi + exit 1 +fi + +# Count total tables for progress tracking +TOTAL_TABLES=$(echo "${TABLES_OUTPUT}" | grep -v '^[[:space:]]*$' | wc -l) +CURRENT=0 + +if [[ ${TOTAL_TABLES} -eq 0 ]]; then + echo "Warning: No tables found in database" >&2 + exit 0 +fi + +echo "Processing ${TOTAL_TABLES} tables for year ${YEAR}..." >&2 + +# Process each table +while IFS= read -r table; do + # Skip empty lines + [[ -z "${table}" ]] && continue + + # Trim whitespace + table="${table#"${table%%[![:space:]]*}"}" + table="${table%"${table##*[![:space:]]}"}" + [[ -z "${table}" ]] && continue + + CURRENT=$((CURRENT + 1)) + echo "[$CURRENT/$TOTAL_TABLES] Processing table: ${table}" >&2 + + # Generate SQL and execute it directly + # Capture both stdout and stderr + if SQL_OUTPUT=$("${PARTITION_SCRIPT}" "${table}" "${YEAR}" 2>&1); then + if [[ -n "${SQL_OUTPUT}" ]]; then + echo " Executing partition DDL..." >&2 + if ! PSQL_OUTPUT=$(printf '%s\n' "${SQL_OUTPUT}" | psql -X "${DATABASE_URI}" -v ON_ERROR_STOP=1 2>&1); then + add_error "Table '${table}': Failed to execute partition DDL - ${PSQL_OUTPUT}" + else + echo " Partitions created successfully" >&2 + fi + fi + else + # SQL_OUTPUT contains the error message (stdout + stderr from failed run) + add_error "Table '${table}': Failed to generate partition DDL - ${SQL_OUTPUT}" + fi +done <<<"${TABLES_OUTPUT}" + +# Report errors at the end +if [[ ${#ERRORS[@]} -gt 0 ]]; then + echo "" >&2 + echo "======================================" >&2 + echo "ERRORS ENCOUNTERED (${#ERRORS[@]} total):" >&2 + echo "======================================" >&2 + for error in "${ERRORS[@]}"; do + echo " • ${error}" >&2 + done + echo "======================================" >&2 + exit 1 +else + echo "" >&2 + echo "Successfully processed all ${TOTAL_TABLES} tables." >&2 + exit 0 +fi diff --git a/tools/sample_postgresql_partitions_for_table.sh b/tools/sample_postgresql_partitions_for_table.sh new file mode 100755 index 00000000..0e1af4f3 --- /dev/null +++ b/tools/sample_postgresql_partitions_for_table.sh @@ -0,0 +1,82 @@ +#!/bin/bash +set -euo pipefail + +# Per-table script to generate partition DDL for a single table +# Outputs SQL to stdout for piping to psql or for use by orchestrator script +# Usage: +# ./sample_postgresql_partitions_for_table.sh +# Examples: +# ./sample_postgresql_partitions_for_table.sh public.users 2025 | psql "${DATABASE_URI}" +# ./sample_postgresql_partitions_for_table.sh myschema.orders 2025 + +# Validate arguments +if [[ $# -lt 2 ]]; then + echo "Error: Missing required arguments" >&2 + echo "Usage: $0 " >&2 + exit 1 +fi + +QUALIFIED_TABLE="$1" +YEAR="$2" + +# Parse schema and table name +if [[ "${QUALIFIED_TABLE}" =~ ^([a-zA-Z_][a-zA-Z0-9_]*)\.([a-zA-Z_][a-zA-Z0-9_]*)$ ]]; then + SCHEMA_NAME="${BASH_REMATCH[1]}" + TABLE_NAME="${BASH_REMATCH[2]}" +else + echo "Error: Invalid schema-qualified table name: ${QUALIFIED_TABLE}" >&2 + echo "Expected format: schema.table_name" >&2 + echo "Both schema and table must start with letter/underscore and contain only alphanumeric characters and underscores" >&2 + exit 1 +fi + +# Validate year +if [[ ! "${YEAR}" =~ ^[0-9]{4}$ ]]; then + echo "Error: Invalid year: ${YEAR}" >&2 + echo "Year must be a 4-digit number" >&2 + exit 1 +fi + +# Generate partition DDL +# This is a sample implementation - customize based on your partitioning strategy +cat <