001/*
002 * VM-Operator
003 * Copyright (C) 2023,2024 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.manager;
020
021import com.google.gson.JsonObject;
022import io.kubernetes.client.apimachinery.GroupVersionKind;
023import io.kubernetes.client.openapi.ApiException;
024import io.kubernetes.client.util.Watch;
025import java.io.IOException;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.concurrent.ConcurrentHashMap;
031import org.jdrupes.vmoperator.common.Constants.Crd;
032import org.jdrupes.vmoperator.common.Constants.Status;
033import org.jdrupes.vmoperator.common.K8s;
034import org.jdrupes.vmoperator.common.K8sClient;
035import org.jdrupes.vmoperator.common.K8sDynamicModel;
036import org.jdrupes.vmoperator.common.K8sDynamicModels;
037import org.jdrupes.vmoperator.common.K8sDynamicStub;
038import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
039import org.jdrupes.vmoperator.common.VmDefinition.Assignment;
040import org.jdrupes.vmoperator.common.VmDefinitionStub;
041import org.jdrupes.vmoperator.common.VmPool;
042import org.jdrupes.vmoperator.manager.events.GetPools;
043import org.jdrupes.vmoperator.manager.events.VmPoolChanged;
044import org.jdrupes.vmoperator.manager.events.VmResourceChanged;
045import org.jdrupes.vmoperator.util.GsonPtr;
046import org.jgrapes.core.Channel;
047import org.jgrapes.core.EventPipeline;
048import org.jgrapes.core.annotation.Handler;
049import org.jgrapes.core.events.Attached;
050
051/**
052 * Watches for changes of VM pools. Reports the changes using 
053 * {@link VmPoolChanged} events fired on a special pipeline to
054 * avoid concurrent change informations.
055 */
056public class PoolMonitor extends
057        AbstractMonitor<K8sDynamicModel, K8sDynamicModels, Channel> {
058
059    private final Map<String, VmPool> pools = new ConcurrentHashMap<>();
060    private EventPipeline poolPipeline;
061
062    /**
063     * Instantiates a new VM pool manager.
064     *
065     * @param componentChannel the component channel
066     */
067    public PoolMonitor(Channel componentChannel) {
068        super(componentChannel, K8sDynamicModel.class,
069            K8sDynamicModels.class);
070    }
071
072    /**
073     * On attached.
074     *
075     * @param event the event
076     */
077    @Handler
078    @SuppressWarnings("PMD.CompareObjectsWithEquals")
079    public void onAttached(Attached event) {
080        if (event.node() == this) {
081            poolPipeline = newEventPipeline();
082        }
083    }
084
085    @Override
086    protected void prepareMonitoring() throws IOException, ApiException {
087        client(new K8sClient());
088
089        // Get all our API versions
090        var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM_POOL);
091        if (ctx.isEmpty()) {
092            logger.severe(() -> "Cannot get CRD context.");
093            return;
094        }
095        context(ctx.get());
096    }
097
098    @Override
099    protected void handleChange(K8sClient client,
100            Watch.Response<K8sDynamicModel> response) {
101
102        var type = ResponseType.valueOf(response.type);
103        var poolName = response.object.metadata().getName();
104
105        // When pool is deleted, save VMs in pending
106        if (type == ResponseType.DELETED) {
107            Optional.ofNullable(pools.get(poolName)).ifPresent(pool -> {
108                pool.setUndefined();
109                if (pool.vms().isEmpty()) {
110                    pools.remove(poolName);
111                }
112                poolPipeline.fire(new VmPoolChanged(pool, true));
113            });
114            return;
115        }
116
117        // Get full definition
118        var poolModel = response.object;
119        if (poolModel.data() == null) {
120            // ADDED event does not provide data, see
121            // https://github.com/kubernetes-client/java/issues/3215
122            try {
123                poolModel = K8sDynamicStub.get(client(), context(), namespace(),
124                    poolModel.metadata().getName()).model().orElse(null);
125            } catch (ApiException e) {
126                return;
127            }
128        }
129
130        // Get pool and merge changes
131        var vmPool = pools.computeIfAbsent(poolName, k -> new VmPool(poolName));
132        vmPool.defineFrom(client().getJSON().getGson().fromJson(
133            GsonPtr.to(poolModel.data()).to("spec").get(), VmPool.class));
134        poolPipeline.fire(new VmPoolChanged(vmPool));
135    }
136
137    /**
138     * Track VM definition changes.
139     *
140     * @param event the event
141     * @throws ApiException 
142     */
143    @Handler
144    public void onVmResourceChanged(VmResourceChanged event)
145            throws ApiException {
146        final var vmDef = event.vmDefinition();
147        final String vmName = vmDef.name();
148        switch (event.type()) {
149        case ADDED:
150            vmDef.<List<String>> fromSpec("pools")
151                .orElse(Collections.emptyList()).stream().forEach(p -> {
152                    pools.computeIfAbsent(p, k -> new VmPool(p))
153                        .vms().add(vmName);
154                    poolPipeline.fire(new VmPoolChanged(pools.get(p)));
155                });
156            break;
157        case DELETED:
158            pools.values().stream().forEach(p -> {
159                if (p.vms().remove(vmName)) {
160                    poolPipeline.fire(new VmPoolChanged(p));
161                }
162            });
163            return;
164        default:
165            break;
166        }
167
168        // Sync last usage to console state change if user matches
169        if (vmDef.assignment().map(Assignment::user)
170            .map(at -> at.equals(vmDef.consoleUser().orElse(null)))
171            .orElse(true)) {
172            return;
173        }
174
175        var ccChange = vmDef.condition("ConsoleConnected")
176            .map(cc -> cc.getLastTransitionTime().toInstant());
177        if (ccChange
178            .map(tt -> vmDef.assignment().map(Assignment::lastUsed)
179                .map(alu -> alu.isAfter(tt)).orElse(true))
180            .orElse(true)) {
181            return;
182        }
183        var vmStub = VmDefinitionStub.get(client(),
184            new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM),
185            vmDef.namespace(), vmDef.name());
186        vmStub.updateStatus(from -> {
187            // TODO
188            JsonObject status = from.statusJson();
189            var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT);
190            assignment.set("lastUsed", ccChange.get().toString());
191            return status;
192        });
193    }
194
195    /**
196     * Return the requested pools.
197     *
198     * @param event the event
199     */
200    @Handler
201    public void onGetPools(GetPools event) {
202        event.setResult(pools.values().stream().filter(VmPool::isDefined)
203            .filter(p -> event.name().isEmpty()
204                || p.name().equals(event.name().get()))
205            .filter(p -> event.forUser().isEmpty() && event.forRoles().isEmpty()
206                || !p.permissionsFor(event.forUser().orElse(null),
207                    event.forRoles()).isEmpty())
208            .toList());
209    }
210}