diff --git a/examples/demos/fundamentals/002_production_line_optimisation/.meta.yml b/examples/demos/fundamentals/002_production_line_optimisation/.meta.yml new file mode 100644 index 00000000..986552cc --- /dev/null +++ b/examples/demos/fundamentals/002_production_line_optimisation/.meta.yml @@ -0,0 +1,2 @@ +tags: + - optimisation \ No newline at end of file diff --git a/examples/demos/fundamentals/002_production_line_optimisation/production-line.ipynb b/examples/demos/fundamentals/002_production_line_optimisation/production-line.ipynb new file mode 100644 index 00000000..9a1e1406 --- /dev/null +++ b/examples/demos/fundamentals/002_production_line_optimisation/production-line.ipynb @@ -0,0 +1,367 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# Production process optimisation\n", + "\n", + "[![Open in GitHub Codespaces](https://github.com/codespaces/badge.svg)](https://codespaces.new/plugboard-dev/plugboard)\n", + "\n", + "This production line simulation models the unit cost of a small manufacturing operation with the following components:\n", + "\n", + "### Components:\n", + "\n", + "1. **Input**: Provides a fixed number of input items (10) per simulation step\n", + "2. **InputStockpile**: \n", + " - Tracks inventory levels\n", + " - Decrements based on machine operations (Machine 1: 5 items/step, Machine 2: 8 items/step)\n", + " - Calculates storage costs ($10 per item above 50)\n", + "3. **Controller** (2 instances):\n", + " - Controller 1: Activates Machine 1 when stockpile >= 30 items\n", + " - Controller 2: Activates Machine 2 when stockpile >= 50 items\n", + "4. **MachineCost** (2 instances):\n", + " - Machine 1: $100 per step when running\n", + " - Machine 2: $200 per step when running\n", + "5. **OutputStock**: Tracks total items processed by both machines\n", + "6. **TotalCost**: Maintains running total of all costs (storage + machine operations)\n", + "7. **CostPerUnit**: Calculates the cost per unit produced (total cost รท total output)\n", + "\n", + "### Key Metrics:\n", + "- **Production Efficiency**: Percentage of input items successfully processed\n", + "- **Cost Per Unit**: Total operational cost divided by units produced\n", + "- **Storage Utilization**: How well the system manages inventory levels\n", + "\n", + "The simulation runs for 1000 steps to provide stable long-term cost metrics. We can then use the model to find optimal values for the threshold levels on the two controllers." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": {}, + "outputs": [], + "source": [ + "import typing as _t\n", + "\n", + "from plugboard.component import Component, IOController as IO\n", + "from plugboard.schemas import ComponentArgsDict\n", + "from plugboard.connector import AsyncioConnector\n", + "from plugboard.process import LocalProcess\n", + "from plugboard.schemas import ConnectorSpec" + ] + }, + { + "cell_type": "markdown", + "id": "2", + "metadata": {}, + "source": [ + "Define the components for the model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3", + "metadata": {}, + "outputs": [], + "source": [ + "class Input(Component):\n", + " \"\"\"Provides a fixed number of input items per step.\"\"\"\n", + "\n", + " io = IO(outputs=[\"items\"])\n", + "\n", + " def __init__(\n", + " self,\n", + " items_per_step: int = 10,\n", + " total_steps: int = 1000,\n", + " **kwargs: _t.Unpack[ComponentArgsDict],\n", + " ) -> None:\n", + " super().__init__(**kwargs)\n", + " self._items_per_step = items_per_step\n", + " self._total_steps = total_steps\n", + "\n", + " async def step(self) -> None:\n", + " self.items = self._items_per_step\n", + " if self._total_steps > 0:\n", + " self._total_steps -= 1\n", + " else:\n", + " self.items = 0\n", + " await self.io.close()\n", + "\n", + "\n", + "class InputStockpile(Component):\n", + " \"\"\"Tracks input stockpile, decrements based on machine operations, and calculates storage costs.\"\"\"\n", + "\n", + " io = IO(\n", + " inputs=[\"incoming_items\", \"machine1_running\", \"machine2_running\"],\n", + " outputs=[\"size\", \"storage_cost\"],\n", + " )\n", + "\n", + " def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:\n", + " super().__init__(**kwargs)\n", + " self._size = 0\n", + "\n", + " async def step(self) -> None:\n", + " # Add incoming items\n", + " self._size += self.incoming_items\n", + "\n", + " # Remove items processed by machines\n", + " if self.machine1_running:\n", + " self._size = max(0, self._size - 5) # Machine 1 processes 5 items\n", + " if self.machine2_running:\n", + " self._size = max(0, self._size - 8) # Machine 2 processes 8 items\n", + "\n", + " # Calculate storage cost: $10 per item above 50\n", + " storage_cost = max(0, self._size - 50) * 10\n", + "\n", + " self.size = self._size\n", + " self.storage_cost = storage_cost\n", + "\n", + "\n", + "class Controller(Component):\n", + " \"\"\"Controls machine operation based on stockpile size.\"\"\"\n", + "\n", + " io = IO(inputs=[\"stockpile_size\"], outputs=[\"should_run\"])\n", + "\n", + " def __init__(self, threshold: int = 30, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:\n", + " super().__init__(**kwargs)\n", + " self._threshold = threshold\n", + "\n", + " async def step(self) -> None:\n", + " self.should_run = self.stockpile_size >= self._threshold\n", + "\n", + "\n", + "class MachineCost(Component):\n", + " \"\"\"Calculates machine running costs.\"\"\"\n", + "\n", + " io = IO(inputs=[\"is_running\"], outputs=[\"cost\"])\n", + "\n", + " def __init__(\n", + " self, cost_per_step: float = 100.0, **kwargs: _t.Unpack[ComponentArgsDict]\n", + " ) -> None:\n", + " super().__init__(**kwargs)\n", + " self._cost_per_step = cost_per_step\n", + "\n", + " async def step(self) -> None:\n", + " self.cost = self._cost_per_step if self.is_running else 0.0\n", + "\n", + "\n", + "class OutputStock(Component):\n", + " \"\"\"Tracks total items processed by both machines.\"\"\"\n", + "\n", + " io = IO(inputs=[\"machine1_running\", \"machine2_running\"], outputs=[\"total_output\"])\n", + "\n", + " def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:\n", + " super().__init__(**kwargs)\n", + " self._total = 0\n", + "\n", + " async def step(self) -> None:\n", + " if self.machine1_running:\n", + " self._total += 5\n", + " if self.machine2_running:\n", + " self._total += 8\n", + " self.total_output = self._total\n", + "\n", + "\n", + "class TotalCost(Component):\n", + " \"\"\"Keeps running total of all costs.\"\"\"\n", + "\n", + " io = IO(inputs=[\"storage_cost\", \"machine1_cost\", \"machine2_cost\"], outputs=[\"total_cost\"])\n", + "\n", + " def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:\n", + " super().__init__(**kwargs)\n", + " self._total = 0.0\n", + "\n", + " async def step(self) -> None:\n", + " step_cost = self.storage_cost + self.machine1_cost + self.machine2_cost\n", + " self._total += step_cost\n", + " self.total_cost = self._total\n", + "\n", + "\n", + "class CostPerUnit(Component):\n", + " \"\"\"Calculates cost per unit produced.\"\"\"\n", + "\n", + " io = IO(inputs=[\"total_cost\", \"total_output\"], outputs=[\"cost_per_unit\"])\n", + "\n", + " async def step(self) -> None:\n", + " if self.total_output > 0:\n", + " self.cost_per_unit = self.total_cost / self.total_output\n", + " else:\n", + " self.cost_per_unit = 0.0" + ] + }, + { + "cell_type": "markdown", + "id": "4", + "metadata": {}, + "source": [ + "Now assemble into a `Process` and make connections between the components." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [ + "connect = lambda in_, out_: AsyncioConnector(spec=ConnectorSpec(source=in_, target=out_))\n", + "\n", + "process = LocalProcess(\n", + " components=[\n", + " # Input component provides items\n", + " Input(name=\"input\", items_per_step=10),\n", + " # Stockpile manages inventory and storage costs\n", + " # We need to include initial values here to resolve a circular graph\n", + " InputStockpile(\n", + " name=\"stockpile\",\n", + " initial_values={\"machine1_running\": [False], \"machine2_running\": [False]},\n", + " ),\n", + " # Controllers decide when machines should run\n", + " Controller(name=\"controller1\", threshold=30), # Machine 1 threshold\n", + " Controller(name=\"controller2\", threshold=50), # Machine 2 threshold\n", + " # Machine cost components\n", + " MachineCost(name=\"machine1_cost\", cost_per_step=100.0),\n", + " MachineCost(name=\"machine2_cost\", cost_per_step=200.0),\n", + " # Output tracking\n", + " OutputStock(name=\"output_stock\"),\n", + " # Cost tracking and calculation\n", + " TotalCost(name=\"total_cost\"),\n", + " CostPerUnit(name=\"cost_per_unit\"),\n", + " ],\n", + " connectors=[\n", + " # Input flow\n", + " connect(\"input.items\", \"stockpile.incoming_items\"),\n", + " # Stockpile to controllers\n", + " connect(\"stockpile.size\", \"controller1.stockpile_size\"),\n", + " connect(\"stockpile.size\", \"controller2.stockpile_size\"),\n", + " # Controllers to machine costs\n", + " connect(\"controller1.should_run\", \"machine1_cost.is_running\"),\n", + " connect(\"controller2.should_run\", \"machine2_cost.is_running\"),\n", + " # Controllers to stockpile (for processing)\n", + " connect(\"controller1.should_run\", \"stockpile.machine1_running\"),\n", + " connect(\"controller2.should_run\", \"stockpile.machine2_running\"),\n", + " # Controllers to output stock\n", + " connect(\"controller1.should_run\", \"output_stock.machine1_running\"),\n", + " connect(\"controller2.should_run\", \"output_stock.machine2_running\"),\n", + " # All costs to total cost\n", + " connect(\"stockpile.storage_cost\", \"total_cost.storage_cost\"),\n", + " connect(\"machine1_cost.cost\", \"total_cost.machine1_cost\"),\n", + " connect(\"machine2_cost.cost\", \"total_cost.machine2_cost\"),\n", + " # Total cost and output to cost per unit\n", + " connect(\"total_cost.total_cost\", \"cost_per_unit.total_cost\"),\n", + " connect(\"output_stock.total_output\", \"cost_per_unit.total_output\"),\n", + " ],\n", + ")\n", + "\n", + "print(\"Production line process created successfully!\")\n", + "print(f\"Process has {len(process.components)} components and {len(process.connectors)} connectors\")" + ] + }, + { + "cell_type": "markdown", + "id": "6", + "metadata": {}, + "source": [ + "We can create a diagram of the process to make a visual check. \n", + "\n", + "![Process Diagram](https://mermaid.ink/img/pako:eNq1llFrgzAUhf9KyKO0D_VRShn0abCxsXVvA0njLYbGROKVPYj_fSbO1jFsQe-ePefky81JsOHSZsATftL2S-bCIXt6-zSMKVPW-NCwKhclJMzZ2mSQrZgWR9AJe_Sft0e3i6KgjCLWsvV6xyq08lwqDXe874OuD7nYfJBfnyInAElr0FmtwW2mo_YXUR8zMv0jUDwHKB6AKDYWgAohc2Vgk0pb3Tjy516270R91i_fX6r5uxtTxTOp4gmqhbOiqjfZmOiBFk7I1tgtl4YFppNegipA9VFjG_mQCJnIngG0KPSdch-85lrtq2WgIbu65EjL7y0d0uKQn3e7wrQEl9ZG4a0-VvgK7qMTDYUc-QYkokbSgvEVL8AVQmU8aTjmUPg_gwxOotbI2_YbJkviBg==)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7", + "metadata": {}, + "outputs": [], + "source": [ + "# Visualize the process flow\n", + "from plugboard.diagram import MermaidDiagram\n", + "\n", + "diagram_url = MermaidDiagram.from_process(process).url\n", + "print(diagram_url)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "# Run the simulation\n", + "async with process:\n", + " await process.run()" + ] + }, + { + "cell_type": "markdown", + "id": "9", + "metadata": {}, + "source": [ + "At the end of the simulation, the final cost per unit is available on the `CostPerUnit` component output." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [], + "source": [ + "final_cost = process.components[\"cost_per_unit\"].cost_per_unit\n", + "print(f\"Final cost per unit: ${final_cost:.2f}\")" + ] + }, + { + "cell_type": "markdown", + "id": "11", + "metadata": {}, + "source": [ + "Now suppose we want to build an optimisation to find the best values of the threshold settings on `controller1` and `controller2`. Specifically we want to minimise `cost_per_unit` by choosing the settings on the two controllers. The easiest way to do this is to convert our model to Python code with an associated YAML config file. See:\n", + "\n", + "* `production_line.py`; and\n", + "* `production-line.yaml`.\n", + "\n", + "The easiest way to launch an optimisation job is via the CLI by running:\n", + "\n", + "```sh\n", + "plugboard process tune production-line.yaml\n", + "```\n", + "This will use Optuna to explore the parameter space and report the controller thresholds that minimise cost per unit at the end of the run, for example:\n", + "\n", + "```\n", + "Best parameters found:\n", + "Config: {'controller1.threshold': 10, 'controller2.threshold': 38} - Metrics: {'cost_per_unit.cost_per_unit': 22.491974317817014, 'timestamp': 1755022664, \n", + "'checkpoint_dir_name': None, 'done': True, 'training_iteration': 1, 'trial_id': '287aff0b', 'date': '2025-08-12_19-17-44', 'time_this_iter_s': 2.224583864212036, \n", + "'time_total_s': 2.224583864212036, 'pid': 94765, 'hostname': 'hostname.local', 'node_ip': '127.0.0.1', 'config': {'controller1.threshold': 10, \n", + "'controller2.threshold': 38}, 'time_since_restore': 2.224583864212036, 'iterations_since_restore': 1, 'experiment_tag': \n", + "'14_controller1_threshold=10,controller2_threshold=38'}\n", + "```\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.8" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/demos/fundamentals/002_production_line_optimisation/production-line.yaml b/examples/demos/fundamentals/002_production_line_optimisation/production-line.yaml new file mode 100644 index 00000000..c3a813bd --- /dev/null +++ b/examples/demos/fundamentals/002_production_line_optimisation/production-line.yaml @@ -0,0 +1,99 @@ +plugboard: + process: + args: + components: + - type: production_line.Input + args: + name: input + items_per_step: 10 + - type: production_line.InputStockpile + args: + name: stockpile + initial_values: + machine1_running: [false] + machine2_running: [false] + - type: production_line.Controller + args: + name: controller1 + threshold: 30 + - type: production_line.Controller + args: + name: controller2 + threshold: 50 + - type: production_line.MachineCost + args: + name: machine1_cost + cost_per_step: 100.0 + - type: production_line.MachineCost + args: + name: machine2_cost + cost_per_step: 200.0 + - type: production_line.OutputStock + args: + name: output_stock + - type: production_line.TotalCost + args: + name: total_cost + - type: production_line.CostPerUnit + args: + name: cost_per_unit + connectors: + # Input flow + - source: input.items + target: stockpile.incoming_items + # Stockpile to controllers + - source: stockpile.size + target: controller1.stockpile_size + - source: stockpile.size + target: controller2.stockpile_size + # Controllers to machine costs + - source: controller1.should_run + target: machine1_cost.is_running + - source: controller2.should_run + target: machine2_cost.is_running + # Controllers to stockpile (for processing) + - source: controller1.should_run + target: stockpile.machine1_running + - source: controller2.should_run + target: stockpile.machine2_running + # Controllers to output stock + - source: controller1.should_run + target: output_stock.machine1_running + - source: controller2.should_run + target: output_stock.machine2_running + # All costs to total cost + - source: stockpile.storage_cost + target: total_cost.storage_cost + - source: machine1_cost.cost + target: total_cost.machine1_cost + - source: machine2_cost.cost + target: total_cost.machine2_cost + # Total cost and output to cost per unit + - source: total_cost.total_cost + target: cost_per_unit.total_cost + - source: output_stock.total_output + target: cost_per_unit.total_output + tune: + args: + objective: + object_name: cost_per_unit + field_type: field + field_name: cost_per_unit + parameters: + - type: ray.tune.randint + object_type: component + object_name: controller1 + field_type: arg + field_name: threshold + lower: 10 + upper: 100 + - type: ray.tune.randint + object_type: component + object_name: controller2 + field_type: arg + field_name: threshold + lower: 10 + upper: 100 + num_samples: 40 + mode: min + max_concurrent: 4 diff --git a/examples/demos/fundamentals/002_production_line_optimisation/production_line.py b/examples/demos/fundamentals/002_production_line_optimisation/production_line.py new file mode 100644 index 00000000..c714a945 --- /dev/null +++ b/examples/demos/fundamentals/002_production_line_optimisation/production_line.py @@ -0,0 +1,131 @@ +"""Defines components used in the production line example.""" + +import typing as _t + +from plugboard.component import Component, IOController as IO +from plugboard.schemas import ComponentArgsDict + + +class Input(Component): + """Provides a fixed number of input items per step.""" + + io = IO(outputs=["items"]) + + def __init__( + self, + items_per_step: int = 10, + total_steps: int = 1000, + **kwargs: _t.Unpack[ComponentArgsDict], + ) -> None: + super().__init__(**kwargs) + self._items_per_step = items_per_step + self._total_steps = total_steps + + async def step(self) -> None: + self.items = self._items_per_step + if self._total_steps > 0: + self._total_steps -= 1 + else: + self.items = 0 + await self.io.close() + + +class InputStockpile(Component): + """Tracks input stockpile, decrements based on machine operations, and calculates storage costs.""" + + io = IO( + inputs=["incoming_items", "machine1_running", "machine2_running"], + outputs=["size", "storage_cost"], + ) + + def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None: + super().__init__(**kwargs) + self._size = 0 + + async def step(self) -> None: + # Add incoming items + self._size += self.incoming_items + + # Remove items processed by machines + if self.machine1_running: + self._size = max(0, self._size - 5) # Machine 1 processes 5 items + if self.machine2_running: + self._size = max(0, self._size - 8) # Machine 2 processes 8 items + + # Calculate storage cost: $10 per item above 50 + storage_cost = max(0, self._size - 50) * 10 + + self.size = self._size + self.storage_cost = storage_cost + + +class Controller(Component): + """Controls machine operation based on stockpile size.""" + + io = IO(inputs=["stockpile_size"], outputs=["should_run"]) + + def __init__(self, threshold: int = 30, **kwargs: _t.Unpack[ComponentArgsDict]) -> None: + super().__init__(**kwargs) + self._threshold = threshold + + async def step(self) -> None: + self.should_run = self.stockpile_size >= self._threshold + + +class MachineCost(Component): + """Calculates machine running costs.""" + + io = IO(inputs=["is_running"], outputs=["cost"]) + + def __init__( + self, cost_per_step: float = 100.0, **kwargs: _t.Unpack[ComponentArgsDict] + ) -> None: + super().__init__(**kwargs) + self._cost_per_step = cost_per_step + + async def step(self) -> None: + self.cost = self._cost_per_step if self.is_running else 0.0 + + +class OutputStock(Component): + """Tracks total items processed by both machines.""" + + io = IO(inputs=["machine1_running", "machine2_running"], outputs=["total_output"]) + + def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None: + super().__init__(**kwargs) + self._total = 0 + + async def step(self) -> None: + if self.machine1_running: + self._total += 5 + if self.machine2_running: + self._total += 8 + self.total_output = self._total + + +class TotalCost(Component): + """Keeps running total of all costs.""" + + io = IO(inputs=["storage_cost", "machine1_cost", "machine2_cost"], outputs=["total_cost"]) + + def __init__(self, **kwargs: _t.Unpack[ComponentArgsDict]) -> None: + super().__init__(**kwargs) + self._total = 0.0 + + async def step(self) -> None: + step_cost = self.storage_cost + self.machine1_cost + self.machine2_cost + self._total += step_cost + self.total_cost = self._total + + +class CostPerUnit(Component): + """Calculates cost per unit produced.""" + + io = IO(inputs=["total_cost", "total_output"], outputs=["cost_per_unit"]) + + async def step(self) -> None: + if self.total_output > 0: + self.cost_per_unit = self.total_cost / self.total_output + else: + self.cost_per_unit = 0.0