Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.vertx.pgclient.PgNotice;
import io.vertx.pgclient.PgNotification;
import io.vertx.pgclient.impl.codec.NoticeResponse;
import io.vertx.pgclient.impl.codec.TxFailedEvent;
import io.vertx.pgclient.impl.codec.TxStatusEvent;
import io.vertx.pgclient.spi.PgDriver;
import io.vertx.sqlclient.codec.SocketConnectionBase;
import io.vertx.sqlclient.internal.SqlConnectionBase;
Expand Down Expand Up @@ -99,9 +99,9 @@ public void handleEvent(Object event) {
} else {
notice.log(SocketConnectionBase.logger);
}
} else if (event instanceof TxFailedEvent) {
} else if (event instanceof TxStatusEvent) {
if (tx != null) {
tx.fail();
tx.status(((TxStatusEvent) event).status());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.vertx.pgclient.impl.codec.NoticeResponse;
import io.vertx.pgclient.impl.codec.PgCodec;
import io.vertx.pgclient.impl.codec.PgCommandMessage;
import io.vertx.pgclient.impl.codec.TxFailedEvent;
import io.vertx.pgclient.impl.codec.TxStatusEvent;
import io.vertx.sqlclient.codec.CommandMessage;
import io.vertx.sqlclient.codec.SocketConnectionBase;
import io.vertx.sqlclient.spi.connection.Connection;
Expand All @@ -41,6 +41,7 @@
import io.vertx.sqlclient.spi.protocol.CommandBase;
import io.vertx.sqlclient.spi.protocol.ExtendedQueryCommand;
import io.vertx.sqlclient.spi.protocol.InitCommand;
import io.vertx.sqlclient.spi.protocol.SavepointCommand;
import io.vertx.sqlclient.spi.protocol.SimpleQueryCommand;
import io.vertx.sqlclient.spi.protocol.TxCommand;

Expand Down Expand Up @@ -116,7 +117,7 @@ Future<Void> sendCancelRequestMessage(int processId, int secretKey) {
@Override
protected void handleMessage(Object msg) {
super.handleMessage(msg);
if (msg instanceof Notification || msg instanceof TxFailedEvent || msg instanceof NoticeResponse) {
if (msg instanceof Notification || msg instanceof TxStatusEvent || msg instanceof NoticeResponse) {
handleEvent(msg);
}
}
Expand Down Expand Up @@ -172,6 +173,15 @@ protected <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
SocketConnectionBase.NULL_COLLECTOR,
QueryResultHandler.NOOP_HANDLER);
super.doSchedule(cmd2, (res, err) -> handler.complete(tx.result(), err));
} else if (cmd instanceof SavepointCommand) {
SavepointCommand<R> savepoint = (SavepointCommand<R>) cmd;
SimpleQueryCommand<Void> cmd2 = new SimpleQueryCommand<>(
savepoint.sql(),
false,
false,
SocketConnectionBase.NULL_COLLECTOR,
QueryResultHandler.NOOP_HANDLER);
super.doSchedule(cmd2, (res, err) -> handler.complete(savepoint.result(), err));
} else {
super.doSchedule(cmd, handler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,11 @@ private void decodeRowDescription(ByteBuf in) {
private void decodeReadyForQuery(ChannelHandlerContext ctx, ByteBuf in) {
byte id = in.readByte();
if (id == I) {
// IDLE
ctx.fireChannelRead(TxStatusEvent.IDLE);
} else if (id == T) {
// ACTIVE
ctx.fireChannelRead(TxStatusEvent.ACTIVE);
} else {
// FAILED
ctx.fireChannelRead(TxFailedEvent.INSTANCE);
ctx.fireChannelRead(TxStatusEvent.FAILED);
}
codec.peek().handleReadyForQuery();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2011-2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.pgclient.impl.codec;

import io.vertx.sqlclient.impl.TransactionState;

public class TxStatusEvent {

public static final TxStatusEvent IDLE = new TxStatusEvent(TransactionState.IDLE);
public static final TxStatusEvent ACTIVE = new TxStatusEvent(TransactionState.ACTIVE);
public static final TxStatusEvent FAILED = new TxStatusEvent(TransactionState.FAILED);

private final TransactionState status;

private TxStatusEvent(TransactionState status) {
this.status = status;
}

public TransactionState status() {
return status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public int appendQueryPlaceholder(StringBuilder queryBuilder, int index, int cur
return index;
}

@Override
public boolean supportsSavepoints() {
return true;
}

@Override
public SqlConnectionInternal wrapConnection(ContextInternal context, ConnectionFactory<PgConnectOptions> factory, Connection connection) {
return new PgConnectionImpl((PgConnectionFactory) factory, context, connection);
Expand Down
Loading