Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion API/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>TikTokLiveJava</artifactId>
<groupId>io.github.jwdeveloper.tiktok</groupId>
<version>1.11.11-Release</version>
<version>1.11.12-Release</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>API</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ public interface ListenersManager
void addListener(Object listener);

void removeListener(Object listener);

/**
* Releases resources held by this manager (e.g. async listener executor). Default no-op.
* Idempotent.
*/
default void shutdown() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ default void disconnect() {
disconnect(LiveClientStopType.NORMAL);
}

/**
* Shuts down this client permanently: closes the connection, cancels pending reconnect attempts,
* removes event listeners and subscriptions, and releases listener thread-pool resources.
* <p>
* The client must not be used after {@code stop()}; further {@link #connect()} calls will fail.
* Idempotent: safe to call more than once.
*/
void stop();

/**
* Use to manually invoke event
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import io.github.jwdeveloper.tiktok.data.events.common.TikTokEvent;
import io.github.jwdeveloper.tiktok.live.builder.EventConsumer;

import java.util.HashSet;
import java.util.Optional;

public interface LiveEventsHandler {
void publish(LiveClient tikTokLiveClient, TikTokEvent tikTokEvent);

Expand All @@ -38,4 +35,11 @@ public interface LiveEventsHandler {
<T extends TikTokEvent> void unsubscribe(EventConsumer<T> consumer);

<T extends TikTokEvent> void unsubscribe(Class<?> clazz, EventConsumer<T> consumer);

/**
* Removes all event subscriptions. Default implementation does nothing; custom handlers should
* clear their internal subscriber state when supporting full client shutdown.
*/
default void clearSubscriptions() {
}
}
2 changes: 1 addition & 1 deletion Client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>TikTokLiveJava</artifactId>
<groupId>io.github.jwdeveloper.tiktok</groupId>
<version>1.11.11-Release</version>
<version>1.11.12-Release</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@
import io.github.jwdeveloper.tiktok.messages.webcast.ProtoMessageFetchResult;
import io.github.jwdeveloper.tiktok.models.ConnectionState;
import io.github.jwdeveloper.tiktok.websocket.*;
import lombok.AccessLevel;
import lombok.Getter;

import java.util.ArrayList;
import java.util.Base64;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Logger;

Expand All @@ -57,6 +60,13 @@ public class TikTokLiveClient implements LiveClient
private final GiftsManager giftManager;
private final LiveMessagesHandler messageHandler;

@Getter(AccessLevel.NONE)
private final Object lifecycleLock = new Object();
@Getter(AccessLevel.NONE)
private final AtomicBoolean stopped = new AtomicBoolean(false);
@Getter(AccessLevel.NONE)
private volatile ScheduledFuture<?> pendingReconnect;

public TikTokLiveClient(
LiveMessagesHandler messageHandler,
GiftsManager giftsManager,
Expand All @@ -79,6 +89,11 @@ public TikTokLiveClient(
}

public void connect() {
synchronized (lifecycleLock) {
if (stopped.get()) {
throw new TikTokLiveException("Client has been stopped and cannot connect again");
}
}
try {
if (clientSettings.isUseEulerstreamWebsocket())
tryEulerConnect();
Expand All @@ -90,11 +105,19 @@ public void connect() {
tikTokEventHandler.publish(this, new TikTokDisconnectedEvent("Exception: " + e.getMessage()));

if (e instanceof TikTokLiveOfflineHostException && clientSettings.isRetryOnConnectionFailure()) {
AsyncHandler.getReconnectScheduler().schedule(() -> {
logger.info("Reconnecting");
tikTokEventHandler.publish(this, new TikTokReconnectingEvent());
this.connect();
}, clientSettings.getRetryConnectionTimeout().toMillis(), TimeUnit.MILLISECONDS);
synchronized (lifecycleLock) {
if (!stopped.get()) {
cancelPendingReconnectLocked();
pendingReconnect = AsyncHandler.getReconnectScheduler().schedule(() -> {
if (stopped.get()) {
return;
}
logger.info("Reconnecting");
tikTokEventHandler.publish(this, new TikTokReconnectingEvent());
this.connect();
}, clientSettings.getRetryConnectionTimeout().toMillis(), TimeUnit.MILLISECONDS);
}
}
}
throw e;
} catch (Exception e) {
Expand All @@ -111,6 +134,10 @@ private void tryEulerConnect() {

setState(ConnectionState.CONNECTING);
tikTokEventHandler.publish(this, new TikTokConnectingEvent());
if (stopped.get()) {
setState(ConnectionState.DISCONNECTED);
throw new TikTokLiveException("Connection aborted: client was stopped");
}
webSocketClient.start(null, this);
setState(ConnectionState.CONNECTED);
}
Expand Down Expand Up @@ -161,6 +188,10 @@ public void tryConnect() {

var liveConnectionRequest = new LiveConnectionData.Request(userData.getRoomInfo().getRoomId());
var liveConnectionData = httpClient.fetchLiveConnectionData(liveConnectionRequest);
if (stopped.get()) {
setState(ConnectionState.DISCONNECTED);
throw new TikTokLiveException("Connection aborted: client was stopped");
}
webSocketClient.start(liveConnectionData, this);

setState(ConnectionState.CONNECTED);
Expand All @@ -174,6 +205,29 @@ public void disconnect(LiveClientStopType type) {
setState(ConnectionState.DISCONNECTED);
}

@Override
public void stop() {
synchronized (lifecycleLock) {
if (!stopped.compareAndSet(false, true)) {
return;
}
cancelPendingReconnectLocked();
}
disconnect(LiveClientStopType.DISCONNECT);
for (Object listener : new ArrayList<>(listenersManager.getListeners())) {
listenersManager.removeListener(listener);
}
tikTokEventHandler.clearSubscriptions();
listenersManager.shutdown();
}

private void cancelPendingReconnectLocked() {
if (pendingReconnect != null) {
pendingReconnect.cancel(false);
pendingReconnect = null;
}
}

private void setState(ConnectionState connectionState) {
logger.info("TikTokLive client state: " + connectionState.name());
roomInfo.setConnectionState(connectionState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,29 @@ public TikTokLiveEventHandler() {
events = new HashMap<>();
}

public void publish(LiveClient tikTokLiveClient, TikTokEvent tikTokEvent) {
public synchronized void publish(LiveClient tikTokLiveClient, TikTokEvent tikTokEvent) {
Optional.ofNullable(events.get(TikTokEvent.class)).ifPresent(handlers -> handlers.forEach(handler -> handler.onEvent(tikTokLiveClient, tikTokEvent)));
Optional.ofNullable(events.get(tikTokEvent.getClass())).ifPresent(handlers -> handlers.forEach(handler -> handler.onEvent(tikTokLiveClient, tikTokEvent)));
}

public <T extends TikTokEvent> void subscribe(Class<?> clazz, EventConsumer<T> event) {
public synchronized <T extends TikTokEvent> void subscribe(Class<?> clazz, EventConsumer<T> event) {
events.computeIfAbsent(clazz, e -> new HashSet<>()).add(event);
}

public <T extends TikTokEvent> void unsubscribeAll(Class<?> clazz) {
public synchronized <T extends TikTokEvent> void unsubscribeAll(Class<?> clazz) {
events.remove(clazz);
}

public <T extends TikTokEvent> void unsubscribe(EventConsumer<T> consumer) {
events.forEach((key, value) -> value.remove(consumer));
public synchronized <T extends TikTokEvent> void unsubscribe(EventConsumer<T> consumer) {
events.forEach((key, value) -> value.remove(consumer));
}

public <T extends TikTokEvent> void unsubscribe(Class<?> clazz, EventConsumer<T> consumer) {
public synchronized <T extends TikTokEvent> void unsubscribe(Class<?> clazz, EventConsumer<T> consumer) {
Optional.ofNullable(clazz).map(events::get).ifPresent(consumers -> consumers.remove(consumer));
}
}

@Override
public synchronized void clearSubscriptions() {
events.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class TikTokListenersManager implements ListenersManager {
Expand All @@ -44,6 +45,7 @@ public class TikTokListenersManager implements ListenersManager {
private final LiveEventsHandler eventsHandler;
private final ExecutorService executorService;
private final DependanceContainer dependanceContainer;
private final AtomicBoolean shutdown = new AtomicBoolean(false);


public TikTokListenersManager(LiveEventsHandler tikTokEventHandler,
Expand All @@ -61,6 +63,9 @@ public List<Object> getListeners() {

@Override
public void addListener(Object listener) {
if (shutdown.get()) {
throw new TikTokLiveException("ListenersManager has been shut down");
}
if (listeners.containsKey(listener)) {
throw new TikTokLiveException("Listener " + listener.getClass() + " has already been registered");
}
Expand All @@ -84,6 +89,15 @@ public void removeListener(Object listener) {
listeners.remove(listener);
}

@Override
public void shutdown() {
if (!shutdown.compareAndSet(false, true)) {
return;
}
executorService.shutdownNow();
listeners.clear();
}

private List<ListenerMethodInfo> getMethodsInfo(Object listener) {
return Arrays.stream(listener.getClass().getDeclaredMethods())
.filter(e -> e.isAnnotationPresent(TikTokEventObserver.class))
Expand Down
Loading